Skip to content

Commit a5e973f

Browse files
authored
Debouncing (#455)
Workflows can now be debounced. Debouncing delays workflow execution until some time has passed since the workflow has last been called. This is useful for preventing wasted work when a workflow may be triggered multiple times in quick succession. For example, if a user is editing a form, we can debounce every change to the form and perform a synchronization workflow only after they haven't edited the form for a certain period of time. ### Debouncer.create ```python Debouncer.create( workflow: Callable[P, R], *, debounce_key: str, debounce_timeout_sec: Optional[float] = None, queue: Optional[Queue] = None, ) -> Debouncer[P, R] ``` **Parameters:** - `workflow`: The workflow to debounce. - `debounce_key`: The **unique** debounce key for this debouncer. Used to group workflows that will be debounced. - `debounce_timeout_sec`: After this time elapses since the first time a workflow is submitted from this debouncer, the workflow is started regardless of the debounce period. - `queue`: When starting a workflow after debouncing, enqueue it on this queue instead of executing it directly. ### debounce ```python debouncer.debounce( debounce_period_sec: float, *args: P.args, **kwargs: P.kwargs, ) -> WorkflowHandle[R] ``` Submit a workflow for execution but delay it by `debounce_period_sec`. Returns a handle to the workflow. The workflow may be debounced again, which further delays its execution (up to `debounce_timeout_sec`). When the workflow eventually executes, it uses the **last** set of inputs passed into `debounce`. After the workflow begins execution, the next call to `debounce` starts the debouncing process again for a new workflow execution. **Example Syntax**: ```python @DBOS.workflow() def process_input(user_input): ... # Each time a user submits a new input, debounce the process_input workflow. # The workflow will wait until 60 seconds after the user stops submitting new inputs, # then process the last input submitted. def on_user_input_submit(user_id, user_input): debounce_period_sec = 60 debouncer = Debouncer.create(process_input, debounce_key=user_id) debouncer.debounce(debounce_period_sec, user_input) ```
1 parent f198a7c commit a5e973f

File tree

12 files changed

+822
-31
lines changed

12 files changed

+822
-31
lines changed

.github/workflows/unit-test.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ jobs:
8787
- name: Run Unit Tests
8888
run: pdm run pytest tests
8989
working-directory: ./
90+
timeout-minutes: 30
9091
env:
9192
PGPASSWORD: a!b@c$d()e*_,/:;=?@ff[]22
9293
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}

dbos/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
)
1010
from ._dbos import DBOS, DBOSConfiguredInstance, WorkflowHandle, WorkflowHandleAsync
1111
from ._dbos_config import DBOSConfig
12+
from ._debouncer import Debouncer, DebouncerClient
1213
from ._kafka_message import KafkaMessage
1314
from ._queue import Queue
1415
from ._sys_db import GetWorkflowsInput, WorkflowStatus, WorkflowStatusString
@@ -32,4 +33,6 @@
3233
"WorkflowStatusString",
3334
"error",
3435
"Queue",
36+
"Debouncer",
37+
"DebouncerClient",
3538
]

dbos/_client.py

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import time
44
import uuid
55
from typing import (
6+
TYPE_CHECKING,
67
Any,
78
AsyncGenerator,
89
Generator,
@@ -24,7 +25,10 @@
2425
from typing import NotRequired
2526

2627
from dbos import _serialization
27-
from dbos._dbos import WorkflowHandle, WorkflowHandleAsync
28+
29+
if TYPE_CHECKING:
30+
from dbos._dbos import WorkflowHandle, WorkflowHandleAsync
31+
2832
from dbos._dbos_config import (
2933
get_application_database_url,
3034
get_system_database_url,
@@ -224,23 +228,25 @@ def _enqueue(self, options: EnqueueOptions, *args: Any, **kwargs: Any) -> str:
224228

225229
def enqueue(
226230
self, options: EnqueueOptions, *args: Any, **kwargs: Any
227-
) -> WorkflowHandle[R]:
231+
) -> "WorkflowHandle[R]":
228232
workflow_id = self._enqueue(options, *args, **kwargs)
229233
return WorkflowHandleClientPolling[R](workflow_id, self._sys_db)
230234

231235
async def enqueue_async(
232236
self, options: EnqueueOptions, *args: Any, **kwargs: Any
233-
) -> WorkflowHandleAsync[R]:
237+
) -> "WorkflowHandleAsync[R]":
234238
workflow_id = await asyncio.to_thread(self._enqueue, options, *args, **kwargs)
235239
return WorkflowHandleClientAsyncPolling[R](workflow_id, self._sys_db)
236240

237-
def retrieve_workflow(self, workflow_id: str) -> WorkflowHandle[R]:
241+
def retrieve_workflow(self, workflow_id: str) -> "WorkflowHandle[R]":
238242
status = get_workflow(self._sys_db, workflow_id)
239243
if status is None:
240244
raise DBOSNonExistentWorkflowError(workflow_id)
241245
return WorkflowHandleClientPolling[R](workflow_id, self._sys_db)
242246

243-
async def retrieve_workflow_async(self, workflow_id: str) -> WorkflowHandleAsync[R]:
247+
async def retrieve_workflow_async(
248+
self, workflow_id: str
249+
) -> "WorkflowHandleAsync[R]":
244250
status = await asyncio.to_thread(get_workflow, self._sys_db, workflow_id)
245251
if status is None:
246252
raise DBOSNonExistentWorkflowError(workflow_id)
@@ -311,11 +317,13 @@ def cancel_workflow(self, workflow_id: str) -> None:
311317
async def cancel_workflow_async(self, workflow_id: str) -> None:
312318
await asyncio.to_thread(self.cancel_workflow, workflow_id)
313319

314-
def resume_workflow(self, workflow_id: str) -> WorkflowHandle[Any]:
320+
def resume_workflow(self, workflow_id: str) -> "WorkflowHandle[Any]":
315321
self._sys_db.resume_workflow(workflow_id)
316322
return WorkflowHandleClientPolling[Any](workflow_id, self._sys_db)
317323

318-
async def resume_workflow_async(self, workflow_id: str) -> WorkflowHandleAsync[Any]:
324+
async def resume_workflow_async(
325+
self, workflow_id: str
326+
) -> "WorkflowHandleAsync[Any]":
319327
await asyncio.to_thread(self.resume_workflow, workflow_id)
320328
return WorkflowHandleClientAsyncPolling[Any](workflow_id, self._sys_db)
321329

@@ -451,7 +459,7 @@ def fork_workflow(
451459
start_step: int,
452460
*,
453461
application_version: Optional[str] = None,
454-
) -> WorkflowHandle[Any]:
462+
) -> "WorkflowHandle[Any]":
455463
forked_workflow_id = fork_workflow(
456464
self._sys_db,
457465
self._app_db,
@@ -467,7 +475,7 @@ async def fork_workflow_async(
467475
start_step: int,
468476
*,
469477
application_version: Optional[str] = None,
470-
) -> WorkflowHandleAsync[Any]:
478+
) -> "WorkflowHandleAsync[Any]":
471479
forked_workflow_id = await asyncio.to_thread(
472480
fork_workflow,
473481
self._sys_db,

dbos/_context.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -524,6 +524,7 @@ def __init__(self, attributes: TracedAttributes) -> None:
524524
self.saved_workflow_timeout: Optional[int] = None
525525
self.saved_deduplication_id: Optional[str] = None
526526
self.saved_priority: Optional[int] = None
527+
self.saved_is_within_set_workflow_id_block: bool = False
527528

528529
def __enter__(self) -> DBOSContext:
529530
# Code to create a basic context
@@ -533,6 +534,9 @@ def __enter__(self) -> DBOSContext:
533534
ctx = DBOSContext()
534535
_set_local_dbos_context(ctx)
535536
assert not ctx.is_within_workflow()
537+
# Unset is_within_set_workflow_id_block as the workflow is not within a block
538+
self.saved_is_within_set_workflow_id_block = ctx.is_within_set_workflow_id_block
539+
ctx.is_within_set_workflow_id_block = False
536540
# Unset the workflow_timeout_ms context var so it is not applied to this
537541
# workflow's children (instead we propagate the deadline)
538542
self.saved_workflow_timeout = ctx.workflow_timeout_ms
@@ -557,6 +561,8 @@ def __exit__(
557561
ctx = assert_current_dbos_context()
558562
assert ctx.is_within_workflow()
559563
ctx.end_workflow(exc_value)
564+
# Restore is_within_set_workflow_id_block
565+
ctx.is_within_set_workflow_id_block = self.saved_is_within_set_workflow_id_block
560566
# Restore the saved workflow timeout
561567
ctx.workflow_timeout_ms = self.saved_workflow_timeout
562568
# Clear any propagating timeout

dbos/_core.py

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
DBOSException,
5151
DBOSMaxStepRetriesExceeded,
5252
DBOSNonExistentWorkflowError,
53+
DBOSQueueDeduplicatedError,
5354
DBOSRecoveryError,
5455
DBOSUnexpectedStepError,
5556
DBOSWorkflowCancelledError,
@@ -95,6 +96,7 @@
9596
F = TypeVar("F", bound=Callable[..., Any])
9697

9798
TEMP_SEND_WF_NAME = "<temp>.temp_send_workflow"
99+
DEBOUNCER_WORKFLOW_NAME = "_dbos_debouncer_workflow"
98100

99101

100102
def check_is_in_coroutine() -> bool:
@@ -310,10 +312,22 @@ def _init_workflow(
310312
}
311313

312314
# Synchronously record the status and inputs for workflows
313-
wf_status, workflow_deadline_epoch_ms = dbos._sys_db.init_workflow(
314-
status,
315-
max_recovery_attempts=max_recovery_attempts,
316-
)
315+
try:
316+
wf_status, workflow_deadline_epoch_ms = dbos._sys_db.init_workflow(
317+
status,
318+
max_recovery_attempts=max_recovery_attempts,
319+
)
320+
except DBOSQueueDeduplicatedError as e:
321+
if ctx.has_parent():
322+
result: OperationResultInternal = {
323+
"workflow_uuid": ctx.parent_workflow_id,
324+
"function_id": ctx.parent_workflow_fid,
325+
"function_name": wf_name,
326+
"output": None,
327+
"error": _serialization.serialize_exception(e),
328+
}
329+
dbos._sys_db.record_operation_result(result)
330+
raise
317331

318332
if workflow_deadline_epoch_ms is not None:
319333
evt = threading.Event()

dbos/_dbos.py

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,12 +32,14 @@
3232
from rich import print
3333

3434
from dbos._conductor.conductor import ConductorWebsocket
35+
from dbos._debouncer import debouncer_workflow
3536
from dbos._sys_db import SystemDatabase, WorkflowStatus
3637
from dbos._utils import INTERNAL_QUEUE_NAME, GlobalParams
3738
from dbos._workflow_commands import fork_workflow, list_queued_workflows, list_workflows
3839

3940
from ._classproperty import classproperty
4041
from ._core import (
42+
DEBOUNCER_WORKFLOW_NAME,
4143
TEMP_SEND_WF_NAME,
4244
WorkflowHandleAsyncPolling,
4345
WorkflowHandlePolling,
@@ -390,11 +392,12 @@ def send_temp_workflow(
390392
) -> None:
391393
self.send(destination_id, message, topic)
392394

393-
temp_send_wf = workflow_wrapper(self._registry, send_temp_workflow)
394-
set_dbos_func_name(send_temp_workflow, TEMP_SEND_WF_NAME)
395-
set_dbos_func_name(temp_send_wf, TEMP_SEND_WF_NAME)
396-
set_temp_workflow_type(send_temp_workflow, "send")
397-
self._registry.register_wf_function(TEMP_SEND_WF_NAME, temp_send_wf, "send")
395+
decorate_workflow(self._registry, TEMP_SEND_WF_NAME, None)(send_temp_workflow)
396+
397+
# Register the debouncer workflow
398+
decorate_workflow(self._registry, DEBOUNCER_WORKFLOW_NAME, None)(
399+
debouncer_workflow
400+
)
398401

399402
for handler in dbos_logger.handlers:
400403
handler.flush()

0 commit comments

Comments
 (0)