Skip to content

Commit a38bf0e

Browse files
committed
add pubsub protocol support for cloudevents transport.
Signed-off-by: morvencao <[email protected]>
1 parent b1be73e commit a38bf0e

File tree

822 files changed

+181069
-5715
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

822 files changed

+181069
-5715
lines changed

go.mod

Lines changed: 37 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ module open-cluster-management.io/sdk-go
33
go 1.24.0
44

55
require (
6+
cloud.google.com/go/pubsub/v2 v2.3.0
67
github.com/bwmarrin/snowflake v0.3.0
78
github.com/cloudevents/sdk-go/protocol/mqtt_paho/v2 v2.0.0-20250922144431-372892d7c84d
89
github.com/cloudevents/sdk-go/v2 v2.16.2
@@ -24,9 +25,10 @@ require (
2425
github.com/prometheus/client_model v0.6.1
2526
github.com/spf13/pflag v1.0.5
2627
github.com/stretchr/testify v1.11.1
27-
golang.org/x/oauth2 v0.27.0
28-
google.golang.org/grpc v1.68.1
29-
google.golang.org/protobuf v1.36.5
28+
golang.org/x/oauth2 v0.32.0
29+
google.golang.org/api v0.255.0
30+
google.golang.org/grpc v1.76.0
31+
google.golang.org/protobuf v1.36.10
3032
gopkg.in/yaml.v2 v2.4.0
3133
k8s.io/api v0.33.2
3234
k8s.io/apimachinery v0.33.2
@@ -41,21 +43,30 @@ require (
4143
)
4244

4345
require (
44-
cel.dev/expr v0.23.1 // indirect
45-
cloud.google.com/go/compute/metadata v0.5.0 // indirect
46+
cel.dev/expr v0.24.0 // indirect
47+
cloud.google.com/go v0.121.6 // indirect
48+
cloud.google.com/go/auth v0.17.0 // indirect
49+
cloud.google.com/go/auth/oauth2adapt v0.2.8 // indirect
50+
cloud.google.com/go/compute/metadata v0.9.0 // indirect
51+
cloud.google.com/go/iam v1.5.2 // indirect
4652
github.com/antlr4-go/antlr/v4 v4.13.0 // indirect
4753
github.com/beorn7/perks v1.0.1 // indirect
4854
github.com/blang/semver/v4 v4.0.0 // indirect
4955
github.com/cespare/xxhash/v2 v2.3.0 // indirect
5056
github.com/emicklei/go-restful/v3 v3.11.0 // indirect
57+
github.com/felixge/httpsnoop v1.0.4 // indirect
5158
github.com/fsnotify/fsnotify v1.7.0 // indirect
5259
github.com/fxamacker/cbor/v2 v2.7.0 // indirect
53-
github.com/go-logr/logr v1.4.2 // indirect
60+
github.com/go-logr/logr v1.4.3 // indirect
61+
github.com/go-logr/stdr v1.2.2 // indirect
5462
github.com/go-openapi/jsonpointer v0.21.0 // indirect
5563
github.com/go-openapi/jsonreference v0.20.2 // indirect
5664
github.com/go-openapi/swag v0.23.0 // indirect
5765
github.com/gogo/protobuf v1.3.2 // indirect
5866
github.com/google/gnostic-models v0.6.9 // indirect
67+
github.com/google/s2a-go v0.1.9 // indirect
68+
github.com/googleapis/enterprise-certificate-proxy v0.3.6 // indirect
69+
github.com/googleapis/gax-go/v2 v2.15.0 // indirect
5970
github.com/gorilla/websocket v1.5.4-0.20250319132907-e064f32e3674 // indirect
6071
github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.1.0 // indirect
6172
github.com/josharian/intern v1.0.0 // indirect
@@ -71,20 +82,30 @@ require (
7182
github.com/prometheus/common v0.62.0 // indirect
7283
github.com/prometheus/procfs v0.15.1 // indirect
7384
github.com/rs/xid v1.4.0 // indirect
74-
github.com/stoewer/go-strcase v1.3.0 // indirect
85+
github.com/stoewer/go-strcase v1.3.1 // indirect
7586
github.com/x448/float16 v0.8.4 // indirect
76-
go.opentelemetry.io/otel v1.33.0 // indirect
77-
go.opentelemetry.io/otel/trace v1.33.0 // indirect
87+
go.einride.tech/aip v0.73.0 // indirect
88+
go.opencensus.io v0.24.0 // indirect
89+
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
90+
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.61.0 // indirect
91+
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.61.0 // indirect
92+
go.opentelemetry.io/otel v1.37.0 // indirect
93+
go.opentelemetry.io/otel/metric v1.37.0 // indirect
94+
go.opentelemetry.io/otel/sdk v1.37.0 // indirect
95+
go.opentelemetry.io/otel/trace v1.37.0 // indirect
7896
go.uber.org/multierr v1.11.0 // indirect
7997
go.uber.org/zap v1.27.0 // indirect
98+
golang.org/x/crypto v0.43.0 // indirect
8099
golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 // indirect
81-
golang.org/x/net v0.43.0 // indirect
82-
golang.org/x/sys v0.35.0 // indirect
83-
golang.org/x/term v0.34.0 // indirect
84-
golang.org/x/text v0.28.0 // indirect
85-
golang.org/x/time v0.12.0 // indirect
86-
google.golang.org/genproto/googleapis/api v0.0.0-20241209162323-e6fa225c2576 // indirect
87-
google.golang.org/genproto/googleapis/rpc v0.0.0-20241209162323-e6fa225c2576 // indirect
100+
golang.org/x/net v0.46.0 // indirect
101+
golang.org/x/sync v0.17.0 // indirect
102+
golang.org/x/sys v0.37.0 // indirect
103+
golang.org/x/term v0.36.0 // indirect
104+
golang.org/x/text v0.30.0 // indirect
105+
golang.org/x/time v0.14.0 // indirect
106+
google.golang.org/genproto v0.0.0-20250603155806-513f23925822 // indirect
107+
google.golang.org/genproto/googleapis/api v0.0.0-20250818200422-3122310a409c // indirect
108+
google.golang.org/genproto/googleapis/rpc v0.0.0-20251029180050-ab9386a59fda // indirect
88109
gopkg.in/evanphx/json-patch.v4 v4.12.0 // indirect
89110
gopkg.in/inf.v0 v0.9.1 // indirect
90111
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect

go.sum

Lines changed: 106 additions & 34 deletions
Large diffs are not rendered by default.

pkg/cloudevents/README.md

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@ Currently, the CloudEvents options supports the following protocols/drivers:
116116

117117
- [MQTT Protocol/Driver](./generic/options/mqtt)
118118
- [gRPC Protocol/Driver](./generic/options/grpc)
119+
- [Pub/Sub Protocol/Driver](./generic/options/pubsub)
119120

120121
To create CloudEvents source/agent options for these supported protocols/drivers, developers need to provide configuration specific to the protocol/driver. The configuration format resembles the kubeconfig for the Kubernetes client-go but has a different schema.
121122

@@ -147,6 +148,47 @@ clientKeyFile: /certs/client.key
147148
148149
For detailed configuration options for the gRPC driver, refer to the [gRPC driver options package](https://github.com/open-cluster-management-io/sdk-go/blob/00a94671ced1c17d2ca2b5fad2f4baab282a7d3c/pkg/cloudevents/generic/options/grpc/options.go#L30-L40).
149150
151+
### Pub/Sub Protocol/Driver
152+
153+
Here's an example of a YAML configuration for the Google Cloud Pub/Sub protocol for a source:
154+
155+
```yaml
156+
projectID: my-project
157+
endpoint: https://pubsub.us-east1.googleapis.com # optional, leave empty for global, or set a regional URL.
158+
credentialsFile: /path/to/credentials.json
159+
topics:
160+
sourceEvents: projects/my-project/topics/sourceevents
161+
sourceBroadcast: projects/my-project/topics/sourcebroadcast
162+
subscriptions:
163+
agentEvents: projects/my-project/subscriptions/agentevents-source1
164+
agentBroadcast: projects/my-project/subscriptions/agentbroadcast-source1
165+
```
166+
167+
And here's an example configuration for an agent:
168+
169+
```yaml
170+
projectID: my-project
171+
endpoint: https://pubsub.us-east1.googleapis.com # optional, leave empty for global, or set a regional URL.
172+
credentialsFile: /path/to/credentials.json
173+
topics:
174+
agentEvents: projects/my-project/topics/agentevents
175+
agentBroadcast: projects/my-project/topics/agentbroadcast
176+
subscriptions:
177+
sourceEvents: projects/my-project/subscriptions/sourceevents-cluster1
178+
sourceBroadcast: projects/my-project/subscriptions/sourcebroadcast-cluster1
179+
```
180+
181+
**Note**: The Pub/Sub protocol uses separate topics and subscriptions for different event types:
182+
- **Source** uses `sourceEvents`/`sourceBroadcast` topics to publish events and `agentEvents`/`agentBroadcast` subscriptions to receive events from agents
183+
- **Agent** uses `agentEvents`/`agentBroadcast` topics to publish events and `sourceEvents`/`sourceBroadcast` subscriptions to receive events from sources
184+
- Both `sourceBroadcast` and `agentBroadcast` channels are used for resync requests
185+
- **All topics and subscriptions must be created before running the source and agent**
186+
- The subscription for `agentEvents` is a filtered subscription with filter `attributes."ce-clustername"="<clustername>"`
187+
- The subscription for `sourceEvents` is a filtered subscription with filter `attributes."ce-originalsource"="<sourceID>"`
188+
- The subscriptions for `agentBroadcast` and `sourceBroadcast` are subscriptions without filters, allowing broadcast messages to reach all subscribers for resync events.
189+
190+
For detailed configuration options for the Pub/Sub driver, refer to the [Pub/Sub driver options package](./generic/options/pubsub).
191+
150192
## Work Clients
151193

152194
### Building a ManifestWorkSourceClient on the hub cluster with SourceLocalWatcherStore

pkg/cloudevents/clients/options/generic.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ type GenericClientOptions[T generic.ResourceObject] struct {
3333
//
3434
// GRPCOptions (*grpc.GRPCOptions): builds a generic cloudevents client with GRPC
3535
//
36+
// PubSubOptions (*pubsub.PubSubOptions): builds a generic cloudevents client with PubSub
37+
//
3638
// - codec, the codec for resource
3739
//
3840
// - clientID, the client ID for generic cloudevents client.
Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package constants
22

33
const (
4-
ConfigTypeMQTT = "mqtt"
5-
ConfigTypeGRPC = "grpc"
4+
ConfigTypeMQTT = "mqtt"
5+
ConfigTypeGRPC = "grpc"
6+
ConfigTypePubSub = "pubsub"
67
)

pkg/cloudevents/doc/design.md

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ The [`CloudEventsOptions`](../generic/options/options.go) interface defines an o
1616
For each event protocol, it needs to implement the `CloudEventsSourceOptions` and `CloudEventsAgentOptions`, the
1717
`CloudEventsSourceOptions` will be used for building cloudevents source client and the `CloudEventsAgentOptions` will be used for building cloudevents agent client.
1818

19-
Currently, the MQTT and gRPC are implemented.
19+
Currently, the MQTT, gRPC and PubSub are implemented.
2020

2121
```mermaid
2222
classDiagram
@@ -44,6 +44,9 @@ CloudEventsAgentOptions <|.. mqttAgentOptions
4444
4545
CloudEventsSourceOptions <|.. grpcSourceOptions
4646
CloudEventsAgentOptions <|.. grpcAgentOptions
47+
48+
CloudEventsSourceOptions <|.. pubsubSourceOptions
49+
CloudEventsAgentOptions <|.. pubsubAgentOptions
4750
```
4851

4952
### CloudEventsClient Interface

pkg/cloudevents/generic/options/builder/optionsbuilder.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options"
88
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc"
99
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/mqtt"
10+
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/pubsub"
1011
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types"
1112
)
1213

@@ -21,6 +22,7 @@ type ConfigLoader struct {
2122
// Available configuration types:
2223
// - mqtt
2324
// - grpc
25+
// - pubsub
2426
func NewConfigLoader(configType, configPath string) *ConfigLoader {
2527
return &ConfigLoader{
2628
configType: configType,
@@ -45,6 +47,13 @@ func (l *ConfigLoader) LoadConfig() (string, any, error) {
4547
}
4648

4749
return grpcOptions.Dialer.URL, grpcOptions, nil
50+
case constants.ConfigTypePubSub:
51+
pubsubOptions, err := pubsub.BuildPubSubOptionsFromFlags(l.configPath)
52+
if err != nil {
53+
return "", nil, err
54+
}
55+
56+
return "", pubsubOptions, nil
4857
}
4958

5059
return "", nil, fmt.Errorf("unsupported config type %s", l.configType)
@@ -58,6 +67,8 @@ func BuildCloudEventsSourceOptions(config any,
5867
return mqtt.NewSourceOptions(config, clientId, sourceId), nil
5968
case *grpc.GRPCOptions:
6069
return grpc.NewSourceOptions(config, sourceId, dataType), nil
70+
case *pubsub.PubSubOptions:
71+
return pubsub.NewSourceOptions(config, sourceId), nil
6172
default:
6273
return nil, fmt.Errorf("unsupported client configuration type %T", config)
6374
}
@@ -71,6 +82,8 @@ func BuildCloudEventsAgentOptions(config any,
7182
return mqtt.NewAgentOptions(config, clusterName, clientId), nil
7283
case *grpc.GRPCOptions:
7384
return grpc.NewAgentOptions(config, clusterName, clientId, dataType), nil
85+
case *pubsub.PubSubOptions:
86+
return pubsub.NewAgentOptions(config, clusterName, clientId), nil
7487
default:
7588
return nil, fmt.Errorf("unsupported client configuration type %T", config)
7689
}

0 commit comments

Comments
 (0)