Skip to content
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
543d5c4
Remove pyright exclusion for temporalio/bridge/worker.py and fix type…
tconley1428 Dec 2, 2025
cef1ff9
Remove pyright exclusion for temporalio/worker/_replayer.py and fix t…
tconley1428 Dec 2, 2025
6c94f4e
Remove pyright exclusion for temporalio/worker/_worker.py and fix typ…
tconley1428 Dec 3, 2025
e34a967
Remove pyright exclusion for temporalio/worker/workflow_sandbox/_impo…
tconley1428 Dec 3, 2025
6f4c651
Remove pyright exclusion for temporalio/worker/workflow_sandbox/_rest…
tconley1428 Dec 3, 2025
c20cf0b
Remove pyright exclusion for temporalio/workflow.py and fix type errors
tconley1428 Dec 3, 2025
86ba426
Clean up after claude
tconley1428 Dec 3, 2025
aace0eb
Remove pyright exclusion for tests/api/test_grpc_stub.py and fix type…
tconley1428 Dec 3, 2025
240d8b5
Remove pyright exclusion for tests/conftest.py and fix type error
tconley1428 Dec 3, 2025
a8034d7
Remove pyright exclusion for tests/contrib/test_opentelemetry.py
tconley1428 Dec 3, 2025
957c4bf
Remove pyright exclusion for tests/contrib/pydantic/models.py
tconley1428 Dec 3, 2025
fcad786
Remove pyright exclusion for tests/contrib/pydantic/models_2.py
tconley1428 Dec 3, 2025
faabe0f
Remove pyright exclusion for tests/contrib/pydantic/test_pydantic.py
tconley1428 Dec 3, 2025
4243766
Remove pyright exclusion for tests/contrib/pydantic/workflows.py
tconley1428 Dec 3, 2025
cf14c92
Remove pyright exclusion for tests/test_converter.py and fix type error
tconley1428 Dec 3, 2025
4fdb431
Remove pyright exclusion for tests/test_service.py and fix type errors
tconley1428 Dec 3, 2025
a5e3642
Remove pyright exclusion for tests/worker/test_activity.py and add mi…
tconley1428 Dec 3, 2025
f669d2b
Remove pyright exclusion for tests/worker/workflow_sandbox/test_impor…
tconley1428 Dec 3, 2025
4d0dfb9
Remove pyright exclusion for tests/worker/workflow_sandbox/test_restr…
tconley1428 Dec 3, 2025
02e69e3
Cleanup
tconley1428 Dec 3, 2025
00d0970
Fix on 3.10
tconley1428 Dec 3, 2025
eeeb627
Merge branch 'main' into pyright_exclusions
tconley1428 Dec 4, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 1 addition & 25 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -199,34 +199,10 @@ reportUnnecessaryTypeIgnoreComment = "none"
reportUnusedCallResult = "none"
include = ["temporalio", "tests"]
exclude = [
# Exclude auto generated files
"temporalio/api",
"temporalio/bridge/proto",
"tests/worker/workflow_sandbox/testmodules/proto",
"temporalio/bridge/worker.py",
"temporalio/worker/_replayer.py",
"temporalio/worker/_worker.py",
"temporalio/worker/workflow_sandbox/_importer.py",
"temporalio/worker/workflow_sandbox/_restrictions.py",
"temporalio/workflow.py",
"tests/api/test_grpc_stub.py",
"tests/conftest.py",
"tests/contrib/test_opentelemetry.py",
"tests/contrib/pydantic/models.py",
"tests/contrib/pydantic/models_2.py",
"tests/contrib/pydantic/test_pydantic.py",
"tests/contrib/pydantic/workflows.py",
"tests/test_converter.py",
"tests/test_service.py",
"tests/worker/test_activity.py",
"tests/worker/workflow_sandbox/test_importer.py",
"tests/worker/workflow_sandbox/test_restrictions.py",
# TODO: these pass locally but fail in CI with
# error: Import "temporalio.bridge.temporal_sdk_bridge" could not be resolved
"temporalio/bridge/client.py",
"temporalio/bridge/metric.py",
"temporalio/bridge/runtime.py",
"temporalio/bridge/testing.py",
"temporalio/envconfig.py",
]

[tool.ruff]
Expand Down
24 changes: 12 additions & 12 deletions temporalio/bridge/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,15 +213,15 @@ def __init__(self, ref: temporalio.bridge.temporal_sdk_bridge.WorkerRef) -> None

async def validate(self) -> None:
"""Validate the bridge worker."""
await self._ref.validate()
await self._ref.validate() # type: ignore[reportOptionalMemberAccess]

async def poll_workflow_activation(
self,
) -> temporalio.bridge.proto.workflow_activation.WorkflowActivation:
"""Poll for a workflow activation."""
return (
temporalio.bridge.proto.workflow_activation.WorkflowActivation.FromString(
await self._ref.poll_workflow_activation()
await self._ref.poll_workflow_activation() # type: ignore[reportOptionalMemberAccess]
)
)

Expand All @@ -230,53 +230,53 @@ async def poll_activity_task(
) -> temporalio.bridge.proto.activity_task.ActivityTask:
"""Poll for an activity task."""
return temporalio.bridge.proto.activity_task.ActivityTask.FromString(
await self._ref.poll_activity_task()
await self._ref.poll_activity_task() # type: ignore[reportOptionalMemberAccess]
)

async def poll_nexus_task(
self,
) -> temporalio.bridge.proto.nexus.NexusTask:
"""Poll for a nexus task."""
return temporalio.bridge.proto.nexus.NexusTask.FromString(
await self._ref.poll_nexus_task()
await self._ref.poll_nexus_task() # type: ignore[reportOptionalMemberAccess]
)

async def complete_workflow_activation(
self,
comp: temporalio.bridge.proto.workflow_completion.WorkflowActivationCompletion,
) -> None:
"""Complete a workflow activation."""
await self._ref.complete_workflow_activation(comp.SerializeToString())
await self._ref.complete_workflow_activation(comp.SerializeToString()) # type: ignore[reportOptionalMemberAccess]

async def complete_activity_task(
self, comp: temporalio.bridge.proto.ActivityTaskCompletion
) -> None:
"""Complete an activity task."""
await self._ref.complete_activity_task(comp.SerializeToString())
await self._ref.complete_activity_task(comp.SerializeToString()) # type: ignore[reportOptionalMemberAccess]

async def complete_nexus_task(
self, comp: temporalio.bridge.proto.nexus.NexusTaskCompletion
) -> None:
"""Complete a nexus task."""
await self._ref.complete_nexus_task(comp.SerializeToString())
await self._ref.complete_nexus_task(comp.SerializeToString()) # type: ignore[reportOptionalMemberAccess]

def record_activity_heartbeat(
self, comp: temporalio.bridge.proto.ActivityHeartbeat
) -> None:
"""Record an activity heartbeat."""
self._ref.record_activity_heartbeat(comp.SerializeToString())
self._ref.record_activity_heartbeat(comp.SerializeToString()) # type: ignore[reportOptionalMemberAccess]

def request_workflow_eviction(self, run_id: str) -> None:
"""Request a workflow be evicted."""
self._ref.request_workflow_eviction(run_id)
self._ref.request_workflow_eviction(run_id) # type: ignore[reportOptionalMemberAccess]

def replace_client(self, client: temporalio.bridge.client.Client) -> None:
"""Replace the worker client."""
self._ref.replace_client(client._ref)
self._ref.replace_client(client._ref) # type: ignore[reportOptionalMemberAccess]

def initiate_shutdown(self) -> None:
"""Start shutdown of the worker."""
self._ref.initiate_shutdown()
self._ref.initiate_shutdown() # type: ignore[reportOptionalMemberAccess]

async def finalize_shutdown(self) -> None:
"""Finalize the worker.
Expand All @@ -286,7 +286,7 @@ async def finalize_shutdown(self) -> None:
"""
ref = self._ref
self._ref = None
await ref.finalize_shutdown()
await ref.finalize_shutdown() # type: ignore[reportOptionalMemberAccess]


class _Visitor(VisitorFunctions):
Expand Down
69 changes: 45 additions & 24 deletions temporalio/worker/_replayer.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import temporalio.client
import temporalio.converter
import temporalio.runtime
import temporalio.worker
import temporalio.workflow

from ..common import HeaderCodecBehavior
Expand Down Expand Up @@ -89,7 +90,7 @@ def __init__(
self._config = plugin.configure_replayer(self._config)

# Validate workflows after plugin configuration
if not self._config["workflows"]:
if not self._config.get("workflows"):
raise ValueError("At least one workflow must be specified")

def config(self, *, active_config: bool = False) -> ReplayerConfig:
Expand All @@ -103,7 +104,7 @@ def config(self, *, active_config: bool = False) -> ReplayerConfig:
Configuration, shallow-copied.
"""
config = self._config.copy() if active_config else self._initial_config.copy()
config["workflows"] = list(config["workflows"])
config["workflows"] = list(config.get("workflows", []))
return config

async def replay_workflow(
Expand Down Expand Up @@ -191,6 +192,11 @@ def make_lambda(plugin, next):
async def _workflow_replay_iterator(
self, histories: AsyncIterator[temporalio.client.WorkflowHistory]
) -> AsyncIterator[AsyncIterator[WorkflowReplayResult]]:
# Initialize variables to avoid unbound variable errors
pusher = None
workflow_worker_task = None
bridge_worker_scope = None

try:
last_replay_failure: Optional[Exception]
last_replay_complete = asyncio.Event()
Expand Down Expand Up @@ -223,39 +229,50 @@ def on_eviction_hook(

# Create worker referencing bridge worker
bridge_worker: temporalio.bridge.worker.Worker
task_queue = f"replay-{self._config['build_id']}"
runtime = self._config["runtime"] or temporalio.runtime.Runtime.default()
task_queue = f"replay-{self._config.get('build_id')}"
runtime = (
self._config.get("runtime") or temporalio.runtime.Runtime.default()
)
workflow_worker = _WorkflowWorker(
bridge_worker=lambda: bridge_worker,
namespace=self._config["namespace"],
namespace=self._config.get("namespace", "ReplayNamespace"),
task_queue=task_queue,
workflows=self._config["workflows"],
workflow_task_executor=self._config["workflow_task_executor"],
workflows=self._config.get("workflows", []),
workflow_task_executor=self._config.get("workflow_task_executor"),
max_concurrent_workflow_tasks=5,
workflow_runner=self._config["workflow_runner"],
unsandboxed_workflow_runner=self._config["unsandboxed_workflow_runner"],
data_converter=self._config["data_converter"],
interceptors=self._config["interceptors"],
workflow_failure_exception_types=self._config[
"workflow_failure_exception_types"
],
debug_mode=self._config["debug_mode"],
workflow_runner=self._config.get("workflow_runner")
or SandboxedWorkflowRunner(),
unsandboxed_workflow_runner=self._config.get(
"unsandboxed_workflow_runner"
)
or UnsandboxedWorkflowRunner(),
data_converter=self._config.get("data_converter")
or temporalio.converter.DataConverter.default,
interceptors=self._config.get("interceptors", []),
workflow_failure_exception_types=self._config.get(
"workflow_failure_exception_types", []
),
debug_mode=self._config.get("debug_mode", False),
metric_meter=runtime.metric_meter,
on_eviction_hook=on_eviction_hook,
disable_eager_activity_execution=False,
disable_safe_eviction=self._config["disable_safe_workflow_eviction"],
disable_safe_eviction=self._config.get(
"disable_safe_workflow_eviction", False
),
should_enforce_versioning_behavior=False,
assert_local_activity_valid=lambda a: None,
encode_headers=self._config["header_codec_behavior"]
encode_headers=self._config.get(
"header_codec_behavior", HeaderCodecBehavior.NO_CODEC
)
!= HeaderCodecBehavior.NO_CODEC,
)
# Create bridge worker
bridge_worker, pusher = temporalio.bridge.worker.Worker.for_replay(
runtime._core_runtime,
temporalio.bridge.worker.WorkerConfig(
namespace=self._config["namespace"],
namespace=self._config.get("namespace", "ReplayNamespace"),
task_queue=task_queue,
identity_override=self._config["identity"],
identity_override=self._config.get("identity"),
# Need to tell core whether we want to consider all
# non-determinism exceptions as workflow fail, and whether we do
# per workflow type
Expand Down Expand Up @@ -292,7 +309,7 @@ def on_eviction_hook(
max_task_queue_activities_per_second=None,
graceful_shutdown_period_millis=0,
versioning_strategy=temporalio.bridge.worker.WorkerVersioningStrategyNone(
build_id_no_versioning=self._config["build_id"]
build_id_no_versioning=self._config.get("build_id")
or load_default_build_id(),
),
workflow_task_poller_behavior=temporalio.bridge.worker.PollerBehaviorSimpleMaximum(
Expand All @@ -307,6 +324,8 @@ def on_eviction_hook(
plugins=[plugin.name() for plugin in self.plugins],
),
)
bridge_worker_scope = bridge_worker

# Start worker
workflow_worker_task = asyncio.create_task(workflow_worker.run())

Expand Down Expand Up @@ -347,18 +366,20 @@ async def replay_iterator() -> AsyncIterator[WorkflowReplayResult]:
yield replay_iterator()
finally:
# Close the pusher
pusher.close()
if pusher is not None:
pusher.close()
# If the workflow worker task is not done, wait for it
try:
if not workflow_worker_task.done():
if workflow_worker_task is not None and not workflow_worker_task.done():
await workflow_worker_task
except Exception:
logger.warning("Failed to shutdown worker", exc_info=True)
finally:
# We must shutdown here
try:
bridge_worker.initiate_shutdown()
await bridge_worker.finalize_shutdown()
if bridge_worker_scope is not None:
bridge_worker_scope.initiate_shutdown()
await bridge_worker_scope.finalize_shutdown()
except Exception:
logger.warning("Failed to finalize shutdown", exc_info=True)

Expand Down
Loading
Loading