-
Notifications
You must be signed in to change notification settings - Fork 3
Integrate decision handler into worker #31
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Signed-off-by: Tim Li <[email protected]>
a313c42
to
324de45
Compare
c14ba05
to
428f0ac
Compare
Signed-off-by: Tim Li <[email protected]>
Signed-off-by: Tim Li <[email protected]>
a40f6a6
to
073dc34
Compare
# Deserialize the input using the client's data converter | ||
try: | ||
# Use from_data method with a single type hint of None (no type conversion) | ||
input_data_list = await self._context.client().data_converter.from_data(started_attrs.input, [None]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
type hints are needed here. I recommend creating a WorkflowDefinition class similar to activitydefinition. And pass it from registry.
Signed-off-by: Tim Li <[email protected]>
Signed-off-by: Tim Li <[email protected]>
Signed-off-by: Tim Li <[email protected]>
Signed-off-by: Tim Li <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DecisionTaskHandler should not have WorkflowEngine field but instead will have a cache to hold workflowEngines. DecisionWorker will be multithreaded on handling decision tasks and handler needs to be threadsafe.
"task_token": task.task_token[:16].hex() if task.task_token else None | ||
} | ||
) | ||
except Exception as e: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
error needs to be thrown out
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For this part, I think we actually don't need to raise error here since if decision task failure responses failed to sent to server, it will timeout and retry the decision task anyway, there is no need to disrupt the worker's execution flow as that would be counterproductive
Signed-off-by: Tim Li <[email protected]>
Signed-off-by: Tim Li <[email protected]>
) | ||
# Use thread-safe cache to get or create workflow engine | ||
cache_key = (workflow_id, run_id) | ||
with self._cache_lock: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
current cache is unbounded. We need a LFU cache here. Maybe add a TODO for next PR
Signed-off-by: Tim Li <[email protected]>
Signed-off-by: Tim Li <[email protected]>
What changed?
integrate decision handler into worker
Why?
now that handler is implemented, we will integrate that with decision worker
How did you test it?
unit test
Potential risks
Release notes
Documentation Changes