Skip to content

Commit 1d112be

Browse files
authored
Use the same column to manage queue executor_id (#235)
1 parent 3304101 commit 1d112be

File tree

2 files changed

+26
-23
lines changed

2 files changed

+26
-23
lines changed

dbos/_schemas/system_database.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ class SystemSchema:
154154
nullable=False,
155155
primary_key=True,
156156
),
157-
Column("executor_id", Text),
157+
# Column("executor_id", Text), # This column is deprecated. Do *not* use it.
158158
Column("queue_name", Text, nullable=False),
159159
Column(
160160
"created_at_epoch_ms",

dbos/_sys_db.py

Lines changed: 25 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1324,24 +1324,32 @@ def start_queued_workflows(self, queue: "Queue", executor_id: str) -> List[str]:
13241324
# If there is a global or local concurrency limit N, select only the N oldest enqueued
13251325
# functions, else select all of them.
13261326

1327-
# First lets figure out how many tasks the worker can dequeue
1327+
# First lets figure out how many tasks are eligible for dequeue.
1328+
# This means figuring out how many unstarted tasks are within the local and global concurrency limits
13281329
running_tasks_query = (
13291330
sa.select(
1330-
SystemSchema.workflow_queue.c.executor_id,
1331+
SystemSchema.workflow_status.c.executor_id,
13311332
sa.func.count().label("task_count"),
13321333
)
1334+
.select_from(
1335+
SystemSchema.workflow_queue.join(
1336+
SystemSchema.workflow_status,
1337+
SystemSchema.workflow_queue.c.workflow_uuid
1338+
== SystemSchema.workflow_status.c.workflow_uuid,
1339+
)
1340+
)
13331341
.where(SystemSchema.workflow_queue.c.queue_name == queue.name)
13341342
.where(
1335-
SystemSchema.workflow_queue.c.executor_id.isnot(
1343+
SystemSchema.workflow_queue.c.started_at_epoch_ms.isnot(
13361344
None
1337-
) # Task is dequeued
1345+
) # Task is started
13381346
)
13391347
.where(
13401348
SystemSchema.workflow_queue.c.completed_at_epoch_ms.is_(
13411349
None
1342-
) # Task is not completed
1350+
) # Task is not completed.
13431351
)
1344-
.group_by(SystemSchema.workflow_queue.c.executor_id)
1352+
.group_by(SystemSchema.workflow_status.c.executor_id)
13451353
)
13461354
running_tasks_result = c.execute(running_tasks_query).fetchall()
13471355
running_tasks_result_dict = {row[0]: row[1] for row in running_tasks_result}
@@ -1351,12 +1359,6 @@ def start_queued_workflows(self, queue: "Queue", executor_id: str) -> List[str]:
13511359

13521360
max_tasks = float("inf")
13531361
if queue.worker_concurrency is not None:
1354-
# Worker local concurrency limit should always be >= running_tasks_for_this_worker
1355-
# This should never happen but a check + warning doesn't hurt
1356-
if running_tasks_for_this_worker > queue.worker_concurrency:
1357-
dbos_logger.warning(
1358-
f"Number of tasks on this worker ({running_tasks_for_this_worker}) exceeds the worker concurrency limit ({queue.worker_concurrency})"
1359-
)
13601362
max_tasks = max(
13611363
0, queue.worker_concurrency - running_tasks_for_this_worker
13621364
)
@@ -1371,16 +1373,14 @@ def start_queued_workflows(self, queue: "Queue", executor_id: str) -> List[str]:
13711373
available_tasks = max(0, queue.concurrency - total_running_tasks)
13721374
max_tasks = min(max_tasks, available_tasks)
13731375

1374-
# Lookup tasks
1376+
# Lookup unstarted/uncompleted tasks (not running)
13751377
query = (
13761378
sa.select(
13771379
SystemSchema.workflow_queue.c.workflow_uuid,
1378-
SystemSchema.workflow_queue.c.started_at_epoch_ms,
1379-
SystemSchema.workflow_queue.c.executor_id,
13801380
)
13811381
.where(SystemSchema.workflow_queue.c.queue_name == queue.name)
1382+
.where(SystemSchema.workflow_queue.c.started_at_epoch_ms == None)
13821383
.where(SystemSchema.workflow_queue.c.completed_at_epoch_ms == None)
1383-
.where(SystemSchema.workflow_queue.c.executor_id == None)
13841384
.order_by(SystemSchema.workflow_queue.c.created_at_epoch_ms.asc())
13851385
.with_for_update(nowait=True) # Error out early
13861386
)
@@ -1423,7 +1423,7 @@ def start_queued_workflows(self, queue: "Queue", executor_id: str) -> List[str]:
14231423
c.execute(
14241424
SystemSchema.workflow_queue.update()
14251425
.where(SystemSchema.workflow_queue.c.workflow_uuid == id)
1426-
.values(started_at_epoch_ms=start_time_ms, executor_id=executor_id)
1426+
.values(started_at_epoch_ms=start_time_ms)
14271427
)
14281428
ret_ids.append(id)
14291429

@@ -1468,23 +1468,26 @@ def clear_queue_assignment(self, workflow_id: str) -> bool:
14681468

14691469
with self.engine.connect() as conn:
14701470
with conn.begin() as transaction:
1471+
# Reset the start time in the queue to mark it as not started
14711472
res = conn.execute(
14721473
sa.update(SystemSchema.workflow_queue)
14731474
.where(SystemSchema.workflow_queue.c.workflow_uuid == workflow_id)
1474-
.values(executor_id=None, started_at_epoch_ms=None)
1475+
.where(
1476+
SystemSchema.workflow_queue.c.completed_at_epoch_ms.is_(None)
1477+
)
1478+
.values(started_at_epoch_ms=None)
14751479
)
14761480

1477-
# If no rows were affected, the workflow is not anymore in the queue
1481+
# If no rows were affected, the workflow is not anymore in the queue or was already completed
14781482
if res.rowcount == 0:
14791483
transaction.rollback()
14801484
return False
14811485

1486+
# Reset the status of the task to "ENQUEUED"
14821487
res = conn.execute(
14831488
sa.update(SystemSchema.workflow_status)
14841489
.where(SystemSchema.workflow_status.c.workflow_uuid == workflow_id)
1485-
.values(
1486-
executor_id=None, status=WorkflowStatusString.ENQUEUED.value
1487-
)
1490+
.values(status=WorkflowStatusString.ENQUEUED.value)
14881491
)
14891492
if res.rowcount == 0:
14901493
# This should never happen

0 commit comments

Comments
 (0)