Skip to content

Commit 2dbe51e

Browse files
authored
Update Debouncer Tests (#460)
1 parent a5e973f commit 2dbe51e

File tree

2 files changed

+66
-21
lines changed

2 files changed

+66
-21
lines changed

dbos/_debouncer.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,10 @@ def assign_debounce_ids() -> tuple[str, str]:
232232
while True:
233233
try:
234234
# Attempt to enqueue a debouncer for this workflow.
235-
with SetEnqueueOptions(deduplication_id=self.debounce_key):
235+
deduplication_id = (
236+
f"{self.options['workflow_name']}-{self.debounce_key}"
237+
)
238+
with SetEnqueueOptions(deduplication_id=deduplication_id):
236239
with SetWorkflowTimeout(None):
237240
internal_queue.enqueue(
238241
debouncer_workflow,
@@ -249,7 +252,7 @@ def assign_debounce_ids() -> tuple[str, str]:
249252
def get_deduplicated_workflow() -> Optional[str]:
250253
return dbos._sys_db.get_deduplicated_workflow(
251254
queue_name=internal_queue.name,
252-
deduplication_id=self.debounce_key,
255+
deduplication_id=deduplication_id,
253256
)
254257

255258
dedup_wfid = dbos._sys_db.call_function_as_step(
@@ -333,10 +336,13 @@ def debounce(
333336
while True:
334337
try:
335338
# Attempt to enqueue a debouncer for this workflow.
339+
deduplication_id = (
340+
f"{self.debouncer_options['workflow_name']}-{self.debounce_key}"
341+
)
336342
debouncer_options: EnqueueOptions = {
337343
"workflow_name": DEBOUNCER_WORKFLOW_NAME,
338344
"queue_name": INTERNAL_QUEUE_NAME,
339-
"deduplication_id": self.debounce_key,
345+
"deduplication_id": deduplication_id,
340346
}
341347
self.client.enqueue(
342348
debouncer_options,
@@ -353,7 +359,7 @@ def debounce(
353359
# If there is already a debouncer, send a message to it.
354360
dedup_wfid = self.client._sys_db.get_deduplicated_workflow(
355361
queue_name=INTERNAL_QUEUE_NAME,
356-
deduplication_id=self.debounce_key,
362+
deduplication_id=deduplication_id,
357363
)
358364
if dedup_wfid is None:
359365
continue

tests/test_debouncer.py

Lines changed: 56 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -17,35 +17,33 @@
1717
from dbos._utils import GlobalParams
1818

1919

20-
def workflow(x: int) -> int:
21-
return x
22-
23-
24-
async def workflow_async(x: int) -> int:
25-
return x
26-
27-
2820
def test_debouncer(dbos: DBOS) -> None:
2921

30-
DBOS.workflow()(workflow)
22+
@DBOS.workflow()
23+
def workflow(x: int) -> int:
24+
return x
25+
3126
first_value, second_value, third_value, fourth_value = 0, 1, 2, 3
3227

3328
@DBOS.step()
3429
def generate_uuid() -> str:
3530
return str(uuid.uuid4())
3631

3732
def debouncer_test() -> None:
38-
debouncer = Debouncer.create(workflow, debounce_key="key")
3933

4034
debounce_period = 2
4135

36+
debouncer = Debouncer.create(workflow, debounce_key="key")
4237
first_handle = debouncer.debounce(debounce_period, first_value)
38+
debouncer = Debouncer.create(workflow, debounce_key="key")
4339
second_handle = debouncer.debounce(debounce_period, second_value)
4440
assert first_handle.workflow_id == second_handle.workflow_id
4541
assert first_handle.get_result() == second_value
4642
assert second_handle.get_result() == second_value
4743

44+
debouncer = Debouncer.create(workflow, debounce_key="key")
4845
third_handle = debouncer.debounce(debounce_period, third_value)
46+
debouncer = Debouncer.create(workflow, debounce_key="key")
4947
fourth_handle = debouncer.debounce(debounce_period, fourth_value)
5048
assert third_handle.workflow_id != first_handle.workflow_id
5149
assert third_handle.workflow_id == fourth_handle.workflow_id
@@ -76,7 +74,10 @@ def debouncer_test() -> None:
7674

7775
def test_debouncer_timeout(dbos: DBOS) -> None:
7876

79-
DBOS.workflow()(workflow)
77+
@DBOS.workflow()
78+
def workflow(x: int) -> int:
79+
return x
80+
8081
first_value, second_value, third_value, fourth_value = 0, 1, 2, 3
8182

8283
# Set a huge period but small timeout, verify workflows start after the timeout
@@ -115,9 +116,38 @@ def test_debouncer_timeout(dbos: DBOS) -> None:
115116
assert second_handle.get_result() == second_value
116117

117118

119+
def test_multiple_debouncers(dbos: DBOS) -> None:
120+
121+
@DBOS.workflow()
122+
def workflow(x: int) -> int:
123+
return x
124+
125+
first_value, second_value, third_value, fourth_value = 0, 1, 2, 3
126+
127+
# Set a huge period but small timeout, verify workflows start after the timeout
128+
debouncer_one = Debouncer.create(workflow, debounce_key="key_one")
129+
debouncer_two = Debouncer.create(workflow, debounce_key="key_two")
130+
debounce_period = 2
131+
132+
first_handle = debouncer_one.debounce(debounce_period, first_value)
133+
second_handle = debouncer_one.debounce(debounce_period, second_value)
134+
third_handle = debouncer_two.debounce(debounce_period, third_value)
135+
fourth_handle = debouncer_two.debounce(debounce_period, fourth_value)
136+
assert first_handle.workflow_id == second_handle.workflow_id
137+
assert first_handle.workflow_id != third_handle.workflow_id
138+
assert third_handle.workflow_id == fourth_handle.workflow_id
139+
assert first_handle.get_result() == second_value
140+
assert second_handle.get_result() == second_value
141+
assert third_handle.get_result() == fourth_value
142+
assert fourth_handle.get_result() == fourth_value
143+
144+
118145
def test_debouncer_queue(dbos: DBOS) -> None:
119146

120-
DBOS.workflow()(workflow)
147+
@DBOS.workflow()
148+
def workflow(x: int) -> int:
149+
return x
150+
121151
first_value, second_value, third_value, fourth_value = 0, 1, 2, 3
122152
queue = Queue("test-queue")
123153

@@ -166,7 +196,10 @@ def test_debouncer_queue(dbos: DBOS) -> None:
166196
@pytest.mark.asyncio
167197
async def test_debouncer_async(dbos: DBOS) -> None:
168198

169-
DBOS.workflow()(workflow_async)
199+
@DBOS.workflow()
200+
async def workflow_async(x: int) -> int:
201+
return x
202+
170203
first_value, second_value, third_value, fourth_value = 0, 1, 2, 3
171204

172205
debouncer = Debouncer.create_async(workflow_async, debounce_key="key")
@@ -188,12 +221,15 @@ async def test_debouncer_async(dbos: DBOS) -> None:
188221

189222
def test_debouncer_client(dbos: DBOS, client: DBOSClient) -> None:
190223

191-
DBOS.workflow()(workflow)
224+
@DBOS.workflow()
225+
def workflow(x: int) -> int:
226+
return x
227+
192228
first_value, second_value, third_value, fourth_value = 0, 1, 2, 3
193229
queue = Queue("test-queue")
194230

195231
options: EnqueueOptions = {
196-
"workflow_name": workflow.__name__,
232+
"workflow_name": workflow.__qualname__,
197233
"queue_name": queue.name,
198234
}
199235
debouncer = DebouncerClient(client, options, debounce_key="key")
@@ -230,12 +266,15 @@ def test_debouncer_client(dbos: DBOS, client: DBOSClient) -> None:
230266
@pytest.mark.asyncio
231267
async def test_debouncer_client_async(dbos: DBOS, client: DBOSClient) -> None:
232268

233-
DBOS.workflow()(workflow_async)
269+
@DBOS.workflow()
270+
async def workflow_async(x: int) -> int:
271+
return x
272+
234273
first_value, second_value, third_value, fourth_value = 0, 1, 2, 3
235274
queue = Queue("test-queue")
236275

237276
options: EnqueueOptions = {
238-
"workflow_name": workflow_async.__name__,
277+
"workflow_name": workflow_async.__qualname__,
239278
"queue_name": queue.name,
240279
}
241280
debouncer = DebouncerClient(client, options, debounce_key="key")

0 commit comments

Comments
 (0)