Skip to content

Commit cf48898

Browse files
authored
Partitioned Queues (#458)
This PR allows you to create "partitioned queues." Partitioned queues let you distribute work across dynamically created queue partitions. When you enqueue a workflow on a partitioned queue, you must supply a "queue partition key." Partitioned queues dequeue workflows and apply flow control limits for individual partitions, not for the entire queue. Essentially, you can think of each partition as a "virtual queue" you can create dynamically by just enqueueing a workflow with a new partition key. For example, let's say you want to run only once task at once per user. You can do this with a partitioned queue where the partition key is the user ID: ```python queue = Queue("queue", partition_queue=True, concurrency=1) @DBOS.workflow() def process_task(task: Task): ... def on_user_task_submission(user_id: str, task: Task): with SetEnqueueOptions(queue_partition_key=user_id): queue.enqueue(process_task, task) ```
1 parent 6b6cd27 commit cf48898

File tree

8 files changed

+195
-7
lines changed

8 files changed

+195
-7
lines changed

dbos/_client.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ class EnqueueOptions(_EnqueueOptionsRequired, total=False):
6262
deduplication_id: str
6363
priority: int
6464
max_recovery_attempts: int
65+
queue_partition_key: str
6566

6667

6768
def validate_enqueue_options(options: EnqueueOptions) -> None:
@@ -185,6 +186,7 @@ def _enqueue(self, options: EnqueueOptions, *args: Any, **kwargs: Any) -> str:
185186
"deduplication_id": options.get("deduplication_id"),
186187
"priority": options.get("priority"),
187188
"app_version": options.get("app_version"),
189+
"queue_partition_key": options.get("queue_partition_key"),
188190
}
189191

190192
inputs: WorkflowInputs = {
@@ -221,6 +223,7 @@ def _enqueue(self, options: EnqueueOptions, *args: Any, **kwargs: Any) -> str:
221223
else 0
222224
),
223225
"inputs": self._serializer.serialize(inputs),
226+
"queue_partition_key": enqueue_options_internal["queue_partition_key"],
224227
}
225228

226229
self._sys_db.init_workflow(
@@ -286,6 +289,7 @@ def send(
286289
"deduplication_id": None,
287290
"priority": 0,
288291
"inputs": self._serializer.serialize({"args": (), "kwargs": {}}),
292+
"queue_partition_key": None,
289293
}
290294
with self._sys_db.engine.begin() as conn:
291295
self._sys_db._insert_workflow_status(

dbos/_context.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,8 @@ def __init__(self) -> None:
120120
self.deduplication_id: Optional[str] = None
121121
# A user-specified priority for the enqueuing workflow.
122122
self.priority: Optional[int] = None
123+
# If the workflow is enqueued on a partitioned queue, its partition key
124+
self.queue_partition_key: Optional[str] = None
123125

124126
def create_child(self) -> DBOSContext:
125127
rv = DBOSContext()
@@ -479,6 +481,7 @@ def __init__(
479481
deduplication_id: Optional[str] = None,
480482
priority: Optional[int] = None,
481483
app_version: Optional[str] = None,
484+
queue_partition_key: Optional[str] = None,
482485
) -> None:
483486
self.created_ctx = False
484487
self.deduplication_id: Optional[str] = deduplication_id
@@ -491,6 +494,8 @@ def __init__(
491494
self.saved_priority: Optional[int] = None
492495
self.app_version: Optional[str] = app_version
493496
self.saved_app_version: Optional[str] = None
497+
self.queue_partition_key = queue_partition_key
498+
self.saved_queue_partition_key: Optional[str] = None
494499

495500
def __enter__(self) -> SetEnqueueOptions:
496501
# Code to create a basic context
@@ -505,6 +510,8 @@ def __enter__(self) -> SetEnqueueOptions:
505510
ctx.priority = self.priority
506511
self.saved_app_version = ctx.app_version
507512
ctx.app_version = self.app_version
513+
self.saved_queue_partition_key = ctx.queue_partition_key
514+
ctx.queue_partition_key = self.queue_partition_key
508515
return self
509516

510517
def __exit__(
@@ -517,6 +524,7 @@ def __exit__(
517524
curr_ctx.deduplication_id = self.saved_deduplication_id
518525
curr_ctx.priority = self.saved_priority
519526
curr_ctx.app_version = self.saved_app_version
527+
curr_ctx.queue_partition_key = self.saved_queue_partition_key
520528
# Code to clean up the basic context if we created it
521529
if self.created_ctx:
522530
_clear_local_dbos_context()

dbos/_core.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -303,6 +303,11 @@ def _init_workflow(
303303
else 0
304304
),
305305
"inputs": dbos._serializer.serialize(inputs),
306+
"queue_partition_key": (
307+
enqueue_options["queue_partition_key"]
308+
if enqueue_options is not None
309+
else None
310+
),
306311
}
307312

308313
# Synchronously record the status and inputs for workflows
@@ -571,6 +576,9 @@ def start_workflow(
571576
deduplication_id=local_ctx.deduplication_id if local_ctx is not None else None,
572577
priority=local_ctx.priority if local_ctx is not None else None,
573578
app_version=local_ctx.app_version if local_ctx is not None else None,
579+
queue_partition_key=(
580+
local_ctx.queue_partition_key if local_ctx is not None else None
581+
),
574582
)
575583
new_wf_id, new_wf_ctx = _get_new_wf()
576584

@@ -664,6 +672,9 @@ async def start_workflow_async(
664672
deduplication_id=local_ctx.deduplication_id if local_ctx is not None else None,
665673
priority=local_ctx.priority if local_ctx is not None else None,
666674
app_version=local_ctx.app_version if local_ctx is not None else None,
675+
queue_partition_key=(
676+
local_ctx.queue_partition_key if local_ctx is not None else None
677+
),
667678
)
668679
new_wf_id, new_wf_ctx = _get_new_wf()
669680

dbos/_migration.py

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -203,8 +203,14 @@ def get_dbos_migration_one(schema: str) -> str:
203203
"""
204204

205205

206+
def get_dbos_migration_two(schema: str) -> str:
207+
return f"""
208+
ALTER TABLE \"{schema}\".workflow_status ADD COLUMN queue_partition_key TEXT;
209+
"""
210+
211+
206212
def get_dbos_migrations(schema: str) -> list[str]:
207-
return [get_dbos_migration_one(schema)]
213+
return [get_dbos_migration_one(schema), get_dbos_migration_two(schema)]
208214

209215

210216
def get_sqlite_timestamp_expr() -> str:
@@ -293,4 +299,8 @@ def get_sqlite_timestamp_expr() -> str:
293299
);
294300
"""
295301

296-
sqlite_migrations = [sqlite_migration_one]
302+
sqlite_migration_two = """
303+
ALTER TABLE workflow_status ADD COLUMN queue_partition_key TEXT;
304+
"""
305+
306+
sqlite_migrations = [sqlite_migration_one, sqlite_migration_two]

dbos/_queue.py

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ def __init__(
4343
*, # Disable positional arguments from here on
4444
worker_concurrency: Optional[int] = None,
4545
priority_enabled: bool = False,
46+
partition_queue: bool = False,
4647
) -> None:
4748
if (
4849
worker_concurrency is not None
@@ -57,6 +58,7 @@ def __init__(
5758
self.worker_concurrency = worker_concurrency
5859
self.limiter = limiter
5960
self.priority_enabled = priority_enabled
61+
self.partition_queue = partition_queue
6062
from ._dbos import _get_or_create_dbos_registry
6163

6264
registry = _get_or_create_dbos_registry()
@@ -78,6 +80,18 @@ def enqueue(
7880
raise Exception(
7981
f"Priority is not enabled for queue {self.name}. Setting priority will not have any effect."
8082
)
83+
if self.partition_queue and (
84+
context is None or context.queue_partition_key is None
85+
):
86+
raise Exception(
87+
f"A workflow cannot be enqueued on partitioned queue {self.name} without a partition key"
88+
)
89+
if context and context.queue_partition_key and not self.partition_queue:
90+
raise Exception(
91+
f"You can only use a partition key on a partition-enabled queue. Key {context.queue_partition_key} was used with non-partitioned queue {self.name}"
92+
)
93+
if context and context.queue_partition_key and context.deduplication_id:
94+
raise Exception("Deduplication is not supported for partitioned queues")
8195

8296
dbos = _get_dbos_instance()
8397
return start_workflow(dbos, func, self.name, False, *args, **kwargs)
@@ -105,10 +119,21 @@ def queue_thread(stop_event: threading.Event, dbos: "DBOS") -> None:
105119
queues = dict(dbos._registry.queue_info_map)
106120
for _, queue in queues.items():
107121
try:
108-
wf_ids = dbos._sys_db.start_queued_workflows(
109-
queue, GlobalParams.executor_id, GlobalParams.app_version
110-
)
111-
for id in wf_ids:
122+
if queue.partition_queue:
123+
dequeued_workflows = []
124+
queue_partition_keys = dbos._sys_db.get_queue_partitions(queue.name)
125+
for key in queue_partition_keys:
126+
dequeued_workflows += dbos._sys_db.start_queued_workflows(
127+
queue,
128+
GlobalParams.executor_id,
129+
GlobalParams.app_version,
130+
key,
131+
)
132+
else:
133+
dequeued_workflows = dbos._sys_db.start_queued_workflows(
134+
queue, GlobalParams.executor_id, GlobalParams.app_version, None
135+
)
136+
for id in dequeued_workflows:
112137
execute_workflow_by_id(dbos, id)
113138
except OperationalError as e:
114139
if isinstance(

dbos/_schemas/system_database.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ def set_schema(cls, schema_name: Optional[str]) -> None:
7777
Column("deduplication_id", Text(), nullable=True),
7878
Column("inputs", Text()),
7979
Column("priority", Integer(), nullable=False, server_default=text("'0'::int")),
80+
Column("queue_partition_key", Text()),
8081
Index("workflow_status_created_at_index", "created_at"),
8182
Index("workflow_status_executor_id_index", "executor_id"),
8283
Index("workflow_status_status_index", "status"),

dbos/_sys_db.py

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,8 @@ class WorkflowStatusInternal(TypedDict):
152152
priority: int
153153
# Serialized workflow inputs
154154
inputs: str
155+
# If this workflow is enqueued on a partitioned queue, its partition key
156+
queue_partition_key: Optional[str]
155157

156158

157159
class EnqueueOptionsInternal(TypedDict):
@@ -161,6 +163,8 @@ class EnqueueOptionsInternal(TypedDict):
161163
priority: Optional[int]
162164
# On what version the workflow is enqueued. Current version if not specified.
163165
app_version: Optional[str]
166+
# If the workflow is enqueued on a partitioned queue, its partition key
167+
queue_partition_key: Optional[str]
164168

165169

166170
class RecordedResult(TypedDict):
@@ -490,6 +494,7 @@ def _insert_workflow_status(
490494
deduplication_id=status["deduplication_id"],
491495
priority=status["priority"],
492496
inputs=status["inputs"],
497+
queue_partition_key=status["queue_partition_key"],
493498
)
494499
.on_conflict_do_update(
495500
index_elements=["workflow_uuid"],
@@ -761,6 +766,7 @@ def get_workflow_status(
761766
SystemSchema.workflow_status.c.deduplication_id,
762767
SystemSchema.workflow_status.c.priority,
763768
SystemSchema.workflow_status.c.inputs,
769+
SystemSchema.workflow_status.c.queue_partition_key,
764770
).where(SystemSchema.workflow_status.c.workflow_uuid == workflow_uuid)
765771
).fetchone()
766772
if row is None:
@@ -788,6 +794,7 @@ def get_workflow_status(
788794
"deduplication_id": row[16],
789795
"priority": row[17],
790796
"inputs": row[18],
797+
"queue_partition_key": row[19],
791798
}
792799
return status
793800

@@ -1714,8 +1721,41 @@ def get_event(
17141721
)
17151722
return value
17161723

1724+
@db_retry()
1725+
def get_queue_partitions(self, queue_name: str) -> List[str]:
1726+
"""
1727+
Get all unique partition names associated with a queue for ENQUEUED workflows.
1728+
1729+
Args:
1730+
queue_name: The name of the queue to get partitions for
1731+
1732+
Returns:
1733+
A list of unique partition names for the queue
1734+
"""
1735+
with self.engine.begin() as c:
1736+
query = (
1737+
sa.select(SystemSchema.workflow_status.c.queue_partition_key)
1738+
.distinct()
1739+
.where(SystemSchema.workflow_status.c.queue_name == queue_name)
1740+
.where(
1741+
SystemSchema.workflow_status.c.status.in_(
1742+
[
1743+
WorkflowStatusString.ENQUEUED.value,
1744+
]
1745+
)
1746+
)
1747+
.where(SystemSchema.workflow_status.c.queue_partition_key.isnot(None))
1748+
)
1749+
1750+
rows = c.execute(query).fetchall()
1751+
return [row[0] for row in rows]
1752+
17171753
def start_queued_workflows(
1718-
self, queue: "Queue", executor_id: str, app_version: str
1754+
self,
1755+
queue: "Queue",
1756+
executor_id: str,
1757+
app_version: str,
1758+
queue_partition_key: Optional[str],
17191759
) -> List[str]:
17201760
if self._debug_mode:
17211761
return []
@@ -1734,6 +1774,10 @@ def start_queued_workflows(
17341774
sa.select(sa.func.count())
17351775
.select_from(SystemSchema.workflow_status)
17361776
.where(SystemSchema.workflow_status.c.queue_name == queue.name)
1777+
.where(
1778+
SystemSchema.workflow_status.c.queue_partition_key
1779+
== queue_partition_key
1780+
)
17371781
.where(
17381782
SystemSchema.workflow_status.c.status
17391783
!= WorkflowStatusString.ENQUEUED.value
@@ -1758,6 +1802,10 @@ def start_queued_workflows(
17581802
)
17591803
.select_from(SystemSchema.workflow_status)
17601804
.where(SystemSchema.workflow_status.c.queue_name == queue.name)
1805+
.where(
1806+
SystemSchema.workflow_status.c.queue_partition_key
1807+
== queue_partition_key
1808+
)
17611809
.where(
17621810
SystemSchema.workflow_status.c.status
17631811
== WorkflowStatusString.PENDING.value
@@ -1799,6 +1847,10 @@ def start_queued_workflows(
17991847
)
18001848
.select_from(SystemSchema.workflow_status)
18011849
.where(SystemSchema.workflow_status.c.queue_name == queue.name)
1850+
.where(
1851+
SystemSchema.workflow_status.c.queue_partition_key
1852+
== queue_partition_key
1853+
)
18021854
.where(
18031855
SystemSchema.workflow_status.c.status
18041856
== WorkflowStatusString.ENQUEUED.value

0 commit comments

Comments
 (0)