Skip to content

Commit 411dc01

Browse files
authored
Worker Concurrency Fix (#336)
Fix an issue where workflows of the wrong version count against worker concurrency limits. (#333)
1 parent f07760a commit 411dc01

File tree

2 files changed

+43
-10
lines changed

2 files changed

+43
-10
lines changed

dbos/_sys_db.py

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1739,15 +1739,30 @@ def start_queued_workflows(
17391739
available_tasks = max(0, queue.concurrency - total_running_tasks)
17401740
max_tasks = min(max_tasks, available_tasks)
17411741

1742-
# Lookup unstarted/uncompleted tasks (not running)
1742+
# Retrieve the first max_tasks workflows in the queue.
1743+
# Only retrieve workflows of the appropriate version (or without version set)
17431744
query = (
17441745
sa.select(
17451746
SystemSchema.workflow_queue.c.workflow_uuid,
17461747
)
1748+
.select_from(
1749+
SystemSchema.workflow_queue.join(
1750+
SystemSchema.workflow_status,
1751+
SystemSchema.workflow_queue.c.workflow_uuid
1752+
== SystemSchema.workflow_status.c.workflow_uuid,
1753+
)
1754+
)
17471755
.where(SystemSchema.workflow_queue.c.queue_name == queue.name)
17481756
.where(SystemSchema.workflow_queue.c.started_at_epoch_ms == None)
17491757
.where(SystemSchema.workflow_queue.c.completed_at_epoch_ms == None)
17501758
.order_by(SystemSchema.workflow_queue.c.created_at_epoch_ms.asc())
1759+
.where(
1760+
sa.or_(
1761+
SystemSchema.workflow_status.c.application_version
1762+
== app_version,
1763+
SystemSchema.workflow_status.c.application_version.is_(None),
1764+
)
1765+
)
17511766
.with_for_update(nowait=True) # Error out early
17521767
)
17531768
# Apply limit only if max_tasks is finite
@@ -1779,15 +1794,6 @@ def start_queued_workflows(
17791794
SystemSchema.workflow_status.c.status
17801795
== WorkflowStatusString.ENQUEUED.value
17811796
)
1782-
.where(
1783-
sa.or_(
1784-
SystemSchema.workflow_status.c.application_version
1785-
== app_version,
1786-
SystemSchema.workflow_status.c.application_version.is_(
1787-
None
1788-
),
1789-
)
1790-
)
17911797
.values(
17921798
status=WorkflowStatusString.PENDING.value,
17931799
application_version=app_version,

tests/test_queue.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
from dbos import (
1515
DBOS,
1616
ConfigFile,
17+
DBOSClient,
1718
DBOSConfiguredInstance,
1819
Queue,
1920
SetEnqueueOptions,
@@ -1210,3 +1211,29 @@ async def test_workflow(var1: str) -> str:
12101211
with SetWorkflowID(wfid2):
12111212
handle2 = await queue.enqueue_async(test_workflow, "def")
12121213
assert (await handle2.get_result()) == "def-c-p"
1214+
1215+
def test_worker_concurrency_across_versions(dbos: DBOS, client: DBOSClient) -> None:
1216+
queue = Queue("test_worker_concurrency_across_versions", worker_concurrency=1)
1217+
1218+
@DBOS.workflow()
1219+
def test_workflow() -> str:
1220+
return DBOS.workflow_id
1221+
1222+
# First enqueue a workflow on the other version, then on the current version
1223+
other_version = "other_version"
1224+
other_version_handle: WorkflowHandle[None] = client.enqueue(
1225+
{
1226+
"queue_name": "test_worker_concurrency_across_versions",
1227+
"workflow_name": test_workflow.__qualname__,
1228+
"app_version": other_version,
1229+
}
1230+
)
1231+
handle = queue.enqueue(test_workflow)
1232+
1233+
# Verify the workflow on the current version completes, but the other version is still ENQUEUED
1234+
assert handle.get_result()
1235+
assert other_version_handle.get_status().status == "ENQUEUED"
1236+
1237+
# Change the version, verify the other version complets
1238+
GlobalParams.app_version = other_version
1239+
assert other_version_handle.get_result()

0 commit comments

Comments
 (0)