Skip to content

Commit 9e35db5

Browse files
committed
Fix channel handling
1 parent 1459a55 commit 9e35db5

File tree

1 file changed

+36
-8
lines changed

1 file changed

+36
-8
lines changed

durabletask/worker.py

Lines changed: 36 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,8 @@
1111
import grpc
1212
from google.protobuf import empty_pb2
1313

14-
import durabletask.internal.helpers as ph
1514
import durabletask.internal.helpers as pbh
15+
import durabletask.internal.helpers as ph
1616
import durabletask.internal.orchestrator_service_pb2 as pb
1717
import durabletask.internal.orchestrator_service_pb2_grpc as stubs
1818
import durabletask.internal.shared as shared
@@ -129,9 +129,6 @@ def add_activity(self, fn: task.Activity) -> str:
129129

130130
def start(self):
131131
"""Starts the worker on a background thread and begins listening for work items."""
132-
channel = shared.get_grpc_channel(self._host_address, self._secure_channel, self._interceptors)
133-
stub = stubs.TaskHubSidecarServiceStub(channel)
134-
135132
if self._is_running:
136133
raise RuntimeError('The worker is already running.')
137134

@@ -140,13 +137,30 @@ def run_loop():
140137
# functions. We'd need to know ahead of time whether a function is async or not.
141138
# TODO: Max concurrency configuration settings
142139
with concurrent.futures.ThreadPoolExecutor(max_workers=16) as executor:
140+
current_channel = None
141+
current_stub = None
142+
143143
while not self._shutdown.is_set():
144+
# Only create a new channel and stub if we don't have a current one
145+
# or if we need to retry after a failure
146+
if current_channel is None or current_stub is None:
147+
# Clean up any existing channel before creating a new one
148+
if current_channel is not None:
149+
try:
150+
current_channel.close()
151+
except Exception:
152+
pass # Ignore any errors during channel cleanup
153+
154+
# Create a fresh channel and stub for this connection attempt
155+
current_channel = shared.get_grpc_channel(self._host_address, self._secure_channel, self._interceptors)
156+
current_stub = stubs.TaskHubSidecarServiceStub(current_channel)
157+
144158
try:
145159
# send a "Hello" message to the sidecar to ensure that it's listening
146-
stub.Hello(empty_pb2.Empty())
160+
current_stub.Hello(empty_pb2.Empty())
147161

148162
# stream work items
149-
self._response_stream = stub.GetWorkItems(pb.GetWorkItemsRequest())
163+
self._response_stream = current_stub.GetWorkItems(pb.GetWorkItemsRequest())
150164
self._logger.info(f'Successfully connected to {self._host_address}. Waiting for work items...')
151165

152166
# The stream blocks until either a work item is received or the stream is canceled
@@ -155,15 +169,19 @@ def run_loop():
155169
request_type = work_item.WhichOneof('request')
156170
self._logger.debug(f'Received "{request_type}" work item')
157171
if work_item.HasField('orchestratorRequest'):
158-
executor.submit(self._execute_orchestrator, work_item.orchestratorRequest, stub, work_item.completionToken)
172+
executor.submit(self._execute_orchestrator, work_item.orchestratorRequest, current_stub, work_item.completionToken)
159173
elif work_item.HasField('activityRequest'):
160-
executor.submit(self._execute_activity, work_item.activityRequest, stub, work_item.completionToken)
174+
executor.submit(self._execute_activity, work_item.activityRequest, current_stub, work_item.completionToken)
161175
elif work_item.HasField('healthPing'):
162176
pass # no-op
163177
else:
164178
self._logger.warning(f'Unexpected work item type: {request_type}')
165179

166180
except grpc.RpcError as rpc_error:
181+
# Mark current channel/stub as invalid so they'll be recreated on next retry
182+
current_channel = None
183+
current_stub = None
184+
167185
if rpc_error.code() == grpc.StatusCode.CANCELLED: # type: ignore
168186
self._logger.info(f'Disconnected from {self._host_address}')
169187
elif rpc_error.code() == grpc.StatusCode.UNAVAILABLE: # type: ignore
@@ -172,10 +190,20 @@ def run_loop():
172190
else:
173191
self._logger.warning(f'Unexpected error: {rpc_error}')
174192
except Exception as ex:
193+
# Mark current channel/stub as invalid so they'll be recreated on next retry
194+
current_channel = None
195+
current_stub = None
175196
self._logger.warning(f'Unexpected error: {ex}')
176197

177198
# CONSIDER: exponential backoff
178199
self._shutdown.wait(5)
200+
201+
# Clean up the final channel when the loop exits
202+
if current_channel is not None:
203+
try:
204+
current_channel.close()
205+
except Exception:
206+
pass # Ignore any errors during channel cleanup
179207
self._logger.info("No longer listening for work items")
180208
return
181209

0 commit comments

Comments
 (0)