Skip to content

Commit 7173cbd

Browse files
authored
Update Debouncer API (#462)
On reflection, the debounce key should be passed into the `debounce` function instead of the Debouncer constructor because a Debouncer may be used with multiple different keys (for example, if the key is customer ID, the same debouncer may dynamically debounce different customers).
1 parent a9beb30 commit 7173cbd

File tree

2 files changed

+67
-64
lines changed

2 files changed

+67
-64
lines changed

dbos/_debouncer.py

Lines changed: 12 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,6 @@ def __init__(
147147
self,
148148
workflow_name: str,
149149
*,
150-
debounce_key: str,
151150
debounce_timeout_sec: Optional[float] = None,
152151
queue: Optional[Queue] = None,
153152
):
@@ -157,13 +156,11 @@ def __init__(
157156
"queue_name": queue.name if queue else None,
158157
"workflow_name": workflow_name,
159158
}
160-
self.debounce_key = debounce_key
161159

162160
@staticmethod
163161
def create(
164162
workflow: Callable[P, R],
165163
*,
166-
debounce_key: str,
167164
debounce_timeout_sec: Optional[float] = None,
168165
queue: Optional[Queue] = None,
169166
) -> "Debouncer[P, R]":
@@ -172,7 +169,6 @@ def create(
172169
raise TypeError("Only workflow functions may be debounced, not methods")
173170
return Debouncer[P, R](
174171
get_dbos_func_name(workflow),
175-
debounce_key=debounce_key,
176172
debounce_timeout_sec=debounce_timeout_sec,
177173
queue=queue,
178174
)
@@ -181,7 +177,6 @@ def create(
181177
def create_async(
182178
workflow: Callable[P, Coroutine[Any, Any, R]],
183179
*,
184-
debounce_key: str,
185180
debounce_timeout_sec: Optional[float] = None,
186181
queue: Optional[Queue] = None,
187182
) -> "Debouncer[P, R]":
@@ -190,13 +185,16 @@ def create_async(
190185
raise TypeError("Only workflow functions may be debounced, not methods")
191186
return Debouncer[P, R](
192187
get_dbos_func_name(workflow),
193-
debounce_key=debounce_key,
194188
debounce_timeout_sec=debounce_timeout_sec,
195189
queue=queue,
196190
)
197191

198192
def debounce(
199-
self, debounce_period_sec: float, *args: P.args, **kwargs: P.kwargs
193+
self,
194+
debounce_key: str,
195+
debounce_period_sec: float,
196+
*args: P.args,
197+
**kwargs: P.kwargs,
200198
) -> "WorkflowHandle[R]":
201199
from dbos._dbos import DBOS, _get_dbos_instance
202200

@@ -232,9 +230,7 @@ def assign_debounce_ids() -> tuple[str, str]:
232230
while True:
233231
try:
234232
# Attempt to enqueue a debouncer for this workflow.
235-
deduplication_id = (
236-
f"{self.options['workflow_name']}-{self.debounce_key}"
237-
)
233+
deduplication_id = f"{self.options['workflow_name']}-{debounce_key}"
238234
with SetEnqueueOptions(deduplication_id=deduplication_id):
239235
with SetWorkflowTimeout(None):
240236
internal_queue.enqueue(
@@ -284,6 +280,7 @@ def get_deduplicated_workflow() -> Optional[str]:
284280

285281
async def debounce_async(
286282
self,
283+
debounce_key: str,
287284
debounce_period_sec: float,
288285
*args: P.args,
289286
**kwargs: P.kwargs,
@@ -292,7 +289,7 @@ async def debounce_async(
292289

293290
dbos = _get_dbos_instance()
294291
handle = await asyncio.to_thread(
295-
self.debounce, debounce_period_sec, *args, **kwargs
292+
self.debounce, debounce_key, debounce_period_sec, *args, **kwargs
296293
)
297294
return WorkflowHandleAsyncPolling(handle.workflow_id, dbos)
298295

@@ -304,7 +301,6 @@ def __init__(
304301
client: DBOSClient,
305302
workflow_options: EnqueueOptions,
306303
*,
307-
debounce_key: str,
308304
debounce_timeout_sec: Optional[float] = None,
309305
queue: Optional[Queue] = None,
310306
):
@@ -314,11 +310,10 @@ def __init__(
314310
"queue_name": queue.name if queue else None,
315311
"workflow_name": workflow_options["workflow_name"],
316312
}
317-
self.debounce_key = debounce_key
318313
self.client = client
319314

320315
def debounce(
321-
self, debounce_period_sec: float, *args: Any, **kwargs: Any
316+
self, debounce_key: str, debounce_period_sec: float, *args: Any, **kwargs: Any
322317
) -> "WorkflowHandle[R]":
323318

324319
ctxOptions: ContextOptions = {
@@ -337,7 +332,7 @@ def debounce(
337332
try:
338333
# Attempt to enqueue a debouncer for this workflow.
339334
deduplication_id = (
340-
f"{self.debouncer_options['workflow_name']}-{self.debounce_key}"
335+
f"{self.debouncer_options['workflow_name']}-{debounce_key}"
341336
)
342337
debouncer_options: EnqueueOptions = {
343338
"workflow_name": DEBOUNCER_WORKFLOW_NAME,
@@ -390,10 +385,10 @@ def debounce(
390385
)
391386

392387
async def debounce_async(
393-
self, debounce_period_sec: float, *args: Any, **kwargs: Any
388+
self, deboucne_key: str, debounce_period_sec: float, *args: Any, **kwargs: Any
394389
) -> "WorkflowHandleAsync[R]":
395390
handle: "WorkflowHandle[R]" = await asyncio.to_thread(
396-
self.debounce, debounce_period_sec, *args, **kwargs
391+
self.debounce, deboucne_key, debounce_period_sec, *args, **kwargs
397392
)
398393
return WorkflowHandleClientAsyncPolling[R](
399394
handle.workflow_id, self.client._sys_db

tests/test_debouncer.py

Lines changed: 55 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -33,18 +33,18 @@ def debouncer_test() -> None:
3333

3434
debounce_period = 2
3535

36-
debouncer = Debouncer.create(workflow, debounce_key="key")
37-
first_handle = debouncer.debounce(debounce_period, first_value)
38-
debouncer = Debouncer.create(workflow, debounce_key="key")
39-
second_handle = debouncer.debounce(debounce_period, second_value)
36+
debouncer = Debouncer.create(workflow)
37+
first_handle = debouncer.debounce("key", debounce_period, first_value)
38+
debouncer = Debouncer.create(workflow)
39+
second_handle = debouncer.debounce("key", debounce_period, second_value)
4040
assert first_handle.workflow_id == second_handle.workflow_id
4141
assert first_handle.get_result() == second_value
4242
assert second_handle.get_result() == second_value
4343

44-
debouncer = Debouncer.create(workflow, debounce_key="key")
45-
third_handle = debouncer.debounce(debounce_period, third_value)
46-
debouncer = Debouncer.create(workflow, debounce_key="key")
47-
fourth_handle = debouncer.debounce(debounce_period, fourth_value)
44+
debouncer = Debouncer.create(workflow)
45+
third_handle = debouncer.debounce("key", debounce_period, third_value)
46+
debouncer = Debouncer.create(workflow)
47+
fourth_handle = debouncer.debounce("key", debounce_period, fourth_value)
4848
assert third_handle.workflow_id != first_handle.workflow_id
4949
assert third_handle.workflow_id == fourth_handle.workflow_id
5050
assert third_handle.get_result() == fourth_value
@@ -53,7 +53,7 @@ def debouncer_test() -> None:
5353
# Test SetWorkflowID works
5454
wfid = generate_uuid()
5555
with SetWorkflowID(wfid):
56-
handle = debouncer.debounce(debounce_period, first_value)
56+
handle = debouncer.debounce("key", debounce_period, first_value)
5757
assert handle.workflow_id == wfid
5858
assert handle.get_result() == first_value
5959

@@ -83,19 +83,18 @@ def workflow(x: int) -> int:
8383
# Set a huge period but small timeout, verify workflows start after the timeout
8484
debouncer = Debouncer.create(
8585
workflow,
86-
debounce_key="key",
8786
debounce_timeout_sec=2,
8887
)
8988
long_debounce_period = 10000000
9089

91-
first_handle = debouncer.debounce(long_debounce_period, first_value)
92-
second_handle = debouncer.debounce(long_debounce_period, second_value)
90+
first_handle = debouncer.debounce("key", long_debounce_period, first_value)
91+
second_handle = debouncer.debounce("key", long_debounce_period, second_value)
9392
assert first_handle.workflow_id == second_handle.workflow_id
9493
assert first_handle.get_result() == second_value
9594
assert second_handle.get_result() == second_value
9695

97-
third_handle = debouncer.debounce(long_debounce_period, third_value)
98-
fourth_handle = debouncer.debounce(long_debounce_period, fourth_value)
96+
third_handle = debouncer.debounce("key", long_debounce_period, third_value)
97+
fourth_handle = debouncer.debounce("key", long_debounce_period, fourth_value)
9998
assert third_handle.workflow_id != first_handle.workflow_id
10099
assert third_handle.workflow_id == fourth_handle.workflow_id
101100
assert third_handle.get_result() == fourth_value
@@ -104,12 +103,11 @@ def workflow(x: int) -> int:
104103
# Submit first with a long period then with a short one, verify workflows start on time
105104
debouncer = Debouncer.create(
106105
workflow,
107-
debounce_key="key",
108106
)
109107
short_debounce_period = 1
110108

111-
first_handle = debouncer.debounce(long_debounce_period, first_value)
112-
second_handle = debouncer.debounce(short_debounce_period, second_value)
109+
first_handle = debouncer.debounce("key", long_debounce_period, first_value)
110+
second_handle = debouncer.debounce("key", short_debounce_period, second_value)
113111
assert fourth_handle.workflow_id != first_handle.workflow_id
114112
assert first_handle.workflow_id == second_handle.workflow_id
115113
assert first_handle.get_result() == second_value
@@ -125,14 +123,14 @@ def workflow(x: int) -> int:
125123
first_value, second_value, third_value, fourth_value = 0, 1, 2, 3
126124

127125
# Set a huge period but small timeout, verify workflows start after the timeout
128-
debouncer_one = Debouncer.create(workflow, debounce_key="key_one")
129-
debouncer_two = Debouncer.create(workflow, debounce_key="key_two")
126+
debouncer_one = Debouncer.create(workflow)
127+
debouncer_two = Debouncer.create(workflow)
130128
debounce_period = 2
131129

132-
first_handle = debouncer_one.debounce(debounce_period, first_value)
133-
second_handle = debouncer_one.debounce(debounce_period, second_value)
134-
third_handle = debouncer_two.debounce(debounce_period, third_value)
135-
fourth_handle = debouncer_two.debounce(debounce_period, fourth_value)
130+
first_handle = debouncer_one.debounce("key_one", debounce_period, first_value)
131+
second_handle = debouncer_one.debounce("key_one", debounce_period, second_value)
132+
third_handle = debouncer_two.debounce("key_two", debounce_period, third_value)
133+
fourth_handle = debouncer_two.debounce("key_two", debounce_period, fourth_value)
136134
assert first_handle.workflow_id == second_handle.workflow_id
137135
assert first_handle.workflow_id != third_handle.workflow_id
138136
assert third_handle.workflow_id == fourth_handle.workflow_id
@@ -151,20 +149,20 @@ def workflow(x: int) -> int:
151149
first_value, second_value, third_value, fourth_value = 0, 1, 2, 3
152150
queue = Queue("test-queue")
153151

154-
debouncer = Debouncer.create(workflow, debounce_key="key", queue=queue)
152+
debouncer = Debouncer.create(workflow, queue=queue)
155153
debounce_period_sec = 2
156154

157-
first_handle = debouncer.debounce(debounce_period_sec, first_value)
158-
second_handle = debouncer.debounce(debounce_period_sec, second_value)
155+
first_handle = debouncer.debounce("key", debounce_period_sec, first_value)
156+
second_handle = debouncer.debounce("key", debounce_period_sec, second_value)
159157
assert first_handle.workflow_id == second_handle.workflow_id
160158
assert first_handle.get_result() == second_value
161159
assert second_handle.get_result() == second_value
162160
assert second_handle.get_status().queue_name == queue.name
163161

164162
# Test SetWorkflowTimeout works
165163
with SetWorkflowTimeout(5.0):
166-
third_handle = debouncer.debounce(debounce_period_sec, third_value)
167-
fourth_handle = debouncer.debounce(debounce_period_sec, fourth_value)
164+
third_handle = debouncer.debounce("key", debounce_period_sec, third_value)
165+
fourth_handle = debouncer.debounce("key", debounce_period_sec, fourth_value)
168166
assert third_handle.workflow_id != first_handle.workflow_id
169167
assert third_handle.workflow_id == fourth_handle.workflow_id
170168
assert third_handle.get_result() == fourth_value
@@ -176,7 +174,7 @@ def workflow(x: int) -> int:
176174
# Test SetWorkflowID works
177175
wfid = str(uuid.uuid4())
178176
with SetWorkflowID(wfid):
179-
handle = debouncer.debounce(debounce_period_sec, first_value)
177+
handle = debouncer.debounce("key", debounce_period_sec, first_value)
180178
assert handle.workflow_id == wfid
181179
assert handle.get_result() == first_value
182180
assert handle.get_status().queue_name == queue.name
@@ -187,7 +185,7 @@ def workflow(x: int) -> int:
187185
with SetEnqueueOptions(
188186
priority=1, deduplication_id="test", app_version=test_version
189187
):
190-
handle = debouncer.debounce(debounce_period_sec, first_value)
188+
handle = debouncer.debounce("key", debounce_period_sec, first_value)
191189
assert handle.get_result() == first_value
192190
assert handle.get_status().queue_name == queue.name
193191
assert handle.get_status().app_version == test_version
@@ -202,17 +200,25 @@ async def workflow_async(x: int) -> int:
202200

203201
first_value, second_value, third_value, fourth_value = 0, 1, 2, 3
204202

205-
debouncer = Debouncer.create_async(workflow_async, debounce_key="key")
203+
debouncer = Debouncer.create_async(workflow_async)
206204
debounce_period_sec = 2
207205

208-
first_handle = await debouncer.debounce_async(debounce_period_sec, first_value)
209-
second_handle = await debouncer.debounce_async(debounce_period_sec, second_value)
206+
first_handle = await debouncer.debounce_async(
207+
"key", debounce_period_sec, first_value
208+
)
209+
second_handle = await debouncer.debounce_async(
210+
"key", debounce_period_sec, second_value
211+
)
210212
assert first_handle.workflow_id == second_handle.workflow_id
211213
assert await first_handle.get_result() == second_value
212214
assert await second_handle.get_result() == second_value
213215

214-
third_handle = await debouncer.debounce_async(debounce_period_sec, third_value)
215-
fourth_handle = await debouncer.debounce_async(debounce_period_sec, fourth_value)
216+
third_handle = await debouncer.debounce_async(
217+
"key", debounce_period_sec, third_value
218+
)
219+
fourth_handle = await debouncer.debounce_async(
220+
"key", debounce_period_sec, fourth_value
221+
)
216222
assert third_handle.workflow_id != first_handle.workflow_id
217223
assert third_handle.workflow_id == fourth_handle.workflow_id
218224
assert await third_handle.get_result() == fourth_value
@@ -232,24 +238,24 @@ def workflow(x: int) -> int:
232238
"workflow_name": workflow.__qualname__,
233239
"queue_name": queue.name,
234240
}
235-
debouncer = DebouncerClient(client, options, debounce_key="key")
241+
debouncer = DebouncerClient(client, options)
236242
debounce_period_sec = 2
237243

238244
first_handle: WorkflowHandle[int] = debouncer.debounce(
239-
debounce_period_sec, first_value
245+
"key", debounce_period_sec, first_value
240246
)
241247
second_handle: WorkflowHandle[int] = debouncer.debounce(
242-
debounce_period_sec, second_value
248+
"key", debounce_period_sec, second_value
243249
)
244250
assert first_handle.workflow_id == second_handle.workflow_id
245251
assert first_handle.get_result() == second_value
246252
assert second_handle.get_result() == second_value
247253

248254
third_handle: WorkflowHandle[int] = debouncer.debounce(
249-
debounce_period_sec, third_value
255+
"key", debounce_period_sec, third_value
250256
)
251257
fourth_handle: WorkflowHandle[int] = debouncer.debounce(
252-
debounce_period_sec, fourth_value
258+
"key", debounce_period_sec, fourth_value
253259
)
254260
assert third_handle.workflow_id != first_handle.workflow_id
255261
assert third_handle.workflow_id == fourth_handle.workflow_id
@@ -258,7 +264,9 @@ def workflow(x: int) -> int:
258264

259265
wfid = str(uuid.uuid4())
260266
options["workflow_id"] = wfid
261-
handle: WorkflowHandle[int] = debouncer.debounce(debounce_period_sec, first_value)
267+
handle: WorkflowHandle[int] = debouncer.debounce(
268+
"key", debounce_period_sec, first_value
269+
)
262270
assert handle.workflow_id == wfid
263271
assert handle.get_result() == first_value
264272

@@ -277,24 +285,24 @@ async def workflow_async(x: int) -> int:
277285
"workflow_name": workflow_async.__qualname__,
278286
"queue_name": queue.name,
279287
}
280-
debouncer = DebouncerClient(client, options, debounce_key="key")
288+
debouncer = DebouncerClient(client, options)
281289
debounce_period_sec = 2
282290

283291
first_handle: WorkflowHandleAsync[int] = await debouncer.debounce_async(
284-
debounce_period_sec, first_value
292+
"key", debounce_period_sec, first_value
285293
)
286294
second_handle: WorkflowHandleAsync[int] = await debouncer.debounce_async(
287-
debounce_period_sec, second_value
295+
"key", debounce_period_sec, second_value
288296
)
289297
assert first_handle.workflow_id == second_handle.workflow_id
290298
assert await first_handle.get_result() == second_value
291299
assert await second_handle.get_result() == second_value
292300

293301
third_handle: WorkflowHandleAsync[int] = await debouncer.debounce_async(
294-
debounce_period_sec, third_value
302+
"key", debounce_period_sec, third_value
295303
)
296304
fourth_handle: WorkflowHandleAsync[int] = await debouncer.debounce_async(
297-
debounce_period_sec, fourth_value
305+
"key", debounce_period_sec, fourth_value
298306
)
299307
assert third_handle.workflow_id != first_handle.workflow_id
300308
assert third_handle.workflow_id == fourth_handle.workflow_id
@@ -304,7 +312,7 @@ async def workflow_async(x: int) -> int:
304312
wfid = str(uuid.uuid4())
305313
options["workflow_id"] = wfid
306314
handle: WorkflowHandleAsync[int] = await debouncer.debounce_async(
307-
debounce_period_sec, first_value
315+
"key", debounce_period_sec, first_value
308316
)
309317
assert handle.workflow_id == wfid
310318
assert await handle.get_result() == first_value

0 commit comments

Comments
 (0)