Skip to content

Commit bc8c707

Browse files
pymilvus-bothaorenfsaclaude
authored
[Backport 2.6] enhance: optimize gRPC keepalive defaults and support user-configurable grpc_options (#3258) (#3259)
Backport of #3258 to `2.6`. Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com> Co-authored-by: shaoyue <haorenfsa@gmail.com> Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
1 parent 111eae3 commit bc8c707

File tree

3 files changed

+196
-12
lines changed

3 files changed

+196
-12
lines changed

pymilvus/client/async_grpc_handler.py

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ def __init__(
8282
self._address = addr if addr is not None else self.__get_address(uri, host, port)
8383
self._log_level = None
8484
self._user = kwargs.get("user")
85+
self._grpc_options = kwargs.get("grpc_options", {})
8586
self._set_authorization(**kwargs)
8687
self._setup_grpc_channel(**kwargs)
8788
self._is_channel_ready = False
@@ -140,12 +141,18 @@ def _setup_authorization_interceptor(self, user: str, password: str, token: str)
140141

141142
def _setup_grpc_channel(self, **kwargs):
142143
if self._async_channel is None:
143-
opts = [
144-
(cygrpc.ChannelArgKey.max_send_message_length, -1),
145-
(cygrpc.ChannelArgKey.max_receive_message_length, -1),
146-
("grpc.enable_retries", 1),
147-
("grpc.keepalive_time_ms", 55000),
148-
]
144+
# Default gRPC options
145+
default_opts = {
146+
cygrpc.ChannelArgKey.max_send_message_length: -1,
147+
cygrpc.ChannelArgKey.max_receive_message_length: -1,
148+
"grpc.enable_retries": 1,
149+
"grpc.keepalive_time_ms": 10000,
150+
"grpc.keepalive_timeout_ms": 5000,
151+
"grpc.keepalive_permit_without_calls": True,
152+
}
153+
# Merge user-provided options (user options override defaults)
154+
default_opts.update(self._grpc_options)
155+
opts = list(default_opts.items())
149156
if not self._secure:
150157
self._async_channel = grpc.aio.insecure_channel(
151158
self._address,

pymilvus/client/grpc_handler.py

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,7 @@ def __init__(
160160
self._log_level = None
161161
self._user = kwargs.get("user")
162162
self._server_info_cache = None
163+
self._grpc_options = kwargs.get("grpc_options", {})
163164
self._set_authorization(**kwargs)
164165
self._setup_grpc_channel()
165166
self.callbacks = []
@@ -263,12 +264,18 @@ def _setup_authorization_interceptor(self, user: str, password: str, token: str)
263264
def _setup_grpc_channel(self):
264265
"""Create a ddl grpc channel"""
265266
if self._channel is None:
266-
opts = [
267-
(cygrpc.ChannelArgKey.max_send_message_length, -1),
268-
(cygrpc.ChannelArgKey.max_receive_message_length, -1),
269-
("grpc.enable_retries", 1),
270-
("grpc.keepalive_time_ms", 55000),
271-
]
267+
# Default gRPC options
268+
default_opts = {
269+
cygrpc.ChannelArgKey.max_send_message_length: -1,
270+
cygrpc.ChannelArgKey.max_receive_message_length: -1,
271+
"grpc.enable_retries": 1,
272+
"grpc.keepalive_time_ms": 10000,
273+
"grpc.keepalive_timeout_ms": 5000,
274+
"grpc.keepalive_permit_without_calls": True,
275+
}
276+
# Merge user-provided options (user options override defaults)
277+
default_opts.update(self._grpc_options)
278+
opts = list(default_opts.items())
272279
if not self._secure:
273280
self._channel = grpc.insecure_channel(
274281
self._address,

tests/test_grpc_channel_options.py

Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,170 @@
1+
"""
2+
Tests for gRPC channel options: keepalive defaults and user-configurable grpc_options.
3+
"""
4+
5+
from unittest.mock import MagicMock, patch
6+
7+
from grpc._cython import cygrpc
8+
from pymilvus.client.async_grpc_handler import AsyncGrpcHandler
9+
from pymilvus.client.grpc_handler import GrpcHandler
10+
11+
12+
class TestGrpcHandlerChannelOptions:
13+
"""Tests for GrpcHandler gRPC channel options."""
14+
15+
def test_default_grpc_options_empty(self):
16+
"""grpc_options defaults to empty dict when not provided."""
17+
handler = GrpcHandler(channel=MagicMock())
18+
assert handler._grpc_options == {}
19+
20+
def test_custom_grpc_options_stored(self):
21+
"""User-provided grpc_options are stored on the handler."""
22+
custom_opts = {"grpc.keepalive_time_ms": 3000}
23+
handler = GrpcHandler(channel=MagicMock(), grpc_options=custom_opts)
24+
assert handler._grpc_options == custom_opts
25+
26+
@patch("pymilvus.client.grpc_handler.grpc.insecure_channel")
27+
def test_default_keepalive_values(self, mock_insecure_channel):
28+
"""Default channel options include optimized keepalive settings."""
29+
mock_insecure_channel.return_value = MagicMock()
30+
31+
handler = GrpcHandler.__new__(GrpcHandler)
32+
handler._channel = None
33+
handler._secure = False
34+
handler._grpc_options = {}
35+
handler._address = "localhost:19530"
36+
handler._log_level = None
37+
handler._authorization_interceptor = None
38+
handler._setup_grpc_channel()
39+
40+
mock_insecure_channel.assert_called_once()
41+
_, call_kwargs = mock_insecure_channel.call_args
42+
opts = dict(call_kwargs["options"])
43+
44+
assert opts["grpc.keepalive_time_ms"] == 10000
45+
assert opts["grpc.keepalive_timeout_ms"] == 5000
46+
assert opts["grpc.keepalive_permit_without_calls"] is True
47+
assert opts["grpc.enable_retries"] == 1
48+
assert opts[cygrpc.ChannelArgKey.max_send_message_length] == -1
49+
assert opts[cygrpc.ChannelArgKey.max_receive_message_length] == -1
50+
51+
@patch("pymilvus.client.grpc_handler.grpc.insecure_channel")
52+
def test_user_override_keepalive(self, mock_insecure_channel):
53+
"""User-provided grpc_options override default keepalive values."""
54+
mock_insecure_channel.return_value = MagicMock()
55+
56+
handler = GrpcHandler.__new__(GrpcHandler)
57+
handler._channel = None
58+
handler._secure = False
59+
handler._grpc_options = {"grpc.keepalive_time_ms": 3000, "grpc.keepalive_timeout_ms": 3000}
60+
handler._address = "localhost:19530"
61+
handler._log_level = None
62+
handler._authorization_interceptor = None
63+
handler._setup_grpc_channel()
64+
65+
_, call_kwargs = mock_insecure_channel.call_args
66+
opts = dict(call_kwargs["options"])
67+
68+
assert opts["grpc.keepalive_time_ms"] == 3000
69+
assert opts["grpc.keepalive_timeout_ms"] == 3000
70+
# Non-overridden defaults remain
71+
assert opts["grpc.keepalive_permit_without_calls"] is True
72+
73+
@patch("pymilvus.client.grpc_handler.grpc.insecure_channel")
74+
def test_user_adds_new_option(self, mock_insecure_channel):
75+
"""User can add new gRPC options not in the defaults."""
76+
mock_insecure_channel.return_value = MagicMock()
77+
78+
handler = GrpcHandler.__new__(GrpcHandler)
79+
handler._channel = None
80+
handler._secure = False
81+
handler._grpc_options = {"grpc.http2.max_pings_without_data": 0}
82+
handler._address = "localhost:19530"
83+
handler._log_level = None
84+
handler._authorization_interceptor = None
85+
handler._setup_grpc_channel()
86+
87+
_, call_kwargs = mock_insecure_channel.call_args
88+
opts = dict(call_kwargs["options"])
89+
90+
assert opts["grpc.http2.max_pings_without_data"] == 0
91+
# Defaults still present
92+
assert opts["grpc.keepalive_time_ms"] == 10000
93+
94+
@patch("pymilvus.client.grpc_handler.grpc.secure_channel")
95+
def test_secure_channel_has_keepalive(self, mock_secure_channel):
96+
"""Secure channel also receives keepalive options."""
97+
mock_secure_channel.return_value = MagicMock()
98+
99+
handler = GrpcHandler.__new__(GrpcHandler)
100+
handler._channel = None
101+
handler._secure = True
102+
handler._server_name = ""
103+
handler._server_pem_path = ""
104+
handler._client_pem_path = ""
105+
handler._client_key_path = ""
106+
handler._ca_pem_path = ""
107+
handler._grpc_options = {}
108+
handler._address = "localhost:19530"
109+
handler._log_level = None
110+
handler._authorization_interceptor = None
111+
handler._setup_grpc_channel()
112+
113+
_, call_kwargs = mock_secure_channel.call_args
114+
opts = dict(call_kwargs["options"])
115+
116+
assert opts["grpc.keepalive_time_ms"] == 10000
117+
assert opts["grpc.keepalive_timeout_ms"] == 5000
118+
assert opts["grpc.keepalive_permit_without_calls"] is True
119+
120+
121+
class TestAsyncGrpcHandlerChannelOptions:
122+
"""Tests for AsyncGrpcHandler gRPC channel options."""
123+
124+
def test_async_default_grpc_options_empty(self):
125+
"""grpc_options defaults to empty dict when not provided."""
126+
handler = AsyncGrpcHandler.__new__(AsyncGrpcHandler)
127+
handler._async_channel = MagicMock()
128+
handler._grpc_options = {}
129+
assert handler._grpc_options == {}
130+
131+
@patch("pymilvus.client.async_grpc_handler.grpc.aio.insecure_channel")
132+
def test_async_default_keepalive_values(self, mock_insecure_channel):
133+
"""Async handler default channel options include optimized keepalive settings."""
134+
mock_insecure_channel.return_value = MagicMock()
135+
136+
handler = self._create_handler()
137+
handler._grpc_options = {}
138+
handler._setup_grpc_channel()
139+
140+
_, call_kwargs = mock_insecure_channel.call_args
141+
opts = dict(call_kwargs["options"])
142+
143+
assert opts["grpc.keepalive_time_ms"] == 10000
144+
assert opts["grpc.keepalive_timeout_ms"] == 5000
145+
assert opts["grpc.keepalive_permit_without_calls"] is True
146+
147+
@patch("pymilvus.client.async_grpc_handler.grpc.aio.insecure_channel")
148+
def test_async_user_override(self, mock_insecure_channel):
149+
"""User-provided grpc_options override defaults in async handler."""
150+
mock_insecure_channel.return_value = MagicMock()
151+
152+
handler = self._create_handler()
153+
handler._grpc_options = {"grpc.keepalive_time_ms": 3000}
154+
handler._setup_grpc_channel()
155+
156+
_, call_kwargs = mock_insecure_channel.call_args
157+
opts = dict(call_kwargs["options"])
158+
159+
assert opts["grpc.keepalive_time_ms"] == 3000
160+
assert opts["grpc.keepalive_timeout_ms"] == 5000
161+
162+
def _create_handler(self):
163+
handler = AsyncGrpcHandler.__new__(AsyncGrpcHandler)
164+
handler._async_channel = None
165+
handler._secure = False
166+
handler._grpc_options = {}
167+
handler._address = "localhost:19530"
168+
handler._log_level = None
169+
handler._async_authorization_interceptor = None
170+
return handler

0 commit comments

Comments
 (0)