Skip to content

Commit c470b47

Browse files
authored
Cherrypick #37013 #37021 to Release 2.70 (#37086)
* Fix too-many-pings on FnAPI runner under grpc mode (#37013) * Fix too-many-pings on FnAPI runner under grpc mode * Fix lints * Set keepalive policy for prism grpc server. (#37021)
1 parent 0de9914 commit c470b47

File tree

2 files changed

+14
-4
lines changed

2 files changed

+14
-4
lines changed

sdks/go/pkg/beam/runners/prism/internal/jobservices/server.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
jobpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/jobmanagement_v1"
3131
"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/worker"
3232
"google.golang.org/grpc"
33+
"google.golang.org/grpc/keepalive"
3334
)
3435

3536
type Server struct {
@@ -80,6 +81,10 @@ func NewServer(port int, execute func(*Job)) *Server {
8081
s.logger.Info("Serving JobManagement", slog.String("endpoint", s.Endpoint()))
8182
opts := []grpc.ServerOption{
8283
grpc.MaxRecvMsgSize(math.MaxInt32),
84+
grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
85+
MinTime: 20 * time.Second, // Minimum duration a client should wait before sending a keepalive ping
86+
PermitWithoutStream: true, // Allow pings even if there are no active streams
87+
}),
8388
}
8489
s.server = grpc.NewServer(opts...)
8590
jobpb.RegisterJobServiceServer(s.server, s)

sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -465,10 +465,15 @@ def __init__(
465465
# received or sent over the data plane. The actual buffer size
466466
# is controlled in a layer above. Also, options to keep the server alive
467467
# when too many pings are received.
468-
options = [("grpc.max_receive_message_length", -1),
469-
("grpc.max_send_message_length", -1),
470-
("grpc.http2.max_pings_without_data", 0),
471-
("grpc.http2.max_ping_strikes", 0)]
468+
options = [
469+
("grpc.max_receive_message_length", -1),
470+
("grpc.max_send_message_length", -1),
471+
("grpc.http2.max_pings_without_data", 0),
472+
("grpc.http2.max_ping_strikes", 0),
473+
# match `grpc.keepalive_time_ms` defined in the client
474+
# (channel_factory.py)
475+
("grpc.http2.min_ping_interval_without_data_ms", 20_000),
476+
]
472477

473478
self.state = state
474479
self.provision_info = provision_info

0 commit comments

Comments
 (0)