Skip to content

Commit 2156df6

Browse files
committed
fixup reconnection for new concurrency model
1 parent fed58ce commit 2156df6

File tree

1 file changed

+36
-8
lines changed

1 file changed

+36
-8
lines changed

durabletask/worker.py

Lines changed: 36 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -300,6 +300,7 @@ async def _async_run_loop(self):
300300
# Connection state management for retry fix
301301
current_channel = None
302302
current_stub = None
303+
current_reader_thread = None
303304
conn_retry_count = 0
304305
conn_max_retry_delay = 60
305306

@@ -327,7 +328,26 @@ def create_fresh_connection():
327328
raise
328329

329330
def invalidate_connection():
330-
nonlocal current_channel, current_stub
331+
nonlocal current_channel, current_stub, current_reader_thread
332+
# Cancel the response stream first to signal the reader thread to stop
333+
if self._response_stream is not None:
334+
try:
335+
self._response_stream.cancel()
336+
except Exception:
337+
pass
338+
self._response_stream = None
339+
340+
# Wait for the reader thread to finish
341+
if current_reader_thread is not None:
342+
try:
343+
current_reader_thread.join(timeout=2)
344+
if current_reader_thread.is_alive():
345+
self._logger.warning("Stream reader thread did not shut down gracefully")
346+
except Exception:
347+
pass
348+
current_reader_thread = None
349+
350+
# Close the channel
331351
if current_channel:
332352
try:
333353
current_channel.close()
@@ -389,8 +409,8 @@ def stream_reader():
389409

390410
import threading
391411

392-
reader_thread = threading.Thread(target=stream_reader, daemon=True)
393-
reader_thread.start()
412+
current_reader_thread = threading.Thread(target=stream_reader, daemon=True)
413+
current_reader_thread.start()
394414
loop = asyncio.get_running_loop()
395415
while not self._shutdown.is_set():
396416
try:
@@ -423,21 +443,29 @@ def stream_reader():
423443
)
424444
except Exception as e:
425445
self._logger.warning(f"Error in work item stream: {e}")
426-
break
427-
reader_thread.join(timeout=1)
446+
raise e
447+
current_reader_thread.join(timeout=1)
428448
self._logger.info("Work item stream ended normally")
429449
except grpc.RpcError as rpc_error:
430450
should_invalidate = should_invalidate_connection(rpc_error)
431451
if should_invalidate:
432452
invalidate_connection()
433453
error_code = rpc_error.code() # type: ignore
454+
error_details = str(rpc_error)
455+
434456
if error_code == grpc.StatusCode.CANCELLED:
435457
self._logger.info(f"Disconnected from {self._host_address}")
436458
break
437459
elif error_code == grpc.StatusCode.UNAVAILABLE:
438-
self._logger.warning(
439-
f"The sidecar at address {self._host_address} is unavailable - will continue retrying"
440-
)
460+
# Check if this is a connection timeout scenario
461+
if "Timeout occurred" in error_details or "Failed to connect to remote host" in error_details:
462+
self._logger.warning(
463+
f"Connection timeout to {self._host_address}: {error_details} - will retry with fresh connection"
464+
)
465+
else:
466+
self._logger.warning(
467+
f"The sidecar at address {self._host_address} is unavailable: {error_details} - will continue retrying"
468+
)
441469
elif should_invalidate:
442470
self._logger.warning(
443471
f"Connection-level gRPC error ({error_code}): {rpc_error} - resetting connection"

0 commit comments

Comments
 (0)