Skip to content

Commit c9eedbd

Browse files
committed
add pubsub support for cloudevents transport.
Signed-off-by: morvencao <[email protected]> rh-pre-commit.version: 2.3.2 rh-pre-commit.check-secrets: ENABLED
1 parent b492d8b commit c9eedbd

File tree

781 files changed

+162541
-5075
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

781 files changed

+162541
-5075
lines changed

go.mod

Lines changed: 32 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ module open-cluster-management.io/sdk-go
33
go 1.24.0
44

55
require (
6+
cloud.google.com/go/pubsub/v2 v2.0.0
67
github.com/bwmarrin/snowflake v0.3.0
78
github.com/cloudevents/sdk-go/protocol/mqtt_paho/v2 v2.0.0-20250922144431-372892d7c84d
89
github.com/cloudevents/sdk-go/v2 v2.16.2
@@ -24,9 +25,10 @@ require (
2425
github.com/prometheus/client_model v0.6.1
2526
github.com/spf13/pflag v1.0.5
2627
github.com/stretchr/testify v1.11.1
27-
golang.org/x/oauth2 v0.27.0
28-
google.golang.org/grpc v1.68.1
29-
google.golang.org/protobuf v1.36.5
28+
golang.org/x/oauth2 v0.30.0
29+
google.golang.org/api v0.247.0
30+
google.golang.org/grpc v1.74.2
31+
google.golang.org/protobuf v1.36.7
3032
gopkg.in/yaml.v2 v2.4.0
3133
k8s.io/api v0.33.2
3234
k8s.io/apimachinery v0.33.2
@@ -41,21 +43,30 @@ require (
4143
)
4244

4345
require (
44-
cel.dev/expr v0.23.1 // indirect
45-
cloud.google.com/go/compute/metadata v0.5.0 // indirect
46+
cel.dev/expr v0.24.0 // indirect
47+
cloud.google.com/go v0.121.6 // indirect
48+
cloud.google.com/go/auth v0.16.4 // indirect
49+
cloud.google.com/go/auth/oauth2adapt v0.2.8 // indirect
50+
cloud.google.com/go/compute/metadata v0.8.0 // indirect
51+
cloud.google.com/go/iam v1.5.2 // indirect
4652
github.com/antlr4-go/antlr/v4 v4.13.0 // indirect
4753
github.com/beorn7/perks v1.0.1 // indirect
4854
github.com/blang/semver/v4 v4.0.0 // indirect
4955
github.com/cespare/xxhash/v2 v2.3.0 // indirect
5056
github.com/emicklei/go-restful/v3 v3.11.0 // indirect
57+
github.com/felixge/httpsnoop v1.0.4 // indirect
5158
github.com/fsnotify/fsnotify v1.7.0 // indirect
5259
github.com/fxamacker/cbor/v2 v2.7.0 // indirect
53-
github.com/go-logr/logr v1.4.2 // indirect
60+
github.com/go-logr/logr v1.4.3 // indirect
61+
github.com/go-logr/stdr v1.2.2 // indirect
5462
github.com/go-openapi/jsonpointer v0.21.0 // indirect
5563
github.com/go-openapi/jsonreference v0.20.2 // indirect
5664
github.com/go-openapi/swag v0.23.0 // indirect
5765
github.com/gogo/protobuf v1.3.2 // indirect
5866
github.com/google/gnostic-models v0.6.9 // indirect
67+
github.com/google/s2a-go v0.1.9 // indirect
68+
github.com/googleapis/enterprise-certificate-proxy v0.3.6 // indirect
69+
github.com/googleapis/gax-go/v2 v2.15.0 // indirect
5970
github.com/gorilla/websocket v1.5.4-0.20250319132907-e064f32e3674 // indirect
6071
github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.1.0 // indirect
6172
github.com/josharian/intern v1.0.0 // indirect
@@ -71,20 +82,30 @@ require (
7182
github.com/prometheus/common v0.62.0 // indirect
7283
github.com/prometheus/procfs v0.15.1 // indirect
7384
github.com/rs/xid v1.4.0 // indirect
74-
github.com/stoewer/go-strcase v1.3.0 // indirect
85+
github.com/stoewer/go-strcase v1.3.1 // indirect
7586
github.com/x448/float16 v0.8.4 // indirect
76-
go.opentelemetry.io/otel v1.33.0 // indirect
77-
go.opentelemetry.io/otel/trace v1.33.0 // indirect
87+
go.einride.tech/aip v0.73.0 // indirect
88+
go.opencensus.io v0.24.0 // indirect
89+
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
90+
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.61.0 // indirect
91+
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.61.0 // indirect
92+
go.opentelemetry.io/otel v1.36.0 // indirect
93+
go.opentelemetry.io/otel/metric v1.36.0 // indirect
94+
go.opentelemetry.io/otel/sdk v1.36.0 // indirect
95+
go.opentelemetry.io/otel/trace v1.36.0 // indirect
7896
go.uber.org/multierr v1.11.0 // indirect
7997
go.uber.org/zap v1.27.0 // indirect
98+
golang.org/x/crypto v0.41.0 // indirect
8099
golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 // indirect
81100
golang.org/x/net v0.43.0 // indirect
101+
golang.org/x/sync v0.16.0 // indirect
82102
golang.org/x/sys v0.35.0 // indirect
83103
golang.org/x/term v0.34.0 // indirect
84104
golang.org/x/text v0.28.0 // indirect
85105
golang.org/x/time v0.12.0 // indirect
86-
google.golang.org/genproto/googleapis/api v0.0.0-20241209162323-e6fa225c2576 // indirect
87-
google.golang.org/genproto/googleapis/rpc v0.0.0-20241209162323-e6fa225c2576 // indirect
106+
google.golang.org/genproto v0.0.0-20250603155806-513f23925822 // indirect
107+
google.golang.org/genproto/googleapis/api v0.0.0-20250818200422-3122310a409c // indirect
108+
google.golang.org/genproto/googleapis/rpc v0.0.0-20250811230008-5f3141c8851a // indirect
88109
gopkg.in/evanphx/json-patch.v4 v4.12.0 // indirect
89110
gopkg.in/inf.v0 v0.9.1 // indirect
90111
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect

go.sum

Lines changed: 83 additions & 22 deletions
Large diffs are not rendered by default.

pkg/cloudevents/clients/addon/client_test.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ import (
66
"testing"
77
"time"
88

9-
"github.com/cloudevents/sdk-go/v2/protocol/gochan"
109
jsonpatch "github.com/evanphx/json-patch/v5"
1110

1211
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -64,11 +63,14 @@ func TestPatch(t *testing.T) {
6463

6564
for _, c := range cases {
6665
t.Run(c.name, func(t *testing.T) {
66+
ctx, cancel := context.WithCancel(context.Background())
67+
defer cancel()
68+
6769
watcherStore := store.NewAgentInformerWatcherStore[*addonapiv1alpha1.ManagedClusterAddOn]()
6870

69-
ceClientOpt := fake.NewAgentOptions(gochan.New(), nil, c.clusterName, c.clusterName+"agent")
71+
ceClientOpt := fake.NewAgentOptions(fake.NewEventChan(), c.clusterName, c.clusterName+"agent")
7072
ceClient, err := clients.NewCloudEventAgentClient(
71-
context.Background(),
73+
ctx,
7274
ceClientOpt,
7375
store.NewAgentWatcherStoreLister(watcherStore),
7476
statushash.StatusHash,
@@ -89,7 +91,7 @@ func TestPatch(t *testing.T) {
8991
watcherStore.SetInformer(informer)
9092

9193
if _, err = addonClientSet.AddonV1alpha1().ManagedClusterAddOns(c.clusterName).Patch(
92-
context.Background(),
94+
ctx,
9395
c.addon.Name,
9496
types.MergePatchType,
9597
c.patch,

pkg/cloudevents/clients/cluster/client_test.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ import (
66
"testing"
77
"time"
88

9-
"github.com/cloudevents/sdk-go/v2/protocol/gochan"
109
jsonpatch "github.com/evanphx/json-patch/v5"
1110

1211
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -57,7 +56,7 @@ func TestCreate(t *testing.T) {
5756
defer cancel()
5857

5958
watcherStore := store.NewAgentInformerWatcherStore[*clusterv1.ManagedCluster]()
60-
ceClientOpt := fake.NewAgentOptions(gochan.New(), nil, "cluster1", "cluster1-agent")
59+
ceClientOpt := fake.NewAgentOptions(fake.NewEventChan(), "cluster1", "cluster1-agent")
6160
ceClient, err := clients.NewCloudEventAgentClient(
6261
ctx,
6362
ceClientOpt,
@@ -116,7 +115,7 @@ func TestPatch(t *testing.T) {
116115
defer cancel()
117116

118117
watcherStore := store.NewAgentInformerWatcherStore[*clusterv1.ManagedCluster]()
119-
ceClientOpt := fake.NewAgentOptions(gochan.New(), nil, "cluster1", "test-agent")
118+
ceClientOpt := fake.NewAgentOptions(fake.NewEventChan(), "cluster1", "test-agent")
120119
ceClient, err := clients.NewCloudEventAgentClient(
121120
ctx,
122121
ceClientOpt,

pkg/cloudevents/clients/csr/client_test.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,6 @@ import (
77

88
v1 "open-cluster-management.io/api/cluster/v1"
99

10-
"github.com/cloudevents/sdk-go/v2/protocol/gochan"
11-
1210
certificatev1 "k8s.io/api/certificates/v1"
1311
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1412
"k8s.io/apimachinery/pkg/runtime"
@@ -61,7 +59,7 @@ func TestCreate(t *testing.T) {
6159
defer cancel()
6260

6361
watcherStore := store.NewAgentInformerWatcherStore[*certificatev1.CertificateSigningRequest]()
64-
ceClientOpt := fake.NewAgentOptions(gochan.New(), nil, "cluster1", "cluster1-agent")
62+
ceClientOpt := fake.NewAgentOptions(fake.NewEventChan(), "cluster1", "cluster1-agent")
6563
ceClient, err := clients.NewCloudEventAgentClient(
6664
ctx,
6765
ceClientOpt,

pkg/cloudevents/clients/event/client_test.go

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,6 @@ import (
55
"encoding/json"
66
"testing"
77

8-
"github.com/cloudevents/sdk-go/v2/protocol/gochan"
9-
108
eventsv1 "k8s.io/api/events/v1"
119
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1210
"k8s.io/apimachinery/pkg/types"
@@ -34,9 +32,12 @@ func TestCreate(t *testing.T) {
3432

3533
for _, c := range cases {
3634
t.Run(c.name, func(t *testing.T) {
35+
ctx, cancel := context.WithCancel(context.Background())
36+
defer cancel()
37+
3738
ceClient, err := clients.NewCloudEventAgentClient(
38-
context.Background(),
39-
fake.NewAgentOptions(gochan.New(), nil, c.clusterName, c.clusterName+"agent"),
39+
ctx,
40+
fake.NewAgentOptions(fake.NewEventChan(), c.clusterName, c.clusterName+"agent"),
4041
nil,
4142
statushash.StatusHash,
4243
NewEventCodec())
@@ -89,9 +90,12 @@ func TestPatch(t *testing.T) {
8990

9091
for _, c := range cases {
9192
t.Run(c.name, func(t *testing.T) {
93+
ctx, cancel := context.WithCancel(context.Background())
94+
defer cancel()
95+
9296
ceClient, err := clients.NewCloudEventAgentClient(
93-
context.Background(),
94-
fake.NewAgentOptions(gochan.New(), nil, c.clusterName, c.clusterName+"agent"),
97+
ctx,
98+
fake.NewAgentOptions(fake.NewEventChan(), c.clusterName, c.clusterName+"agent"),
9599
nil,
96100
statushash.StatusHash,
97101
NewEventCodec())
@@ -101,7 +105,7 @@ func TestPatch(t *testing.T) {
101105

102106
evtClient := NewEventClient(ceClient).WithNamespace(c.clusterName)
103107

104-
if _, err := evtClient.Patch(context.Background(),
108+
if _, err := evtClient.Patch(ctx,
105109
c.event.Name,
106110
types.StrategicMergePatchType,
107111
c.patch,

pkg/cloudevents/clients/lease/client_test.go

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,6 @@ import (
44
"context"
55
"testing"
66

7-
"github.com/cloudevents/sdk-go/v2/protocol/gochan"
8-
97
coordv1 "k8s.io/api/coordination/v1"
108
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
119

@@ -35,10 +33,13 @@ func TestUpdate(t *testing.T) {
3533

3634
for _, c := range cases {
3735
t.Run(c.name, func(t *testing.T) {
36+
ctx, cancel := context.WithCancel(context.Background())
37+
defer cancel()
38+
3839
leaseWatchStore := store.NewSimpleStore[*coordv1.Lease]()
3940
ceClient, err := clients.NewCloudEventAgentClient(
40-
context.Background(),
41-
fake.NewAgentOptions(gochan.New(), nil, c.clusterName, c.clusterName+"agent"),
41+
ctx,
42+
fake.NewAgentOptions(fake.NewEventChan(), c.clusterName, c.clusterName+"agent"),
4243
store.NewAgentWatcherStoreLister(leaseWatchStore),
4344
statushash.StatusHash,
4445
NewLeaseCodec())
@@ -81,6 +82,9 @@ func TestGet(t *testing.T) {
8182

8283
for _, c := range cases {
8384
t.Run(c.name, func(t *testing.T) {
85+
ctx, cancel := context.WithCancel(context.Background())
86+
defer cancel()
87+
8488
leaseWatchStore := store.NewSimpleStore[*coordv1.Lease]()
8589
for _, lease := range c.leases {
8690
if err := leaseWatchStore.Add(lease); err != nil {
@@ -89,8 +93,8 @@ func TestGet(t *testing.T) {
8993
}
9094

9195
ceClient, err := clients.NewCloudEventAgentClient(
92-
context.Background(),
93-
fake.NewAgentOptions(gochan.New(), nil, "cluster1", "cluster1-agent"),
96+
ctx,
97+
fake.NewAgentOptions(fake.NewEventChan(), "cluster1", "cluster1-agent"),
9498
store.NewAgentWatcherStoreLister(leaseWatchStore),
9599
statushash.StatusHash,
96100
NewLeaseCodec())

pkg/cloudevents/clients/options/generic.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ func (o *GenericClientOptions[T]) AgentClient(ctx context.Context) (generic.Clou
111111
o.watcherStore = store.NewAgentInformerWatcherStore[T]()
112112
}
113113

114-
options, err := builder.BuildCloudEventsAgentOptions(o.config, o.clusterName, o.clientID)
114+
options, err := builder.BuildCloudEventsAgentOptions(o.config, o.clusterName, o.clientID, o.codec.EventDataType())
115115
if err != nil {
116116
return nil, err
117117
}
@@ -182,7 +182,7 @@ func (o *GenericClientOptions[T]) SourceClient(ctx context.Context) (generic.Clo
182182
return nil, fmt.Errorf("a watcher store is required")
183183
}
184184

185-
options, err := builder.BuildCloudEventsSourceOptions(o.config, o.clientID, o.sourceID)
185+
options, err := builder.BuildCloudEventsSourceOptions(o.config, o.clientID, o.sourceID, o.codec.EventDataType())
186186
if err != nil {
187187
return nil, err
188188
}
Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package constants
22

33
const (
4-
ConfigTypeMQTT = "mqtt"
5-
ConfigTypeGRPC = "grpc"
4+
ConfigTypeMQTT = "mqtt"
5+
ConfigTypeGRPC = "grpc"
6+
ConfigTypePubSub = "pubsub"
67
)

pkg/cloudevents/generic/clients/agentclient.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,10 +48,9 @@ func NewCloudEventAgentClient[T generic.ResourceObject](
4848
) (generic.CloudEventsClient[T], error) {
4949
baseClient := &baseClient{
5050
clientID: agentOptions.AgentID,
51-
cloudEventsOptions: agentOptions.CloudEventsOptions,
51+
transport: agentOptions.CloudEventsTransport,
5252
cloudEventsRateLimiter: utils.NewRateLimiter(agentOptions.EventRateLimit),
5353
reconnectedChan: make(chan struct{}),
54-
dataType: codec.EventDataType(),
5554
}
5655

5756
if err := baseClient.connect(ctx); err != nil {

0 commit comments

Comments
 (0)