diff --git a/sdks/python/apache_beam/runners/worker/channel_factory.py b/sdks/python/apache_beam/runners/worker/channel_factory.py index 6ad0f7235e9d..afb4d182cabd 100644 --- a/sdks/python/apache_beam/runners/worker/channel_factory.py +++ b/sdks/python/apache_beam/runners/worker/channel_factory.py @@ -23,8 +23,14 @@ class GRPCChannelFactory(grpc.StreamStreamClientInterceptor): DEFAULT_OPTIONS = [ - ("grpc.keepalive_time_ms", 20000), - ("grpc.keepalive_timeout_ms", 300000), + # Setting keepalive_time_ms is needed for other options to work. + ("grpc.keepalive_time_ms", 20_000), + # Default: 20s. Increasing to 5 min. + ("grpc.keepalive_timeout_ms", 300_000), + # Default: 2, set to 0 to allow unlimited pings without data + ("grpc.http2.max_pings_without_data", 0), + # Default: False, set to True to allow keepalive pings when no calls + ("grpc.keepalive_permit_without_calls", True), ] def __init__(self): diff --git a/sdks/python/apache_beam/utils/subprocess_server.py b/sdks/python/apache_beam/utils/subprocess_server.py index 7fb692e66ea7..ff1a0d9c46aa 100644 --- a/sdks/python/apache_beam/utils/subprocess_server.py +++ b/sdks/python/apache_beam/utils/subprocess_server.py @@ -185,8 +185,20 @@ def start(self): try: process, endpoint = self.start_process() wait_secs = .1 - channel_options = [("grpc.max_receive_message_length", -1), - ("grpc.max_send_message_length", -1)] + channel_options = [ + ("grpc.max_receive_message_length", -1), + ("grpc.max_send_message_length", -1), + # Default: 20000ms (20s), increased to 10 minutes for stability + ("grpc.keepalive_timeout_ms", 600_000), + # Default: 2, set to 0 to allow unlimited pings without data + ("grpc.http2.max_pings_without_data", 0), + # Default: False, set to True to allow keepalive pings when no calls + ("grpc.keepalive_permit_without_calls", True), + # Default: 2, set to 0 to allow unlimited ping strikes + ("grpc.http2.max_ping_strikes", 0), + # Default: 0 (disabled), enable socket reuse for better handling + ("grpc.so_reuseport", 1), + ] self._grpc_channel = grpc.insecure_channel( endpoint, options=channel_options) channel_ready = grpc.channel_ready_future(self._grpc_channel)