Skip to content

Commit 6a7ab88

Browse files
committed
add pubsub protocol support for cloudevents transport.
Signed-off-by: morvencao <[email protected]>
1 parent b1be73e commit 6a7ab88

File tree

814 files changed

+179436
-5687
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

814 files changed

+179436
-5687
lines changed

go.mod

Lines changed: 37 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ module open-cluster-management.io/sdk-go
33
go 1.24.0
44

55
require (
6+
cloud.google.com/go/pubsub/v2 v2.3.0
67
github.com/bwmarrin/snowflake v0.3.0
78
github.com/cloudevents/sdk-go/protocol/mqtt_paho/v2 v2.0.0-20250922144431-372892d7c84d
89
github.com/cloudevents/sdk-go/v2 v2.16.2
@@ -24,9 +25,10 @@ require (
2425
github.com/prometheus/client_model v0.6.1
2526
github.com/spf13/pflag v1.0.5
2627
github.com/stretchr/testify v1.11.1
27-
golang.org/x/oauth2 v0.27.0
28-
google.golang.org/grpc v1.68.1
29-
google.golang.org/protobuf v1.36.5
28+
golang.org/x/oauth2 v0.32.0
29+
google.golang.org/api v0.255.0
30+
google.golang.org/grpc v1.76.0
31+
google.golang.org/protobuf v1.36.10
3032
gopkg.in/yaml.v2 v2.4.0
3133
k8s.io/api v0.33.2
3234
k8s.io/apimachinery v0.33.2
@@ -41,21 +43,30 @@ require (
4143
)
4244

4345
require (
44-
cel.dev/expr v0.23.1 // indirect
45-
cloud.google.com/go/compute/metadata v0.5.0 // indirect
46+
cel.dev/expr v0.24.0 // indirect
47+
cloud.google.com/go v0.121.6 // indirect
48+
cloud.google.com/go/auth v0.17.0 // indirect
49+
cloud.google.com/go/auth/oauth2adapt v0.2.8 // indirect
50+
cloud.google.com/go/compute/metadata v0.9.0 // indirect
51+
cloud.google.com/go/iam v1.5.2 // indirect
4652
github.com/antlr4-go/antlr/v4 v4.13.0 // indirect
4753
github.com/beorn7/perks v1.0.1 // indirect
4854
github.com/blang/semver/v4 v4.0.0 // indirect
4955
github.com/cespare/xxhash/v2 v2.3.0 // indirect
5056
github.com/emicklei/go-restful/v3 v3.11.0 // indirect
57+
github.com/felixge/httpsnoop v1.0.4 // indirect
5158
github.com/fsnotify/fsnotify v1.7.0 // indirect
5259
github.com/fxamacker/cbor/v2 v2.7.0 // indirect
53-
github.com/go-logr/logr v1.4.2 // indirect
60+
github.com/go-logr/logr v1.4.3 // indirect
61+
github.com/go-logr/stdr v1.2.2 // indirect
5462
github.com/go-openapi/jsonpointer v0.21.0 // indirect
5563
github.com/go-openapi/jsonreference v0.20.2 // indirect
5664
github.com/go-openapi/swag v0.23.0 // indirect
5765
github.com/gogo/protobuf v1.3.2 // indirect
5866
github.com/google/gnostic-models v0.6.9 // indirect
67+
github.com/google/s2a-go v0.1.9 // indirect
68+
github.com/googleapis/enterprise-certificate-proxy v0.3.6 // indirect
69+
github.com/googleapis/gax-go/v2 v2.15.0 // indirect
5970
github.com/gorilla/websocket v1.5.4-0.20250319132907-e064f32e3674 // indirect
6071
github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.1.0 // indirect
6172
github.com/josharian/intern v1.0.0 // indirect
@@ -71,20 +82,30 @@ require (
7182
github.com/prometheus/common v0.62.0 // indirect
7283
github.com/prometheus/procfs v0.15.1 // indirect
7384
github.com/rs/xid v1.4.0 // indirect
74-
github.com/stoewer/go-strcase v1.3.0 // indirect
85+
github.com/stoewer/go-strcase v1.3.1 // indirect
7586
github.com/x448/float16 v0.8.4 // indirect
76-
go.opentelemetry.io/otel v1.33.0 // indirect
77-
go.opentelemetry.io/otel/trace v1.33.0 // indirect
87+
go.einride.tech/aip v0.73.0 // indirect
88+
go.opencensus.io v0.24.0 // indirect
89+
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
90+
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.61.0 // indirect
91+
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.61.0 // indirect
92+
go.opentelemetry.io/otel v1.37.0 // indirect
93+
go.opentelemetry.io/otel/metric v1.37.0 // indirect
94+
go.opentelemetry.io/otel/sdk v1.37.0 // indirect
95+
go.opentelemetry.io/otel/trace v1.37.0 // indirect
7896
go.uber.org/multierr v1.11.0 // indirect
7997
go.uber.org/zap v1.27.0 // indirect
98+
golang.org/x/crypto v0.43.0 // indirect
8099
golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 // indirect
81-
golang.org/x/net v0.43.0 // indirect
82-
golang.org/x/sys v0.35.0 // indirect
83-
golang.org/x/term v0.34.0 // indirect
84-
golang.org/x/text v0.28.0 // indirect
85-
golang.org/x/time v0.12.0 // indirect
86-
google.golang.org/genproto/googleapis/api v0.0.0-20241209162323-e6fa225c2576 // indirect
87-
google.golang.org/genproto/googleapis/rpc v0.0.0-20241209162323-e6fa225c2576 // indirect
100+
golang.org/x/net v0.46.0 // indirect
101+
golang.org/x/sync v0.17.0 // indirect
102+
golang.org/x/sys v0.37.0 // indirect
103+
golang.org/x/term v0.36.0 // indirect
104+
golang.org/x/text v0.30.0 // indirect
105+
golang.org/x/time v0.14.0 // indirect
106+
google.golang.org/genproto v0.0.0-20250603155806-513f23925822 // indirect
107+
google.golang.org/genproto/googleapis/api v0.0.0-20250818200422-3122310a409c // indirect
108+
google.golang.org/genproto/googleapis/rpc v0.0.0-20251029180050-ab9386a59fda // indirect
88109
gopkg.in/evanphx/json-patch.v4 v4.12.0 // indirect
89110
gopkg.in/inf.v0 v0.9.1 // indirect
90111
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect

go.sum

Lines changed: 106 additions & 34 deletions
Large diffs are not rendered by default.
Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package constants
22

33
const (
4-
ConfigTypeMQTT = "mqtt"
5-
ConfigTypeGRPC = "grpc"
4+
ConfigTypeMQTT = "mqtt"
5+
ConfigTypeGRPC = "grpc"
6+
ConfigTypePubSub = "pubsub"
67
)

pkg/cloudevents/generic/options/builder/optionsbuilder.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options"
88
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc"
99
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/mqtt"
10+
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/pubsub"
1011
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types"
1112
)
1213

@@ -21,6 +22,7 @@ type ConfigLoader struct {
2122
// Available configuration types:
2223
// - mqtt
2324
// - grpc
25+
// - pubsub
2426
func NewConfigLoader(configType, configPath string) *ConfigLoader {
2527
return &ConfigLoader{
2628
configType: configType,
@@ -45,6 +47,13 @@ func (l *ConfigLoader) LoadConfig() (string, any, error) {
4547
}
4648

4749
return grpcOptions.Dialer.URL, grpcOptions, nil
50+
case constants.ConfigTypePubSub:
51+
pubsubOptions, err := pubsub.BuildPubSubOptionsFromFlags(l.configPath)
52+
if err != nil {
53+
return "", nil, err
54+
}
55+
56+
return "", pubsubOptions, nil
4857
}
4958

5059
return "", nil, fmt.Errorf("unsupported config type %s", l.configType)
@@ -58,6 +67,8 @@ func BuildCloudEventsSourceOptions(config any,
5867
return mqtt.NewSourceOptions(config, clientId, sourceId), nil
5968
case *grpc.GRPCOptions:
6069
return grpc.NewSourceOptions(config, sourceId, dataType), nil
70+
case *pubsub.PubSubOptions:
71+
return pubsub.NewSourceOptions(config, sourceId), nil
6172
default:
6273
return nil, fmt.Errorf("unsupported client configuration type %T", config)
6374
}
@@ -71,6 +82,8 @@ func BuildCloudEventsAgentOptions(config any,
7182
return mqtt.NewAgentOptions(config, clusterName, clientId), nil
7283
case *grpc.GRPCOptions:
7384
return grpc.NewAgentOptions(config, clusterName, clientId, dataType), nil
85+
case *pubsub.PubSubOptions:
86+
return pubsub.NewAgentOptions(config, clusterName, clientId), nil
7487
default:
7588
return nil, fmt.Errorf("unsupported client configuration type %T", config)
7689
}
Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
package pubsub
2+
3+
import (
4+
"context"
5+
"fmt"
6+
7+
"cloud.google.com/go/pubsub/v2"
8+
cloudevents "github.com/cloudevents/sdk-go/v2"
9+
"google.golang.org/api/option"
10+
"google.golang.org/grpc"
11+
"google.golang.org/grpc/credentials/insecure"
12+
"k8s.io/klog/v2"
13+
14+
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options"
15+
)
16+
17+
type pubsubAgentTransport struct {
18+
PubSubOptions
19+
clusterName string
20+
client *pubsub.Client
21+
publisher *pubsub.Publisher
22+
subscriber *pubsub.Subscriber
23+
errorChan chan error
24+
}
25+
26+
// NewAgentOptions creates a new CloudEventsAgentOptions for Pub/Sub.
27+
func NewAgentOptions(pubsubOptions *PubSubOptions,
28+
clusterName, agentID string) *options.CloudEventsAgentOptions {
29+
return &options.CloudEventsAgentOptions{
30+
CloudEventsTransport: &pubsubAgentTransport{
31+
PubSubOptions: *pubsubOptions,
32+
clusterName: clusterName,
33+
errorChan: make(chan error),
34+
},
35+
AgentID: agentID,
36+
ClusterName: clusterName,
37+
}
38+
}
39+
40+
func (o *pubsubAgentTransport) Connect(ctx context.Context) error {
41+
options := []option.ClientOption{}
42+
if o.CredentialsFile != "" {
43+
options = append(options, option.WithCredentialsFile(o.CredentialsFile))
44+
}
45+
if o.Endpoint != "" {
46+
options = append(options, option.WithEndpoint(o.Endpoint))
47+
if o.CredentialsFile == "" {
48+
pubsubConn, err := grpc.NewClient(o.Endpoint, grpc.WithTransportCredentials(insecure.NewCredentials()))
49+
if err != nil {
50+
return err
51+
}
52+
options = append(options, option.WithGRPCConn(pubsubConn))
53+
}
54+
}
55+
client, err := pubsub.NewClient(ctx, o.ProjectID, options...)
56+
if err != nil {
57+
return err
58+
}
59+
60+
o.client = client
61+
o.publisher = client.Publisher(o.Topics.AgentEvents)
62+
o.subscriber = client.Subscriber(o.Subscriptions.SourceEvents)
63+
64+
return nil
65+
}
66+
67+
func (o *pubsubAgentTransport) Send(ctx context.Context, evt cloudevents.Event) error {
68+
msg, err := Encode(evt)
69+
if err != nil {
70+
return err
71+
}
72+
73+
// Publish the message
74+
result := o.publisher.Publish(ctx, msg)
75+
76+
// Block until the result is returned and a server-generated
77+
// ID is returned for the published message.
78+
id, err := result.Get(ctx)
79+
if err != nil {
80+
return fmt.Errorf("pubsub: result.Get: %w", err)
81+
}
82+
klog.V(4).Infof("Published a message; msg ID: %v\n", id)
83+
return nil
84+
}
85+
86+
func (o *pubsubAgentTransport) Receive(ctx context.Context, fn options.ReceiveHandlerFn) error {
87+
err := o.subscriber.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
88+
evt, err := Decode(msg)
89+
if err != nil {
90+
klog.Warningf("failed to decode Pub/Sub message: %v", err)
91+
} else {
92+
// Handle the event
93+
fn(evt)
94+
}
95+
// TODO: decide when to Ack
96+
msg.Ack()
97+
})
98+
99+
if err != nil {
100+
return fmt.Errorf("pubsub: subscriber.Receive: %w", err)
101+
}
102+
103+
return nil
104+
}
105+
106+
func (o *pubsubAgentTransport) Close(ctx context.Context) error {
107+
return o.client.Close()
108+
}
109+
110+
func (o *pubsubAgentTransport) ErrorChan() <-chan error {
111+
return o.errorChan
112+
}

0 commit comments

Comments
 (0)