Skip to content

Commit 42285e3

Browse files
committed
add grpc helpers
Signed-off-by: Filinto Duran <[email protected]>
1 parent 8e17bff commit 42285e3

File tree

3 files changed

+111
-6
lines changed

3 files changed

+111
-6
lines changed

dapr/conf/__init__.py

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,7 @@ def __init__(self):
2424
default_value = getattr(global_settings, setting)
2525
env_variable = os.environ.get(setting)
2626
if env_variable:
27-
val = (
28-
type(default_value)(env_variable) if default_value is not None else env_variable
29-
)
27+
val = self._coerce_env_value(default_value, env_variable)
3028
setattr(self, setting, val)
3129
else:
3230
setattr(self, setting, default_value)
@@ -36,5 +34,27 @@ def __getattr__(self, name):
3634
raise AttributeError(f"'{self.__class__.__name__}' object has no attribute '{name}'")
3735
return getattr(self, name)
3836

37+
@staticmethod
38+
def _coerce_env_value(default_value, env_variable: str):
39+
if default_value is None:
40+
return env_variable
41+
# Handle booleans explicitly to avoid bool('false') == True
42+
if isinstance(default_value, bool):
43+
s = env_variable.strip().lower()
44+
if s in ('1', 'true', 't', 'yes', 'y', 'on'):
45+
return True
46+
if s in ('0', 'false', 'f', 'no', 'n', 'off'):
47+
return False
48+
# Fallback: non-empty -> True for backward-compat
49+
return bool(s)
50+
# Integers
51+
if isinstance(default_value, int) and not isinstance(default_value, bool):
52+
return int(env_variable)
53+
# Floats
54+
if isinstance(default_value, float):
55+
return float(env_variable)
56+
# Other types: try to cast as before
57+
return type(default_value)(env_variable)
58+
3959

4060
settings = Settings()

dapr/conf/global_settings.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,23 @@
3434

3535
DAPR_HTTP_TIMEOUT_SECONDS = 60
3636

37+
# gRPC keepalive (disabled by default; enable via env to help with idle debugging sessions)
38+
DAPR_GRPC_KEEPALIVE_ENABLED: bool = False
39+
DAPR_GRPC_KEEPALIVE_TIME_MS: int = 120000 # send keepalive pings every 120s
40+
DAPR_GRPC_KEEPALIVE_TIMEOUT_MS: int = (
41+
20000 # wait 20s for ack before considering the connection dead
42+
)
43+
DAPR_GRPC_KEEPALIVE_PERMIT_WITHOUT_CALLS: bool = False # allow pings when there are no active calls
44+
45+
# gRPC retries (disabled by default; enable via env to apply channel service config)
46+
DAPR_GRPC_RETRY_ENABLED: bool = False
47+
DAPR_GRPC_RETRY_MAX_ATTEMPTS: int = 4
48+
DAPR_GRPC_RETRY_INITIAL_BACKOFF_MS: int = 100
49+
DAPR_GRPC_RETRY_MAX_BACKOFF_MS: int = 1000
50+
DAPR_GRPC_RETRY_BACKOFF_MULTIPLIER: float = 2.0
51+
# Comma-separated list of status codes, e.g., 'UNAVAILABLE,DEADLINE_EXCEEDED'
52+
DAPR_GRPC_RETRY_CODES: str = 'UNAVAILABLE,DEADLINE_EXCEEDED'
53+
3754
# ----- Conversation API settings ------
3855

3956
# Configuration for handling large enums to avoid massive JSON schemas that can exceed LLM token limits

dapr/conf/helpers.py

Lines changed: 71 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
1+
import json
12
from urllib.parse import ParseResult, parse_qs, urlparse
23
from warnings import warn
34

5+
from dapr.conf import settings
6+
47

58
class URIParseConfig:
69
DEFAULT_SCHEME = 'dns'
@@ -126,9 +129,9 @@ def _preprocess_uri(self, url: str) -> str:
126129
# A URI like dns:mydomain:5000 or vsock:mycid:5000 was used
127130
url = url.replace(':', '://', 1)
128131
elif (
129-
len(url_list) >= 2
130-
and '://' not in url
131-
and url_list[0] in URIParseConfig.ACCEPTED_SCHEMES
132+
len(url_list) >= 2
133+
and '://' not in url
134+
and url_list[0] in URIParseConfig.ACCEPTED_SCHEMES
132135
):
133136
# A URI like dns:mydomain or dns:[2001:db8:1f70::999:de8:7648:6e8]:mydomain was used
134137
# Possibly a URI like dns:[2001:db8:1f70::999:de8:7648:6e8]:mydomain was used
@@ -189,3 +192,68 @@ def _validate_path_and_query(self) -> None:
189192
f'query parameters are not supported for gRPC endpoints:'
190193
f" '{self._parsed_url.query}'"
191194
)
195+
196+
197+
# ------------------------------
198+
# gRPC channel options helpers
199+
# ------------------------------
200+
201+
202+
def get_grpc_keepalive_options():
203+
"""Return a list of keepalive channel options if enabled, else empty list.
204+
205+
Options are tuples suitable for passing to grpc.{secure,insecure}_channel.
206+
"""
207+
if not settings.DAPR_GRPC_KEEPALIVE_ENABLED:
208+
return []
209+
return [
210+
('grpc.keepalive_time_ms', int(settings.DAPR_GRPC_KEEPALIVE_TIME_MS)),
211+
('grpc.keepalive_timeout_ms', int(settings.DAPR_GRPC_KEEPALIVE_TIMEOUT_MS)),
212+
(
213+
'grpc.keepalive_permit_without_calls',
214+
1 if settings.DAPR_GRPC_KEEPALIVE_PERMIT_WITHOUT_CALLS else 0,
215+
),
216+
]
217+
218+
219+
def get_grpc_retry_service_config_option():
220+
"""Return ('grpc.service_config', json) option if retry is enabled, else None.
221+
222+
Applies a universal retry policy via gRPC service config.
223+
"""
224+
if not getattr(settings, 'DAPR_GRPC_RETRY_ENABLED', False):
225+
return None
226+
retry_policy = {
227+
'maxAttempts': int(settings.DAPR_GRPC_RETRY_MAX_ATTEMPTS),
228+
'initialBackoff': f'{int(settings.DAPR_GRPC_RETRY_INITIAL_BACKOFF_MS) / 1000.0}s',
229+
'maxBackoff': f'{int(settings.DAPR_GRPC_RETRY_MAX_BACKOFF_MS) / 1000.0}s',
230+
'backoffMultiplier': float(settings.DAPR_GRPC_RETRY_BACKOFF_MULTIPLIER),
231+
'retryableStatusCodes': [
232+
c.strip() for c in str(settings.DAPR_GRPC_RETRY_CODES).split(',') if c.strip()
233+
],
234+
}
235+
service_config = {
236+
'methodConfig': [
237+
{
238+
'name': [{'service': ''}], # apply to all services
239+
'retryPolicy': retry_policy,
240+
}
241+
]
242+
}
243+
return ('grpc.service_config', json.dumps(service_config))
244+
245+
246+
def build_grpc_channel_options(base_options=None):
247+
"""Combine base options with keepalive and retry policy options.
248+
249+
Args:
250+
base_options: optional iterable of (key, value) tuples.
251+
Returns:
252+
list of (key, value) tuples.
253+
"""
254+
options = list(base_options or [])
255+
options.extend(get_grpc_keepalive_options())
256+
retry_opt = get_grpc_retry_service_config_option()
257+
if retry_opt is not None:
258+
options.append(retry_opt)
259+
return options

0 commit comments

Comments
 (0)