Skip to content

Commit b1e6d76

Browse files
committed
Revert debug stuff. Update core to latest
1 parent 50ccb37 commit b1e6d76

File tree

7 files changed

+21
-72
lines changed

7 files changed

+21
-72
lines changed

temporalio/bridge/Cargo.lock

Lines changed: 0 additions & 7 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

temporalio/bridge/Cargo.toml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,14 +27,14 @@ pyo3 = { version = "0.25", features = [
2727
] }
2828
pyo3-async-runtimes = { version = "0.25", features = ["tokio-runtime"] }
2929
pythonize = "0.25"
30-
temporal-client = { git = "https://github.com/temporalio/sdk-core", branch = "shutdown_notify" }
31-
temporal-sdk-core = { git = "https://github.com/temporalio/sdk-core", branch = "shutdown_notify", features = [
30+
temporal-client = { version = "0.1.0", path = "./sdk-core/client" }
31+
temporal-sdk-core = { version = "0.1.0", path = "./sdk-core/core", features = [
3232
"ephemeral-server",
3333
] }
34-
temporal-sdk-core-api = { git = "https://github.com/temporalio/sdk-core", branch = "shutdown_notify", features = [
34+
temporal-sdk-core-api = { version = "0.1.0", path = "./sdk-core/core-api", features = [
3535
"envconfig",
3636
] }
37-
temporal-sdk-core-protos = { git = "https://github.com/temporalio/sdk-core", branch = "shutdown_notify" }
37+
temporal-sdk-core-protos = { version = "0.1.0", path = "./sdk-core/sdk-core-protos" }
3838
tokio = "1.26"
3939
tokio-stream = "0.1"
4040
tonic = "0.13"

temporalio/bridge/worker.py

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55

66
from __future__ import annotations
77

8-
import logging
98
from dataclasses import dataclass
109
from typing import (
1110
TYPE_CHECKING,
@@ -43,8 +42,6 @@
4342
)
4443
from temporalio.bridge.temporal_sdk_bridge import PollShutdownError # type: ignore
4544

46-
logger = logging.getLogger(__name__)
47-
4845

4946
@dataclass
5047
class WorkerConfig:
@@ -222,8 +219,6 @@ async def poll_activity_task(
222219
self,
223220
) -> temporalio.bridge.proto.activity_task.ActivityTask:
224221
"""Poll for an activity task."""
225-
logger.info("Polling for activity task")
226-
227222
return temporalio.bridge.proto.activity_task.ActivityTask.FromString(
228223
await self._ref.poll_activity_task()
229224
)
@@ -247,7 +242,6 @@ async def complete_activity_task(
247242
self, comp: temporalio.bridge.proto.ActivityTaskCompletion
248243
) -> None:
249244
"""Complete an activity task."""
250-
logger.info("Completing activity task: %s", comp.task_token)
251245
await self._ref.complete_activity_task(comp.SerializeToString())
252246

253247
async def complete_nexus_task(

temporalio/worker/_activity.py

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -169,13 +169,10 @@ async def raise_from_exception_queue() -> NoReturn:
169169
else:
170170
raise RuntimeError(f"Unrecognized activity task: {task}")
171171
except temporalio.bridge.worker.PollShutdownError: # type: ignore[reportPrivateLocalImportUsage]
172-
logger.info("Activity worker finished1")
173172
exception_task.cancel()
174-
logger.info("Activity worker finished2")
175173
return
176174
except Exception as err:
177175
exception_task.cancel()
178-
logger.info("Activity worker failed: %s", err)
179176
raise RuntimeError("Activity worker failed") from err
180177

181178
def notify_shutdown(self) -> None:
@@ -186,18 +183,14 @@ def notify_shutdown(self) -> None:
186183
async def drain_poll_queue(self) -> None:
187184
while True:
188185
try:
189-
logger.info("Poll activity tasks")
190-
191186
# Just take all tasks and say we can't handle them
192187
task = await self._bridge_worker().poll_activity_task()
193188
completion = temporalio.bridge.proto.ActivityTaskCompletion( # type: ignore[reportAttributeAccessIssue]
194189
task_token=task.task_token
195190
)
196191
completion.result.failed.failure.message = "Worker shutting down"
197-
logger.info("Complete activity task")
198192
await self._bridge_worker().complete_activity_task(completion)
199193
except temporalio.bridge.worker.PollShutdownError: # type: ignore[reportPrivateLocalImportUsage]
200-
logger.info("Return from drain")
201194
return
202195

203196
# Only call this after run()/drain_poll_queue() have returned. This will not

temporalio/worker/_worker.py

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -755,34 +755,25 @@ async def raise_on_shutdown():
755755
# Initiate core worker shutdown
756756
self._bridge_worker.initiate_shutdown()
757757

758-
logger.info("Bridge shut down")
759-
760758
# If any worker task had an exception, replace that task with a queue drain
761759
for worker, task in tasks.items():
762760
if worker and task.done() and task.exception():
763-
logger.info("Adding a drain task")
764761
tasks[worker] = asyncio.create_task(worker.drain_poll_queue())
765762

766-
logger.info("Task drain done")
767-
768763
# Notify shutdown occurring
769764
if self._activity_worker:
770765
self._activity_worker.notify_shutdown()
771766
if self._workflow_worker:
772767
self._workflow_worker.notify_shutdown()
773-
logger.info("workers notified")
774768

775769
# Wait for all tasks to complete (i.e. for poller loops to stop)
776770
await asyncio.wait(tasks.values())
777-
logger.info("tasks waited")
778-
779771
# Sometimes both workers throw an exception and since we only take the
780772
# first, Python may complain with "Task exception was never retrieved"
781773
# if we don't get the others. Therefore we call cancel on each task
782774
# which suppresses this.
783775
for task in tasks.values():
784776
task.cancel()
785-
logger.info("tasks cancelled")
786777

787778
# Let all activity / nexus operations completions finish. We cannot guarantee that
788779
# because poll shutdown completed (which means activities/operations completed)
@@ -791,7 +782,6 @@ async def raise_on_shutdown():
791782
await self._activity_worker.wait_all_completed()
792783
if self._nexus_worker:
793784
await self._nexus_worker.wait_all_completed()
794-
logger.info("Waited for workers")
795785

796786
# Do final shutdown
797787
try:
@@ -800,7 +790,6 @@ async def raise_on_shutdown():
800790
# Ignore errors here that can arise in some tests where the bridge
801791
# worker still has a reference
802792
pass
803-
logger.info("Set shutdown complete")
804793

805794
# Mark as shutdown complete and re-raise exception if present
806795
self._shutdown_complete_event.set()
@@ -858,8 +847,6 @@ async def __aexit__(self, exc_type: Optional[Type[BaseException]], *args) -> Non
858847
if not self._async_context_run_task:
859848
raise RuntimeError("Never started")
860849
await self.shutdown()
861-
logger.info("Shutdown complete")
862-
863850
# Cancel our run task
864851
self._async_context_run_task.cancel()
865852
# Only re-raise our exception if present and exc_type is cancel

tests/worker/test_workflow.py

Lines changed: 16 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -152,8 +152,6 @@
152152
with workflow.unsafe.imports_passed_through():
153153
import pytest
154154

155-
logger = logging.getLogger(__name__)
156-
157155

158156
@workflow.defn
159157
class HelloWorkflow:
@@ -890,9 +888,7 @@ async def wait_cancel(self) -> str:
890888
except asyncio.CancelledError:
891889
return "Got cancelled error, cancelled? " + str(activity.is_cancelled())
892890
finally:
893-
logger.info("ActivityWaitCancelNotify setting event")
894891
self.wait_cancel_complete.set()
895-
logger.info("ActivityWaitCancelNotify set event")
896892

897893

898894
@dataclass
@@ -953,8 +949,6 @@ async def test_workflow_cancel_activity(client: Client, local: bool):
953949
async with new_worker(
954950
client, CancelActivityWorkflow, activities=[activity_inst.wait_cancel]
955951
) as worker:
956-
logger.info("Start first workflow")
957-
958952
# Try cancel - confirm error and activity was sent the cancel
959953
handle = await client.start_workflow(
960954
CancelActivityWorkflow.run,
@@ -970,18 +964,12 @@ async def test_workflow_cancel_activity(client: Client, local: bool):
970964
async def activity_result() -> str:
971965
return await handle.query(CancelActivityWorkflow.activity_result)
972966

973-
logger.info("Wait for cancelled")
974967
await assert_eq_eventually(
975968
"Error: CancelledError", activity_result, timeout=assert_timeout
976969
)
977-
978-
logger.info("Wait for activity complete")
979970
await activity_inst.wait_cancel_complete.wait()
980-
981-
logger.info("Wait for workflow cancel")
982971
await handle.cancel()
983972

984-
logger.info("Start second workflow")
985973
# Wait cancel - confirm no error due to graceful cancel handling
986974
handle = await client.start_workflow(
987975
CancelActivityWorkflow.run,
@@ -993,18 +981,14 @@ async def activity_result() -> str:
993981
task_queue=worker.task_queue,
994982
task_timeout=task_timeout,
995983
)
996-
logger.info("Wait for cancelled")
997984
await assert_eq_eventually(
998985
"Got cancelled error, cancelled? True",
999986
activity_result,
1000987
timeout=assert_timeout,
1001988
)
1002-
logger.info("Wait for activity complete")
1003989
await activity_inst.wait_cancel_complete.wait()
1004-
logger.info("Wait for workflow cancel")
1005990
await handle.cancel()
1006991

1007-
logger.info("Start third workflow")
1008992
# Abandon - confirm error and that activity stays running
1009993
handle = await client.start_workflow(
1010994
CancelActivityWorkflow.run,
@@ -1016,16 +1000,12 @@ async def activity_result() -> str:
10161000
task_queue=worker.task_queue,
10171001
task_timeout=task_timeout,
10181002
)
1019-
logger.info("Wait for cancelled")
10201003
await assert_eq_eventually(
10211004
"Error: CancelledError", activity_result, timeout=assert_timeout
10221005
)
1023-
logger.info("sleep")
10241006
await asyncio.sleep(0.5)
10251007
assert not activity_inst.wait_cancel_complete.is_set()
1026-
logger.info("Wait for workflow cancel")
10271008
await handle.cancel()
1028-
logger.info("Wait for activity complete")
10291009
await activity_inst.wait_cancel_complete.wait()
10301010

10311011

@@ -2966,12 +2946,21 @@ async def waiting_signal() -> bool:
29662946
task_queue=task_queue,
29672947
)
29682948

2949+
# Need to wait until it has gotten halfway through, otherwise the post_patch workflow may never complete
2950+
async def waiting_signal() -> bool:
2951+
return await post_patch_handle.query(
2952+
PatchMemoizedWorkflowPatched.waiting_signal
2953+
)
2954+
2955+
await assert_eq_eventually(True, waiting_signal)
2956+
29692957
# Send signal to both and check results
29702958
await pre_patch_handle.signal(PatchMemoizedWorkflowUnpatched.signal)
29712959
await post_patch_handle.signal(PatchMemoizedWorkflowPatched.signal)
29722960

29732961
# Confirm expected values
29742962
assert ["some-value"] == await pre_patch_handle.result()
2963+
29752964
assert [
29762965
"pre-patch",
29772966
"some-value",
@@ -6111,22 +6100,21 @@ def __init__(
61116100
self.main_workflow_returns_before_signal_completions = (
61126101
main_workflow_returns_before_signal_completions
61136102
)
6114-
self.ping_pong_val = 1
6115-
self.ping_pong_counter = 0
6116-
self.ping_pong_max_count = 4
6103+
self.run_finished = False
61176104

61186105
@workflow.run
61196106
async def run(self) -> str:
61206107
await workflow.wait_condition(
61216108
lambda: self.seen_first_signal and self.seen_second_signal
61226109
)
6110+
self.run_finished = True
61236111
return "workflow-result"
61246112

61256113
@workflow.signal
61266114
async def this_signal_executes_first(self):
61276115
self.seen_first_signal = True
61286116
if self.main_workflow_returns_before_signal_completions:
6129-
await self.ping_pong(lambda: self.ping_pong_val > 0)
6117+
await workflow.wait_condition(lambda: self.run_finished)
61306118
raise ApplicationError(
61316119
"Client should see this error unless doing ping-pong "
61326120
"(in which case main coroutine returns first)"
@@ -6137,18 +6125,12 @@ async def this_signal_executes_second(self):
61376125
await workflow.wait_condition(lambda: self.seen_first_signal)
61386126
self.seen_second_signal = True
61396127
if self.main_workflow_returns_before_signal_completions:
6140-
await self.ping_pong(lambda: self.ping_pong_val < 0)
6128+
await workflow.wait_condition(lambda: self.run_finished)
61416129
raise ApplicationError("Client should never see this error!")
61426130

6143-
async def ping_pong(self, cond: Callable[[], bool]):
6144-
while self.ping_pong_counter < self.ping_pong_max_count:
6145-
await workflow.wait_condition(cond)
6146-
self.ping_pong_val = -self.ping_pong_val
6147-
self.ping_pong_counter += 1
6148-
61496131

61506132
@workflow.defn
6151-
class FirstCompletionCommandIsHonoredPingPongWorkflow(
6133+
class FirstCompletionCommandIsHonoredSignalWaitWorkflow(
61526134
FirstCompletionCommandIsHonoredWorkflow
61536135
):
61546136
def __init__(self) -> None:
@@ -6177,10 +6159,10 @@ async def _do_first_completion_command_is_honored_test(
61776159
client: Client, main_workflow_returns_before_signal_completions: bool
61786160
):
61796161
workflow_cls: Union[
6180-
Type[FirstCompletionCommandIsHonoredPingPongWorkflow],
6162+
Type[FirstCompletionCommandIsHonoredSignalWaitWorkflow],
61816163
Type[FirstCompletionCommandIsHonoredWorkflow],
61826164
] = (
6183-
FirstCompletionCommandIsHonoredPingPongWorkflow
6165+
FirstCompletionCommandIsHonoredSignalWaitWorkflow
61846166
if main_workflow_returns_before_signal_completions
61856167
else FirstCompletionCommandIsHonoredWorkflow
61866168
)

0 commit comments

Comments
 (0)