Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion cmd/maestro/server/grpc_broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,10 @@ func NewGRPCBroker(ctx context.Context, eventBroadcaster *event.EventBroadcaster

// TODO after the sdk go support source grpc server
grpcServer := grpc.NewServer(grpcServerOptions...)
eventServer := servergrpc.NewGRPCBroker()
eventServer := servergrpc.NewGRPCBroker(&servergrpc.BrokerOptions{
HeartbeatDisabled: config.HeartbeatDisable,
HeartbeatCheckInterval: config.HeartbeatCheckInterval,
})
pbv1.RegisterCloudEventServiceServer(grpcServer, eventServer)
svc := NewGRPCBrokerService(resourceService, statusEventService)
eventServer.RegisterService(context.Background(), workpayload.ManifestBundleEventDataType, svc)
Expand Down
42 changes: 23 additions & 19 deletions cmd/maestro/server/grpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type GRPCServer struct {
grpcAuthorizer grpcauthorizer.GRPCAuthorizer
bindAddress string
heartbeatCheckInterval time.Duration
heartbeatDisable bool
}

// NewGRPCServer creates a new GRPCServer
Expand Down Expand Up @@ -143,6 +144,7 @@ func NewGRPCServer(
grpcAuthorizer: grpcAuthorizer,
bindAddress: env().Config.HTTPServer.Hostname + ":" + config.ServerBindPort,
heartbeatCheckInterval: config.HeartbeatCheckInterval,
heartbeatDisable: config.HeartbeatDisable,
}
}

Expand Down Expand Up @@ -346,29 +348,31 @@ func (svr *GRPCServer) Subscribe(subReq *pbv1.SubscriptionRequest, subServer pbv
return nil
})

go func() {
ticker := time.NewTicker(svr.heartbeatCheckInterval)
defer ticker.Stop()

for {
select {
case <-ticker.C:
heartbeat := &pbv1.CloudEvent{
SpecVersion: "1.0",
Id: uuid.New().String(),
Type: types.HeartbeatCloudEventsType,
}
if !svr.heartbeatDisable {
go func() {
ticker := time.NewTicker(svr.heartbeatCheckInterval)
defer ticker.Stop()

for {
select {
case heartbeatCh <- heartbeat:
default:
logger.Info("send channel is full, dropping heartbeat")
case <-ticker.C:
heartbeat := &pbv1.CloudEvent{
SpecVersion: "1.0",
Id: uuid.New().String(),
Type: types.HeartbeatCloudEventsType,
}

select {
case heartbeatCh <- heartbeat:
default:
logger.Info("send channel is full, dropping heartbeat")
}
case <-ctx.Done():
return
}
case <-ctx.Done():
return
}
}
}()
}()
}

select {
case err := <-sendErrCh:
Expand Down
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ require (
github.com/lib/pq v1.10.9
github.com/mendsley/gojwk v0.0.0-20141217222730-4d5ec6e58103
github.com/onsi/ginkgo/v2 v2.27.3
github.com/onsi/gomega v1.38.3
github.com/onsi/gomega v1.39.0
github.com/openshift-online/ocm-common v0.0.34
github.com/openshift-online/ocm-sdk-go v0.1.486
github.com/openshift/library-go v0.0.0-20251120164824-14a789e09884
Expand Down Expand Up @@ -56,8 +56,8 @@ require (
k8s.io/klog/v2 v2.130.1
k8s.io/utils v0.0.0-20250604170112-4c0f3b243397
open-cluster-management.io/api v1.1.1-0.20260108015315-68cef17a0643
open-cluster-management.io/ocm v1.1.1-0.20260108015314-635b0ff7e9cf
open-cluster-management.io/sdk-go v1.1.1-0.20260108080638-c607eaaa5d12
open-cluster-management.io/ocm v1.1.1-0.20260112054407-532b09a44026
open-cluster-management.io/sdk-go v1.1.1-0.20260112054941-b6c1a665df1b
sigs.k8s.io/yaml v1.6.0
)

Expand Down
12 changes: 6 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -397,8 +397,8 @@ github.com/onsi/ginkgo/v2 v2.27.3 h1:ICsZJ8JoYafeXFFlFAG75a7CxMsJHwgKwtO+82SE9L8
github.com/onsi/ginkgo/v2 v2.27.3/go.mod h1:ArE1D/XhNXBXCBkKOLkbsb2c81dQHCRcF5zwn/ykDRo=
github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/onsi/gomega v1.38.3 h1:eTX+W6dobAYfFeGC2PV6RwXRu/MyT+cQguijutvkpSM=
github.com/onsi/gomega v1.38.3/go.mod h1:ZCU1pkQcXDO5Sl9/VVEGlDyp+zm0m1cmeG5TOzLgdh4=
github.com/onsi/gomega v1.39.0 h1:y2ROC3hKFmQZJNFeGAMeHZKkjBL65mIZcvrLQBF9k6Q=
github.com/onsi/gomega v1.39.0/go.mod h1:ZCU1pkQcXDO5Sl9/VVEGlDyp+zm0m1cmeG5TOzLgdh4=
github.com/openshift-online/ocm-api-model/clientapi v0.0.440 h1:BGWikczo8UuSvzEkoTf6q9iodg/pC7ibfvn5bvuD5c0=
github.com/openshift-online/ocm-api-model/clientapi v0.0.440/go.mod h1:fZwy5HY2URG9nrExvQeXrDU/08TGqZ16f8oymVEN5lo=
github.com/openshift-online/ocm-api-model/model v0.0.440 h1:sfi+fEw3ORh32keJdkE7ZA0g1uCBf457dRg6Qs8yJ6s=
Expand Down Expand Up @@ -828,10 +828,10 @@ k8s.io/utils v0.0.0-20250604170112-4c0f3b243397 h1:hwvWFiBzdWw1FhfY1FooPn3kzWuJ8
k8s.io/utils v0.0.0-20250604170112-4c0f3b243397/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0=
open-cluster-management.io/api v1.1.1-0.20260108015315-68cef17a0643 h1:eA/8UpvFuWr79O7/aAT4bcx/tVG9kkl7+4u9o9dRShM=
open-cluster-management.io/api v1.1.1-0.20260108015315-68cef17a0643/go.mod h1:YcmA6SpGEekIMxdoeVIIyOaBhMA6ImWRLXP4g8n8T+4=
open-cluster-management.io/ocm v1.1.1-0.20260108015314-635b0ff7e9cf h1:J7t6GhW++DkOfpHEdqQQLP8EVdgDh4cg3248YzcmPPo=
open-cluster-management.io/ocm v1.1.1-0.20260108015314-635b0ff7e9cf/go.mod h1:dIjBV+mixDX9Ce1zFvRoOTYT3p1IJ5C9BT7qSSaWI+M=
open-cluster-management.io/sdk-go v1.1.1-0.20260108080638-c607eaaa5d12 h1:WZrNXgDKxAneVjiiNf0b3ApOyjuEgBcz2akw6vYVxJ8=
open-cluster-management.io/sdk-go v1.1.1-0.20260108080638-c607eaaa5d12/go.mod h1:4haPv/uuKqQ3gxi62/PPknlrUFi132ga0KYLwj5tpx0=
open-cluster-management.io/ocm v1.1.1-0.20260112054407-532b09a44026 h1:/iEIzl+t3B+5CCm/DKM7rI/TGKhhv+K8KJYCC+LByJQ=
open-cluster-management.io/ocm v1.1.1-0.20260112054407-532b09a44026/go.mod h1:2ToFX//FL/tQuKPdPTegx6D0PYhjyU38AxRdwow+dWs=
open-cluster-management.io/sdk-go v1.1.1-0.20260112054941-b6c1a665df1b h1:r5U3cDh6kuBmzKnAUqeoYPwwVU/VS9udvpcDEkxh6g4=
open-cluster-management.io/sdk-go v1.1.1-0.20260112054941-b6c1a665df1b/go.mod h1:4haPv/uuKqQ3gxi62/PPknlrUFi132ga0KYLwj5tpx0=
sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.31.2 h1:jpcvIRr3GLoUoEKRkHKSmGjxb6lWwrBlJsXc+eUYQHM=
sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.31.2/go.mod h1:Ve9uj1L+deCXFrPOk1LpFXqTg7LCFzFso6PA48q/XZw=
sigs.k8s.io/controller-runtime v0.22.4 h1:GEjV7KV3TY8e+tJ2LCTxUTanW4z/FmNB7l327UfMq9A=
Expand Down
2 changes: 2 additions & 0 deletions pkg/config/grpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type GRPCServerConfig struct {
ServerPingTimeout time.Duration `json:"server_ping_timeout"`
PermitPingWithoutStream bool `json:"permit_ping_without_stream"`
HeartbeatCheckInterval time.Duration `json:"heartbeatCheckInterval"`
HeartbeatDisable bool `json:"heartbeat_disable"`
}

func NewGRPCServerConfig() *GRPCServerConfig {
Expand Down Expand Up @@ -64,4 +65,5 @@ func (s *GRPCServerConfig) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&s.ClientCAFile, "grpc-client-ca-file", "", "The path to the client ca file, must specify if using mtls authentication type")
fs.StringVar(&s.BrokerClientCAFile, "grpc-broker-client-ca-file", "", "The path to the broker client ca file")
fs.DurationVar(&s.HeartbeatCheckInterval, "heartbeat-check-interval", 10*time.Second, "Duration the server send heartbeat messages")
fs.BoolVar(&s.HeartbeatDisable, "heartbeat-disable", false, "Disable heartbeat messages from server to clients")
}
Loading