Skip to content

Commit f3442db

Browse files
committed
add missing grpc option in worker grpc client
Signed-off-by: Filinto Duran <[email protected]>
1 parent 76444c8 commit f3442db

File tree

3 files changed

+25
-11
lines changed

3 files changed

+25
-11
lines changed

durabletask/worker.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -223,13 +223,15 @@ def __init__(
223223
secure_channel: bool = False,
224224
interceptors: Optional[Sequence[shared.ClientInterceptor]] = None,
225225
concurrency_options: Optional[ConcurrencyOptions] = None,
226+
channel_options: Optional[Sequence[tuple[str, Any]]] = None,
226227
):
227228
self._registry = _Registry()
228229
self._host_address = host_address if host_address else shared.get_default_host_address()
229230
self._logger = shared.get_logger("worker", log_handler, log_formatter)
230231
self._shutdown = Event()
231232
self._is_running = False
232233
self._secure_channel = secure_channel
234+
self._channel_options = channel_options
233235

234236
# Use provided concurrency options or create default ones
235237
self._concurrency_options = (
@@ -306,7 +308,7 @@ def create_fresh_connection():
306308
current_stub = None
307309
try:
308310
current_channel = shared.get_grpc_channel(
309-
self._host_address, self._secure_channel, self._interceptors
311+
self._host_address, self._secure_channel, self._interceptors, options=self._channel_options
310312
)
311313
current_stub = stubs.TaskHubSidecarServiceStub(current_channel)
312314
current_stub.Hello(empty_pb2.Empty())

tests/durabletask/test_orchestration_e2e.py

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,8 @@
1111
from durabletask import client, task, worker
1212

1313
# NOTE: These tests assume a sidecar process is running. Example command:
14-
# docker run --name durabletask-sidecar -p 4001:4001 --env 'DURABLETASK_SIDECAR_LOGLEVEL=Debug' --rm cgillum/durabletask-sidecar:latest start --backend Emulator
14+
# dapr init || true
15+
# dapr run --app-id test-app --dapr-grpc-port 4001
1516
pytestmark = pytest.mark.e2e
1617

1718

@@ -22,16 +23,18 @@ def empty_orchestrator(ctx: task.OrchestrationContext, _):
2223
nonlocal invoked # don't do this in a real app!
2324
invoked = True
2425

26+
channel_options = [
27+
("grpc.max_send_message_length", 1024 * 1024), # 1MB
28+
]
29+
2530
# Start a worker, which will connect to the sidecar in a background thread
26-
with worker.TaskHubGrpcWorker() as w:
31+
with worker.TaskHubGrpcWorker(channel_options=channel_options) as w:
2732
w.add_orchestrator(empty_orchestrator)
2833
w.start()
2934

3035
# set a custom max send length option
3136
c = client.TaskHubGrpcClient(
32-
channel_options=[
33-
("grpc.max_send_message_length", 1024 * 1024), # 1MB
34-
]
37+
channel_options=channel_options
3538
)
3639
id = c.schedule_new_orchestration(empty_orchestrator)
3740
state = c.wait_for_orchestration_completion(id, timeout=30)

tests/durabletask/test_orchestration_e2e_async.py

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
from durabletask.client import OrchestrationStatus
1414

1515
# NOTE: These tests assume a sidecar process is running. Example command:
16-
# go install github.com/microsoft/durabletask-go@main
16+
# go install github.com/dapr/durabletask-go@main
1717
# durabletask-go --port 4001
1818
pytestmark = [pytest.mark.e2e, pytest.mark.asyncio]
1919

@@ -25,12 +25,16 @@ def empty_orchestrator(ctx: task.OrchestrationContext, _):
2525
nonlocal invoked # don't do this in a real app!
2626
invoked = True
2727

28+
channel_options = [
29+
("grpc.max_send_message_length", 1024 * 1024), # 1MB
30+
]
31+
2832
# Start a worker, which will connect to the sidecar in a background thread
29-
with worker.TaskHubGrpcWorker() as w:
33+
with worker.TaskHubGrpcWorker(channel_options=channel_options) as w:
3034
w.add_orchestrator(empty_orchestrator)
3135
w.start()
3236

33-
c = AsyncTaskHubGrpcClient()
37+
c = AsyncTaskHubGrpcClient(channel_options=channel_options)
3438
id = await c.schedule_new_orchestration(empty_orchestrator)
3539
state = await c.wait_for_orchestration_completion(id, timeout=30)
3640
await c.aclose()
@@ -58,13 +62,18 @@ def sequence(ctx: task.OrchestrationContext, start_val: int):
5862
numbers.append(current)
5963
return numbers
6064

65+
channel_options =[
66+
("grpc.max_send_message_length", 1024 * 1024), # 1MB
67+
]
6168
# Start a worker, which will connect to the sidecar in a background thread
62-
with worker.TaskHubGrpcWorker() as w:
69+
with worker.TaskHubGrpcWorker(
70+
channel_options=channel_options
71+
) as w:
6372
w.add_orchestrator(sequence)
6473
w.add_activity(plus_one)
6574
w.start()
6675

67-
client = AsyncTaskHubGrpcClient()
76+
client = AsyncTaskHubGrpcClient(channel_options=channel_options)
6877
id = await client.schedule_new_orchestration(sequence, input=1)
6978
state = await client.wait_for_orchestration_completion(id, timeout=30)
7079
await client.aclose()

0 commit comments

Comments
 (0)