Skip to content

Commit 0c9847b

Browse files
authored
add heartbeat flag for grpc server and broker (#464)
Signed-off-by: Wei Liu <liuweixa@redhat.com>
1 parent d33854d commit 0c9847b

File tree

5 files changed

+38
-29
lines changed

5 files changed

+38
-29
lines changed

cmd/maestro/server/grpc_broker.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,10 @@ func NewGRPCBroker(ctx context.Context, eventBroadcaster *event.EventBroadcaster
184184

185185
// TODO after the sdk go support source grpc server
186186
grpcServer := grpc.NewServer(grpcServerOptions...)
187-
eventServer := servergrpc.NewGRPCBroker()
187+
eventServer := servergrpc.NewGRPCBroker(&servergrpc.BrokerOptions{
188+
HeartbeatDisabled: config.HeartbeatDisable,
189+
HeartbeatCheckInterval: config.HeartbeatCheckInterval,
190+
})
188191
pbv1.RegisterCloudEventServiceServer(grpcServer, eventServer)
189192
svc := NewGRPCBrokerService(resourceService, statusEventService)
190193
eventServer.RegisterService(context.Background(), workpayload.ManifestBundleEventDataType, svc)

cmd/maestro/server/grpc_server.go

Lines changed: 23 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ type GRPCServer struct {
4343
grpcAuthorizer grpcauthorizer.GRPCAuthorizer
4444
bindAddress string
4545
heartbeatCheckInterval time.Duration
46+
heartbeatDisable bool
4647
}
4748

4849
// NewGRPCServer creates a new GRPCServer
@@ -143,6 +144,7 @@ func NewGRPCServer(
143144
grpcAuthorizer: grpcAuthorizer,
144145
bindAddress: env().Config.HTTPServer.Hostname + ":" + config.ServerBindPort,
145146
heartbeatCheckInterval: config.HeartbeatCheckInterval,
147+
heartbeatDisable: config.HeartbeatDisable,
146148
}
147149
}
148150

@@ -346,29 +348,31 @@ func (svr *GRPCServer) Subscribe(subReq *pbv1.SubscriptionRequest, subServer pbv
346348
return nil
347349
})
348350

349-
go func() {
350-
ticker := time.NewTicker(svr.heartbeatCheckInterval)
351-
defer ticker.Stop()
352-
353-
for {
354-
select {
355-
case <-ticker.C:
356-
heartbeat := &pbv1.CloudEvent{
357-
SpecVersion: "1.0",
358-
Id: uuid.New().String(),
359-
Type: types.HeartbeatCloudEventsType,
360-
}
351+
if !svr.heartbeatDisable {
352+
go func() {
353+
ticker := time.NewTicker(svr.heartbeatCheckInterval)
354+
defer ticker.Stop()
361355

356+
for {
362357
select {
363-
case heartbeatCh <- heartbeat:
364-
default:
365-
logger.Info("send channel is full, dropping heartbeat")
358+
case <-ticker.C:
359+
heartbeat := &pbv1.CloudEvent{
360+
SpecVersion: "1.0",
361+
Id: uuid.New().String(),
362+
Type: types.HeartbeatCloudEventsType,
363+
}
364+
365+
select {
366+
case heartbeatCh <- heartbeat:
367+
default:
368+
logger.Info("send channel is full, dropping heartbeat")
369+
}
370+
case <-ctx.Done():
371+
return
366372
}
367-
case <-ctx.Done():
368-
return
369373
}
370-
}
371-
}()
374+
}()
375+
}
372376

373377
select {
374378
case err := <-sendErrCh:

go.mod

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ require (
2626
github.com/lib/pq v1.10.9
2727
github.com/mendsley/gojwk v0.0.0-20141217222730-4d5ec6e58103
2828
github.com/onsi/ginkgo/v2 v2.27.3
29-
github.com/onsi/gomega v1.38.3
29+
github.com/onsi/gomega v1.39.0
3030
github.com/openshift-online/ocm-common v0.0.34
3131
github.com/openshift-online/ocm-sdk-go v0.1.486
3232
github.com/openshift/library-go v0.0.0-20251120164824-14a789e09884
@@ -56,8 +56,8 @@ require (
5656
k8s.io/klog/v2 v2.130.1
5757
k8s.io/utils v0.0.0-20250604170112-4c0f3b243397
5858
open-cluster-management.io/api v1.1.1-0.20260108015315-68cef17a0643
59-
open-cluster-management.io/ocm v1.1.1-0.20260108015314-635b0ff7e9cf
60-
open-cluster-management.io/sdk-go v1.1.1-0.20260108080638-c607eaaa5d12
59+
open-cluster-management.io/ocm v1.1.1-0.20260112054407-532b09a44026
60+
open-cluster-management.io/sdk-go v1.1.1-0.20260112054941-b6c1a665df1b
6161
sigs.k8s.io/yaml v1.6.0
6262
)
6363

go.sum

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -397,8 +397,8 @@ github.com/onsi/ginkgo/v2 v2.27.3 h1:ICsZJ8JoYafeXFFlFAG75a7CxMsJHwgKwtO+82SE9L8
397397
github.com/onsi/ginkgo/v2 v2.27.3/go.mod h1:ArE1D/XhNXBXCBkKOLkbsb2c81dQHCRcF5zwn/ykDRo=
398398
github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
399399
github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
400-
github.com/onsi/gomega v1.38.3 h1:eTX+W6dobAYfFeGC2PV6RwXRu/MyT+cQguijutvkpSM=
401-
github.com/onsi/gomega v1.38.3/go.mod h1:ZCU1pkQcXDO5Sl9/VVEGlDyp+zm0m1cmeG5TOzLgdh4=
400+
github.com/onsi/gomega v1.39.0 h1:y2ROC3hKFmQZJNFeGAMeHZKkjBL65mIZcvrLQBF9k6Q=
401+
github.com/onsi/gomega v1.39.0/go.mod h1:ZCU1pkQcXDO5Sl9/VVEGlDyp+zm0m1cmeG5TOzLgdh4=
402402
github.com/openshift-online/ocm-api-model/clientapi v0.0.440 h1:BGWikczo8UuSvzEkoTf6q9iodg/pC7ibfvn5bvuD5c0=
403403
github.com/openshift-online/ocm-api-model/clientapi v0.0.440/go.mod h1:fZwy5HY2URG9nrExvQeXrDU/08TGqZ16f8oymVEN5lo=
404404
github.com/openshift-online/ocm-api-model/model v0.0.440 h1:sfi+fEw3ORh32keJdkE7ZA0g1uCBf457dRg6Qs8yJ6s=
@@ -828,10 +828,10 @@ k8s.io/utils v0.0.0-20250604170112-4c0f3b243397 h1:hwvWFiBzdWw1FhfY1FooPn3kzWuJ8
828828
k8s.io/utils v0.0.0-20250604170112-4c0f3b243397/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0=
829829
open-cluster-management.io/api v1.1.1-0.20260108015315-68cef17a0643 h1:eA/8UpvFuWr79O7/aAT4bcx/tVG9kkl7+4u9o9dRShM=
830830
open-cluster-management.io/api v1.1.1-0.20260108015315-68cef17a0643/go.mod h1:YcmA6SpGEekIMxdoeVIIyOaBhMA6ImWRLXP4g8n8T+4=
831-
open-cluster-management.io/ocm v1.1.1-0.20260108015314-635b0ff7e9cf h1:J7t6GhW++DkOfpHEdqQQLP8EVdgDh4cg3248YzcmPPo=
832-
open-cluster-management.io/ocm v1.1.1-0.20260108015314-635b0ff7e9cf/go.mod h1:dIjBV+mixDX9Ce1zFvRoOTYT3p1IJ5C9BT7qSSaWI+M=
833-
open-cluster-management.io/sdk-go v1.1.1-0.20260108080638-c607eaaa5d12 h1:WZrNXgDKxAneVjiiNf0b3ApOyjuEgBcz2akw6vYVxJ8=
834-
open-cluster-management.io/sdk-go v1.1.1-0.20260108080638-c607eaaa5d12/go.mod h1:4haPv/uuKqQ3gxi62/PPknlrUFi132ga0KYLwj5tpx0=
831+
open-cluster-management.io/ocm v1.1.1-0.20260112054407-532b09a44026 h1:/iEIzl+t3B+5CCm/DKM7rI/TGKhhv+K8KJYCC+LByJQ=
832+
open-cluster-management.io/ocm v1.1.1-0.20260112054407-532b09a44026/go.mod h1:2ToFX//FL/tQuKPdPTegx6D0PYhjyU38AxRdwow+dWs=
833+
open-cluster-management.io/sdk-go v1.1.1-0.20260112054941-b6c1a665df1b h1:r5U3cDh6kuBmzKnAUqeoYPwwVU/VS9udvpcDEkxh6g4=
834+
open-cluster-management.io/sdk-go v1.1.1-0.20260112054941-b6c1a665df1b/go.mod h1:4haPv/uuKqQ3gxi62/PPknlrUFi132ga0KYLwj5tpx0=
835835
sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.31.2 h1:jpcvIRr3GLoUoEKRkHKSmGjxb6lWwrBlJsXc+eUYQHM=
836836
sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.31.2/go.mod h1:Ve9uj1L+deCXFrPOk1LpFXqTg7LCFzFso6PA48q/XZw=
837837
sigs.k8s.io/controller-runtime v0.22.4 h1:GEjV7KV3TY8e+tJ2LCTxUTanW4z/FmNB7l327UfMq9A=

pkg/config/grpc_server.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ type GRPCServerConfig struct {
3232
ServerPingTimeout time.Duration `json:"server_ping_timeout"`
3333
PermitPingWithoutStream bool `json:"permit_ping_without_stream"`
3434
HeartbeatCheckInterval time.Duration `json:"heartbeatCheckInterval"`
35+
HeartbeatDisable bool `json:"heartbeat_disable"`
3536
}
3637

3738
func NewGRPCServerConfig() *GRPCServerConfig {
@@ -64,4 +65,5 @@ func (s *GRPCServerConfig) AddFlags(fs *pflag.FlagSet) {
6465
fs.StringVar(&s.ClientCAFile, "grpc-client-ca-file", "", "The path to the client ca file, must specify if using mtls authentication type")
6566
fs.StringVar(&s.BrokerClientCAFile, "grpc-broker-client-ca-file", "", "The path to the broker client ca file")
6667
fs.DurationVar(&s.HeartbeatCheckInterval, "heartbeat-check-interval", 10*time.Second, "Duration the server send heartbeat messages")
68+
fs.BoolVar(&s.HeartbeatDisable, "heartbeat-disable", false, "Disable heartbeat messages from server to clients")
6769
}

0 commit comments

Comments
 (0)