Skip to content

Commit 7d85c76

Browse files
committed
more fixup
1 parent f9713e3 commit 7d85c76

File tree

3 files changed

+33
-4
lines changed

3 files changed

+33
-4
lines changed

durabletask-azuremanaged/durabletask/azuremanaged/worker.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ class DurableTaskSchedulerWorker(TaskHubGrpcWorker):
3636
>>> from azure.identity import DefaultAzureCredential
3737
>>> from durabletask.azuremanaged import DurableTaskSchedulerWorker
3838
>>> from durabletask.worker import ConcurrencyOptions
39-
>>>
39+
>>>
4040
>>> credential = DefaultAzureCredential()
4141
>>> concurrency = ConcurrencyOptions(max_concurrent_activities=10)
4242
>>> worker = DurableTaskSchedulerWorker(

durabletask/worker.py

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,7 @@ class TaskHubGrpcWorker:
185185
... result = yield context.call_activity("my_activity", input="hello")
186186
... return result
187187
>>>
188-
>>> @worker.add_activity
188+
>>> @worker.add_activity
189189
... def my_activity(context, input):
190190
... return f"Processed: {input}"
191191
>>>
@@ -1351,11 +1351,35 @@ def _ensure_queues_for_current_loop(self):
13511351
return
13521352

13531353
# Need to recreate queues for the current event loop
1354-
# Create fresh queues - any items from previous event loops are dropped
1354+
# First, preserve any existing work items
1355+
existing_activity_items = []
1356+
existing_orchestration_items = []
1357+
1358+
if hasattr(self, 'activity_queue'):
1359+
try:
1360+
while not self.activity_queue.empty():
1361+
existing_activity_items.append(self.activity_queue.get_nowait())
1362+
except Exception:
1363+
pass
1364+
1365+
if hasattr(self, 'orchestration_queue'):
1366+
try:
1367+
while not self.orchestration_queue.empty():
1368+
existing_orchestration_items.append(self.orchestration_queue.get_nowait())
1369+
except Exception:
1370+
pass
1371+
1372+
# Create fresh queues for the current event loop
13551373
self.activity_queue = asyncio.Queue()
13561374
self.orchestration_queue = asyncio.Queue()
13571375
self._queue_event_loop = current_loop
13581376

1377+
# Restore the work items to the new queues
1378+
for item in existing_activity_items:
1379+
self.activity_queue.put_nowait(item)
1380+
for item in existing_orchestration_items:
1381+
self.orchestration_queue.put_nowait(item)
1382+
13591383
async def run(self):
13601384
# Reset shutdown flag in case this manager is being reused
13611385
self._shutdown = False

tests/durabletask/test_worker_concurrency_loop.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,11 +124,16 @@ def run_manager():
124124
t = threading.Thread(target=run_manager)
125125
t.start()
126126
time.sleep(1.5) # Let work process
127-
manager.shutdown()
127+
128+
# Signal shutdown but don't actually call shutdown() yet
129+
manager._shutdown = True
128130
# Unblock the consumers by putting dummy items in the queues
129131
manager.activity_queue.put_nowait((lambda: None, (), {}))
130132
manager.orchestration_queue.put_nowait((lambda: None, (), {}))
131133
t.join(timeout=2)
132134

135+
# Now shutdown the thread pool
136+
manager.thread_pool.shutdown(wait=True)
137+
133138
# Check that all work items completed
134139
assert len(results) == 10

0 commit comments

Comments
 (0)