-
Notifications
You must be signed in to change notification settings - Fork 551
Feature:3963 Step HeartBeat components #4073
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: develop
Are you sure you want to change the base?
Conversation
src/zenml/steps/heartbeat.py
Outdated
| _thread.interrupt_main() # raises KeyboardInterrupt in main thread | ||
| # Ensure we stop our own loop as well. | ||
| self._running = False | ||
| except Exception: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TODO: Improve this. For sure try to capture HTTP errors in more verbose logs to avoid excessive log generation if the error is for instance server raising 500 status code.
123c1c4 to
18d8b76
Compare
|
Questions/Comments for reviewers @schustmi @bcdurak : Log info records I see that in general core components (StepLauncher, StepRunner, etc.) we display a very small number of log records. For better visibility during development I have some log records in the heartbeat worker, should these be removed? I am assuming we display few systemic logs to avoid polluting the user experience as they would be interested in their step function logs only? Some follow-up recommendations would be - a) use structured logs with context variables (https://www.structlog.org/en/stable/) to easily filter records by metadata values b) introduce a systemic logger that is configurable. Suppressed by default, when activated it would present all systemic logs. Handling of constants Currently heartbeat interval is hard set as a class variable for the StepHeartBeatWorker cls. For sure I don't want to expose this to user-provided settings as this should be a system setting (too frequent heartbeats from multiple steps may end-up overloading the rest server). I believe a good value would be somewhere in the range of 30-60 seconds. Where would you organize this value? Under Interrupt implementation I went over our signals/daemonize implementations. While that would be the proper implementation for any unix-based system it is not compatible with Windows. I opted to use _thead.interrupt_main() instead which raises a KeyboardInterrupt exception by default, capture it with a context manager that reraises it with a custom exception. Let me know your thoughts. |
18d8b76 to
1352677
Compare
src/zenml/utils/exception_utils.py
Outdated
| self._target_exception = target_exception | ||
| self._message = message |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess to simplify this, we could just pass an instance of the exception here instead of the class and message? That would additionally also allow some exceptions which can/need to be instantiated with multiple arguments.
src/zenml/models/v2/core/step_run.py
Outdated
| """Light-weight model for Step Heartbeat responses.""" | ||
|
|
||
| id: UUID | ||
| status: str |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should probably be of type ExecutionStatus?
| "interrupting main thread", | ||
| self.name, | ||
| ) | ||
| _thread.interrupt_main() # raises KeyboardInterrupt in main thread |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My dynamic pipelines PR introduces running multiple steps in different threads, which doesn't work with this I think.
Can we somehow store the thread from which the heartbeat worker was started, and then interrupt that thread instead of the main one?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah that is an important change, good point. interrupt_main will not work here, we will need to change the pattern a bit. Should I work my changes from your branch?
| step = zen_store().get_run_step(step_run_id, hydrate=True) | ||
| pipeline_run = zen_store().get_run(step.pipeline_run_id) | ||
| verify_permission_for_model(pipeline_run, action=Action.UPDATE) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering whether this RBAC check is even necessary, as running all of this will take quite some time (two calls to the DB, then a request to the RBAC service).
Is there any real harm in leaving this unprotected? I guess it would allow users potential access to the status of the step, which I'm not sure really is a concern.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
True, we can probably do both authenication & authorization with pipeline tokens. Will discuss with @stefannica for directions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me suggest an alternative: we could limit this endpoint to only be accessed by running pipelines.
Running pipelines (the containerized environment where the steps are running actually) use something called "a workload API token" which is only valid as long as the pipeline run itself is not yet finalized. These workload API tokens are tied to a particular pipeline run (or schedule, in case of scheduled pipelines). So we can also use their scope to limit the range of targets that they can update.
Some references:
- this is the code that verifies the pipeline scoped tokens (you can see some leeway is involved): https://github.com/zenml-io/zenml/blob/main/src/zenml/zen_server/auth.py#L406-L475
- same thing for the schedule-scoped tokens: https://github.com/zenml-io/zenml/blob/main/src/zenml/zen_server/auth.py#L363-L404
A sketch of how you can use this in your endpoint:
def update_heartbeat(
step_run_id: UUID,
auth_context: AuthContext = Security(authorize),
) -> StepHeartbeatResponse:
...
if not auth_context.access_token or not auth_context.access_token.schedule_id and not auth_context.access_token.pipeline_run_id:
raise AuthorizationException("Not authorized")
if auth_context.access_token.pipeline_run_id:
# optionally, check that the step ID is part of this run ID
else: # if auth_context.access_token.schedule_id
# optionally, check that the step ID is part of a run ID that was scheduled with this schedule
This will no longer rely on RBAC calls, but it might still flood the database with a lot of requests, so maybe you could also implement a mini-caching system like the ones used in the previous code references, to reduce its impact.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@stefannica That was my initial idea as well, but do we use those tokens also when running pipelines with service accounts? I thought at some point we used the API key directly when running scheduled pipelines, but I might be misremembering.
I know for sure though that there is a way to generate a generic unscoped token instead of a workload token when running a pipeline (by setting some token expiration env variable), so we'll have to think about how we handle this case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@schustmi yes, even when running pipelines with service accounts, we generate workload API tokens scoped to the pipeline run or schedule. The only case where we use a generic unscoped token is if you set the ZENML_PIPELINE_API_TOKEN_EXPIRATION env variable. But this is a very obscure case, which I don't think we need to handle separately. In that case, we can just not run any RBAC like checks on this endpoint.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The problem is that this is a client-side env variable and I'm not sure we can recognize this case in the server endpoint that receives the heartbeat requests. It will just be a generic token that is not scoped to the run, and if we allow those to call the endpoint without any checks then everyone can do so, no?
- Backend heartbeat support (DB, API) - Heartbeat monitoring worker
3db4503 to
6d077e6
Compare
54b63a9 to
d416056
Compare
d416056 to
1d19a9f
Compare
1d19a9f to
1649b45
Compare
- Updates migration down revision refs - context-reraise exception - changes in the step-heartbeat logic - fix null heartbeat in list/get endpoints
1649b45 to
c40b9bf
Compare
| input_artifacts=step_run.regular_inputs, | ||
| output_artifact_uris=output_artifact_uris, | ||
| ) | ||
| except StepHeartBeatTerminationException as exc: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe I'm getting the logic wrong here, but this somehow seems off to me:
- We're in the main thread, and in the outermost layer we have the
ContextReraisecontext manager, that convertsKeyboardInterrupttoStepHeartBeatTerminationException. - Here, we catch the
StepHeartBeatTerminationExceptionand potentially reraise it as aKeyboardInterruptif the heatbeat worker isn't terminated. This will anyway be converted back to aStepHeartBeatTerminationExceptionby theContextReraise
Shouldn't the logic rather be the following:
- No
ContextReraiseneeded at all
try:
heartbeat_worker.start()
run_step()
except KeyboardInterrupt:
if heartbeat_worker.is_terminated:
raise StepHeartBeatTerminationException(...)
else:
raise
except:
...
finally:
heartbeat_worker.stop()
Describe changes
I implemented/fixed _ to achieve _.
Pre-requisites
Please ensure you have done the following:
developand the open PR is targetingdevelop. If your branch wasn't based on develop read Contribution guide on rebasing branch to develop.Types of changes