Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions cadence/_internal/workflow/context.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from cadence.client import Client
from cadence.workflow import WorkflowContext, WorkflowInfo


class Context(WorkflowContext):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: importing this context from cadence._internal.workflow.context import Context will confuse people with from cadence.workflow import WorkflowContext

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think that's a misunderstanding. This is just an internal implementation that user will never get exposed to.


def __init__(self, client: Client, info: WorkflowInfo):
self._client = client
self._info = info

def info(self) -> WorkflowInfo:
return self._info

def client(self) -> Client:
return self._client
25 changes: 25 additions & 0 deletions cadence/_internal/workflow/history_event_iterator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@

from cadence.api.v1.service_worker_pb2 import PollForDecisionTaskResponse
from cadence.api.v1.service_workflow_pb2 import GetWorkflowExecutionHistoryRequest, GetWorkflowExecutionHistoryResponse
from cadence.client import Client

async def iterate_history_events(decision_task: PollForDecisionTaskResponse, client: Client):
PAGE_SIZE = 1000

current_page = decision_task.history.events
next_page_token = decision_task.next_page_token
workflow_execution = decision_task.workflow_execution

while True:
for event in current_page:
yield event
if not next_page_token:
break
response: GetWorkflowExecutionHistoryResponse = await client.workflow_stub.GetWorkflowExecutionHistory(GetWorkflowExecutionHistoryRequest(
domain=client.domain,
workflow_execution=workflow_execution,
next_page_token=next_page_token,
page_size=PAGE_SIZE,
))
current_page = response.history.events
next_page_token = response.next_page_token
Original file line number Diff line number Diff line change
@@ -1,28 +1,21 @@
from dataclasses import dataclass
from typing import Callable

from cadence._internal.workflow.context import Context
from cadence.api.v1.decision_pb2 import Decision
from cadence.client import Client
from cadence.data_converter import DataConverter
from cadence.api.v1.service_worker_pb2 import PollForDecisionTaskResponse
from cadence.workflow import WorkflowInfo

@dataclass
class WorkflowContext:
domain: str
workflow_id: str
run_id: str
client: Client
workflow_func: Callable
data_converter: DataConverter

@dataclass
class DecisionResult:
decisions: list[Decision]

class WorkflowEngine:
def __init__(self, context: WorkflowContext):
self._context = context
def __init__(self, info: WorkflowInfo, client: Client):
self._context = Context(client, info)

# TODO: Implement this
def process_decision(self, decision_task: PollForDecisionTaskResponse) -> DecisionResult:
return DecisionResult(decisions=[])
with self._context._activate():
return DecisionResult(decisions=[])
8 changes: 7 additions & 1 deletion cadence/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from cadence.api.v1.service_domain_pb2_grpc import DomainAPIStub
from cadence.api.v1.service_worker_pb2_grpc import WorkerAPIStub
from grpc.aio import Channel, ClientInterceptor, secure_channel, insecure_channel
from cadence.api.v1.service_workflow_pb2_grpc import WorkflowAPIStub
from cadence.data_converter import DataConverter, DefaultDataConverter


Expand Down Expand Up @@ -42,6 +43,7 @@ def __init__(self, **kwargs: Unpack[ClientOptions]) -> None:
self._channel = _create_channel(self._options)
self._worker_stub = WorkerAPIStub(self._channel)
self._domain_stub = DomainAPIStub(self._channel)
self._workflow_stub = WorkflowAPIStub(self._channel)

@property
def data_converter(self) -> DataConverter:
Expand All @@ -63,6 +65,10 @@ def domain_stub(self) -> DomainAPIStub:
def worker_stub(self) -> WorkerAPIStub:
return self._worker_stub

@property
def workflow_stub(self) -> WorkflowAPIStub:
return self._workflow_stub

async def ready(self) -> None:
await self._channel.channel_ready()

Expand Down Expand Up @@ -99,4 +105,4 @@ def _create_channel(options: ClientOptions) -> Channel:
if options["credentials"]:
return secure_channel(options["target"], options["credentials"], options["channel_arguments"], options["compression"], interceptors)
else:
return insecure_channel(options["target"], options["channel_arguments"], options["compression"], interceptors)
return insecure_channel(options["target"], options["channel_arguments"], options["compression"], interceptors)
39 changes: 39 additions & 0 deletions cadence/workflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
from abc import ABC, abstractmethod
from contextlib import contextmanager
from contextvars import ContextVar
from dataclasses import dataclass
from typing import Iterator

from cadence.client import Client

@dataclass
class WorkflowInfo:
workflow_type: str
workflow_domain: str
workflow_id: str
workflow_run_id: str

class WorkflowContext(ABC):
_var: ContextVar['WorkflowContext'] = ContextVar("workflow")

@abstractmethod
def info(self) -> WorkflowInfo:
...

@abstractmethod
def client(self) -> Client:
...

@contextmanager
def _activate(self) -> Iterator[None]:
token = WorkflowContext._var.set(self)
yield None
WorkflowContext._var.reset(token)

@staticmethod
def is_set() -> bool:
return WorkflowContext._var.get(None) is not None

@staticmethod
def get() -> 'WorkflowContext':
return WorkflowContext._var.get()
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import pytest
import asyncio
from cadence.workflow.deterministic_event_loop import DeterministicEventLoop
from cadence._internal.workflow.deterministic_event_loop import DeterministicEventLoop


async def coro_append(results: list, i: int):
Expand Down
178 changes: 178 additions & 0 deletions tests/cadence/_internal/workflow/test_history_event_iterator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
import pytest
from unittest.mock import Mock, AsyncMock

from cadence.client import Client
from cadence.api.v1.common_pb2 import WorkflowExecution
from cadence.api.v1.history_pb2 import HistoryEvent, History
from cadence.api.v1.service_worker_pb2 import PollForDecisionTaskResponse
from cadence.api.v1.service_workflow_pb2 import GetWorkflowExecutionHistoryResponse
from cadence._internal.workflow.history_event_iterator import iterate_history_events


@pytest.fixture
def mock_client():
"""Create a mock client with workflow_stub."""
client = Mock(spec=Client)
client.workflow_stub = AsyncMock()
client.domain = "test-domain"
return client


@pytest.fixture
def mock_workflow_execution():
"""Create a mock workflow execution."""
return WorkflowExecution(
workflow_id="test-workflow-id",
run_id="test-run-id"
)


def create_history_event(event_id: int) -> HistoryEvent:
return HistoryEvent(event_id=event_id)


async def test_iterate_history_events_single_page_no_next_token(mock_client, mock_workflow_execution):
"""Test iterating over a single page of events with no next page token."""
# Create test events
events = [
create_history_event(1),
create_history_event(2),
create_history_event(3)
]

# Create decision task response with events but no next page token
decision_task = PollForDecisionTaskResponse(
history=History(events=events),
next_page_token=b"", # Empty token means no more pages
workflow_execution=mock_workflow_execution
)

# Iterate and collect events
result_events = [e async for e in iterate_history_events(decision_task, mock_client)]

# Verify all events were returned
assert len(result_events) == 3
assert result_events[0].event_id == 1
assert result_events[1].event_id == 2
assert result_events[2].event_id == 3

# Verify no additional API calls were made
mock_client.workflow_stub.GetWorkflowExecutionHistory.assert_not_called()


async def test_iterate_history_events_empty_events(mock_client, mock_workflow_execution):
"""Test iterating over empty events list."""
# Create decision task response with no events
decision_task = PollForDecisionTaskResponse(
history=History(events=[]),
next_page_token=b"",
workflow_execution=mock_workflow_execution
)

# Iterate and collect events
result_events = [e async for e in iterate_history_events(decision_task, mock_client)]

# Verify no events were returned
assert len(result_events) == 0

# Verify no additional API calls were made
mock_client.workflow_stub.GetWorkflowExecutionHistory.assert_not_called()

async def test_iterate_history_events_multiple_pages(mock_client, mock_workflow_execution):
"""Test iterating over multiple pages of events."""

# Create decision task response with first page and next page token
decision_task = PollForDecisionTaskResponse(
history=History(events=[
create_history_event(1),
create_history_event(2)
]),
next_page_token=b"page2_token",
workflow_execution=mock_workflow_execution
)

# Mock the subsequent API calls
second_response = GetWorkflowExecutionHistoryResponse(
history=History(events=[
create_history_event(3),
create_history_event(4)
]),
next_page_token=b"page3_token"
)

third_response = GetWorkflowExecutionHistoryResponse(
history=History(events=[
create_history_event(5)
]),
next_page_token=b"" # No more pages
)

# Configure mock to return responses in sequence
mock_client.workflow_stub.GetWorkflowExecutionHistory.side_effect = [
second_response,
third_response
]

# Iterate and collect events
result_events = [e async for e in iterate_history_events(decision_task, mock_client)]

# Verify all events from all pages were returned
assert len(result_events) == 5
assert result_events[0].event_id == 1
assert result_events[1].event_id == 2
assert result_events[2].event_id == 3
assert result_events[3].event_id == 4
assert result_events[4].event_id == 5

# Verify correct API calls were made
assert mock_client.workflow_stub.GetWorkflowExecutionHistory.call_count == 2

# Verify first API call
first_call = mock_client.workflow_stub.GetWorkflowExecutionHistory.call_args_list[0]
first_request = first_call[0][0]
assert first_request.domain == "test-domain"
assert first_request.workflow_execution == mock_workflow_execution
assert first_request.next_page_token == b"page2_token"
assert first_request.page_size == 1000

# Verify second API call
second_call = mock_client.workflow_stub.GetWorkflowExecutionHistory.call_args_list[1]
second_request = second_call[0][0]
assert second_request.domain == "test-domain"
assert second_request.workflow_execution == mock_workflow_execution
assert second_request.next_page_token == b"page3_token"
assert second_request.page_size == 1000

async def test_iterate_history_events_single_page_with_next_token_then_empty(mock_client, mock_workflow_execution):
"""Test case where first page has next token but second page is empty."""
# Create first page of events
first_page_events = [
create_history_event(1),
create_history_event(2)
]

# Create decision task response with first page and next page token
decision_task = PollForDecisionTaskResponse(
history=History(events=first_page_events),
next_page_token=b"page2_token",
workflow_execution=mock_workflow_execution
)

# Mock the second API call to return empty page
second_response = GetWorkflowExecutionHistoryResponse(
history=History(events=[]),
next_page_token=b"" # No more pages
)

mock_client.workflow_stub.GetWorkflowExecutionHistory.return_value = second_response

# Iterate and collect events
result_events = [e async for e in iterate_history_events(decision_task, mock_client)]

# Verify only first page events were returned
assert len(result_events) == 2
assert result_events[0].event_id == 1
assert result_events[1].event_id == 2

# Verify one API call was made
assert mock_client.workflow_stub.GetWorkflowExecutionHistory.call_count == 1