Skip to content

Commit a3be1bb

Browse files
authored
Job Queue (#106)
Add a queue abstraction for asynchronous processing. Example usage: ```python queue = Queue("example_queue") @dbos.step() def process_job(job): ... @DBOS.workflow() def process_jobs(jobs): for job in jobs: queue.enqueue(process_job, job) ``` You can enqueue any DBOS-annotated function (workflows, transactions, or steps). `enqueue` returns a [workflow handle](https://docs.dbos.dev/python/reference/workflow_handles) so you can track the status of enqueued tasks. You can set a maximum concurrency level for a queue to cap the maximum number of tasks from that queue that can be processed at once. Scheduled workflows and Kafka workflows now use queues, making them more robust in a distributed setting.
1 parent 0db5c43 commit a3be1bb

File tree

16 files changed

+438
-40
lines changed

16 files changed

+438
-40
lines changed

dbos/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from .dbos import DBOS, DBOSConfiguredInstance, WorkflowHandle, WorkflowStatus
44
from .dbos_config import ConfigFile, get_dbos_database_url, load_config
55
from .kafka_message import KafkaMessage
6+
from .queue import Queue
67
from .system_database import GetWorkflowsInput, WorkflowStatusString
78

89
__all__ = [
@@ -19,4 +20,5 @@
1920
"load_config",
2021
"get_dbos_database_url",
2122
"error",
23+
"Queue",
2224
]

dbos/core.py

Lines changed: 35 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@
6363
OperationResultInternal,
6464
WorkflowInputs,
6565
WorkflowStatusInternal,
66+
WorkflowStatusString,
6667
)
6768

6869
if TYPE_CHECKING:
@@ -126,6 +127,7 @@ def _init_workflow(
126127
class_name: Optional[str],
127128
config_name: Optional[str],
128129
temp_wf_type: Optional[str],
130+
queue: Optional[str] = None,
129131
) -> WorkflowStatusInternal:
130132
wfid = (
131133
ctx.workflow_id
@@ -134,7 +136,11 @@ def _init_workflow(
134136
)
135137
status: WorkflowStatusInternal = {
136138
"workflow_uuid": wfid,
137-
"status": "PENDING",
139+
"status": (
140+
WorkflowStatusString.PENDING.value
141+
if queue is None
142+
else WorkflowStatusString.ENQUEUED.value
143+
),
138144
"name": wf_name,
139145
"class_name": class_name,
140146
"config_name": config_name,
@@ -150,21 +156,26 @@ def _init_workflow(
150156
json.dumps(ctx.authenticated_roles) if ctx.authenticated_roles else None
151157
),
152158
"assumed_role": ctx.assumed_role,
159+
"queue_name": queue,
153160
}
154161

155162
# If we have a class name, the first arg is the instance and do not serialize
156163
if class_name is not None:
157164
inputs = {"args": inputs["args"][1:], "kwargs": inputs["kwargs"]}
158165

159-
if temp_wf_type != "transaction":
166+
if temp_wf_type != "transaction" or queue is not None:
160167
# Synchronously record the status and inputs for workflows and single-step workflows
161168
# We also have to do this for single-step workflows because of the foreign key constraint on the operation outputs table
169+
# TODO: Make this transactional (and with the queue step below)
162170
dbos._sys_db.update_workflow_status(status, False, ctx.in_recovery)
163171
dbos._sys_db.update_workflow_inputs(wfid, utils.serialize(inputs))
164172
else:
165173
# Buffer the inputs for single-transaction workflows, but don't buffer the status
166174
dbos._sys_db.buffer_workflow_inputs(wfid, utils.serialize(inputs))
167175

176+
if queue is not None:
177+
dbos._sys_db.enqueue(wfid, queue)
178+
168179
return status
169180

170181

@@ -179,6 +190,8 @@ def _execute_workflow(
179190
output = func(*args, **kwargs)
180191
status["status"] = "SUCCESS"
181192
status["output"] = utils.serialize(output)
193+
if status["queue_name"] is not None:
194+
dbos._sys_db.remove_from_queue(status["workflow_uuid"])
182195
dbos._sys_db.buffer_workflow_status(status)
183196
except DBOSWorkflowConflictIDError:
184197
# Retrieve the workflow handle and wait for the result.
@@ -191,6 +204,8 @@ def _execute_workflow(
191204
except Exception as error:
192205
status["status"] = "ERROR"
193206
status["error"] = utils.serialize(error)
207+
if status["queue_name"] is not None:
208+
dbos._sys_db.remove_from_queue(status["workflow_uuid"])
194209
dbos._sys_db.update_workflow_status(status)
195210
raise
196211

@@ -249,6 +264,8 @@ def _execute_workflow_id(dbos: "DBOS", workflow_id: str) -> "WorkflowHandle[Any]
249264
return _start_workflow(
250265
dbos,
251266
wf_func,
267+
status["queue_name"],
268+
True,
252269
dbos._registry.instance_info_map[iname],
253270
*inputs["args"],
254271
**inputs["kwargs"],
@@ -264,14 +281,21 @@ def _execute_workflow_id(dbos: "DBOS", workflow_id: str) -> "WorkflowHandle[Any]
264281
return _start_workflow(
265282
dbos,
266283
wf_func,
284+
status["queue_name"],
285+
True,
267286
dbos._registry.class_info_map[class_name],
268287
*inputs["args"],
269288
**inputs["kwargs"],
270289
)
271290
else:
272291
with SetWorkflowID(workflow_id):
273292
return _start_workflow(
274-
dbos, wf_func, *inputs["args"], **inputs["kwargs"]
293+
dbos,
294+
wf_func,
295+
status["queue_name"],
296+
True,
297+
*inputs["args"],
298+
**inputs["kwargs"],
275299
)
276300

277301

@@ -331,6 +355,8 @@ def _workflow_decorator(func: F) -> F:
331355
def _start_workflow(
332356
dbos: "DBOS",
333357
func: "Workflow[P, R]",
358+
queue_name: Optional[str],
359+
execute_workflow: bool,
334360
*args: P.args,
335361
**kwargs: P.kwargs,
336362
) -> "WorkflowHandle[R]":
@@ -384,8 +410,12 @@ def _start_workflow(
384410
class_name=get_dbos_class_name(fi, func, gin_args),
385411
config_name=get_config_name(fi, func, gin_args),
386412
temp_wf_type=get_temp_workflow_type(func),
413+
queue=queue_name,
387414
)
388415

416+
if not execute_workflow:
417+
return _WorkflowHandlePolling(new_wf_id, dbos)
418+
389419
if fself is not None:
390420
future = dbos._executor.submit(
391421
cast(Callable[..., R], _execute_workflow_wthread),
@@ -529,6 +559,7 @@ def temp_wf(*args: Any, **kwargs: Any) -> Any:
529559
set_dbos_func_name(temp_wf, "<temp>." + func.__qualname__)
530560
set_temp_workflow_type(temp_wf, "transaction")
531561
dbosreg.register_wf_function(get_dbos_func_name(temp_wf), wrapped_wf)
562+
wrapper.__orig_func = temp_wf # type: ignore
532563

533564
return cast(F, wrapper)
534565

@@ -645,6 +676,7 @@ def temp_wf(*args: Any, **kwargs: Any) -> Any:
645676
set_dbos_func_name(temp_wf, "<temp>." + func.__qualname__)
646677
set_temp_workflow_type(temp_wf, "step")
647678
dbosreg.register_wf_function(get_dbos_func_name(temp_wf), wrapped_wf)
679+
wrapper.__orig_func = temp_wf # type: ignore
648680

649681
return cast(F, wrapper)
650682

dbos/dbos.py

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
Tuple,
2121
Type,
2222
TypeVar,
23-
cast,
2423
)
2524

2625
from opentelemetry.trace import Span
@@ -40,6 +39,7 @@
4039
_WorkflowHandlePolling,
4140
)
4241
from dbos.decorators import classproperty
42+
from dbos.queue import Queue, queue_thread
4343
from dbos.recovery import _recover_pending_workflows, _startup_recovery_thread
4444
from dbos.registrations import (
4545
DBOSClassInfo,
@@ -138,6 +138,7 @@ def __init__(self) -> None:
138138
self.workflow_info_map: dict[str, Workflow[..., Any]] = {}
139139
self.class_info_map: dict[str, type] = {}
140140
self.instance_info_map: dict[str, object] = {}
141+
self.queue_info_map: dict[str, Queue] = {}
141142
self.pollers: list[_RegisteredJob] = []
142143
self.dbos: Optional[DBOS] = None
143144
self.config: Optional[ConfigFile] = None
@@ -356,6 +357,11 @@ def _launch(self) -> None:
356357
# Start flush workflow buffers thread
357358
self._executor.submit(self._sys_db.flush_workflow_buffers)
358359

360+
# Start the queue thread
361+
evt = threading.Event()
362+
self.stop_events.append(evt)
363+
self._executor.submit(queue_thread, evt, self)
364+
359365
# Grab any pollers that were deferred and start them
360366
for evt, func, args, kwargs in self._registry.pollers:
361367
self.stop_events.append(evt)
@@ -488,13 +494,18 @@ def scheduled(cls, cron: str) -> Callable[[ScheduledWorkflow], ScheduledWorkflow
488494

489495
@classmethod
490496
def kafka_consumer(
491-
cls, config: dict[str, Any], topics: list[str]
497+
cls,
498+
config: dict[str, Any],
499+
topics: list[str],
500+
in_order: bool = False,
492501
) -> Callable[[KafkaConsumerWorkflow], KafkaConsumerWorkflow]:
493502
"""Decorate a function to be used as a Kafka consumer."""
494503
try:
495504
from dbos.kafka import kafka_consumer
496505

497-
return kafka_consumer(_get_or_create_dbos_registry(), config, topics)
506+
return kafka_consumer(
507+
_get_or_create_dbos_registry(), config, topics, in_order
508+
)
498509
except ModuleNotFoundError as e:
499510
raise DBOSException(
500511
f"{e.name} dependency not found. Please install {e.name} via your package manager."
@@ -508,7 +519,7 @@ def start_workflow(
508519
**kwargs: P.kwargs,
509520
) -> WorkflowHandle[R]:
510521
"""Invoke a workflow function in the background, returning a handle to the ongoing execution."""
511-
return _start_workflow(_get_dbos_instance(), func, *args, **kwargs)
522+
return _start_workflow(_get_dbos_instance(), func, None, True, *args, **kwargs)
512523

513524
@classmethod
514525
def get_workflow_status(cls, workflow_id: str) -> Optional[WorkflowStatus]:

dbos/kafka.py

Lines changed: 27 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,30 @@
11
import threading
2-
import traceback
3-
from dataclasses import dataclass
4-
from typing import TYPE_CHECKING, Any, Callable, Generator, NoReturn, Optional, Union
2+
from typing import TYPE_CHECKING, Any, Callable, NoReturn
53

64
from confluent_kafka import Consumer, KafkaError, KafkaException
7-
from confluent_kafka import Message as CTypeMessage
5+
6+
from dbos.queue import Queue
87

98
if TYPE_CHECKING:
109
from dbos.dbos import _DBOSRegistry
1110

1211
from .context import SetWorkflowID
12+
from .error import DBOSInitializationError
1313
from .kafka_message import KafkaMessage
1414
from .logger import dbos_logger
1515

1616
KafkaConsumerWorkflow = Callable[[KafkaMessage], None]
1717

18+
kafka_queue: Queue
19+
in_order_kafka_queues: dict[str, Queue] = {}
20+
1821

1922
def _kafka_consumer_loop(
2023
func: KafkaConsumerWorkflow,
2124
config: dict[str, Any],
2225
topics: list[str],
2326
stop_event: threading.Event,
27+
in_order: bool,
2428
) -> None:
2529

2630
def on_error(err: KafkaError) -> NoReturn:
@@ -70,24 +74,35 @@ def on_error(err: KafkaError) -> NoReturn:
7074
with SetWorkflowID(
7175
f"kafka-unique-id-{msg.topic}-{msg.partition}-{msg.offset}"
7276
):
73-
try:
74-
func(msg)
75-
except Exception as e:
76-
dbos_logger.error(
77-
f"Exception encountered in Kafka consumer: {traceback.format_exc()}"
78-
)
77+
if in_order:
78+
assert msg.topic is not None
79+
queue = in_order_kafka_queues[msg.topic]
80+
queue.enqueue(func, msg)
81+
else:
82+
kafka_queue.enqueue(func, msg)
7983

8084
finally:
8185
consumer.close()
8286

8387

8488
def kafka_consumer(
85-
dbosreg: "_DBOSRegistry", config: dict[str, Any], topics: list[str]
89+
dbosreg: "_DBOSRegistry", config: dict[str, Any], topics: list[str], in_order: bool
8690
) -> Callable[[KafkaConsumerWorkflow], KafkaConsumerWorkflow]:
8791
def decorator(func: KafkaConsumerWorkflow) -> KafkaConsumerWorkflow:
92+
if in_order:
93+
for topic in topics:
94+
if topic.startswith("^"):
95+
raise DBOSInitializationError(
96+
f"Error: in-order processing is not supported for regular expression topic selectors ({topic})"
97+
)
98+
queue = Queue(f"_dbos_kafka_queue_topic_{topic}", concurrency=1)
99+
in_order_kafka_queues[topic] = queue
100+
else:
101+
global kafka_queue
102+
kafka_queue = Queue("_dbos_internal_queue")
88103
stop_event = threading.Event()
89104
dbosreg.register_poller(
90-
stop_event, _kafka_consumer_loop, func, config, topics, stop_event
105+
stop_event, _kafka_consumer_loop, func, config, topics, stop_event, in_order
91106
)
92107
return func
93108

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
"""job_queue
2+
3+
Revision ID: eab0cc1d9a14
4+
Revises: a3b18ad34abe
5+
Create Date: 2024-09-13 14:50:00.531294
6+
7+
"""
8+
9+
from typing import Sequence, Union
10+
11+
import sqlalchemy as sa
12+
from alembic import op
13+
14+
# revision identifiers, used by Alembic.
15+
revision: str = "eab0cc1d9a14"
16+
down_revision: Union[str, None] = "a3b18ad34abe"
17+
branch_labels: Union[str, Sequence[str], None] = None
18+
depends_on: Union[str, Sequence[str], None] = None
19+
20+
21+
def upgrade() -> None:
22+
op.create_table(
23+
"job_queue",
24+
sa.Column("workflow_uuid", sa.Text(), nullable=False),
25+
sa.Column("queue_name", sa.Text(), nullable=False),
26+
sa.Column(
27+
"created_at_epoch_ms",
28+
sa.BigInteger(),
29+
server_default=sa.text(
30+
"(EXTRACT(epoch FROM now()) * 1000::numeric)::bigint"
31+
),
32+
nullable=False,
33+
primary_key=True,
34+
),
35+
sa.ForeignKeyConstraint(
36+
["workflow_uuid"],
37+
["dbos.workflow_status.workflow_uuid"],
38+
onupdate="CASCADE",
39+
ondelete="CASCADE",
40+
),
41+
schema="dbos",
42+
)
43+
op.add_column(
44+
"workflow_status",
45+
sa.Column(
46+
"queue_name",
47+
sa.Text(),
48+
),
49+
schema="dbos",
50+
)
51+
52+
53+
def downgrade() -> None:
54+
op.drop_table("job_queue", schema="dbos")
55+
op.drop_column("workflow_status", "queue_name", schema="dbos")

dbos/queue.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
import threading
2+
import time
3+
from typing import TYPE_CHECKING, Optional
4+
5+
from dbos.core import P, R, _execute_workflow_id, _start_workflow
6+
from dbos.error import DBOSInitializationError
7+
8+
if TYPE_CHECKING:
9+
from dbos.dbos import DBOS, Workflow, WorkflowHandle
10+
11+
12+
class Queue:
13+
def __init__(self, name: str, concurrency: Optional[int] = None) -> None:
14+
self.name = name
15+
self.concurrency = concurrency
16+
from dbos.dbos import _get_or_create_dbos_registry
17+
18+
registry = _get_or_create_dbos_registry()
19+
registry.queue_info_map[self.name] = self
20+
21+
def enqueue(
22+
self, func: "Workflow[P, R]", *args: P.args, **kwargs: P.kwargs
23+
) -> "WorkflowHandle[R]":
24+
from dbos.dbos import _get_dbos_instance
25+
26+
dbos = _get_dbos_instance()
27+
return _start_workflow(dbos, func, self.name, False, *args, **kwargs)
28+
29+
30+
def queue_thread(stop_event: threading.Event, dbos: "DBOS") -> None:
31+
while not stop_event.is_set():
32+
time.sleep(1)
33+
for queue_name, queue in dbos._registry.queue_info_map.items():
34+
wf_ids = dbos._sys_db.start_queued_workflows(queue_name, queue.concurrency)
35+
for id in wf_ids:
36+
_execute_workflow_id(dbos, id)

0 commit comments

Comments
 (0)