Skip to content

Commit dc2674a

Browse files
committed
address comments.
Signed-off-by: morvencao <[email protected]>
1 parent a38bf0e commit dc2674a

File tree

9 files changed

+611
-118
lines changed

9 files changed

+611
-118
lines changed

pkg/cloudevents/generic/options/builder/optionsbuilder_test.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,11 @@ func TestBuildCloudEventsSourceOptions(t *testing.T) {
106106
AgentEvents: "projects/test-project/subscriptions/agentevents-source1",
107107
AgentBroadcast: "projects/test-project/subscriptions/agentbroadcast-source1",
108108
},
109+
KeepaliveSettings: &pubsub.KeepaliveSettings{
110+
Time: 5 * time.Minute,
111+
Timeout: 20 * time.Second,
112+
PermitWithoutStream: false,
113+
},
109114
},
110115
},
111116
}
@@ -167,6 +172,11 @@ func TestBuildCloudEventsAgentOptions(t *testing.T) {
167172
SourceEvents: "projects/test-project/subscriptions/sourceevents-cluster1",
168173
SourceBroadcast: "projects/test-project/subscriptions/sourcebroadcast-cluster1",
169174
},
175+
KeepaliveSettings: &pubsub.KeepaliveSettings{
176+
Time: 5 * time.Minute,
177+
Timeout: 20 * time.Second,
178+
PermitWithoutStream: false,
179+
},
170180
},
171181
},
172182
}

pkg/cloudevents/generic/options/pubsub/agentoptions.go

Lines changed: 82 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"google.golang.org/api/option"
1010
"google.golang.org/grpc"
1111
"google.golang.org/grpc/credentials/insecure"
12+
"google.golang.org/grpc/keepalive"
1213
"k8s.io/klog/v2"
1314

1415
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options"
@@ -30,8 +31,7 @@ type pubsubAgentTransport struct {
3031
subscriber *pubsub.Subscriber
3132
// Subscriber for resync broadcasts
3233
resyncSubscriber *pubsub.Subscriber
33-
// TODO: handle error channel
34-
errorChan chan error
34+
errorChan chan error
3535
}
3636

3737
// NewAgentOptions creates a new CloudEventsAgentOptions for Pub/Sub.
@@ -49,23 +49,28 @@ func NewAgentOptions(pubsubOptions *PubSubOptions,
4949
}
5050

5151
func (o *pubsubAgentTransport) Connect(ctx context.Context) error {
52-
options := []option.ClientOption{}
52+
clientOptions := []option.ClientOption{}
5353
if o.CredentialsFile != "" {
54-
options = append(options, option.WithCredentialsFile(o.CredentialsFile))
54+
clientOptions = append(clientOptions, option.WithCredentialsFile(o.CredentialsFile))
5555
}
5656
if o.Endpoint != "" {
57-
options = append(options, option.WithEndpoint(o.Endpoint))
57+
clientOptions = append(clientOptions, option.WithEndpoint(o.Endpoint))
5858
if o.CredentialsFile == "" {
5959
// use the insecure connection for local development/testing when no credentials are provided
6060
pubsubConn, err := grpc.NewClient(o.Endpoint, grpc.WithTransportCredentials(insecure.NewCredentials()))
6161
if err != nil {
6262
return err
6363
}
64-
options = append(options, option.WithGRPCConn(pubsubConn))
64+
clientOptions = append(clientOptions, option.WithGRPCConn(pubsubConn))
6565
}
6666
}
6767

68-
client, err := pubsub.NewClient(ctx, o.ProjectID, options...)
68+
if o.KeepaliveSettings != nil {
69+
// config keepalive parameters for pubsub client
70+
clientOptions = append(clientOptions, option.WithGRPCDialOption(grpc.WithKeepaliveParams(toGRPCKeepaliveParamater(o.KeepaliveSettings))))
71+
}
72+
73+
client, err := pubsub.NewClient(ctx, o.ProjectID, clientOptions...)
6974
if err != nil {
7075
return err
7176
}
@@ -77,6 +82,13 @@ func (o *pubsubAgentTransport) Connect(ctx context.Context) error {
7782
o.subscriber = client.Subscriber(o.Subscriptions.SourceEvents)
7883
o.resyncSubscriber = client.Subscriber(o.Subscriptions.SourceBroadcast)
7984

85+
// configure receive settings if provided
86+
if o.ReceiveSettings != nil {
87+
receiveSettings := toPubSubReceiveSettings(o.ReceiveSettings)
88+
o.subscriber.ReceiveSettings = receiveSettings
89+
o.resyncSubscriber.ReceiveSettings = receiveSettings
90+
}
91+
8092
return nil
8193
}
8294

@@ -88,7 +100,7 @@ func (o *pubsubAgentTransport) Send(ctx context.Context, evt cloudevents.Event)
88100

89101
eventType, err := types.ParseCloudEventsType(evt.Context.GetType())
90102
if err != nil {
91-
return fmt.Errorf("unsupported event type %s, %v", eventType, err)
103+
return fmt.Errorf("unsupported event type %s, %v", evt.Context.GetType(), err)
92104
}
93105

94106
// determine publisher based on event type
@@ -105,10 +117,6 @@ func (o *pubsubAgentTransport) Send(ctx context.Context, evt cloudevents.Event)
105117
}
106118

107119
func (o *pubsubAgentTransport) Receive(ctx context.Context, fn options.ReceiveHandlerFn) error {
108-
// create error channels for both subscribers
109-
subscriberErrChan := make(chan error, 1)
110-
resyncSubscriberErrChan := make(chan error, 1)
111-
112120
// start the subscriber for spec updates
113121
go func() {
114122
err := o.subscriber.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
@@ -123,7 +131,15 @@ func (o *pubsubAgentTransport) Receive(ctx context.Context, fn options.ReceiveHa
123131
msg.Ack()
124132
})
125133
if err != nil {
126-
subscriberErrChan <- fmt.Errorf("subscriber is interrupted by error: %w", err)
134+
// When keepalive is enabled, the Pub/Sub client's Receive call automatically retries on retryable errors.
135+
// See: https://github.com/googleapis/google-cloud-go/blob/b8e70aa0056a3e126bc36cb7bf242d987f32c0bd/pubsub/service.go#L51
136+
// If Receive returns an error, it’s usually due to a non-retryable issue or service outage.
137+
// In such cases, we send the error to the error channel to signal the source/agent client to reconnect later.
138+
select {
139+
case o.errorChan <- fmt.Errorf("subscriber is interrupted by error: %w", err):
140+
default:
141+
klog.Warningf("error channel full, dropping subscriber error: %v", err)
142+
}
127143
}
128144
}()
129145

@@ -141,19 +157,21 @@ func (o *pubsubAgentTransport) Receive(ctx context.Context, fn options.ReceiveHa
141157
msg.Ack()
142158
})
143159
if err != nil {
144-
resyncSubscriberErrChan <- fmt.Errorf("resync subscriber is interrupted by error: %w", err)
160+
// When keepalive is enabled, the Pub/Sub client's Receive call automatically retries on retryable errors.
161+
// See: https://github.com/googleapis/google-cloud-go/blob/b8e70aa0056a3e126bc36cb7bf242d987f32c0bd/pubsub/service.go#L51
162+
// If Receive returns an error, it’s usually due to a non-retryable issue or service outage.
163+
// In such cases, we send the error to the error channel to signal the source/agent client to reconnect later.
164+
select {
165+
case o.errorChan <- fmt.Errorf("resync subscriber is interrupted by error: %w", err):
166+
default:
167+
klog.Warningf("error channel full, dropping resync subscriber error: %v", err)
168+
}
145169
}
146170
}()
147171

148-
// wait for either subscriber to error or context cancellation
149-
select {
150-
case err := <-subscriberErrChan:
151-
return err
152-
case err := <-resyncSubscriberErrChan:
153-
return err
154-
case <-ctx.Done():
155-
return ctx.Err()
156-
}
172+
// wait for context cancellation or timeout
173+
<-ctx.Done()
174+
return ctx.Err()
157175
}
158176

159177
func (o *pubsubAgentTransport) Close(ctx context.Context) error {
@@ -163,3 +181,44 @@ func (o *pubsubAgentTransport) Close(ctx context.Context) error {
163181
func (o *pubsubAgentTransport) ErrorChan() <-chan error {
164182
return o.errorChan
165183
}
184+
185+
// toGRPCKeepaliveParamater converts our KeepaliveSettings to GRPC ClientParameters.
186+
func toGRPCKeepaliveParamater(settings *KeepaliveSettings) keepalive.ClientParameters {
187+
keepaliveParamater := keepalive.ClientParameters{
188+
PermitWithoutStream: settings.PermitWithoutStream,
189+
}
190+
if settings.Time > 0 {
191+
keepaliveParamater.Time = settings.Time
192+
}
193+
if settings.Timeout > 0 {
194+
keepaliveParamater.Timeout = settings.Timeout
195+
}
196+
197+
return keepaliveParamater
198+
}
199+
200+
// toPubSubReceiveSettings converts our ReceiveSettings to Pub/Sub ReceiveSettings.
201+
func toPubSubReceiveSettings(settings *ReceiveSettings) pubsub.ReceiveSettings {
202+
receiveSettings := pubsub.ReceiveSettings{}
203+
204+
if settings.MaxExtension > 0 {
205+
receiveSettings.MaxExtension = settings.MaxExtension
206+
}
207+
if settings.MaxDurationPerAckExtension > 0 {
208+
receiveSettings.MaxDurationPerAckExtension = settings.MaxDurationPerAckExtension
209+
}
210+
if settings.MinDurationPerAckExtension > 0 {
211+
receiveSettings.MinDurationPerAckExtension = settings.MinDurationPerAckExtension
212+
}
213+
if settings.MaxOutstandingMessages > 0 {
214+
receiveSettings.MaxOutstandingMessages = settings.MaxOutstandingMessages
215+
}
216+
if settings.MaxOutstandingBytes > 0 {
217+
receiveSettings.MaxOutstandingBytes = settings.MaxOutstandingBytes
218+
}
219+
if settings.NumGoroutines > 0 {
220+
receiveSettings.NumGoroutines = settings.NumGoroutines
221+
}
222+
223+
return receiveSettings
224+
}

pkg/cloudevents/generic/options/pubsub/agentoptions_test.go

Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,10 @@ package pubsub
22

33
import (
44
"testing"
5+
"time"
56

7+
"cloud.google.com/go/pubsub/v2"
8+
"google.golang.org/grpc/keepalive"
69
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types"
710
)
811

@@ -151,3 +154,170 @@ func TestPubsubAgentTransport_Structure(t *testing.T) {
151154
t.Error("expected resyncSubscriber to be nil before Connect is called")
152155
}
153156
}
157+
158+
func TestToGRPCKeepaliveParameter(t *testing.T) {
159+
cases := []struct {
160+
name string
161+
settings *KeepaliveSettings
162+
expected keepalive.ClientParameters
163+
}{
164+
{
165+
name: "all settings provided",
166+
settings: &KeepaliveSettings{
167+
Time: 10 * time.Minute,
168+
Timeout: 30 * time.Second,
169+
PermitWithoutStream: true,
170+
},
171+
expected: keepalive.ClientParameters{
172+
Time: 10 * time.Minute,
173+
Timeout: 30 * time.Second,
174+
PermitWithoutStream: true,
175+
},
176+
},
177+
{
178+
name: "only time provided",
179+
settings: &KeepaliveSettings{
180+
Time: 5 * time.Minute,
181+
},
182+
expected: keepalive.ClientParameters{
183+
Time: 5 * time.Minute,
184+
Timeout: 0,
185+
PermitWithoutStream: false,
186+
},
187+
},
188+
{
189+
name: "only timeout provided",
190+
settings: &KeepaliveSettings{
191+
Timeout: 20 * time.Second,
192+
},
193+
expected: keepalive.ClientParameters{
194+
Time: 0,
195+
Timeout: 20 * time.Second,
196+
PermitWithoutStream: false,
197+
},
198+
},
199+
{
200+
name: "zero values",
201+
settings: &KeepaliveSettings{
202+
Time: 0,
203+
Timeout: 0,
204+
PermitWithoutStream: false,
205+
},
206+
expected: keepalive.ClientParameters{
207+
Time: 0,
208+
Timeout: 0,
209+
PermitWithoutStream: false,
210+
},
211+
},
212+
}
213+
214+
for _, c := range cases {
215+
t.Run(c.name, func(t *testing.T) {
216+
result := toGRPCKeepaliveParamater(c.settings)
217+
218+
if result.Time != c.expected.Time {
219+
t.Errorf("expected Time to be %v, got %v", c.expected.Time, result.Time)
220+
}
221+
222+
if result.Timeout != c.expected.Timeout {
223+
t.Errorf("expected Timeout to be %v, got %v", c.expected.Timeout, result.Timeout)
224+
}
225+
226+
if result.PermitWithoutStream != c.expected.PermitWithoutStream {
227+
t.Errorf("expected PermitWithoutStream to be %v, got %v", c.expected.PermitWithoutStream, result.PermitWithoutStream)
228+
}
229+
})
230+
}
231+
}
232+
233+
func TestToPubSubReceiveSettings(t *testing.T) {
234+
cases := []struct {
235+
name string
236+
settings *ReceiveSettings
237+
expected pubsub.ReceiveSettings
238+
}{
239+
{
240+
name: "all settings provided",
241+
settings: &ReceiveSettings{
242+
MaxExtension: 600 * time.Second,
243+
MaxDurationPerAckExtension: 10 * time.Second,
244+
MinDurationPerAckExtension: 1 * time.Second,
245+
MaxOutstandingMessages: 1000,
246+
MaxOutstandingBytes: 1000000000,
247+
NumGoroutines: 10,
248+
},
249+
expected: pubsub.ReceiveSettings{
250+
MaxExtension: 600 * time.Second,
251+
MaxDurationPerAckExtension: 10 * time.Second,
252+
MinDurationPerAckExtension: 1 * time.Second,
253+
MaxOutstandingMessages: 1000,
254+
MaxOutstandingBytes: 1000000000,
255+
NumGoroutines: 10,
256+
},
257+
},
258+
{
259+
name: "partial settings",
260+
settings: &ReceiveSettings{
261+
MaxExtension: 300 * time.Second,
262+
MaxOutstandingMessages: 500,
263+
},
264+
expected: pubsub.ReceiveSettings{
265+
MaxExtension: 300 * time.Second,
266+
MaxDurationPerAckExtension: 0,
267+
MinDurationPerAckExtension: 0,
268+
MaxOutstandingMessages: 500,
269+
MaxOutstandingBytes: 0,
270+
NumGoroutines: 0,
271+
},
272+
},
273+
{
274+
name: "zero values",
275+
settings: &ReceiveSettings{
276+
MaxExtension: 0,
277+
MaxDurationPerAckExtension: 0,
278+
MinDurationPerAckExtension: 0,
279+
MaxOutstandingMessages: 0,
280+
MaxOutstandingBytes: 0,
281+
NumGoroutines: 0,
282+
},
283+
expected: pubsub.ReceiveSettings{
284+
MaxExtension: 0,
285+
MaxDurationPerAckExtension: 0,
286+
MinDurationPerAckExtension: 0,
287+
MaxOutstandingMessages: 0,
288+
MaxOutstandingBytes: 0,
289+
NumGoroutines: 0,
290+
},
291+
},
292+
}
293+
294+
for _, c := range cases {
295+
t.Run(c.name, func(t *testing.T) {
296+
result := toPubSubReceiveSettings(c.settings)
297+
298+
if result.MaxExtension != c.expected.MaxExtension {
299+
t.Errorf("expected MaxExtension to be %v, got %v", c.expected.MaxExtension, result.MaxExtension)
300+
}
301+
302+
if result.MaxDurationPerAckExtension != c.expected.MaxDurationPerAckExtension {
303+
t.Errorf("expected MaxDurationPerAckExtension to be %v, got %v", c.expected.MaxDurationPerAckExtension, result.MaxDurationPerAckExtension)
304+
}
305+
306+
if result.MinDurationPerAckExtension != c.expected.MinDurationPerAckExtension {
307+
t.Errorf("expected MinDurationPerAckExtension to be %v, got %v", c.expected.MinDurationPerAckExtension, result.MinDurationPerAckExtension)
308+
}
309+
310+
if result.MaxOutstandingMessages != c.expected.MaxOutstandingMessages {
311+
t.Errorf("expected MaxOutstandingMessages to be %v, got %v", c.expected.MaxOutstandingMessages, result.MaxOutstandingMessages)
312+
}
313+
314+
if result.MaxOutstandingBytes != c.expected.MaxOutstandingBytes {
315+
t.Errorf("expected MaxOutstandingBytes to be %v, got %v", c.expected.MaxOutstandingBytes, result.MaxOutstandingBytes)
316+
}
317+
318+
if result.NumGoroutines != c.expected.NumGoroutines {
319+
t.Errorf("expected NumGoroutines to be %v, got %v", c.expected.NumGoroutines, result.NumGoroutines)
320+
}
321+
})
322+
}
323+
}

0 commit comments

Comments
 (0)