Skip to content

Commit d799b6a

Browse files
haorenfsaclaude
andauthored
enhance: optimize gRPC keepalive defaults and support user-configurable grpc_options (#3258)
## Summary - Optimize gRPC keepalive defaults: `keepalive_time_ms` 55s→10s, add `keepalive_timeout_ms` 5s, enable `keepalive_permit_without_calls` - Support user-configurable `grpc_options` dict parameter that merges with and can override defaults - Apply changes to both sync `GrpcHandler` and async `AsyncGrpcHandler` ## Motivation In environments with network instability (e.g., GCP Private Service Connect with NAT), the previous 55s keepalive interval was too slow to detect dead connections. Testing showed that 10s keepalive detects connection-level failures ~45s faster, enabling gRPC to automatically reconnect before user requests time out. Users can now also pass custom `grpc_options` to fine-tune channel behavior: ```python client = MilvusClient( uri='https://...', grpc_options={ 'grpc.keepalive_time_ms': 3000, 'grpc.keepalive_timeout_ms': 3000, }, ) ``` ## Test plan - [x] Unit tests for default keepalive values (sync + async) - [x] Unit tests for user override of keepalive options - [x] Unit tests for user adding new gRPC options - [x] Unit tests for secure channel keepalive options 🤖 Generated with [Claude Code](https://claude.com/claude-code) --------- Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
1 parent 2a2e898 commit d799b6a

File tree

3 files changed

+196
-12
lines changed

3 files changed

+196
-12
lines changed

pymilvus/client/async_grpc_handler.py

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ def __init__(
8484
self._address = addr if addr is not None else self.__get_address(uri, host, port)
8585
self._log_level = None
8686
self._user = kwargs.get("user")
87+
self._grpc_options = kwargs.get("grpc_options", {})
8788
self._set_authorization(**kwargs)
8889
self._setup_grpc_channel(**kwargs)
8990
self._is_channel_ready = False
@@ -142,12 +143,18 @@ def _setup_authorization_interceptor(self, user: str, password: str, token: str)
142143

143144
def _setup_grpc_channel(self, **kwargs):
144145
if self._async_channel is None:
145-
opts = [
146-
(cygrpc.ChannelArgKey.max_send_message_length, -1),
147-
(cygrpc.ChannelArgKey.max_receive_message_length, -1),
148-
("grpc.enable_retries", 1),
149-
("grpc.keepalive_time_ms", 55000),
150-
]
146+
# Default gRPC options
147+
default_opts = {
148+
cygrpc.ChannelArgKey.max_send_message_length: -1,
149+
cygrpc.ChannelArgKey.max_receive_message_length: -1,
150+
"grpc.enable_retries": 1,
151+
"grpc.keepalive_time_ms": 10000,
152+
"grpc.keepalive_timeout_ms": 5000,
153+
"grpc.keepalive_permit_without_calls": True,
154+
}
155+
# Merge user-provided options (user options override defaults)
156+
default_opts.update(self._grpc_options)
157+
opts = list(default_opts.items())
151158
if not self._secure:
152159
self._async_channel = grpc.aio.insecure_channel(
153160
self._address,

pymilvus/client/grpc_handler.py

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,7 @@ def __init__(
169169
self._log_level = None
170170
self._user = kwargs.get("user")
171171
self._server_info_cache = None
172+
self._grpc_options = kwargs.get("grpc_options", {})
172173
self._set_authorization(**kwargs)
173174
self._setup_grpc_channel()
174175
self.callbacks = []
@@ -318,12 +319,18 @@ def _setup_authorization_interceptor(self, user: str, password: str, token: str)
318319
def _setup_grpc_channel(self):
319320
"""Create a ddl grpc channel"""
320321
if self._channel is None:
321-
opts = [
322-
(cygrpc.ChannelArgKey.max_send_message_length, -1),
323-
(cygrpc.ChannelArgKey.max_receive_message_length, -1),
324-
("grpc.enable_retries", 1),
325-
("grpc.keepalive_time_ms", 55000),
326-
]
322+
# Default gRPC options
323+
default_opts = {
324+
cygrpc.ChannelArgKey.max_send_message_length: -1,
325+
cygrpc.ChannelArgKey.max_receive_message_length: -1,
326+
"grpc.enable_retries": 1,
327+
"grpc.keepalive_time_ms": 10000,
328+
"grpc.keepalive_timeout_ms": 5000,
329+
"grpc.keepalive_permit_without_calls": True,
330+
}
331+
# Merge user-provided options (user options override defaults)
332+
default_opts.update(self._grpc_options)
333+
opts = list(default_opts.items())
327334
if not self._secure:
328335
self._channel = grpc.insecure_channel(
329336
self._address,

tests/test_grpc_channel_options.py

Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,170 @@
1+
"""
2+
Tests for gRPC channel options: keepalive defaults and user-configurable grpc_options.
3+
"""
4+
5+
from unittest.mock import MagicMock, patch
6+
7+
from grpc._cython import cygrpc
8+
from pymilvus.client.async_grpc_handler import AsyncGrpcHandler
9+
from pymilvus.client.grpc_handler import GrpcHandler
10+
11+
12+
class TestGrpcHandlerChannelOptions:
13+
"""Tests for GrpcHandler gRPC channel options."""
14+
15+
def test_default_grpc_options_empty(self):
16+
"""grpc_options defaults to empty dict when not provided."""
17+
handler = GrpcHandler(channel=MagicMock())
18+
assert handler._grpc_options == {}
19+
20+
def test_custom_grpc_options_stored(self):
21+
"""User-provided grpc_options are stored on the handler."""
22+
custom_opts = {"grpc.keepalive_time_ms": 3000}
23+
handler = GrpcHandler(channel=MagicMock(), grpc_options=custom_opts)
24+
assert handler._grpc_options == custom_opts
25+
26+
@patch("pymilvus.client.grpc_handler.grpc.insecure_channel")
27+
def test_default_keepalive_values(self, mock_insecure_channel):
28+
"""Default channel options include optimized keepalive settings."""
29+
mock_insecure_channel.return_value = MagicMock()
30+
31+
handler = GrpcHandler.__new__(GrpcHandler)
32+
handler._channel = None
33+
handler._secure = False
34+
handler._grpc_options = {}
35+
handler._address = "localhost:19530"
36+
handler._log_level = None
37+
handler._authorization_interceptor = None
38+
handler._setup_grpc_channel()
39+
40+
mock_insecure_channel.assert_called_once()
41+
_, call_kwargs = mock_insecure_channel.call_args
42+
opts = dict(call_kwargs["options"])
43+
44+
assert opts["grpc.keepalive_time_ms"] == 10000
45+
assert opts["grpc.keepalive_timeout_ms"] == 5000
46+
assert opts["grpc.keepalive_permit_without_calls"] is True
47+
assert opts["grpc.enable_retries"] == 1
48+
assert opts[cygrpc.ChannelArgKey.max_send_message_length] == -1
49+
assert opts[cygrpc.ChannelArgKey.max_receive_message_length] == -1
50+
51+
@patch("pymilvus.client.grpc_handler.grpc.insecure_channel")
52+
def test_user_override_keepalive(self, mock_insecure_channel):
53+
"""User-provided grpc_options override default keepalive values."""
54+
mock_insecure_channel.return_value = MagicMock()
55+
56+
handler = GrpcHandler.__new__(GrpcHandler)
57+
handler._channel = None
58+
handler._secure = False
59+
handler._grpc_options = {"grpc.keepalive_time_ms": 3000, "grpc.keepalive_timeout_ms": 3000}
60+
handler._address = "localhost:19530"
61+
handler._log_level = None
62+
handler._authorization_interceptor = None
63+
handler._setup_grpc_channel()
64+
65+
_, call_kwargs = mock_insecure_channel.call_args
66+
opts = dict(call_kwargs["options"])
67+
68+
assert opts["grpc.keepalive_time_ms"] == 3000
69+
assert opts["grpc.keepalive_timeout_ms"] == 3000
70+
# Non-overridden defaults remain
71+
assert opts["grpc.keepalive_permit_without_calls"] is True
72+
73+
@patch("pymilvus.client.grpc_handler.grpc.insecure_channel")
74+
def test_user_adds_new_option(self, mock_insecure_channel):
75+
"""User can add new gRPC options not in the defaults."""
76+
mock_insecure_channel.return_value = MagicMock()
77+
78+
handler = GrpcHandler.__new__(GrpcHandler)
79+
handler._channel = None
80+
handler._secure = False
81+
handler._grpc_options = {"grpc.http2.max_pings_without_data": 0}
82+
handler._address = "localhost:19530"
83+
handler._log_level = None
84+
handler._authorization_interceptor = None
85+
handler._setup_grpc_channel()
86+
87+
_, call_kwargs = mock_insecure_channel.call_args
88+
opts = dict(call_kwargs["options"])
89+
90+
assert opts["grpc.http2.max_pings_without_data"] == 0
91+
# Defaults still present
92+
assert opts["grpc.keepalive_time_ms"] == 10000
93+
94+
@patch("pymilvus.client.grpc_handler.grpc.secure_channel")
95+
def test_secure_channel_has_keepalive(self, mock_secure_channel):
96+
"""Secure channel also receives keepalive options."""
97+
mock_secure_channel.return_value = MagicMock()
98+
99+
handler = GrpcHandler.__new__(GrpcHandler)
100+
handler._channel = None
101+
handler._secure = True
102+
handler._server_name = ""
103+
handler._server_pem_path = ""
104+
handler._client_pem_path = ""
105+
handler._client_key_path = ""
106+
handler._ca_pem_path = ""
107+
handler._grpc_options = {}
108+
handler._address = "localhost:19530"
109+
handler._log_level = None
110+
handler._authorization_interceptor = None
111+
handler._setup_grpc_channel()
112+
113+
_, call_kwargs = mock_secure_channel.call_args
114+
opts = dict(call_kwargs["options"])
115+
116+
assert opts["grpc.keepalive_time_ms"] == 10000
117+
assert opts["grpc.keepalive_timeout_ms"] == 5000
118+
assert opts["grpc.keepalive_permit_without_calls"] is True
119+
120+
121+
class TestAsyncGrpcHandlerChannelOptions:
122+
"""Tests for AsyncGrpcHandler gRPC channel options."""
123+
124+
def test_async_default_grpc_options_empty(self):
125+
"""grpc_options defaults to empty dict when not provided."""
126+
handler = AsyncGrpcHandler.__new__(AsyncGrpcHandler)
127+
handler._async_channel = MagicMock()
128+
handler._grpc_options = {}
129+
assert handler._grpc_options == {}
130+
131+
@patch("pymilvus.client.async_grpc_handler.grpc.aio.insecure_channel")
132+
def test_async_default_keepalive_values(self, mock_insecure_channel):
133+
"""Async handler default channel options include optimized keepalive settings."""
134+
mock_insecure_channel.return_value = MagicMock()
135+
136+
handler = self._create_handler()
137+
handler._grpc_options = {}
138+
handler._setup_grpc_channel()
139+
140+
_, call_kwargs = mock_insecure_channel.call_args
141+
opts = dict(call_kwargs["options"])
142+
143+
assert opts["grpc.keepalive_time_ms"] == 10000
144+
assert opts["grpc.keepalive_timeout_ms"] == 5000
145+
assert opts["grpc.keepalive_permit_without_calls"] is True
146+
147+
@patch("pymilvus.client.async_grpc_handler.grpc.aio.insecure_channel")
148+
def test_async_user_override(self, mock_insecure_channel):
149+
"""User-provided grpc_options override defaults in async handler."""
150+
mock_insecure_channel.return_value = MagicMock()
151+
152+
handler = self._create_handler()
153+
handler._grpc_options = {"grpc.keepalive_time_ms": 3000}
154+
handler._setup_grpc_channel()
155+
156+
_, call_kwargs = mock_insecure_channel.call_args
157+
opts = dict(call_kwargs["options"])
158+
159+
assert opts["grpc.keepalive_time_ms"] == 3000
160+
assert opts["grpc.keepalive_timeout_ms"] == 5000
161+
162+
def _create_handler(self):
163+
handler = AsyncGrpcHandler.__new__(AsyncGrpcHandler)
164+
handler._async_channel = None
165+
handler._secure = False
166+
handler._grpc_options = {}
167+
handler._address = "localhost:19530"
168+
handler._log_level = None
169+
handler._async_authorization_interceptor = None
170+
return handler

0 commit comments

Comments
 (0)