Skip to content

Commit 87053df

Browse files
committed
address comments.
Signed-off-by: morvencao <[email protected]>
1 parent a38bf0e commit 87053df

File tree

13 files changed

+735
-383
lines changed

13 files changed

+735
-383
lines changed

pkg/cloudevents/generic/options/builder/optionsbuilder_test.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,11 @@ func TestBuildCloudEventsSourceOptions(t *testing.T) {
106106
AgentEvents: "projects/test-project/subscriptions/agentevents-source1",
107107
AgentBroadcast: "projects/test-project/subscriptions/agentbroadcast-source1",
108108
},
109+
KeepaliveSettings: &pubsub.KeepaliveSettings{
110+
Time: 5 * time.Minute,
111+
Timeout: 20 * time.Second,
112+
PermitWithoutStream: false,
113+
},
109114
},
110115
},
111116
}
@@ -167,6 +172,11 @@ func TestBuildCloudEventsAgentOptions(t *testing.T) {
167172
SourceEvents: "projects/test-project/subscriptions/sourceevents-cluster1",
168173
SourceBroadcast: "projects/test-project/subscriptions/sourcebroadcast-cluster1",
169174
},
175+
KeepaliveSettings: &pubsub.KeepaliveSettings{
176+
Time: 5 * time.Minute,
177+
Timeout: 20 * time.Second,
178+
PermitWithoutStream: false,
179+
},
170180
},
171181
},
172182
}
Lines changed: 1 addition & 147 deletions
Original file line numberDiff line numberDiff line change
@@ -1,44 +1,14 @@
11
package pubsub
22

33
import (
4-
"context"
5-
"fmt"
6-
7-
"cloud.google.com/go/pubsub/v2"
8-
cloudevents "github.com/cloudevents/sdk-go/v2"
9-
"google.golang.org/api/option"
10-
"google.golang.org/grpc"
11-
"google.golang.org/grpc/credentials/insecure"
12-
"k8s.io/klog/v2"
13-
144
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options"
15-
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types"
165
)
176

18-
var _ options.CloudEventTransport = &pubsubAgentTransport{}
19-
20-
// pubsubAgentTransport is a CloudEventTransport implementation for Pub/Sub for agents.
21-
type pubsubAgentTransport struct {
22-
PubSubOptions
23-
clusterName string
24-
client *pubsub.Client
25-
// Publisher for status updates
26-
publisher *pubsub.Publisher
27-
// Publisher for resync broadcasts
28-
resyncPublisher *pubsub.Publisher
29-
// Subscriber for spec updates
30-
subscriber *pubsub.Subscriber
31-
// Subscriber for resync broadcasts
32-
resyncSubscriber *pubsub.Subscriber
33-
// TODO: handle error channel
34-
errorChan chan error
35-
}
36-
377
// NewAgentOptions creates a new CloudEventsAgentOptions for Pub/Sub.
388
func NewAgentOptions(pubsubOptions *PubSubOptions,
399
clusterName, agentID string) *options.CloudEventsAgentOptions {
4010
return &options.CloudEventsAgentOptions{
41-
CloudEventsTransport: &pubsubAgentTransport{
11+
CloudEventsTransport: &pubsubTransport{
4212
PubSubOptions: *pubsubOptions,
4313
clusterName: clusterName,
4414
errorChan: make(chan error),
@@ -47,119 +17,3 @@ func NewAgentOptions(pubsubOptions *PubSubOptions,
4717
ClusterName: clusterName,
4818
}
4919
}
50-
51-
func (o *pubsubAgentTransport) Connect(ctx context.Context) error {
52-
options := []option.ClientOption{}
53-
if o.CredentialsFile != "" {
54-
options = append(options, option.WithCredentialsFile(o.CredentialsFile))
55-
}
56-
if o.Endpoint != "" {
57-
options = append(options, option.WithEndpoint(o.Endpoint))
58-
if o.CredentialsFile == "" {
59-
// use the insecure connection for local development/testing when no credentials are provided
60-
pubsubConn, err := grpc.NewClient(o.Endpoint, grpc.WithTransportCredentials(insecure.NewCredentials()))
61-
if err != nil {
62-
return err
63-
}
64-
options = append(options, option.WithGRPCConn(pubsubConn))
65-
}
66-
}
67-
68-
client, err := pubsub.NewClient(ctx, o.ProjectID, options...)
69-
if err != nil {
70-
return err
71-
}
72-
73-
// initialize pubsub client, publisher and subscriber
74-
o.client = client
75-
o.publisher = client.Publisher(o.Topics.AgentEvents)
76-
o.resyncPublisher = client.Publisher(o.Topics.AgentBroadcast)
77-
o.subscriber = client.Subscriber(o.Subscriptions.SourceEvents)
78-
o.resyncSubscriber = client.Subscriber(o.Subscriptions.SourceBroadcast)
79-
80-
return nil
81-
}
82-
83-
func (o *pubsubAgentTransport) Send(ctx context.Context, evt cloudevents.Event) error {
84-
msg, err := Encode(evt)
85-
if err != nil {
86-
return err
87-
}
88-
89-
eventType, err := types.ParseCloudEventsType(evt.Context.GetType())
90-
if err != nil {
91-
return fmt.Errorf("unsupported event type %s, %v", eventType, err)
92-
}
93-
94-
// determine publisher based on event type
95-
var result *pubsub.PublishResult
96-
if eventType.Action == types.ResyncRequestAction {
97-
result = o.resyncPublisher.Publish(ctx, msg)
98-
} else {
99-
result = o.publisher.Publish(ctx, msg)
100-
}
101-
102-
// block until the result is returned
103-
_, err = result.Get(ctx)
104-
return err
105-
}
106-
107-
func (o *pubsubAgentTransport) Receive(ctx context.Context, fn options.ReceiveHandlerFn) error {
108-
// create error channels for both subscribers
109-
subscriberErrChan := make(chan error, 1)
110-
resyncSubscriberErrChan := make(chan error, 1)
111-
112-
// start the subscriber for spec updates
113-
go func() {
114-
err := o.subscriber.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
115-
evt, err := Decode(msg)
116-
if err != nil {
117-
// also send ACK on decode error since redelivery won't fix it.
118-
klog.Errorf("failed to decode pubsub message to resource spec event: %v", err)
119-
} else {
120-
fn(evt)
121-
}
122-
// send ACK after all receiver handlers complete.
123-
msg.Ack()
124-
})
125-
if err != nil {
126-
subscriberErrChan <- fmt.Errorf("subscriber is interrupted by error: %w", err)
127-
}
128-
}()
129-
130-
// start the resync subscriber for broadcast messages
131-
go func() {
132-
err := o.resyncSubscriber.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
133-
evt, err := Decode(msg)
134-
if err != nil {
135-
// also send ACK on decode error since redelivery won't fix it.
136-
klog.Errorf("failed to decode pubsub message to resource statusresync event: %v", err)
137-
} else {
138-
fn(evt)
139-
}
140-
// send ACK after all receiver handlers complete.
141-
msg.Ack()
142-
})
143-
if err != nil {
144-
resyncSubscriberErrChan <- fmt.Errorf("resync subscriber is interrupted by error: %w", err)
145-
}
146-
}()
147-
148-
// wait for either subscriber to error or context cancellation
149-
select {
150-
case err := <-subscriberErrChan:
151-
return err
152-
case err := <-resyncSubscriberErrChan:
153-
return err
154-
case <-ctx.Done():
155-
return ctx.Err()
156-
}
157-
}
158-
159-
func (o *pubsubAgentTransport) Close(ctx context.Context) error {
160-
return o.client.Close()
161-
}
162-
163-
func (o *pubsubAgentTransport) ErrorChan() <-chan error {
164-
return o.errorChan
165-
}

pkg/cloudevents/generic/options/pubsub/agentoptions_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -70,9 +70,9 @@ func TestNewAgentOptions(t *testing.T) {
7070
t.Fatal("expected CloudEventsTransport to be set, got nil")
7171
}
7272

73-
transport, ok := opts.CloudEventsTransport.(*pubsubAgentTransport)
73+
transport, ok := opts.CloudEventsTransport.(*pubsubTransport)
7474
if !ok {
75-
t.Fatalf("expected transport to be *pubsubAgentTransport, got %T", opts.CloudEventsTransport)
75+
t.Fatalf("expected transport to be *pubsubTransport, got %T", opts.CloudEventsTransport)
7676
}
7777

7878
if transport.clusterName != c.clusterName {
@@ -112,7 +112,7 @@ func TestPubsubAgentTransport_Structure(t *testing.T) {
112112
}
113113

114114
opts := NewAgentOptions(pubsubOpts, "test-cluster", "test-agent")
115-
transport := opts.CloudEventsTransport.(*pubsubAgentTransport)
115+
transport := opts.CloudEventsTransport.(*pubsubTransport)
116116

117117
// Verify topics are set correctly for agent
118118
if transport.Topics.AgentEvents != pubsubOpts.Topics.AgentEvents {

pkg/cloudevents/generic/options/pubsub/codec.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,7 @@ func Decode(msg *pubsub.Message) (cloudevents.Event, error) {
112112
// Get the spec version to access attributes
113113
version := specs.Version(specVersion)
114114

115+
var dataContentType string
115116
// Set all CloudEvent context attributes from message attributes
116117
for _, attr := range version.Attributes() {
117118
var value string
@@ -120,6 +121,7 @@ func Decode(msg *pubsub.Message) (cloudevents.Event, error) {
120121
// Special handling for datacontenttype - use "Content-Type" without "ce-" prefix
121122
if attr.Kind() == spec.DataContentType {
122123
value, ok = msg.Attributes[contentType]
124+
dataContentType = value
123125
} else {
124126
value, ok = msg.Attributes[attr.PrefixedName()]
125127
}
@@ -152,8 +154,12 @@ func Decode(msg *pubsub.Message) (cloudevents.Event, error) {
152154
}
153155
}
154156

157+
if dataContentType == "" {
158+
// default data content type be "application/JSON"
159+
dataContentType = cloudevents.ApplicationJSON
160+
}
155161
// Set the data from the message
156-
if err := evt.SetData(cloudevents.ApplicationJSON, msg.Data); err != nil {
162+
if err := evt.SetData(dataContentType, msg.Data); err != nil {
157163
return cloudevents.Event{}, fmt.Errorf("failed to set event data: %v", err)
158164
}
159165

pkg/cloudevents/generic/options/pubsub/codec_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package pubsub
22

33
import (
4+
"strings"
45
"testing"
56
"time"
67

@@ -341,7 +342,7 @@ func TestDecode(t *testing.T) {
341342
if c.expectedErr {
342343
if err == nil {
343344
t.Errorf("expected error, but got none")
344-
} else if c.errorContains != "" && !contains(err.Error(), c.errorContains) {
345+
} else if c.errorContains != "" && !strings.Contains(err.Error(), c.errorContains) {
345346
t.Errorf("expected error to contain %q, got %q", c.errorContains, err.Error())
346347
}
347348
return

pkg/cloudevents/generic/options/pubsub/options.go

Lines changed: 65 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"os"
66
"regexp"
77
"strings"
8+
"time"
89

910
"gopkg.in/yaml.v2"
1011

@@ -14,11 +15,13 @@ import (
1415

1516
// PubSubOptions holds the options that are used to build Pub/Sub client.
1617
type PubSubOptions struct {
17-
Endpoint string
18-
ProjectID string
19-
CredentialsFile string
20-
Topics types.Topics
21-
Subscriptions types.Subscriptions
18+
Endpoint string
19+
ProjectID string
20+
CredentialsFile string
21+
Topics types.Topics
22+
Subscriptions types.Subscriptions
23+
KeepaliveSettings *KeepaliveSettings
24+
ReceiveSettings *ReceiveSettings
2225
}
2326

2427
// PubSubConfig holds the information needed to connect to Google Cloud Pub/Sub.
@@ -44,7 +47,45 @@ type PubSubConfig struct {
4447
// Required: must be provided to specify the subscriptions to receive events from.
4548
Subscriptions *types.Subscriptions `json:"subscriptions,omitempty" yaml:"subscriptions,omitempty"`
4649

47-
// TODO: Add support for additional Pub/Sub settings (e.g., message ordering, receive options).
50+
// (Optional) KeepaliveSettings configures the keepalive parameters for Pub/Sub client.
51+
KeepaliveSettings *KeepaliveSettings `json:"keepaliveSettings,omitempty" yaml:"keepaliveSettings,omitempty"`
52+
53+
// (Optional) ReceiveSettings configures the pubsub subscriber's receive settings.
54+
ReceiveSettings *ReceiveSettings `json:"receiveSettings,omitempty" yaml:"receiveSettings,omitempty"`
55+
}
56+
57+
// KeepaliveSettings defines gRPC keepalive options for the Pub/Sub client.
58+
type KeepaliveSettings struct {
59+
// Time between pings when there’s no activity, minimum is 10s, default: 5m.
60+
Time time.Duration `json:"time,omitempty" yaml:"time,omitempty"`
61+
62+
// Wait time for a ping response before closing the connection, default: 20s.
63+
Timeout time.Duration `json:"timeout,omitempty" yaml:"timeout,omitempty"`
64+
65+
// If true, send pings even when no RPCs are active, default: false.
66+
PermitWithoutStream bool `json:"permitWithoutStream,omitempty" yaml:"permitWithoutStream,omitempty"`
67+
}
68+
69+
// ReceiveSettings defines how the Pub/Sub subscriber receives and processes messages.
70+
type ReceiveSettings struct {
71+
// MaxExtension is the maximum period for which the Subscriber should
72+
// automatically extend the ack deadline for each message.
73+
MaxExtension time.Duration `json:"maxExtension,omitempty" yaml:"maxExtension,omitempty"`
74+
75+
// MaxDurationPerAckExtension is the maximum duration per lease extension.
76+
MaxDurationPerAckExtension time.Duration `json:"maxDurationPerAckExtension,omitempty" yaml:"maxDurationPerAckExtension,omitempty"`
77+
78+
// MinDurationPerAckExtension is the minimum duration per lease extension.
79+
MinDurationPerAckExtension time.Duration `json:"minDurationPerAckExtension,omitempty" yaml:"minDurationPerAckExtension,omitempty"`
80+
81+
// MaxOutstandingMessages is the maximum number of unprocessed messages.
82+
MaxOutstandingMessages int `json:"maxOutstandingMessages,omitempty" yaml:"maxOutstandingMessages,omitempty"`
83+
84+
// MaxOutstandingBytes is the maximum size of unprocessed messages.
85+
MaxOutstandingBytes int `json:"maxOutstandingBytes,omitempty" yaml:"maxOutstandingBytes,omitempty"`
86+
87+
// NumGoroutines is the number of StreamingPull streams to pull messages from the subscription.
88+
NumGoroutines int `json:"numGoroutines,omitempty" yaml:"numGoroutines,omitempty"`
4889
}
4990

5091
// LoadConfig loads the Pub/Sub configuration from a file.
@@ -79,13 +120,29 @@ func BuildPubSubOptionsFromFlags(configPath string) (*PubSubOptions, error) {
79120
return nil, err
80121
}
81122

82-
return &PubSubOptions{
123+
options := &PubSubOptions{
83124
Endpoint: config.Endpoint,
84125
ProjectID: config.ProjectID,
85126
CredentialsFile: config.CredentialsFile,
86127
Topics: *config.Topics,
87128
Subscriptions: *config.Subscriptions,
88-
}, nil
129+
// enable keepalive by default
130+
KeepaliveSettings: &KeepaliveSettings{
131+
Time: 5 * time.Minute,
132+
Timeout: 20 * time.Second,
133+
PermitWithoutStream: false,
134+
},
135+
}
136+
137+
if config.KeepaliveSettings != nil {
138+
options.KeepaliveSettings = config.KeepaliveSettings
139+
}
140+
141+
if config.ReceiveSettings != nil {
142+
options.ReceiveSettings = config.ReceiveSettings
143+
}
144+
145+
return options, nil
89146
}
90147

91148
func validateTopicsAndSubscriptions(topics *types.Topics, subscriptions *types.Subscriptions, projectID string) error {

0 commit comments

Comments
 (0)