Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions dapr/clients/grpc/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@
from dapr.clients.retry import RetryPolicy
from dapr.common.pubsub.subscription import StreamCancelledError
from dapr.conf import settings
from dapr.conf.helpers import GrpcEndpoint
from dapr.conf.helpers import GrpcEndpoint, build_grpc_channel_options
from dapr.proto import api_service_v1, api_v1, common_v1
from dapr.proto.runtime.v1.dapr_pb2 import UnsubscribeConfigurationResponse
from dapr.version import __version__
Expand Down Expand Up @@ -146,11 +146,9 @@ def __init__(

useragent = f'dapr-sdk-python/{__version__}'
if not max_grpc_message_length:
options = [
('grpc.primary_user_agent', useragent),
]
base_options = [('grpc.primary_user_agent', useragent)]
else:
options = [
base_options = [
('grpc.max_send_message_length', max_grpc_message_length), # type: ignore
('grpc.max_receive_message_length', max_grpc_message_length), # type: ignore
('grpc.primary_user_agent', useragent),
Expand All @@ -166,6 +164,9 @@ def __init__(
except ValueError as error:
raise DaprInternalError(f'{error}') from error

# Merge standard + keepalive + retry options
options = build_grpc_channel_options(base_options)

if self._uri.tls:
self._channel = grpc.secure_channel( # type: ignore
self._uri.endpoint,
Expand Down
26 changes: 23 additions & 3 deletions dapr/conf/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,7 @@ def __init__(self):
default_value = getattr(global_settings, setting)
env_variable = os.environ.get(setting)
if env_variable:
val = (
type(default_value)(env_variable) if default_value is not None else env_variable
)
val = self._coerce_env_value(default_value, env_variable)
setattr(self, setting, val)
else:
setattr(self, setting, default_value)
Expand All @@ -36,5 +34,27 @@ def __getattr__(self, name):
raise AttributeError(f"'{self.__class__.__name__}' object has no attribute '{name}'")
return getattr(self, name)

@staticmethod
def _coerce_env_value(default_value, env_variable: str):
if default_value is None:
return env_variable
# Handle booleans explicitly to avoid bool('false') == True
if isinstance(default_value, bool):
s = env_variable.strip().lower()
if s in ('1', 'true', 't', 'yes', 'y', 'on'):
return True
if s in ('0', 'false', 'f', 'no', 'n', 'off'):
return False
# Fallback: non-empty -> True for backward-compat
return bool(s)
# Integers
if isinstance(default_value, int) and not isinstance(default_value, bool):
return int(env_variable)
# Floats
if isinstance(default_value, float):
return float(env_variable)
# Other types: try to cast as before
return type(default_value)(env_variable)


settings = Settings()
17 changes: 17 additions & 0 deletions dapr/conf/global_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,23 @@

DAPR_HTTP_TIMEOUT_SECONDS = 60

# gRPC keepalive (disabled by default; enable via env to help with idle debugging sessions)
DAPR_GRPC_KEEPALIVE_ENABLED: bool = False
DAPR_GRPC_KEEPALIVE_TIME_MS: int = 120000 # send keepalive pings every 120s
DAPR_GRPC_KEEPALIVE_TIMEOUT_MS: int = (
20000 # wait 20s for ack before considering the connection dead
)
DAPR_GRPC_KEEPALIVE_PERMIT_WITHOUT_CALLS: bool = False # allow pings when there are no active calls

# gRPC retries (disabled by default; enable via env to apply channel service config)
DAPR_GRPC_RETRY_ENABLED: bool = False
DAPR_GRPC_RETRY_MAX_ATTEMPTS: int = 4
DAPR_GRPC_RETRY_INITIAL_BACKOFF_MS: int = 100
DAPR_GRPC_RETRY_MAX_BACKOFF_MS: int = 1000
DAPR_GRPC_RETRY_BACKOFF_MULTIPLIER: float = 2.0
# Comma-separated list of status codes, e.g., 'UNAVAILABLE,DEADLINE_EXCEEDED'
DAPR_GRPC_RETRY_CODES: str = 'UNAVAILABLE,DEADLINE_EXCEEDED'

# ----- Conversation API settings ------

# Configuration for handling large enums to avoid massive JSON schemas that can exceed LLM token limits
Expand Down
81 changes: 81 additions & 0 deletions dapr/conf/helpers.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,22 @@
"""
Copyright 2025 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""

import json
from urllib.parse import ParseResult, parse_qs, urlparse
from warnings import warn

from dapr.conf import settings


class URIParseConfig:
DEFAULT_SCHEME = 'dns'
Expand Down Expand Up @@ -189,3 +205,68 @@ def _validate_path_and_query(self) -> None:
f'query parameters are not supported for gRPC endpoints:'
f" '{self._parsed_url.query}'"
)


# ------------------------------
# gRPC channel options helpers
# ------------------------------


def get_grpc_keepalive_options():
"""Return a list of keepalive channel options if enabled, else empty list.

Options are tuples suitable for passing to grpc.{secure,insecure}_channel.
"""
if not settings.DAPR_GRPC_KEEPALIVE_ENABLED:
return []
return [
('grpc.keepalive_time_ms', int(settings.DAPR_GRPC_KEEPALIVE_TIME_MS)),
('grpc.keepalive_timeout_ms', int(settings.DAPR_GRPC_KEEPALIVE_TIMEOUT_MS)),
(
'grpc.keepalive_permit_without_calls',
1 if settings.DAPR_GRPC_KEEPALIVE_PERMIT_WITHOUT_CALLS else 0,
),
]


def get_grpc_retry_service_config_option():
"""Return ('grpc.service_config', json) option if retry is enabled, else None.

Applies a universal retry policy via gRPC service config.
"""
if not getattr(settings, 'DAPR_GRPC_RETRY_ENABLED', False):
return None
retry_policy = {
'maxAttempts': int(settings.DAPR_GRPC_RETRY_MAX_ATTEMPTS),
'initialBackoff': f'{int(settings.DAPR_GRPC_RETRY_INITIAL_BACKOFF_MS) / 1000.0}s',
'maxBackoff': f'{int(settings.DAPR_GRPC_RETRY_MAX_BACKOFF_MS) / 1000.0}s',
'backoffMultiplier': float(settings.DAPR_GRPC_RETRY_BACKOFF_MULTIPLIER),
'retryableStatusCodes': [
c.strip() for c in str(settings.DAPR_GRPC_RETRY_CODES).split(',') if c.strip()
],
}
service_config = {
'methodConfig': [
{
'name': [{'service': ''}], # apply to all services
'retryPolicy': retry_policy,
}
]
}
return ('grpc.service_config', json.dumps(service_config))


def build_grpc_channel_options(base_options=None):
"""Combine base options with keepalive and retry policy options.

Args:
base_options: optional iterable of (key, value) tuples.
Returns:
list of (key, value) tuples.
"""
options = list(base_options or [])
options.extend(get_grpc_keepalive_options())
retry_opt = get_grpc_retry_service_config_option()
if retry_opt is not None:
options.append(retry_opt)
return options
2 changes: 2 additions & 0 deletions dev-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ mypy>=1.2.0
mypy-extensions>=0.4.3
mypy-protobuf>=2.9
tox>=4.3.0
pip>=23.0.0
coverage>=5.3
pytest
wheel
# used in unit test only
opentelemetry-sdk
Expand Down
1 change: 1 addition & 0 deletions examples/grpc_proxying/invoke-receiver.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import json
import logging

import grpc
Expand Down
15 changes: 15 additions & 0 deletions examples/workflow-async/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# Dapr Workflow Async Examples (Python)

These examples mirror `examples/workflow/` but author orchestrators with `async def` using the
async workflow APIs. Activities remain regular functions unless noted.

How to run:
- Ensure a Dapr sidecar is running locally. If needed, set `DURABLETASK_GRPC_ENDPOINT`, or
`DURABLETASK_GRPC_HOST/PORT`.
- Install requirements: `pip install -r requirements.txt`
- Run any example: `python simple.py`

Notes:
- Orchestrators use `await ctx.activity(...)`, `await ctx.sleep(...)`, `await ctx.when_all/when_any(...)`, etc.
- No event loop is started manually; the Durable Task worker drives the async orchestrators.
- You can also launch instances using `DaprWorkflowClient` as in the non-async examples.
47 changes: 47 additions & 0 deletions examples/workflow-async/child_workflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# -*- coding: utf-8 -*-

"""
Copyright 2025 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the specific language governing permissions and
limitations under the License.
"""

from dapr.ext.workflow import (
AsyncWorkflowContext,
DaprWorkflowClient,
WorkflowRuntime,
)

wfr = WorkflowRuntime()


@wfr.async_workflow(name='child_async')
async def child(ctx: AsyncWorkflowContext, n: int) -> int:
return n * 2


@wfr.async_workflow(name='parent_async')
async def parent(ctx: AsyncWorkflowContext, n: int) -> int:
r = await ctx.call_child_workflow(child, input=n)
print(f'Child workflow returned {r}')
return r + 1


def main():
wfr.start()
client = DaprWorkflowClient()
instance_id = 'parent_async_instance'
client.schedule_new_workflow(workflow=parent, input=5, instance_id=instance_id)
client.wait_for_workflow_completion(instance_id, timeout_in_seconds=60)
wfr.shutdown()


if __name__ == '__main__':
main()
49 changes: 49 additions & 0 deletions examples/workflow-async/fan_out_fan_in.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# -*- coding: utf-8 -*-
"""
Copyright 2025 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the specific language governing permissions and
limitations under the License.
"""

from dapr.ext.workflow import (
AsyncWorkflowContext,
DaprWorkflowClient,
WorkflowActivityContext,
WorkflowRuntime,
)

wfr = WorkflowRuntime()


@wfr.activity(name='square')
def square(ctx: WorkflowActivityContext, x: int) -> int:
return x * x


@wfr.async_workflow(name='fan_out_fan_in_async')
async def orchestrator(ctx: AsyncWorkflowContext):
tasks = [ctx.call_activity(square, input=i) for i in range(1, 6)]
results = await ctx.when_all(tasks)
total = sum(results)
return total


def main():
wfr.start()
client = DaprWorkflowClient()
instance_id = 'fofi_async'
client.schedule_new_workflow(workflow=orchestrator, instance_id=instance_id)
wf_state = client.wait_for_workflow_completion(instance_id, timeout_in_seconds=60)
print(f'Workflow state: {wf_state}')
wfr.shutdown()


if __name__ == '__main__':
main()
47 changes: 47 additions & 0 deletions examples/workflow-async/human_approval.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# -*- coding: utf-8 -*-

"""
Copyright 2025 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the specific language governing permissions and
limitations under the License.
"""

from dapr.ext.workflow import AsyncWorkflowContext, DaprWorkflowClient, WorkflowRuntime

wfr = WorkflowRuntime()


@wfr.async_workflow(name='human_approval_async')
async def orchestrator(ctx: AsyncWorkflowContext, request_id: str):
decision = await ctx.when_any(
[
ctx.wait_for_external_event(f'approve:{request_id}'),
ctx.wait_for_external_event(f'reject:{request_id}'),
ctx.create_timer(300.0),
]
)
if isinstance(decision, dict) and decision.get('approved'):
return 'APPROVED'
if isinstance(decision, dict) and decision.get('rejected'):
return 'REJECTED'
return 'TIMEOUT'


def main():
wfr.start()
client = DaprWorkflowClient()
instance_id = 'human_approval_async_1'
client.schedule_new_workflow(workflow=orchestrator, input='REQ-1', instance_id=instance_id)
# In a real scenario, raise approve/reject event from another service.
wfr.shutdown()


if __name__ == '__main__':
main()
2 changes: 2 additions & 0 deletions examples/workflow-async/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
dapr-ext-workflow-dev>=1.15.0.dev
dapr-dev>=1.15.0.dev
Loading