-
Notifications
You must be signed in to change notification settings - Fork 329
feat(tiers): enforce workflow concurrency limits #1962
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: efb6fbe9f3
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| # Exponential backoff with jitter, capped at max_backoff | ||
| base = min(2**attempt, max_backoff) | ||
| jitter = random.uniform(0.8, 1.2) # +/- 20% | ||
| sleep_duration = base * jitter | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Replace nondeterministic random jitter in workflow backoff
The backoff jitter uses random.uniform inside workflow code imported via workflow.unsafe.imports_passed_through, which makes the sleep duration nondeterministic; on replay the jitter can differ and Temporal will raise a nondeterminism error. This only happens when the concurrency limit is hit and the backoff path is exercised. Use a deterministic source like workflow.random() or move the jitter calculation into an activity to keep workflow history replayable.
Useful? React with 👍 / 👎.
| if result.acquired: | ||
| self._workflow_permit_acquired = True | ||
| self.logger.info( | ||
| "Workflow permit acquired", | ||
| current_count=result.current_count, | ||
| limit=limit, | ||
| ) | ||
| return |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Keep workflow permits alive while long runs execute
Permits are stored in a Redis sorted set with TTL-based pruning (DEFAULT_TTL_SECONDS is 3600), but after acquiring a permit there is no periodic heartbeat to refresh the timestamp. Any workflow running longer than the TTL will have its permit pruned, allowing new workflows to start and exceed max_concurrent_workflows until the original finishes. This only affects long-running workflows; consider scheduling heartbeat_workflow_permit_activity or renewing the TTL while the workflow runs.
Useful? React with 👍 / 👎.
| # Cap batch size based on tier limits for concurrent actions | ||
| if ( | ||
| self._tier_limits is not None | ||
| and self._tier_limits.max_concurrent_actions is not None | ||
| ): | ||
| batch_size = min(batch_size, self._tier_limits.max_concurrent_actions) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Enforce org-wide action concurrency instead of per-workflow batch
max_concurrent_actions is described as an org-wide cap, but this change only reduces the batch size within a single workflow. If multiple workflows execute concurrently, each can still run up to max_concurrent_actions actions in parallel, so the org-wide limit is exceeded. This only shows up when multiple workflows for the same org run at once; if the intent is truly org-scoped, use a shared semaphore instead of per-workflow batching.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
4 issues found across 6 files
Prompt for AI agents (all issues)
Check if these issues are valid — if so, understand the root cause of each and fix them.
<file name="tracecat/tiers/activities.py">
<violation number="1" location="tracecat/tiers/activities.py:69">
P3: Avoid `type: ignore` here; adjust the input model to use the correct type (e.g., UUID) or convert `org_id` before calling the service/semaphore.</violation>
</file>
<file name="tracecat/dsl/workflow.py">
<violation number="1" location="tracecat/dsl/workflow.py:1192">
P1: Guard against `max_concurrent_actions <= 0` when capping batch size; otherwise the batch loop can become infinite if the tier limit is 0.</violation>
<violation number="2" location="tracecat/dsl/workflow.py:1928">
P2: Workflow permits are stored with a 1-hour TTL but no heartbeat mechanism refreshes them during execution. For workflows running longer than the TTL, the permit will be pruned from Redis, allowing additional workflows to start and exceed the `max_concurrent_workflows` limit. Consider scheduling periodic calls to `heartbeat_workflow_permit_activity` while the workflow runs, or starting a background timer loop after permit acquisition.</violation>
<violation number="3" location="tracecat/dsl/workflow.py:1938">
P1: Avoid using `random.uniform()` directly in workflow code because it makes the workflow nondeterministic on replay. Generate jitter deterministically (e.g., via workflow side effects) or move jitter generation into an activity.</violation>
</file>
Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.
| self._tier_limits is not None | ||
| and self._tier_limits.max_concurrent_actions is not None | ||
| ): | ||
| batch_size = min(batch_size, self._tier_limits.max_concurrent_actions) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
P1: Guard against max_concurrent_actions <= 0 when capping batch size; otherwise the batch loop can become infinite if the tier limit is 0.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At tracecat/dsl/workflow.py, line 1192:
<comment>Guard against `max_concurrent_actions <= 0` when capping batch size; otherwise the batch loop can become infinite if the tier limit is 0.</comment>
<file context>
@@ -1135,6 +1184,13 @@ async def _execute_child_workflow_loop_prepared(
+ self._tier_limits is not None
+ and self._tier_limits.max_concurrent_actions is not None
+ ):
+ batch_size = min(batch_size, self._tier_limits.max_concurrent_actions)
+
# Process in batches for concurrency control
</file context>
|
|
||
| # Exponential backoff with jitter, capped at max_backoff | ||
| base = min(2**attempt, max_backoff) | ||
| jitter = random.uniform(0.8, 1.2) # +/- 20% |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
P1: Avoid using random.uniform() directly in workflow code because it makes the workflow nondeterministic on replay. Generate jitter deterministically (e.g., via workflow side effects) or move jitter generation into an activity.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At tracecat/dsl/workflow.py, line 1938:
<comment>Avoid using `random.uniform()` directly in workflow code because it makes the workflow nondeterministic on replay. Generate jitter deterministically (e.g., via workflow side effects) or move jitter generation into an activity.</comment>
<file context>
@@ -1835,3 +1898,102 @@ async def _run_error_handler_workflow(
+
+ # Exponential backoff with jitter, capped at max_backoff
+ base = min(2**attempt, max_backoff)
+ jitter = random.uniform(0.8, 1.2) # +/- 20%
+ sleep_duration = base * jitter
+
</file context>
| ) | ||
|
|
||
| if result.acquired: | ||
| self._workflow_permit_acquired = True |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
P2: Workflow permits are stored with a 1-hour TTL but no heartbeat mechanism refreshes them during execution. For workflows running longer than the TTL, the permit will be pruned from Redis, allowing additional workflows to start and exceed the max_concurrent_workflows limit. Consider scheduling periodic calls to heartbeat_workflow_permit_activity while the workflow runs, or starting a background timer loop after permit acquisition.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At tracecat/dsl/workflow.py, line 1928:
<comment>Workflow permits are stored with a 1-hour TTL but no heartbeat mechanism refreshes them during execution. For workflows running longer than the TTL, the permit will be pruned from Redis, allowing additional workflows to start and exceed the `max_concurrent_workflows` limit. Consider scheduling periodic calls to `heartbeat_workflow_permit_activity` while the workflow runs, or starting a background timer loop after permit acquisition.</comment>
<file context>
@@ -1835,3 +1898,102 @@ async def _run_error_handler_workflow(
+ )
+
+ if result.acquired:
+ self._workflow_permit_acquired = True
+ self.logger.info(
+ "Workflow permit acquired",
</file context>
| semaphore = RedisSemaphore(client) | ||
|
|
||
| result = await semaphore.acquire( | ||
| org_id=input.org_id, # type: ignore[arg-type] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
P3: Avoid type: ignore here; adjust the input model to use the correct type (e.g., UUID) or convert org_id before calling the service/semaphore.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At tracecat/tiers/activities.py, line 69:
<comment>Avoid `type: ignore` here; adjust the input model to use the correct type (e.g., UUID) or convert `org_id` before calling the service/semaphore.</comment>
<file context>
@@ -0,0 +1,169 @@
+ semaphore = RedisSemaphore(client)
+
+ result = await semaphore.acquire(
+ org_id=input.org_id, # type: ignore[arg-type]
+ workflow_id=input.workflow_id,
+ limit=input.limit,
</file context>
Add tier-based enforcement of workflow execution limits: - max_concurrent_workflows (Redis semaphore per org) - max_action_executions_per_workflow (counter per workflow) - max_concurrent_actions (batch size cap) Gated by WORKFLOW_CONCURRENCY_LIMITS feature flag.
efb6fbe to
02822b7
Compare
Summary
max_concurrent_workflows- Redis semaphore per organizationmax_action_executions_per_workflow- counter per workflow runmax_concurrent_actions- caps batch sizes for child workflowsImplementation
RedisSemaphoreclass using sorted sets with TTL-based pruningWORKFLOW_CONCURRENCY_LIMITSfeature flagTest plan
tracecat-admin tier updateSummary by cubic
Enforce tier-based workflow concurrency to prevent orgs from running too many workflows or actions at once. Adds Redis-backed permits and caps action throughput, gated by a feature flag.
New Features
Migration
Written for commit 02822b7. Summary will update on new commits.