Skip to content

Commit cb9695e

Browse files
committed
cleanup tests
1 parent 38eeabc commit cb9695e

File tree

3 files changed

+34
-33
lines changed

3 files changed

+34
-33
lines changed

tests/durabletask/test_concurrency_options.py

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,12 @@
33

44
import os
55

6-
from durabletask import worker
6+
from durabletask.worker import ConcurrencyOptions, TaskHubGrpcWorker
77

88

99
def test_default_concurrency_options():
1010
"""Test that default concurrency options work correctly."""
11-
options = worker.ConcurrencyOptions()
11+
options = ConcurrencyOptions()
1212
processor_count = os.cpu_count() or 1
1313
expected_default = 100 * processor_count
1414
expected_workers = processor_count + 4
@@ -20,7 +20,7 @@ def test_default_concurrency_options():
2020

2121
def test_custom_concurrency_options():
2222
"""Test that custom concurrency options work correctly."""
23-
options = worker.ConcurrencyOptions(
23+
options = ConcurrencyOptions(
2424
maximum_concurrent_activity_work_items=50,
2525
maximum_concurrent_orchestration_work_items=25,
2626
maximum_thread_pool_workers=30,
@@ -37,7 +37,7 @@ def test_partial_custom_options():
3737
expected_default = 100 * processor_count
3838
expected_workers = processor_count + 4
3939

40-
options = worker.ConcurrencyOptions(
40+
options = ConcurrencyOptions(
4141
maximum_concurrent_activity_work_items=30
4242
)
4343

@@ -48,44 +48,44 @@ def test_partial_custom_options():
4848

4949
def test_worker_with_concurrency_options():
5050
"""Test that TaskHubGrpcWorker accepts concurrency options."""
51-
options = worker.ConcurrencyOptions(
51+
options = ConcurrencyOptions(
5252
maximum_concurrent_activity_work_items=10,
5353
maximum_concurrent_orchestration_work_items=20,
5454
maximum_thread_pool_workers=15,
5555
)
5656

57-
grpc_worker = worker.TaskHubGrpcWorker(concurrency_options=options)
57+
worker = TaskHubGrpcWorker(concurrency_options=options)
5858

59-
assert grpc_worker.concurrency_options == options
59+
assert worker.concurrency_options == options
6060

6161

6262
def test_worker_default_options():
6363
"""Test that TaskHubGrpcWorker uses default options when no parameters are provided."""
64-
grpc_worker = worker.TaskHubGrpcWorker()
64+
worker = TaskHubGrpcWorker()
6565

6666
processor_count = os.cpu_count() or 1
6767
expected_default = 100 * processor_count
6868
expected_workers = processor_count + 4
6969

7070
assert (
71-
grpc_worker.concurrency_options.maximum_concurrent_activity_work_items == expected_default
71+
worker.concurrency_options.maximum_concurrent_activity_work_items == expected_default
7272
)
7373
assert (
74-
grpc_worker.concurrency_options.maximum_concurrent_orchestration_work_items == expected_default
74+
worker.concurrency_options.maximum_concurrent_orchestration_work_items == expected_default
7575
)
76-
assert grpc_worker.concurrency_options.maximum_thread_pool_workers == expected_workers
76+
assert worker.concurrency_options.maximum_thread_pool_workers == expected_workers
7777

7878

7979
def test_concurrency_options_property_access():
8080
"""Test that the concurrency_options property works correctly."""
81-
options = worker.ConcurrencyOptions(
81+
options = ConcurrencyOptions(
8282
maximum_concurrent_activity_work_items=15,
8383
maximum_concurrent_orchestration_work_items=25,
8484
maximum_thread_pool_workers=30,
8585
)
8686

87-
grpc_worker = worker.TaskHubGrpcWorker(concurrency_options=options)
88-
retrieved_options = grpc_worker.concurrency_options
87+
worker = TaskHubGrpcWorker(concurrency_options=options)
88+
retrieved_options = worker.concurrency_options
8989

9090
# Should be the same object
9191
assert retrieved_options is options

tests/durabletask/test_worker_concurrency_loop.py

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
import threading
33
import time
44

5-
from durabletask import worker
5+
from durabletask.worker import ConcurrencyOptions, TaskHubGrpcWorker
66

77

88
class DummyStub:
@@ -15,6 +15,7 @@ def CompleteOrchestratorTask(self, res):
1515
def CompleteActivityTask(self, res):
1616
self.completed.append(('activity', res))
1717

18+
1819
class DummyRequest:
1920
def __init__(self, kind, instance_id):
2021
self.kind = kind
@@ -33,16 +34,18 @@ def HasField(self, field):
3334
def WhichOneof(self, _):
3435
return f'{self.kind}Request'
3536

37+
3638
class DummyCompletionToken:
3739
pass
3840

41+
3942
def test_worker_concurrency_loop_sync():
40-
options = worker.ConcurrencyOptions(
43+
options = ConcurrencyOptions(
4144
maximum_concurrent_activity_work_items=2,
4245
maximum_concurrent_orchestration_work_items=1,
4346
maximum_thread_pool_workers=2,
4447
)
45-
grpc_worker = worker.TaskHubGrpcWorker(concurrency_options=options)
48+
worker = TaskHubGrpcWorker(concurrency_options=options)
4649
stub = DummyStub()
4750

4851
def dummy_orchestrator(req, stub, completionToken):
@@ -54,39 +57,42 @@ def dummy_activity(req, stub, completionToken):
5457
stub.CompleteActivityTask('ok')
5558

5659
# Patch the worker's _execute_orchestrator and _execute_activity
57-
grpc_worker._execute_orchestrator = dummy_orchestrator
58-
grpc_worker._execute_activity = dummy_activity
60+
worker._execute_orchestrator = dummy_orchestrator
61+
worker._execute_activity = dummy_activity
5962

6063
orchestrator_requests = [DummyRequest('orchestrator', f'orch{i}') for i in range(3)]
6164
activity_requests = [DummyRequest('activity', f'act{i}') for i in range(4)]
6265

6366
async def run_test():
6467
# Start the worker manager's run loop in the background
65-
worker_task = asyncio.create_task(grpc_worker._async_worker_manager.run())
68+
worker_task = asyncio.create_task(worker._async_worker_manager.run())
6669
for req in orchestrator_requests:
67-
grpc_worker._async_worker_manager.submit_orchestration(dummy_orchestrator, req, stub, DummyCompletionToken())
70+
worker._async_worker_manager.submit_orchestration(dummy_orchestrator, req, stub, DummyCompletionToken())
6871
for req in activity_requests:
69-
grpc_worker._async_worker_manager.submit_activity(dummy_activity, req, stub, DummyCompletionToken())
72+
worker._async_worker_manager.submit_activity(dummy_activity, req, stub, DummyCompletionToken())
7073
await asyncio.sleep(1.0)
7174
orchestrator_count = sum(1 for t, _ in stub.completed if t == 'orchestrator')
7275
activity_count = sum(1 for t, _ in stub.completed if t == 'activity')
7376
assert orchestrator_count == 3, f"Expected 3 orchestrator completions, got {orchestrator_count}"
7477
assert activity_count == 4, f"Expected 4 activity completions, got {activity_count}"
75-
grpc_worker._async_worker_manager._shutdown = True
78+
worker._async_worker_manager._shutdown = True
7679
await worker_task
7780
asyncio.run(run_test())
7881

82+
7983
# Dummy orchestrator and activity for sync context
8084
def dummy_orchestrator(ctx, input):
8185
# Simulate some work
8286
time.sleep(0.1)
8387
return "orchestrator-done"
8488

89+
8590
def dummy_activity(ctx, input):
8691
# Simulate some work
8792
time.sleep(0.1)
8893
return "activity-done"
8994

95+
9096
def test_worker_concurrency_sync():
9197
# Use small concurrency to make test observable
9298
options = ConcurrencyOptions(
@@ -124,16 +130,11 @@ def run_manager():
124130
t = threading.Thread(target=run_manager)
125131
t.start()
126132
time.sleep(1.5) # Let work process
127-
128-
# Signal shutdown but don't actually call shutdown() yet
129-
manager._shutdown = True
133+
manager.shutdown()
130134
# Unblock the consumers by putting dummy items in the queues
131135
manager.activity_queue.put_nowait((lambda: None, (), {}))
132136
manager.orchestration_queue.put_nowait((lambda: None, (), {}))
133137
t.join(timeout=2)
134138

135-
# Now shutdown the thread pool
136-
manager.thread_pool.shutdown(wait=True)
137-
138139
# Check that all work items completed
139-
assert len(results) == 10
140+
assert len(results) == 10

tests/durabletask/test_worker_concurrency_loop_async.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import asyncio
22

3-
from durabletask import worker
3+
from durabletask.worker import ConcurrencyOptions, TaskHubGrpcWorker
44

55

66
class DummyStub:
@@ -38,12 +38,12 @@ class DummyCompletionToken:
3838

3939

4040
def test_worker_concurrency_loop_async():
41-
options = worker.ConcurrencyOptions(
41+
options = ConcurrencyOptions(
4242
maximum_concurrent_activity_work_items=2,
4343
maximum_concurrent_orchestration_work_items=1,
4444
maximum_thread_pool_workers=2,
4545
)
46-
grpc_worker = worker.TaskHubGrpcWorker(concurrency_options=options)
46+
grpc_worker = TaskHubGrpcWorker(concurrency_options=options)
4747
stub = DummyStub()
4848

4949
async def dummy_orchestrator(req, stub, completionToken):

0 commit comments

Comments
 (0)