diff --git a/tests/worker/test_replayer.py b/tests/worker/test_replayer.py index 9f6e6ac53..390e7d79f 100644 --- a/tests/worker/test_replayer.py +++ b/tests/worker/test_replayer.py @@ -4,7 +4,7 @@ from dataclasses import dataclass from datetime import timedelta from pathlib import Path -from typing import Dict +from typing import Any, Dict, Optional, Type import pytest @@ -12,8 +12,19 @@ from temporalio.client import Client, WorkflowFailureError, WorkflowHistory from temporalio.exceptions import ApplicationError from temporalio.testing import WorkflowEnvironment -from temporalio.worker import Replayer, Worker +from temporalio.worker import ( + ExecuteWorkflowInput, + Interceptor, + Replayer, + Worker, + WorkflowInboundInterceptor, + WorkflowInterceptorClassInput, +) from tests.helpers import assert_eq_eventually +from tests.worker.test_workflow import ( + ActivityAndSignalsWhileWorkflowDown, + SignalsActivitiesTimersUpdatesTracingWorkflow, +) @activity.defn @@ -385,3 +396,97 @@ async def test_replayer_command_reordering_backward_compatibility() -> None: await Replayer(workflows=[UpdateCompletionAfterWorkflowReturn]).replay_workflow( WorkflowHistory.from_json("fake", history) ) + + +test_replayer_workflow_res = None + + +class WorkerWorkflowResultInterceptor(Interceptor): + def workflow_interceptor_class( + self, input: WorkflowInterceptorClassInput + ) -> Optional[Type[WorkflowInboundInterceptor]]: + return WorkflowResultInterceptor + + +class WorkflowResultInterceptor(WorkflowInboundInterceptor): + async def execute_workflow(self, input: ExecuteWorkflowInput) -> Any: + global test_replayer_workflow_res + res = await super().execute_workflow(input) + test_replayer_workflow_res = res + return res + + +async def test_replayer_async_ordering() -> None: + """ + This test verifies that the order that asyncio tasks/coroutines are woken up matches the + order they were before changes to apply all jobs and then run the event loop, where previously + the event loop was ran after each "batch" of jobs. + """ + histories_and_expecteds = [ + ( + "test_replayer_event_tracing.json", + [ + "sig-before-sync", + "sig-before-1", + "sig-before-2", + "timer-sync", + "act-sync", + "act-1", + "act-2", + "sig-1-sync", + "sig-1-1", + "sig-1-2", + "update-1-sync", + "update-1-1", + "update-1-2", + "timer-1", + "timer-2", + ], + ), + ( + "test_replayer_event_tracing_double_sig_at_start.json", + [ + "sig-before-sync", + "sig-before-1", + "sig-1-sync", + "sig-1-1", + "sig-before-2", + "sig-1-2", + "timer-sync", + "act-sync", + "update-1-sync", + "update-1-1", + "update-1-2", + "act-1", + "act-2", + "timer-1", + "timer-2", + ], + ), + ] + for history, expected in histories_and_expecteds: + with Path(__file__).with_name(history).open() as f: + history = f.read() + await Replayer( + workflows=[SignalsActivitiesTimersUpdatesTracingWorkflow], + interceptors=[WorkerWorkflowResultInterceptor()], + ).replay_workflow(WorkflowHistory.from_json("fake", history)) + assert test_replayer_workflow_res == expected + + +async def test_replayer_alternate_async_ordering() -> None: + with Path(__file__).with_name( + "test_replayer_event_tracing_alternate.json" + ).open() as f: + history = f.read() + await Replayer( + workflows=[ActivityAndSignalsWhileWorkflowDown], + interceptors=[WorkerWorkflowResultInterceptor()], + ).replay_workflow(WorkflowHistory.from_json("fake", history)) + assert test_replayer_workflow_res == [ + "act-start", + "sig-1", + "sig-2", + "counter-2", + "act-done", + ] diff --git a/tests/worker/test_replayer_event_tracing.json b/tests/worker/test_replayer_event_tracing.json new file mode 100644 index 000000000..c70599292 --- /dev/null +++ b/tests/worker/test_replayer_event_tracing.json @@ -0,0 +1,469 @@ +{ + "events": [ + { + "eventId": "1", + "eventTime": "2024-12-30T22:38:44.668149481Z", + "eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_STARTED", + "taskId": "1049178", + "workflowExecutionStartedEventAttributes": { + "workflowType": { + "name": "SignalsActivitiesTimersUpdatesTracingWorkflow" + }, + "taskQueue": { + "name": "tq-a41dde3c-1ed4-4a67-a808-d9d99da337c6", + "kind": "TASK_QUEUE_KIND_NORMAL" + }, + "workflowTaskTimeout": "10s", + "originalExecutionRunId": "af6b802e-0485-4953-9625-c46e9b2243e6", + "identity": "19041@monolith", + "firstExecutionRunId": "af6b802e-0485-4953-9625-c46e9b2243e6", + "attempt": 1, + "firstWorkflowTaskBackoff": "0s", + "workflowId": "wf-13b9e507-6f00-42e7-b9f3-3c07ba101ff4" + } + }, + { + "eventId": "2", + "eventTime": "2024-12-30T22:38:44.668193778Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_SCHEDULED", + "taskId": "1049179", + "workflowTaskScheduledEventAttributes": { + "taskQueue": { + "name": "tq-a41dde3c-1ed4-4a67-a808-d9d99da337c6", + "kind": "TASK_QUEUE_KIND_NORMAL" + }, + "startToCloseTimeout": "10s", + "attempt": 1 + } + }, + { + "eventId": "3", + "eventTime": "2024-12-30T22:38:44.670247658Z", + "eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED", + "taskId": "1049184", + "workflowExecutionSignaledEventAttributes": { + "signalName": "dosig", + "input": { + "payloads": [ + { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "ImJlZm9yZSI=" + } + ] + }, + "identity": "19041@monolith" + } + }, + { + "eventId": "4", + "eventTime": "2024-12-30T22:38:44.773914284Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_STARTED", + "taskId": "1049186", + "workflowTaskStartedEventAttributes": { + "scheduledEventId": "2", + "identity": "19041@monolith", + "requestId": "550a3619-9085-434a-806e-a1f3f36f0d81", + "historySizeBytes": "432", + "workerVersion": { + "buildId": "57e24bd7b09fe919fbd6100294185967" + } + } + }, + { + "eventId": "5", + "eventTime": "2024-12-30T22:38:44.855333558Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_COMPLETED", + "taskId": "1049191", + "workflowTaskCompletedEventAttributes": { + "scheduledEventId": "2", + "startedEventId": "4", + "identity": "19041@monolith", + "workerVersion": { + "buildId": "57e24bd7b09fe919fbd6100294185967" + }, + "sdkMetadata": { + "coreUsedFlags": [ + 2, + 1, + 3 + ] + }, + "meteringMetadata": {} + } + }, + { + "eventId": "6", + "eventTime": "2024-12-30T22:38:44.855357649Z", + "eventType": "EVENT_TYPE_TIMER_STARTED", + "taskId": "1049192", + "timerStartedEventAttributes": { + "timerId": "1", + "startToFireTimeout": "0.100s", + "workflowTaskCompletedEventId": "5" + } + }, + { + "eventId": "7", + "eventTime": "2024-12-30T22:38:44.855373343Z", + "eventType": "EVENT_TYPE_ACTIVITY_TASK_SCHEDULED", + "taskId": "1049193", + "activityTaskScheduledEventAttributes": { + "activityId": "1", + "activityType": { + "name": "say_hello" + }, + "taskQueue": { + "name": "tq-a41dde3c-1ed4-4a67-a808-d9d99da337c6", + "kind": "TASK_QUEUE_KIND_NORMAL" + }, + "header": {}, + "input": { + "payloads": [ + { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "IkVuY2hpIg==" + } + ] + }, + "scheduleToCloseTimeout": "5s", + "scheduleToStartTimeout": "5s", + "startToCloseTimeout": "5s", + "heartbeatTimeout": "0s", + "workflowTaskCompletedEventId": "5", + "retryPolicy": { + "initialInterval": "1s", + "backoffCoefficient": 2, + "maximumInterval": "100s" + }, + "useWorkflowBuildId": true + } + }, + { + "eventId": "8", + "eventTime": "2024-12-30T22:38:44.855391748Z", + "eventType": "EVENT_TYPE_ACTIVITY_TASK_STARTED", + "taskId": "1049198", + "activityTaskStartedEventAttributes": { + "scheduledEventId": "7", + "identity": "19041@monolith", + "requestId": "1cf0b17b-c708-4690-88e7-238facf002b1", + "attempt": 1, + "workerVersion": { + "buildId": "57e24bd7b09fe919fbd6100294185967" + } + } + }, + { + "eventId": "9", + "eventTime": "2024-12-30T22:38:44.858633171Z", + "eventType": "EVENT_TYPE_ACTIVITY_TASK_COMPLETED", + "taskId": "1049199", + "activityTaskCompletedEventAttributes": { + "result": { + "payloads": [ + { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "IkhlbGxvLCBFbmNoaSEi" + } + ] + }, + "scheduledEventId": "7", + "startedEventId": "8", + "identity": "19041@monolith" + } + }, + { + "eventId": "10", + "eventTime": "2024-12-30T22:38:44.858637777Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_SCHEDULED", + "taskId": "1049200", + "workflowTaskScheduledEventAttributes": { + "taskQueue": { + "name": "19041@monolith-bf301b3177254f0e9f28bbea3fcd3bb5", + "kind": "TASK_QUEUE_KIND_STICKY", + "normalName": "tq-a41dde3c-1ed4-4a67-a808-d9d99da337c6" + }, + "startToCloseTimeout": "10s", + "attempt": 1 + } + }, + { + "eventId": "11", + "eventTime": "2024-12-30T22:38:44.859901888Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_STARTED", + "taskId": "1049204", + "workflowTaskStartedEventAttributes": { + "scheduledEventId": "10", + "identity": "19041@monolith", + "requestId": "f3d5bdba-6830-43fa-a0ca-ee4c49435118", + "historySizeBytes": "1187", + "workerVersion": { + "buildId": "57e24bd7b09fe919fbd6100294185967" + } + } + }, + { + "eventId": "12", + "eventTime": "2024-12-30T22:38:44.862902108Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_COMPLETED", + "taskId": "1049208", + "workflowTaskCompletedEventAttributes": { + "scheduledEventId": "10", + "startedEventId": "11", + "identity": "19041@monolith", + "workerVersion": { + "buildId": "57e24bd7b09fe919fbd6100294185967" + }, + "sdkMetadata": {}, + "meteringMetadata": {} + } + }, + { + "eventId": "13", + "eventTime": "2024-12-30T22:38:44.977333487Z", + "eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED", + "taskId": "1049210", + "workflowExecutionSignaledEventAttributes": { + "signalName": "dosig", + "input": { + "payloads": [ + { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "IjEi" + } + ] + }, + "identity": "19041@monolith" + } + }, + { + "eventId": "14", + "eventTime": "2024-12-30T22:38:44.977342033Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_SCHEDULED", + "taskId": "1049211", + "workflowTaskScheduledEventAttributes": { + "taskQueue": { + "name": "19041@monolith-bf301b3177254f0e9f28bbea3fcd3bb5", + "kind": "TASK_QUEUE_KIND_STICKY", + "normalName": "tq-a41dde3c-1ed4-4a67-a808-d9d99da337c6" + }, + "startToCloseTimeout": "10s", + "attempt": 1 + } + }, + { + "eventId": "15", + "eventTime": "2024-12-30T22:38:44.981296169Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_STARTED", + "taskId": "1049215", + "workflowTaskStartedEventAttributes": { + "scheduledEventId": "14", + "identity": "19041@monolith", + "requestId": "e0f19a52-c283-4898-a4c2-18b8dbd54e87", + "historySizeBytes": "1603", + "workerVersion": { + "buildId": "57e24bd7b09fe919fbd6100294185967" + } + } + }, + { + "eventId": "16", + "eventTime": "2024-12-30T22:38:44.984426814Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_COMPLETED", + "taskId": "1049219", + "workflowTaskCompletedEventAttributes": { + "scheduledEventId": "14", + "startedEventId": "15", + "identity": "19041@monolith", + "workerVersion": { + "buildId": "57e24bd7b09fe919fbd6100294185967" + }, + "sdkMetadata": {}, + "meteringMetadata": {} + } + }, + { + "eventId": "17", + "eventTime": "2024-12-30T22:38:44.984766655Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_SCHEDULED", + "taskId": "1049222", + "workflowTaskScheduledEventAttributes": { + "taskQueue": { + "name": "19041@monolith-bf301b3177254f0e9f28bbea3fcd3bb5", + "kind": "TASK_QUEUE_KIND_STICKY", + "normalName": "tq-a41dde3c-1ed4-4a67-a808-d9d99da337c6" + }, + "startToCloseTimeout": "10s", + "attempt": 1 + } + }, + { + "eventId": "18", + "eventTime": "2024-12-30T22:38:44.984768595Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_STARTED", + "taskId": "1049223", + "workflowTaskStartedEventAttributes": { + "scheduledEventId": "17", + "identity": "19041@monolith", + "requestId": "request-from-RespondWorkflowTaskCompleted", + "historySizeBytes": "1810", + "workerVersion": { + "buildId": "57e24bd7b09fe919fbd6100294185967" + } + } + }, + { + "eventId": "19", + "eventTime": "2024-12-30T22:38:44.987111164Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_COMPLETED", + "taskId": "1049224", + "workflowTaskCompletedEventAttributes": { + "scheduledEventId": "17", + "startedEventId": "18", + "identity": "19041@monolith", + "workerVersion": { + "buildId": "57e24bd7b09fe919fbd6100294185967" + }, + "sdkMetadata": {}, + "meteringMetadata": {} + } + }, + { + "eventId": "20", + "eventTime": "2024-12-30T22:38:44.987144596Z", + "eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_ACCEPTED", + "taskId": "1049225", + "workflowExecutionUpdateAcceptedEventAttributes": { + "protocolInstanceId": "3899d0f3-2269-485e-bea0-d1a6f23bffc6", + "acceptedRequestMessageId": "3899d0f3-2269-485e-bea0-d1a6f23bffc6/request", + "acceptedRequestSequencingEventId": "17", + "acceptedRequest": { + "meta": { + "updateId": "3899d0f3-2269-485e-bea0-d1a6f23bffc6", + "identity": "19041@monolith" + }, + "input": { + "name": "doupdate", + "args": { + "payloads": [ + { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "IjEi" + } + ] + } + } + } + } + }, + { + "eventId": "21", + "eventTime": "2024-12-30T22:38:44.987169359Z", + "eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_COMPLETED", + "taskId": "1049226", + "workflowExecutionUpdateCompletedEventAttributes": { + "meta": { + "updateId": "3899d0f3-2269-485e-bea0-d1a6f23bffc6", + "identity": "19041@monolith" + }, + "acceptedEventId": "20", + "outcome": { + "success": { + "payloads": [ + { + "metadata": { + "encoding": "YmluYXJ5L251bGw=" + } + } + ] + } + } + } + }, + { + "eventId": "22", + "eventTime": "2024-12-30T22:38:45.670816108Z", + "eventType": "EVENT_TYPE_TIMER_FIRED", + "taskId": "1049229", + "timerFiredEventAttributes": { + "timerId": "1", + "startedEventId": "6" + } + }, + { + "eventId": "23", + "eventTime": "2024-12-30T22:38:45.670833156Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_SCHEDULED", + "taskId": "1049230", + "workflowTaskScheduledEventAttributes": { + "taskQueue": { + "name": "19041@monolith-bf301b3177254f0e9f28bbea3fcd3bb5", + "kind": "TASK_QUEUE_KIND_STICKY", + "normalName": "tq-a41dde3c-1ed4-4a67-a808-d9d99da337c6" + }, + "startToCloseTimeout": "10s", + "attempt": 1 + } + }, + { + "eventId": "24", + "eventTime": "2024-12-30T22:38:45.675574932Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_STARTED", + "taskId": "1049234", + "workflowTaskStartedEventAttributes": { + "scheduledEventId": "23", + "identity": "19041@monolith", + "requestId": "9d2c2c0b-3f6f-4e67-9693-150e2eec6fc6", + "historySizeBytes": "2640", + "workerVersion": { + "buildId": "57e24bd7b09fe919fbd6100294185967" + } + } + }, + { + "eventId": "25", + "eventTime": "2024-12-30T22:38:45.690634379Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_COMPLETED", + "taskId": "1049238", + "workflowTaskCompletedEventAttributes": { + "scheduledEventId": "23", + "startedEventId": "24", + "identity": "19041@monolith", + "workerVersion": { + "buildId": "57e24bd7b09fe919fbd6100294185967" + }, + "sdkMetadata": {}, + "meteringMetadata": {} + } + }, + { + "eventId": "26", + "eventTime": "2024-12-30T22:38:45.690715818Z", + "eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED", + "taskId": "1049239", + "workflowExecutionCompletedEventAttributes": { + "result": { + "payloads": [ + { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "WyJzaWctYmVmb3JlLXN5bmMiLCJzaWctYmVmb3JlLTEiLCJ0aW1lci1zeW5jIiwiYWN0LXN5bmMiLCJzaWctYmVmb3JlLTIiLCJhY3QtMSIsImFjdC0yIiwic2lnLTEtc3luYyIsInNpZy0xLTEiLCJzaWctMS0yIiwidXBkYXRlLTEtc3luYyIsInVwZGF0ZS0xLTEiLCJ1cGRhdGUtMS0yIiwidGltZXItMSIsInRpbWVyLTIiXQ==" + } + ] + }, + "workflowTaskCompletedEventId": "25" + } + } + ] +} \ No newline at end of file diff --git a/tests/worker/test_replayer_event_tracing_alternate.json b/tests/worker/test_replayer_event_tracing_alternate.json new file mode 100644 index 000000000..f01c06242 --- /dev/null +++ b/tests/worker/test_replayer_event_tracing_alternate.json @@ -0,0 +1,265 @@ +{ + "events": [ + { + "eventId": "1", + "eventTime": "2025-01-14T00:22:21.334998040Z", + "eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_STARTED", + "taskId": "1048663", + "workflowExecutionStartedEventAttributes": { + "workflowType": { + "name": "ActivityAndSignalsWhileWorkflowDown" + }, + "taskQueue": { + "name": "tq-3a5a912b-654f-438d-8798-b2f01e10cd8b", + "kind": "TASK_QUEUE_KIND_NORMAL" + }, + "input": { + "payloads": [ + { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "InRxLTYwYzdkMTlhLTgxYTYtNDBlZi05MGY1LTIxMmNlZWRmY2RiMSI=" + } + ] + }, + "workflowTaskTimeout": "10s", + "originalExecutionRunId": "a6dc1b12-c29a-40c9-be78-9e412de7bcc9", + "identity": "200001@monolith", + "firstExecutionRunId": "a6dc1b12-c29a-40c9-be78-9e412de7bcc9", + "attempt": 1, + "firstWorkflowTaskBackoff": "0s", + "workflowId": "wf-052172dc-68ba-449a-94b9-c02ee901d0d8" + } + }, + { + "eventId": "2", + "eventTime": "2025-01-14T00:22:21.335034841Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_SCHEDULED", + "taskId": "1048664", + "workflowTaskScheduledEventAttributes": { + "taskQueue": { + "name": "tq-3a5a912b-654f-438d-8798-b2f01e10cd8b", + "kind": "TASK_QUEUE_KIND_NORMAL" + }, + "startToCloseTimeout": "10s", + "attempt": 1 + } + }, + { + "eventId": "3", + "eventTime": "2025-01-14T00:22:21.436966989Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_STARTED", + "taskId": "1048669", + "workflowTaskStartedEventAttributes": { + "scheduledEventId": "2", + "identity": "200001@monolith", + "requestId": "bbbfaa92-5121-48d0-b06a-2935e42bcdde", + "historySizeBytes": "406", + "workerVersion": { + "buildId": "57e24bd7b09fe919fbd6100294185967" + } + } + }, + { + "eventId": "4", + "eventTime": "2025-01-14T00:22:21.513746285Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_COMPLETED", + "taskId": "1048673", + "workflowTaskCompletedEventAttributes": { + "scheduledEventId": "2", + "startedEventId": "3", + "identity": "200001@monolith", + "workerVersion": { + "buildId": "57e24bd7b09fe919fbd6100294185967" + }, + "sdkMetadata": { + "coreUsedFlags": [ + 2, + 1, + 3 + ] + }, + "meteringMetadata": {} + } + }, + { + "eventId": "5", + "eventTime": "2025-01-14T00:22:21.513782761Z", + "eventType": "EVENT_TYPE_ACTIVITY_TASK_SCHEDULED", + "taskId": "1048674", + "activityTaskScheduledEventAttributes": { + "activityId": "1", + "activityType": { + "name": "say_hello" + }, + "taskQueue": { + "name": "tq-60c7d19a-81a6-40ef-90f5-212ceedfcdb1", + "kind": "TASK_QUEUE_KIND_NORMAL" + }, + "header": {}, + "input": { + "payloads": [ + { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "IkVuY2hpIg==" + } + ] + }, + "scheduleToCloseTimeout": "30s", + "scheduleToStartTimeout": "30s", + "startToCloseTimeout": "30s", + "heartbeatTimeout": "0s", + "workflowTaskCompletedEventId": "4", + "retryPolicy": { + "initialInterval": "1s", + "backoffCoefficient": 2, + "maximumInterval": "100s" + } + } + }, + { + "eventId": "6", + "eventTime": "2025-01-14T00:22:21.653032295Z", + "eventType": "EVENT_TYPE_ACTIVITY_TASK_STARTED", + "taskId": "1048681", + "activityTaskStartedEventAttributes": { + "scheduledEventId": "5", + "identity": "200001@monolith", + "requestId": "f4db5e63-7f33-4e35-ada8-7d2941ff4434", + "attempt": 1, + "workerVersion": { + "buildId": "57e24bd7b09fe919fbd6100294185967" + } + } + }, + { + "eventId": "7", + "eventTime": "2025-01-14T00:22:21.661985708Z", + "eventType": "EVENT_TYPE_ACTIVITY_TASK_COMPLETED", + "taskId": "1048682", + "activityTaskCompletedEventAttributes": { + "result": { + "payloads": [ + { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "IkhlbGxvLCBFbmNoaSEi" + } + ] + }, + "scheduledEventId": "5", + "startedEventId": "6", + "identity": "200001@monolith" + } + }, + { + "eventId": "8", + "eventTime": "2025-01-14T00:22:21.662002915Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_SCHEDULED", + "taskId": "1048683", + "workflowTaskScheduledEventAttributes": { + "taskQueue": { + "name": "200001@monolith-c1916baa725b4c65a2748ad7243b3910", + "kind": "TASK_QUEUE_KIND_STICKY", + "normalName": "tq-3a5a912b-654f-438d-8798-b2f01e10cd8b" + }, + "startToCloseTimeout": "10s", + "attempt": 1 + } + }, + { + "eventId": "9", + "eventTime": "2025-01-14T00:22:22.648893473Z", + "eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED", + "taskId": "1048687", + "workflowExecutionSignaledEventAttributes": { + "signalName": "dosig", + "input": { + "payloads": [ + { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "IjEi" + } + ] + }, + "identity": "200001@monolith" + } + }, + { + "eventId": "10", + "eventTime": "2025-01-14T00:22:22.650727279Z", + "eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED", + "taskId": "1048689", + "workflowExecutionSignaledEventAttributes": { + "signalName": "dosig", + "input": { + "payloads": [ + { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "IjIi" + } + ] + }, + "identity": "200001@monolith" + } + }, + { + "eventId": "11", + "eventTime": "2025-01-14T00:22:23.670330822Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_STARTED", + "taskId": "1048691", + "workflowTaskStartedEventAttributes": { + "scheduledEventId": "8", + "identity": "200001@monolith", + "requestId": "87a1f77f-6400-43af-81ae-2ba88389243b", + "historySizeBytes": "1293", + "workerVersion": { + "buildId": "57e24bd7b09fe919fbd6100294185967" + } + } + }, + { + "eventId": "12", + "eventTime": "2025-01-14T00:22:23.773536352Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_COMPLETED", + "taskId": "1048695", + "workflowTaskCompletedEventAttributes": { + "scheduledEventId": "8", + "startedEventId": "11", + "identity": "200001@monolith", + "workerVersion": { + "buildId": "57e24bd7b09fe919fbd6100294185967" + }, + "sdkMetadata": {}, + "meteringMetadata": {} + } + }, + { + "eventId": "13", + "eventTime": "2025-01-14T00:22:23.773564892Z", + "eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED", + "taskId": "1048696", + "workflowExecutionCompletedEventAttributes": { + "result": { + "payloads": [ + { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "WyJhY3Qtc3RhcnQiLCJzaWctMSIsInNpZy0yIiwiY291bnRlci0yIiwiYWN0LWRvbmUiXQ==" + } + ] + }, + "workflowTaskCompletedEventId": "12" + } + } + ] +} \ No newline at end of file diff --git a/tests/worker/test_replayer_event_tracing_double_sig_at_start.json b/tests/worker/test_replayer_event_tracing_double_sig_at_start.json new file mode 100644 index 000000000..77e6d7b1b --- /dev/null +++ b/tests/worker/test_replayer_event_tracing_double_sig_at_start.json @@ -0,0 +1,419 @@ +{ + "events": [ + { + "eventId": "1", + "eventTime": "2024-12-30T23:32:32.093973251Z", + "eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_STARTED", + "taskId": "1049984", + "workflowExecutionStartedEventAttributes": { + "workflowType": { + "name": "SignalsActivitiesTimersUpdatesTracingWorkflow" + }, + "taskQueue": { + "name": "tq-a50c15b0-72ae-4dfb-abb3-8d4889bfe0b9", + "kind": "TASK_QUEUE_KIND_NORMAL" + }, + "workflowTaskTimeout": "10s", + "originalExecutionRunId": "2de83810-23a0-462e-ac13-bb7396196fe7", + "identity": "45920@monolith", + "firstExecutionRunId": "2de83810-23a0-462e-ac13-bb7396196fe7", + "attempt": 1, + "firstWorkflowTaskBackoff": "0s", + "workflowId": "wf-22dd253f-47e3-4235-975b-1de4722a65ca" + } + }, + { + "eventId": "2", + "eventTime": "2024-12-30T23:32:32.094013742Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_SCHEDULED", + "taskId": "1049985", + "workflowTaskScheduledEventAttributes": { + "taskQueue": { + "name": "tq-a50c15b0-72ae-4dfb-abb3-8d4889bfe0b9", + "kind": "TASK_QUEUE_KIND_NORMAL" + }, + "startToCloseTimeout": "10s", + "attempt": 1 + } + }, + { + "eventId": "3", + "eventTime": "2024-12-30T23:32:32.095679382Z", + "eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED", + "taskId": "1049990", + "workflowExecutionSignaledEventAttributes": { + "signalName": "dosig", + "input": { + "payloads": [ + { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "ImJlZm9yZSI=" + } + ] + }, + "identity": "45920@monolith" + } + }, + { + "eventId": "4", + "eventTime": "2024-12-30T23:32:32.196636794Z", + "eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED", + "taskId": "1049992", + "workflowExecutionSignaledEventAttributes": { + "signalName": "dosig", + "input": { + "payloads": [ + { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "IjEi" + } + ] + }, + "identity": "45920@monolith" + } + }, + { + "eventId": "5", + "eventTime": "2024-12-30T23:32:32.198643629Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_STARTED", + "taskId": "1049994", + "workflowTaskStartedEventAttributes": { + "scheduledEventId": "2", + "identity": "45920@monolith", + "requestId": "830905ad-736a-49c4-bb60-36636b6bfcbd", + "historySizeBytes": "511", + "workerVersion": { + "buildId": "57e24bd7b09fe919fbd6100294185967" + } + } + }, + { + "eventId": "6", + "eventTime": "2024-12-30T23:32:32.280201513Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_COMPLETED", + "taskId": "1049999", + "workflowTaskCompletedEventAttributes": { + "scheduledEventId": "2", + "startedEventId": "5", + "identity": "45920@monolith", + "workerVersion": { + "buildId": "57e24bd7b09fe919fbd6100294185967" + }, + "sdkMetadata": { + "coreUsedFlags": [ + 3, + 1, + 2 + ] + }, + "meteringMetadata": {} + } + }, + { + "eventId": "7", + "eventTime": "2024-12-30T23:32:32.280226107Z", + "eventType": "EVENT_TYPE_TIMER_STARTED", + "taskId": "1050000", + "timerStartedEventAttributes": { + "timerId": "1", + "startToFireTimeout": "0.100s", + "workflowTaskCompletedEventId": "6" + } + }, + { + "eventId": "8", + "eventTime": "2024-12-30T23:32:32.280241347Z", + "eventType": "EVENT_TYPE_ACTIVITY_TASK_SCHEDULED", + "taskId": "1050001", + "activityTaskScheduledEventAttributes": { + "activityId": "1", + "activityType": { + "name": "say_hello" + }, + "taskQueue": { + "name": "tq-a50c15b0-72ae-4dfb-abb3-8d4889bfe0b9", + "kind": "TASK_QUEUE_KIND_NORMAL" + }, + "header": {}, + "input": { + "payloads": [ + { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "IkVuY2hpIg==" + } + ] + }, + "scheduleToCloseTimeout": "5s", + "scheduleToStartTimeout": "5s", + "startToCloseTimeout": "5s", + "heartbeatTimeout": "0s", + "workflowTaskCompletedEventId": "6", + "retryPolicy": { + "initialInterval": "1s", + "backoffCoefficient": 2, + "maximumInterval": "100s" + }, + "useWorkflowBuildId": true + } + }, + { + "eventId": "9", + "eventTime": "2024-12-30T23:32:32.280932517Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_SCHEDULED", + "taskId": "1050007", + "workflowTaskScheduledEventAttributes": { + "taskQueue": { + "name": "45920@monolith-4969e79c76e14402a358a6c48afa41fa", + "kind": "TASK_QUEUE_KIND_STICKY", + "normalName": "tq-a50c15b0-72ae-4dfb-abb3-8d4889bfe0b9" + }, + "startToCloseTimeout": "10s", + "attempt": 1 + } + }, + { + "eventId": "10", + "eventTime": "2024-12-30T23:32:32.280935332Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_STARTED", + "taskId": "1050008", + "workflowTaskStartedEventAttributes": { + "scheduledEventId": "9", + "requestId": "request-from-RespondWorkflowTaskCompleted", + "historySizeBytes": "926" + } + }, + { + "eventId": "11", + "eventTime": "2024-12-30T23:32:32.285406568Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_COMPLETED", + "taskId": "1050011", + "workflowTaskCompletedEventAttributes": { + "scheduledEventId": "9", + "startedEventId": "10", + "identity": "45920@monolith", + "workerVersion": { + "buildId": "57e24bd7b09fe919fbd6100294185967" + }, + "sdkMetadata": {}, + "meteringMetadata": {} + } + }, + { + "eventId": "12", + "eventTime": "2024-12-30T23:32:32.285444845Z", + "eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_ACCEPTED", + "taskId": "1050012", + "workflowExecutionUpdateAcceptedEventAttributes": { + "protocolInstanceId": "e4b5bdec-c887-422f-b28d-ac217874bc2b", + "acceptedRequestMessageId": "e4b5bdec-c887-422f-b28d-ac217874bc2b/request", + "acceptedRequestSequencingEventId": "9", + "acceptedRequest": { + "meta": { + "updateId": "e4b5bdec-c887-422f-b28d-ac217874bc2b", + "identity": "45920@monolith" + }, + "input": { + "name": "doupdate", + "args": { + "payloads": [ + { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "IjEi" + } + ] + } + } + } + } + }, + { + "eventId": "13", + "eventTime": "2024-12-30T23:32:32.285476357Z", + "eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_COMPLETED", + "taskId": "1050013", + "workflowExecutionUpdateCompletedEventAttributes": { + "meta": { + "updateId": "e4b5bdec-c887-422f-b28d-ac217874bc2b", + "identity": "45920@monolith" + }, + "acceptedEventId": "12", + "outcome": { + "success": { + "payloads": [ + { + "metadata": { + "encoding": "YmluYXJ5L251bGw=" + } + } + ] + } + } + } + }, + { + "eventId": "14", + "eventTime": "2024-12-30T23:32:32.280263855Z", + "eventType": "EVENT_TYPE_ACTIVITY_TASK_STARTED", + "taskId": "1050014", + "activityTaskStartedEventAttributes": { + "scheduledEventId": "8", + "identity": "45920@monolith", + "requestId": "3a230ac0-ff57-4016-b199-ce716a4eb331", + "attempt": 1, + "workerVersion": { + "buildId": "57e24bd7b09fe919fbd6100294185967" + } + } + }, + { + "eventId": "15", + "eventTime": "2024-12-30T23:32:32.284262316Z", + "eventType": "EVENT_TYPE_ACTIVITY_TASK_COMPLETED", + "taskId": "1050015", + "activityTaskCompletedEventAttributes": { + "result": { + "payloads": [ + { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "IkhlbGxvLCBFbmNoaSEi" + } + ] + }, + "scheduledEventId": "8", + "startedEventId": "14", + "identity": "45920@monolith" + } + }, + { + "eventId": "16", + "eventTime": "2024-12-30T23:32:32.285491511Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_SCHEDULED", + "taskId": "1050016", + "workflowTaskScheduledEventAttributes": { + "taskQueue": { + "name": "45920@monolith-4969e79c76e14402a358a6c48afa41fa", + "kind": "TASK_QUEUE_KIND_STICKY", + "normalName": "tq-a50c15b0-72ae-4dfb-abb3-8d4889bfe0b9" + }, + "startToCloseTimeout": "10s", + "attempt": 1 + } + }, + { + "eventId": "17", + "eventTime": "2024-12-30T23:32:32.285493889Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_STARTED", + "taskId": "1050017", + "workflowTaskStartedEventAttributes": { + "scheduledEventId": "16", + "identity": "45920@monolith", + "requestId": "request-from-RespondWorkflowTaskCompleted", + "historySizeBytes": "1126", + "workerVersion": { + "buildId": "57e24bd7b09fe919fbd6100294185967" + } + } + }, + { + "eventId": "18", + "eventTime": "2024-12-30T23:32:32.288461384Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_COMPLETED", + "taskId": "1050020", + "workflowTaskCompletedEventAttributes": { + "scheduledEventId": "16", + "startedEventId": "17", + "identity": "45920@monolith", + "workerVersion": { + "buildId": "57e24bd7b09fe919fbd6100294185967" + }, + "sdkMetadata": {}, + "meteringMetadata": {} + } + }, + { + "eventId": "19", + "eventTime": "2024-12-30T23:32:33.096788296Z", + "eventType": "EVENT_TYPE_TIMER_FIRED", + "taskId": "1050022", + "timerFiredEventAttributes": { + "timerId": "1", + "startedEventId": "7" + } + }, + { + "eventId": "20", + "eventTime": "2024-12-30T23:32:33.096806096Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_SCHEDULED", + "taskId": "1050023", + "workflowTaskScheduledEventAttributes": { + "taskQueue": { + "name": "45920@monolith-4969e79c76e14402a358a6c48afa41fa", + "kind": "TASK_QUEUE_KIND_STICKY", + "normalName": "tq-a50c15b0-72ae-4dfb-abb3-8d4889bfe0b9" + }, + "startToCloseTimeout": "10s", + "attempt": 1 + } + }, + { + "eventId": "21", + "eventTime": "2024-12-30T23:32:33.101429716Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_STARTED", + "taskId": "1050027", + "workflowTaskStartedEventAttributes": { + "scheduledEventId": "20", + "identity": "45920@monolith", + "requestId": "7ceeadc0-a095-4298-80ba-44b7b91d02d8", + "historySizeBytes": "2253", + "workerVersion": { + "buildId": "57e24bd7b09fe919fbd6100294185967" + } + } + }, + { + "eventId": "22", + "eventTime": "2024-12-30T23:32:33.117619050Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_COMPLETED", + "taskId": "1050031", + "workflowTaskCompletedEventAttributes": { + "scheduledEventId": "20", + "startedEventId": "21", + "identity": "45920@monolith", + "workerVersion": { + "buildId": "57e24bd7b09fe919fbd6100294185967" + }, + "sdkMetadata": {}, + "meteringMetadata": {} + } + }, + { + "eventId": "23", + "eventTime": "2024-12-30T23:32:33.117743945Z", + "eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED", + "taskId": "1050032", + "workflowExecutionCompletedEventAttributes": { + "result": { + "payloads": [ + { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "WyJzaWctYmVmb3JlLXN5bmMiLCJzaWctYmVmb3JlLTEiLCJzaWctMS1zeW5jIiwic2lnLTEtMSIsInNpZy1iZWZvcmUtMiIsInNpZy0xLTIiLCJ0aW1lci1zeW5jIiwiYWN0LXN5bmMiLCJ1cGRhdGUtMS1zeW5jIiwidXBkYXRlLTEtMSIsInVwZGF0ZS0xLTIiLCJhY3QtMSIsImFjdC0yIiwidGltZXItMSIsInRpbWVyLTIiXQ==" + } + ] + }, + "workflowTaskCompletedEventId": "22" + } + } + ] +} \ No newline at end of file diff --git a/tests/worker/test_workflow.py b/tests/worker/test_workflow.py index ca82bebc2..59129d5f7 100644 --- a/tests/worker/test_workflow.py +++ b/tests/worker/test_workflow.py @@ -6448,3 +6448,166 @@ async def test_concurrent_sleeps_use_proper_options( # Force replay with a query to ensure determinism await handle.query("__temporal_workflow_metadata") + + +@workflow.defn +class SignalsActivitiesTimersUpdatesTracingWorkflow: + """ + These handlers all do different things that will cause the event loop to yield, sometimes + until the next workflow task (ex: timer) sometimes within the workflow task (ex: future resolve + or wait condition). + """ + + def __init__(self) -> None: + self.events: List[str] = [] + + @workflow.run + async def run(self) -> List[str]: + tt = asyncio.create_task(self.run_timer()) + at = asyncio.create_task(self.run_act()) + await asyncio.gather(tt, at) + return self.events + + @workflow.signal + async def dosig(self, name: str): + self.events.append(f"sig-{name}-sync") + fut: asyncio.Future[bool] = asyncio.Future() + fut.set_result(True) + await fut + self.events.append(f"sig-{name}-1") + await workflow.wait_condition(lambda: True) + self.events.append(f"sig-{name}-2") + + @workflow.update + async def doupdate(self, name: str): + self.events.append(f"update-{name}-sync") + fut: asyncio.Future[bool] = asyncio.Future() + fut.set_result(True) + await fut + self.events.append(f"update-{name}-1") + await workflow.wait_condition(lambda: True) + self.events.append(f"update-{name}-2") + + async def run_timer(self): + self.events.append("timer-sync") + await workflow.sleep(0.1) + fut: asyncio.Future[bool] = asyncio.Future() + fut.set_result(True) + await fut + self.events.append("timer-1") + await workflow.wait_condition(lambda: True) + self.events.append("timer-2") + + async def run_act(self): + self.events.append("act-sync") + await workflow.execute_activity( + say_hello, "Enchi", schedule_to_close_timeout=timedelta(seconds=30) + ) + fut: asyncio.Future[bool] = asyncio.Future() + fut.set_result(True) + await fut + self.events.append("act-1") + await workflow.wait_condition(lambda: True) + self.events.append("act-2") + + +async def test_async_loop_ordering(client: Client, env: WorkflowEnvironment): + """This test mostly exists to generate histories for test_replayer_async_ordering. + See that test for more.""" + + if env.supports_time_skipping: + pytest.skip("This test doesn't work right with time skipping for some reason") + task_queue = f"tq-{uuid.uuid4()}" + handle = await client.start_workflow( + SignalsActivitiesTimersUpdatesTracingWorkflow.run, + id=f"wf-{uuid.uuid4()}", + task_queue=task_queue, + ) + await handle.signal(SignalsActivitiesTimersUpdatesTracingWorkflow.dosig, "before") + + async with new_worker( + client, + SignalsActivitiesTimersUpdatesTracingWorkflow, + activities=[say_hello], + task_queue=task_queue, + ): + await asyncio.sleep(0.2) + await handle.signal(SignalsActivitiesTimersUpdatesTracingWorkflow.dosig, "1") + await handle.execute_update( + SignalsActivitiesTimersUpdatesTracingWorkflow.doupdate, "1" + ) + await handle.result() + + +@workflow.defn +class ActivityAndSignalsWhileWorkflowDown: + def __init__(self) -> None: + self.events: List[str] = [] + self.counter = 0 + + @workflow.run + async def run(self, activity_tq: str) -> List[str]: + act_task = asyncio.create_task(self.run_act(activity_tq)) + await workflow.wait_condition(lambda: self.counter >= 2) + self.events.append(f"counter-{self.counter}") + await act_task + return self.events + + @workflow.signal + async def dosig(self, name: str): + self.events.append(f"sig-{name}") + self.counter += 1 + + async def run_act(self, activity_tq: str): + self.events.append("act-start") + await workflow.execute_activity( + say_hello, + "Enchi", + schedule_to_close_timeout=timedelta(seconds=30), + task_queue=activity_tq, + ) + self.counter += 1 + self.events.append("act-done") + + +async def test_alternate_async_loop_ordering(client: Client, env: WorkflowEnvironment): + """This test mostly exists to generate histories for test_replayer_alternate_async_ordering. + See that test for more.""" + + if env.supports_time_skipping: + pytest.skip("This test doesn't work right with time skipping for some reason") + task_queue = f"tq-{uuid.uuid4()}" + activity_tq = f"tq-{uuid.uuid4()}" + handle = await client.start_workflow( + ActivityAndSignalsWhileWorkflowDown.run, + activity_tq, + id=f"wf-{uuid.uuid4()}", + task_queue=task_queue, + ) + + async with new_worker( + client, + ActivityAndSignalsWhileWorkflowDown, + activities=[say_hello], + task_queue=task_queue, + ): + # This sleep exists to make sure the first WFT is processed + await asyncio.sleep(0.2) + + async with new_worker( + client, + activities=[say_hello], + task_queue=activity_tq, + ): + # Make sure the activity starts being processed before sending signals + await asyncio.sleep(1) + await handle.signal(ActivityAndSignalsWhileWorkflowDown.dosig, "1") + await handle.signal(ActivityAndSignalsWhileWorkflowDown.dosig, "2") + + async with new_worker( + client, + ActivityAndSignalsWhileWorkflowDown, + activities=[say_hello], + task_queue=task_queue, + ): + await handle.result()