diff --git a/cadence/_internal/workflow/context.py b/cadence/_internal/workflow/context.py new file mode 100644 index 0000000..5038e48 --- /dev/null +++ b/cadence/_internal/workflow/context.py @@ -0,0 +1,15 @@ +from cadence.client import Client +from cadence.workflow import WorkflowContext, WorkflowInfo + + +class Context(WorkflowContext): + + def __init__(self, client: Client, info: WorkflowInfo): + self._client = client + self._info = info + + def info(self) -> WorkflowInfo: + return self._info + + def client(self) -> Client: + return self._client diff --git a/cadence/workflow/deterministic_event_loop.py b/cadence/_internal/workflow/deterministic_event_loop.py similarity index 100% rename from cadence/workflow/deterministic_event_loop.py rename to cadence/_internal/workflow/deterministic_event_loop.py diff --git a/cadence/_internal/workflow/history_event_iterator.py b/cadence/_internal/workflow/history_event_iterator.py new file mode 100644 index 0000000..3d99497 --- /dev/null +++ b/cadence/_internal/workflow/history_event_iterator.py @@ -0,0 +1,25 @@ + +from cadence.api.v1.service_worker_pb2 import PollForDecisionTaskResponse +from cadence.api.v1.service_workflow_pb2 import GetWorkflowExecutionHistoryRequest, GetWorkflowExecutionHistoryResponse +from cadence.client import Client + +async def iterate_history_events(decision_task: PollForDecisionTaskResponse, client: Client): + PAGE_SIZE = 1000 + + current_page = decision_task.history.events + next_page_token = decision_task.next_page_token + workflow_execution = decision_task.workflow_execution + + while True: + for event in current_page: + yield event + if not next_page_token: + break + response: GetWorkflowExecutionHistoryResponse = await client.workflow_stub.GetWorkflowExecutionHistory(GetWorkflowExecutionHistoryRequest( + domain=client.domain, + workflow_execution=workflow_execution, + next_page_token=next_page_token, + page_size=PAGE_SIZE, + )) + current_page = response.history.events + next_page_token = response.next_page_token diff --git a/cadence/workflow/workflow_engine.py b/cadence/_internal/workflow/workflow_engine.py similarity index 52% rename from cadence/workflow/workflow_engine.py rename to cadence/_internal/workflow/workflow_engine.py index bfde8c3..8c2f8da 100644 --- a/cadence/workflow/workflow_engine.py +++ b/cadence/_internal/workflow/workflow_engine.py @@ -1,28 +1,21 @@ from dataclasses import dataclass -from typing import Callable +from cadence._internal.workflow.context import Context from cadence.api.v1.decision_pb2 import Decision from cadence.client import Client -from cadence.data_converter import DataConverter from cadence.api.v1.service_worker_pb2 import PollForDecisionTaskResponse +from cadence.workflow import WorkflowInfo -@dataclass -class WorkflowContext: - domain: str - workflow_id: str - run_id: str - client: Client - workflow_func: Callable - data_converter: DataConverter @dataclass class DecisionResult: decisions: list[Decision] class WorkflowEngine: - def __init__(self, context: WorkflowContext): - self._context = context + def __init__(self, info: WorkflowInfo, client: Client): + self._context = Context(client, info) # TODO: Implement this def process_decision(self, decision_task: PollForDecisionTaskResponse) -> DecisionResult: - return DecisionResult(decisions=[]) + with self._context._activate(): + return DecisionResult(decisions=[]) diff --git a/cadence/client.py b/cadence/client.py index 8294da5..10abc39 100644 --- a/cadence/client.py +++ b/cadence/client.py @@ -10,6 +10,7 @@ from cadence.api.v1.service_domain_pb2_grpc import DomainAPIStub from cadence.api.v1.service_worker_pb2_grpc import WorkerAPIStub from grpc.aio import Channel, ClientInterceptor, secure_channel, insecure_channel +from cadence.api.v1.service_workflow_pb2_grpc import WorkflowAPIStub from cadence.data_converter import DataConverter, DefaultDataConverter @@ -42,6 +43,7 @@ def __init__(self, **kwargs: Unpack[ClientOptions]) -> None: self._channel = _create_channel(self._options) self._worker_stub = WorkerAPIStub(self._channel) self._domain_stub = DomainAPIStub(self._channel) + self._workflow_stub = WorkflowAPIStub(self._channel) @property def data_converter(self) -> DataConverter: @@ -63,6 +65,10 @@ def domain_stub(self) -> DomainAPIStub: def worker_stub(self) -> WorkerAPIStub: return self._worker_stub + @property + def workflow_stub(self) -> WorkflowAPIStub: + return self._workflow_stub + async def ready(self) -> None: await self._channel.channel_ready() @@ -99,4 +105,4 @@ def _create_channel(options: ClientOptions) -> Channel: if options["credentials"]: return secure_channel(options["target"], options["credentials"], options["channel_arguments"], options["compression"], interceptors) else: - return insecure_channel(options["target"], options["channel_arguments"], options["compression"], interceptors) \ No newline at end of file + return insecure_channel(options["target"], options["channel_arguments"], options["compression"], interceptors) diff --git a/cadence/workflow.py b/cadence/workflow.py new file mode 100644 index 0000000..51b968f --- /dev/null +++ b/cadence/workflow.py @@ -0,0 +1,39 @@ +from abc import ABC, abstractmethod +from contextlib import contextmanager +from contextvars import ContextVar +from dataclasses import dataclass +from typing import Iterator + +from cadence.client import Client + +@dataclass +class WorkflowInfo: + workflow_type: str + workflow_domain: str + workflow_id: str + workflow_run_id: str + +class WorkflowContext(ABC): + _var: ContextVar['WorkflowContext'] = ContextVar("workflow") + + @abstractmethod + def info(self) -> WorkflowInfo: + ... + + @abstractmethod + def client(self) -> Client: + ... + + @contextmanager + def _activate(self) -> Iterator[None]: + token = WorkflowContext._var.set(self) + yield None + WorkflowContext._var.reset(token) + + @staticmethod + def is_set() -> bool: + return WorkflowContext._var.get(None) is not None + + @staticmethod + def get() -> 'WorkflowContext': + return WorkflowContext._var.get() diff --git a/tests/cadence/workflow/test_deterministic_event_loop.py b/tests/cadence/_internal/workflow/test_deterministic_event_loop.py similarity index 96% rename from tests/cadence/workflow/test_deterministic_event_loop.py rename to tests/cadence/_internal/workflow/test_deterministic_event_loop.py index 51fcb68..9d2e7fb 100644 --- a/tests/cadence/workflow/test_deterministic_event_loop.py +++ b/tests/cadence/_internal/workflow/test_deterministic_event_loop.py @@ -1,6 +1,6 @@ import pytest import asyncio -from cadence.workflow.deterministic_event_loop import DeterministicEventLoop +from cadence._internal.workflow.deterministic_event_loop import DeterministicEventLoop async def coro_append(results: list, i: int): diff --git a/tests/cadence/_internal/workflow/test_history_event_iterator.py b/tests/cadence/_internal/workflow/test_history_event_iterator.py new file mode 100644 index 0000000..430f4cf --- /dev/null +++ b/tests/cadence/_internal/workflow/test_history_event_iterator.py @@ -0,0 +1,178 @@ +import pytest +from unittest.mock import Mock, AsyncMock + +from cadence.client import Client +from cadence.api.v1.common_pb2 import WorkflowExecution +from cadence.api.v1.history_pb2 import HistoryEvent, History +from cadence.api.v1.service_worker_pb2 import PollForDecisionTaskResponse +from cadence.api.v1.service_workflow_pb2 import GetWorkflowExecutionHistoryResponse +from cadence._internal.workflow.history_event_iterator import iterate_history_events + + +@pytest.fixture +def mock_client(): + """Create a mock client with workflow_stub.""" + client = Mock(spec=Client) + client.workflow_stub = AsyncMock() + client.domain = "test-domain" + return client + + +@pytest.fixture +def mock_workflow_execution(): + """Create a mock workflow execution.""" + return WorkflowExecution( + workflow_id="test-workflow-id", + run_id="test-run-id" + ) + + +def create_history_event(event_id: int) -> HistoryEvent: + return HistoryEvent(event_id=event_id) + + +async def test_iterate_history_events_single_page_no_next_token(mock_client, mock_workflow_execution): + """Test iterating over a single page of events with no next page token.""" + # Create test events + events = [ + create_history_event(1), + create_history_event(2), + create_history_event(3) + ] + + # Create decision task response with events but no next page token + decision_task = PollForDecisionTaskResponse( + history=History(events=events), + next_page_token=b"", # Empty token means no more pages + workflow_execution=mock_workflow_execution + ) + + # Iterate and collect events + result_events = [e async for e in iterate_history_events(decision_task, mock_client)] + + # Verify all events were returned + assert len(result_events) == 3 + assert result_events[0].event_id == 1 + assert result_events[1].event_id == 2 + assert result_events[2].event_id == 3 + + # Verify no additional API calls were made + mock_client.workflow_stub.GetWorkflowExecutionHistory.assert_not_called() + + +async def test_iterate_history_events_empty_events(mock_client, mock_workflow_execution): + """Test iterating over empty events list.""" + # Create decision task response with no events + decision_task = PollForDecisionTaskResponse( + history=History(events=[]), + next_page_token=b"", + workflow_execution=mock_workflow_execution + ) + + # Iterate and collect events + result_events = [e async for e in iterate_history_events(decision_task, mock_client)] + + # Verify no events were returned + assert len(result_events) == 0 + + # Verify no additional API calls were made + mock_client.workflow_stub.GetWorkflowExecutionHistory.assert_not_called() + +async def test_iterate_history_events_multiple_pages(mock_client, mock_workflow_execution): + """Test iterating over multiple pages of events.""" + + # Create decision task response with first page and next page token + decision_task = PollForDecisionTaskResponse( + history=History(events=[ + create_history_event(1), + create_history_event(2) + ]), + next_page_token=b"page2_token", + workflow_execution=mock_workflow_execution + ) + + # Mock the subsequent API calls + second_response = GetWorkflowExecutionHistoryResponse( + history=History(events=[ + create_history_event(3), + create_history_event(4) + ]), + next_page_token=b"page3_token" + ) + + third_response = GetWorkflowExecutionHistoryResponse( + history=History(events=[ + create_history_event(5) + ]), + next_page_token=b"" # No more pages + ) + + # Configure mock to return responses in sequence + mock_client.workflow_stub.GetWorkflowExecutionHistory.side_effect = [ + second_response, + third_response + ] + + # Iterate and collect events + result_events = [e async for e in iterate_history_events(decision_task, mock_client)] + + # Verify all events from all pages were returned + assert len(result_events) == 5 + assert result_events[0].event_id == 1 + assert result_events[1].event_id == 2 + assert result_events[2].event_id == 3 + assert result_events[3].event_id == 4 + assert result_events[4].event_id == 5 + + # Verify correct API calls were made + assert mock_client.workflow_stub.GetWorkflowExecutionHistory.call_count == 2 + + # Verify first API call + first_call = mock_client.workflow_stub.GetWorkflowExecutionHistory.call_args_list[0] + first_request = first_call[0][0] + assert first_request.domain == "test-domain" + assert first_request.workflow_execution == mock_workflow_execution + assert first_request.next_page_token == b"page2_token" + assert first_request.page_size == 1000 + + # Verify second API call + second_call = mock_client.workflow_stub.GetWorkflowExecutionHistory.call_args_list[1] + second_request = second_call[0][0] + assert second_request.domain == "test-domain" + assert second_request.workflow_execution == mock_workflow_execution + assert second_request.next_page_token == b"page3_token" + assert second_request.page_size == 1000 + +async def test_iterate_history_events_single_page_with_next_token_then_empty(mock_client, mock_workflow_execution): + """Test case where first page has next token but second page is empty.""" + # Create first page of events + first_page_events = [ + create_history_event(1), + create_history_event(2) + ] + + # Create decision task response with first page and next page token + decision_task = PollForDecisionTaskResponse( + history=History(events=first_page_events), + next_page_token=b"page2_token", + workflow_execution=mock_workflow_execution + ) + + # Mock the second API call to return empty page + second_response = GetWorkflowExecutionHistoryResponse( + history=History(events=[]), + next_page_token=b"" # No more pages + ) + + mock_client.workflow_stub.GetWorkflowExecutionHistory.return_value = second_response + + # Iterate and collect events + result_events = [e async for e in iterate_history_events(decision_task, mock_client)] + + # Verify only first page events were returned + assert len(result_events) == 2 + assert result_events[0].event_id == 1 + assert result_events[1].event_id == 2 + + # Verify one API call was made + assert mock_client.workflow_stub.GetWorkflowExecutionHistory.call_count == 1