Skip to content

Commit 9d5ec58

Browse files
committed
Test async activity completion
1 parent 553efca commit 9d5ec58

File tree

1 file changed

+100
-0
lines changed

1 file changed

+100
-0
lines changed

tests/test_serialization_context.py

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -314,6 +314,106 @@ async def test_workflow_payload_conversion(
314314
pprint(result.items)
315315

316316

317+
# Async activity completion test
318+
@activity.defn
319+
async def async_activity() -> TraceData:
320+
activity.raise_complete_async()
321+
322+
323+
@workflow.defn
324+
class AsyncActivityCompletionSerializationContextTestWorkflow:
325+
@workflow.run
326+
async def run(self) -> TraceData:
327+
return await workflow.execute_activity(
328+
async_activity,
329+
start_to_close_timeout=timedelta(seconds=10),
330+
activity_id="async-activity-id",
331+
)
332+
333+
334+
async def test_async_activity_completion_payload_conversion(
335+
client: Client,
336+
):
337+
workflow_id = str(uuid.uuid4())
338+
task_queue = str(uuid.uuid4())
339+
340+
config = client.config()
341+
config["data_converter"] = data_converter
342+
client = Client(**config)
343+
344+
async with Worker(
345+
client,
346+
task_queue=task_queue,
347+
workflows=[AsyncActivityCompletionSerializationContextTestWorkflow],
348+
activities=[async_activity],
349+
workflow_runner=UnsandboxedWorkflowRunner(), # so that we can use isinstance
350+
):
351+
wf_handle = await client.start_workflow(
352+
AsyncActivityCompletionSerializationContextTestWorkflow.run,
353+
id=workflow_id,
354+
task_queue=task_queue,
355+
)
356+
activity_handle = client.get_async_activity_handle(
357+
workflow_id=workflow_id,
358+
run_id=wf_handle.first_execution_run_id,
359+
activity_id="async-activity-id",
360+
)
361+
await activity_handle.complete(TraceData())
362+
result = await wf_handle.result()
363+
364+
workflow_context = dataclasses.asdict(
365+
WorkflowSerializationContext(
366+
namespace="default",
367+
workflow_id=workflow_id,
368+
)
369+
)
370+
activity_context = dataclasses.asdict(
371+
ActivitySerializationContext(
372+
namespace="default",
373+
workflow_id=workflow_id,
374+
workflow_type="AsyncActivityCompletionSerializationContextTestWorkflow",
375+
activity_type="async_activity",
376+
activity_task_queue=task_queue,
377+
is_local=False,
378+
)
379+
)
380+
assert_trace(
381+
result.items,
382+
[
383+
TraceItem(
384+
context_type="activity",
385+
in_workflow=False,
386+
method="to_payload",
387+
context=activity_context, # Outbound activity input
388+
),
389+
TraceItem(
390+
context_type="activity",
391+
in_workflow=False,
392+
method="from_payload",
393+
context=activity_context, # Inbound activity input
394+
),
395+
TraceItem(
396+
context_type="activity",
397+
in_workflow=False,
398+
method="to_payload",
399+
context=activity_context, # Outbound activity result
400+
),
401+
TraceItem(
402+
context_type="activity",
403+
in_workflow=False,
404+
method="from_payload",
405+
context=activity_context, # Inbound activity result
406+
),
407+
TraceItem(
408+
context_type="workflow",
409+
in_workflow=True,
410+
method="to_payload",
411+
context=workflow_context, # Inbound activity result
412+
),
413+
],
414+
)
415+
416+
317417
# Signal test
318418

319419

0 commit comments

Comments
 (0)