Skip to content

Commit bfaee4c

Browse files
authored
Merge branch 'main' into 728
2 parents 708e74b + 7665bf5 commit bfaee4c

File tree

5 files changed

+1423
-2
lines changed

5 files changed

+1423
-2
lines changed

tests/worker/test_replayer.py

Lines changed: 107 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,16 +4,27 @@
44
from dataclasses import dataclass
55
from datetime import timedelta
66
from pathlib import Path
7-
from typing import Dict
7+
from typing import Any, Dict, Optional, Type
88

99
import pytest
1010

1111
from temporalio import activity, workflow
1212
from temporalio.client import Client, WorkflowFailureError, WorkflowHistory
1313
from temporalio.exceptions import ApplicationError
1414
from temporalio.testing import WorkflowEnvironment
15-
from temporalio.worker import Replayer, Worker
15+
from temporalio.worker import (
16+
ExecuteWorkflowInput,
17+
Interceptor,
18+
Replayer,
19+
Worker,
20+
WorkflowInboundInterceptor,
21+
WorkflowInterceptorClassInput,
22+
)
1623
from tests.helpers import assert_eq_eventually
24+
from tests.worker.test_workflow import (
25+
ActivityAndSignalsWhileWorkflowDown,
26+
SignalsActivitiesTimersUpdatesTracingWorkflow,
27+
)
1728

1829

1930
@activity.defn
@@ -393,3 +404,97 @@ async def test_replayer_command_reordering_backward_compatibility() -> None:
393404
await Replayer(workflows=[UpdateCompletionAfterWorkflowReturn]).replay_workflow(
394405
WorkflowHistory.from_json("fake", history)
395406
)
407+
408+
409+
test_replayer_workflow_res = None
410+
411+
412+
class WorkerWorkflowResultInterceptor(Interceptor):
413+
def workflow_interceptor_class(
414+
self, input: WorkflowInterceptorClassInput
415+
) -> Optional[Type[WorkflowInboundInterceptor]]:
416+
return WorkflowResultInterceptor
417+
418+
419+
class WorkflowResultInterceptor(WorkflowInboundInterceptor):
420+
async def execute_workflow(self, input: ExecuteWorkflowInput) -> Any:
421+
global test_replayer_workflow_res
422+
res = await super().execute_workflow(input)
423+
test_replayer_workflow_res = res
424+
return res
425+
426+
427+
async def test_replayer_async_ordering() -> None:
428+
"""
429+
This test verifies that the order that asyncio tasks/coroutines are woken up matches the
430+
order they were before changes to apply all jobs and then run the event loop, where previously
431+
the event loop was ran after each "batch" of jobs.
432+
"""
433+
histories_and_expecteds = [
434+
(
435+
"test_replayer_event_tracing.json",
436+
[
437+
"sig-before-sync",
438+
"sig-before-1",
439+
"sig-before-2",
440+
"timer-sync",
441+
"act-sync",
442+
"act-1",
443+
"act-2",
444+
"sig-1-sync",
445+
"sig-1-1",
446+
"sig-1-2",
447+
"update-1-sync",
448+
"update-1-1",
449+
"update-1-2",
450+
"timer-1",
451+
"timer-2",
452+
],
453+
),
454+
(
455+
"test_replayer_event_tracing_double_sig_at_start.json",
456+
[
457+
"sig-before-sync",
458+
"sig-before-1",
459+
"sig-1-sync",
460+
"sig-1-1",
461+
"sig-before-2",
462+
"sig-1-2",
463+
"timer-sync",
464+
"act-sync",
465+
"update-1-sync",
466+
"update-1-1",
467+
"update-1-2",
468+
"act-1",
469+
"act-2",
470+
"timer-1",
471+
"timer-2",
472+
],
473+
),
474+
]
475+
for history, expected in histories_and_expecteds:
476+
with Path(__file__).with_name(history).open() as f:
477+
history = f.read()
478+
await Replayer(
479+
workflows=[SignalsActivitiesTimersUpdatesTracingWorkflow],
480+
interceptors=[WorkerWorkflowResultInterceptor()],
481+
).replay_workflow(WorkflowHistory.from_json("fake", history))
482+
assert test_replayer_workflow_res == expected
483+
484+
485+
async def test_replayer_alternate_async_ordering() -> None:
486+
with Path(__file__).with_name(
487+
"test_replayer_event_tracing_alternate.json"
488+
).open() as f:
489+
history = f.read()
490+
await Replayer(
491+
workflows=[ActivityAndSignalsWhileWorkflowDown],
492+
interceptors=[WorkerWorkflowResultInterceptor()],
493+
).replay_workflow(WorkflowHistory.from_json("fake", history))
494+
assert test_replayer_workflow_res == [
495+
"act-start",
496+
"sig-1",
497+
"sig-2",
498+
"counter-2",
499+
"act-done",
500+
]

0 commit comments

Comments
 (0)