Skip to content

Commit e257ebd

Browse files
committed
increase test coverage on new code
Signed-off-by: Filinto Duran <[email protected]>
1 parent c4d7fd5 commit e257ebd

File tree

3 files changed

+169
-1
lines changed

3 files changed

+169
-1
lines changed

tests/durabletask/test_client.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from unittest.mock import patch
1+
from unittest.mock import MagicMock, patch
22

33
from durabletask.internal.grpc_interceptor import DefaultClientInterceptorImpl
44
from durabletask.internal.shared import get_default_host_address, get_grpc_channel
@@ -140,3 +140,17 @@ def test_sync_channel_passes_base_options_and_max_lengths():
140140
assert ("grpc.max_send_message_length", 1234) in opts
141141
assert ("grpc.max_receive_message_length", 5678) in opts
142142
assert ("grpc.primary_user_agent", "durabletask-tests") in opts
143+
144+
145+
def test_taskhub_client_close_handles_exceptions():
146+
"""Test that close() handles exceptions gracefully (edge case not easily testable in E2E)."""
147+
with patch("durabletask.internal.shared.get_grpc_channel") as mock_get_channel:
148+
mock_channel = MagicMock()
149+
mock_channel.close.side_effect = Exception("close failed")
150+
mock_get_channel.return_value = mock_channel
151+
152+
from durabletask import client
153+
154+
task_hub_client = client.TaskHubGrpcClient()
155+
# Should not raise exception
156+
task_hub_client.close()

tests/durabletask/test_orchestration_e2e.py

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,11 @@ def empty_orchestrator(ctx: task.OrchestrationContext, _):
6565
id = c.schedule_new_orchestration(empty_orchestrator)
6666
state = c.wait_for_orchestration_completion(id, timeout=30)
6767

68+
# Test calling wait again on already-completed orchestration (should return immediately)
69+
state2 = c.wait_for_orchestration_completion(id, timeout=30)
70+
assert state2 is not None
71+
assert state2.runtime_status == client.OrchestrationStatus.COMPLETED
72+
6873
assert invoked
6974
assert state is not None
7075
assert state.name == task.get_name(empty_orchestrator)
@@ -76,6 +81,41 @@ def empty_orchestrator(ctx: task.OrchestrationContext, _):
7681
assert state.serialized_custom_status is None
7782

7883

84+
def test_wait_for_idle():
85+
"""Test that wait_for_idle properly waits for in-flight activities to complete."""
86+
import time
87+
88+
def slow_activity(ctx: task.ActivityContext, input: int):
89+
# Simulate slow activity
90+
time.sleep(0.2)
91+
return input + 1
92+
93+
def orchestrator(ctx: task.OrchestrationContext, input: int):
94+
# Schedule multiple activities without waiting
95+
tasks = [ctx.call_activity(slow_activity, input=i) for i in range(3)]
96+
# Wait for all to complete
97+
results = yield task.when_all(tasks)
98+
return sum(results)
99+
100+
with worker.TaskHubGrpcWorker() as w:
101+
w.add_orchestrator(orchestrator)
102+
w.add_activity(slow_activity)
103+
w.start()
104+
w.wait_for_ready(timeout=10)
105+
106+
with client.TaskHubGrpcClient() as c:
107+
id = c.schedule_new_orchestration(orchestrator, input=1)
108+
109+
# Wait for orchestration to complete
110+
state = c.wait_for_orchestration_completion(id, timeout=30)
111+
assert state is not None
112+
assert state.runtime_status == client.OrchestrationStatus.COMPLETED
113+
114+
# Wait for any lingering activities to finish
115+
idle = w.wait_for_idle(timeout=5.0)
116+
assert idle is True
117+
118+
79119
def test_activity_sequence():
80120
def plus_one(_: task.ActivityContext, input: int) -> int:
81121
return input + 1
Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
"""
2+
Copyright 2025 The Dapr Authors
3+
Licensed under the Apache License, Version 2.0 (the "License");
4+
you may not use this file except in compliance with the License.
5+
You may obtain a copy of the License at
6+
http://www.apache.org/licenses/LICENSE-2.0
7+
Unless required by applicable law or agreed to in writing, software
8+
distributed under the License is distributed on an "AS IS" BASIS,
9+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
See the License for the specific language governing permissions and
11+
limitations under the License.
12+
"""
13+
14+
from unittest.mock import MagicMock, Mock, patch
15+
16+
import grpc
17+
18+
from durabletask import worker
19+
20+
21+
def test_execute_orchestrator_grpc_error_benign_cancelled():
22+
"""Test that benign gRPC errors in orchestrator execution are handled gracefully."""
23+
w = worker.TaskHubGrpcWorker()
24+
25+
# Add a dummy orchestrator
26+
def test_orchestrator(ctx, input):
27+
return "result"
28+
29+
w.add_orchestrator(test_orchestrator)
30+
31+
# Mock the stub to raise a benign error
32+
mock_stub = MagicMock()
33+
mock_error = grpc.RpcError()
34+
mock_error.code = Mock(return_value=grpc.StatusCode.CANCELLED)
35+
mock_stub.CompleteOrchestratorTask.side_effect = mock_error
36+
37+
# Create a mock request with proper structure
38+
mock_req = MagicMock()
39+
mock_req.instanceId = "test-id"
40+
mock_req.pastEvents = []
41+
mock_req.newEvents = [MagicMock()]
42+
mock_req.newEvents[0].HasField = lambda x: x == "executionStarted"
43+
mock_req.newEvents[0].executionStarted.name = "test_orchestrator"
44+
mock_req.newEvents[0].executionStarted.input = None
45+
mock_req.newEvents[0].router.targetAppID = None
46+
mock_req.newEvents[0].router.sourceAppID = None
47+
mock_req.newEvents[0].timestamp.ToDatetime = Mock(return_value=None)
48+
49+
# Should not raise exception (benign error)
50+
w._execute_orchestrator(mock_req, mock_stub, "token")
51+
52+
53+
def test_execute_orchestrator_grpc_error_non_benign():
54+
"""Test that non-benign gRPC errors in orchestrator execution are logged."""
55+
w = worker.TaskHubGrpcWorker()
56+
57+
# Add a dummy orchestrator
58+
def test_orchestrator(ctx, input):
59+
return "result"
60+
61+
w.add_orchestrator(test_orchestrator)
62+
63+
# Mock the stub to raise a non-benign error
64+
mock_stub = MagicMock()
65+
mock_error = grpc.RpcError()
66+
mock_error.code = Mock(return_value=grpc.StatusCode.INTERNAL)
67+
mock_stub.CompleteOrchestratorTask.side_effect = mock_error
68+
69+
# Create a mock request with proper structure
70+
mock_req = MagicMock()
71+
mock_req.instanceId = "test-id"
72+
mock_req.pastEvents = []
73+
mock_req.newEvents = [MagicMock()]
74+
mock_req.newEvents[0].HasField = lambda x: x == "executionStarted"
75+
mock_req.newEvents[0].executionStarted.name = "test_orchestrator"
76+
mock_req.newEvents[0].executionStarted.input = None
77+
mock_req.newEvents[0].router.targetAppID = None
78+
mock_req.newEvents[0].router.sourceAppID = None
79+
mock_req.newEvents[0].timestamp.ToDatetime = Mock(return_value=None)
80+
81+
# Should not raise exception (error is logged but handled)
82+
with patch.object(w._logger, "exception") as mock_log:
83+
w._execute_orchestrator(mock_req, mock_stub, "token")
84+
# Verify error was logged
85+
assert mock_log.called
86+
87+
88+
def test_execute_activity_grpc_error_benign():
89+
"""Test that benign gRPC errors in activity execution are handled gracefully."""
90+
w = worker.TaskHubGrpcWorker()
91+
92+
# Add a dummy activity
93+
def test_activity(ctx, input):
94+
return "result"
95+
96+
w.add_activity(test_activity)
97+
98+
# Mock the stub to raise a benign error
99+
mock_stub = MagicMock()
100+
mock_error = grpc.RpcError()
101+
mock_error.code = Mock(return_value=grpc.StatusCode.CANCELLED)
102+
str_return = "unknown instance ID/task ID combo"
103+
mock_error.__str__ = Mock(return_value=str_return)
104+
mock_stub.CompleteActivityTask.side_effect = mock_error
105+
106+
# Create a mock request
107+
mock_req = MagicMock()
108+
mock_req.orchestrationInstance.instanceId = "test-id"
109+
mock_req.name = "test_activity"
110+
mock_req.taskId = 1
111+
mock_req.input.value = '""'
112+
113+
# Should not raise exception (benign error)
114+
w._execute_activity(mock_req, mock_stub, "token")

0 commit comments

Comments
 (0)