Skip to content

Commit 941e1f1

Browse files
committed
address comments.
Signed-off-by: morvencao <[email protected]>
1 parent a38bf0e commit 941e1f1

File tree

4 files changed

+112
-93
lines changed

4 files changed

+112
-93
lines changed

pkg/cloudevents/generic/options/pubsub/agentoptions.go

Lines changed: 13 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -105,10 +105,6 @@ func (o *pubsubAgentTransport) Send(ctx context.Context, evt cloudevents.Event)
105105
}
106106

107107
func (o *pubsubAgentTransport) Receive(ctx context.Context, fn options.ReceiveHandlerFn) error {
108-
// create error channels for both subscribers
109-
subscriberErrChan := make(chan error, 1)
110-
resyncSubscriberErrChan := make(chan error, 1)
111-
112108
// start the subscriber for spec updates
113109
go func() {
114110
err := o.subscriber.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
@@ -123,7 +119,11 @@ func (o *pubsubAgentTransport) Receive(ctx context.Context, fn options.ReceiveHa
123119
msg.Ack()
124120
})
125121
if err != nil {
126-
subscriberErrChan <- fmt.Errorf("subscriber is interrupted by error: %w", err)
122+
// When keepalive is enabled, the Pub/Sub client's Receive call automatically retries on retryable errors.
123+
// See: https://github.com/googleapis/google-cloud-go/blob/b8e70aa0056a3e126bc36cb7bf242d987f32c0bd/pubsub/service.go#L51
124+
// If Receive returns an error, it’s usually due to a non-retryable issue or service outage.
125+
// In such cases, we send the error to the error channel to signal the source/agent client to reconnect later.
126+
o.errorChan <- fmt.Errorf("subscriber is interrupted by error: %w", err)
127127
}
128128
}()
129129

@@ -141,19 +141,17 @@ func (o *pubsubAgentTransport) Receive(ctx context.Context, fn options.ReceiveHa
141141
msg.Ack()
142142
})
143143
if err != nil {
144-
resyncSubscriberErrChan <- fmt.Errorf("resync subscriber is interrupted by error: %w", err)
144+
// When keepalive is enabled, the Pub/Sub client's Receive call automatically retries on retryable errors.
145+
// See: https://github.com/googleapis/google-cloud-go/blob/b8e70aa0056a3e126bc36cb7bf242d987f32c0bd/pubsub/service.go#L51
146+
// If Receive returns an error, it’s usually due to a non-retryable issue or service outage.
147+
// In such cases, we send the error to the error channel to signal the source/agent client to reconnect later.
148+
o.errorChan <- fmt.Errorf("resync subscriber is interrupted by error: %w", err)
145149
}
146150
}()
147151

148-
// wait for either subscriber to error or context cancellation
149-
select {
150-
case err := <-subscriberErrChan:
151-
return err
152-
case err := <-resyncSubscriberErrChan:
153-
return err
154-
case <-ctx.Done():
155-
return ctx.Err()
156-
}
152+
// wait for context cancellation or timeout
153+
<-ctx.Done()
154+
return ctx.Err()
157155
}
158156

159157
func (o *pubsubAgentTransport) Close(ctx context.Context) error {

pkg/cloudevents/generic/options/pubsub/sourceoptions.go

Lines changed: 13 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -104,10 +104,6 @@ func (o *pubsubSourceTransport) Send(ctx context.Context, evt cloudevents.Event)
104104
}
105105

106106
func (o *pubsubSourceTransport) Receive(ctx context.Context, fn options.ReceiveHandlerFn) error {
107-
// create error channels for both subscribers
108-
subscriberErrChan := make(chan error, 1)
109-
resyncSubscriberErrChan := make(chan error, 1)
110-
111107
// start the subscriber for status updates
112108
go func() {
113109
err := o.subscriber.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
@@ -122,7 +118,11 @@ func (o *pubsubSourceTransport) Receive(ctx context.Context, fn options.ReceiveH
122118
msg.Ack()
123119
})
124120
if err != nil {
125-
subscriberErrChan <- fmt.Errorf("subscriber is interrupted by error: %w", err)
121+
// When keepalive is enabled, the Pub/Sub client's Receive call automatically retries on retryable errors.
122+
// See: https://github.com/googleapis/google-cloud-go/blob/b8e70aa0056a3e126bc36cb7bf242d987f32c0bd/pubsub/service.go#L51
123+
// If Receive returns an error, it’s usually due to a non-retryable issue or service outage.
124+
// In such cases, we send the error to the error channel to signal the source/agent client to reconnect later.
125+
o.errorChan <- fmt.Errorf("subscriber is interrupted by error: %w", err)
126126
}
127127
}()
128128

@@ -140,19 +140,17 @@ func (o *pubsubSourceTransport) Receive(ctx context.Context, fn options.ReceiveH
140140
msg.Ack()
141141
})
142142
if err != nil {
143-
resyncSubscriberErrChan <- fmt.Errorf("resync subscriber is interrupted by error: %w", err)
143+
// When keepalive is enabled, the Pub/Sub client's Receive call automatically retries on retryable errors.
144+
// See: https://github.com/googleapis/google-cloud-go/blob/b8e70aa0056a3e126bc36cb7bf242d987f32c0bd/pubsub/service.go#L51
145+
// If Receive returns an error, it’s usually due to a non-retryable issue or service outage.
146+
// In such cases, we send the error to the error channel to signal the source/agent client to reconnect later.
147+
o.errorChan <- fmt.Errorf("resync subscriber is interrupted by error: %w", err)
144148
}
145149
}()
146150

147-
// wait for either subscriber to error or context cancellation
148-
select {
149-
case err := <-subscriberErrChan:
150-
return err
151-
case err := <-resyncSubscriberErrChan:
152-
return err
153-
case <-ctx.Done():
154-
return ctx.Err()
155-
}
151+
// wait for context cancellation or timeout
152+
<-ctx.Done()
153+
return ctx.Err()
156154
}
157155

158156
func (o *pubsubSourceTransport) Close(ctx context.Context) error {

test/integration/cloudevents/cloudevents_test.go

Lines changed: 82 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -85,69 +85,8 @@ func runCloudeventsClientPubSubTest(getSourceOptionsFn GetSourceOptionsFn) func(
8585
agentOptions = util.NewGRPCAgentOptions(certPool, grpcBrokerHost, tokenFile)
8686
case constants.ConfigTypePubSub:
8787
agentOptions = util.NewPubSubAgentOptions(pubsubServer.Addr, pubsubProjectID, clusterName)
88-
// prepare topics and subscriptions
89-
pubsubConn, err := grpc.NewClient(pubsubServer.Addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
90-
gomega.Expect(err).ToNot(gomega.HaveOccurred())
91-
defer pubsubConn.Close()
92-
93-
pubsubClient, err := pubsubv2.NewClient(ctx, pubsubProjectID, option.WithGRPCConn(pubsubConn))
94-
gomega.Expect(err).ToNot(gomega.HaveOccurred())
95-
defer pubsubClient.Close()
96-
97-
sourceEventsTopic := fmt.Sprintf("projects/%s/topics/sourceevents", pubsubProjectID)
98-
if _, err = pubsubClient.TopicAdminClient.GetTopic(ctx, &pubsubpb.GetTopicRequest{Topic: sourceEventsTopic}); err != nil {
99-
_, err = pubsubClient.TopicAdminClient.CreateTopic(ctx, &pubsubpb.Topic{Name: sourceEventsTopic})
100-
gomega.Expect(err).ToNot(gomega.HaveOccurred())
101-
}
102-
sourceEventsSubscription := fmt.Sprintf("projects/%s/subscriptions/sourceevents-%s", pubsubProjectID, clusterName)
103-
if _, err = pubsubClient.SubscriptionAdminClient.GetSubscription(ctx, &pubsubpb.GetSubscriptionRequest{Subscription: sourceEventsSubscription}); err != nil {
104-
_, err = pubsubClient.SubscriptionAdminClient.CreateSubscription(ctx, &pubsubpb.Subscription{
105-
Name: sourceEventsSubscription,
106-
Topic: sourceEventsTopic,
107-
Filter: fmt.Sprintf("attributes.\"ce-clustername\"=\"%s\"", clusterName),
108-
})
109-
gomega.Expect(err).ToNot(gomega.HaveOccurred())
110-
}
111-
sourceBroadcastTopic := fmt.Sprintf("projects/%s/topics/sourcebroadcast", pubsubProjectID)
112-
if _, err = pubsubClient.TopicAdminClient.GetTopic(ctx, &pubsubpb.GetTopicRequest{Topic: sourceBroadcastTopic}); err != nil {
113-
_, err = pubsubClient.TopicAdminClient.CreateTopic(ctx, &pubsubpb.Topic{Name: sourceBroadcastTopic})
114-
gomega.Expect(err).ToNot(gomega.HaveOccurred())
115-
}
116-
sourceBroadcastSubscription := fmt.Sprintf("projects/%s/subscriptions/sourcebroadcast-%s", pubsubProjectID, clusterName)
117-
if _, err = pubsubClient.SubscriptionAdminClient.GetSubscription(ctx, &pubsubpb.GetSubscriptionRequest{Subscription: sourceBroadcastSubscription}); err != nil {
118-
_, err = pubsubClient.SubscriptionAdminClient.CreateSubscription(ctx, &pubsubpb.Subscription{
119-
Name: sourceBroadcastSubscription,
120-
Topic: sourceBroadcastTopic,
121-
})
122-
gomega.Expect(err).ToNot(gomega.HaveOccurred())
123-
}
124-
agentEventsTopic := fmt.Sprintf("projects/%s/topics/agentevents", pubsubProjectID)
125-
if _, err = pubsubClient.TopicAdminClient.GetTopic(ctx, &pubsubpb.GetTopicRequest{Topic: agentEventsTopic}); err != nil {
126-
_, err = pubsubClient.TopicAdminClient.CreateTopic(ctx, &pubsubpb.Topic{Name: agentEventsTopic})
127-
gomega.Expect(err).ToNot(gomega.HaveOccurred())
128-
}
129-
agentEventsSubscription := fmt.Sprintf("projects/%s/subscriptions/agentevents-%s", pubsubProjectID, sourceID)
130-
if _, err = pubsubClient.SubscriptionAdminClient.GetSubscription(ctx, &pubsubpb.GetSubscriptionRequest{Subscription: agentEventsSubscription}); err != nil {
131-
_, err = pubsubClient.SubscriptionAdminClient.CreateSubscription(ctx, &pubsubpb.Subscription{
132-
Name: agentEventsSubscription,
133-
Topic: agentEventsTopic,
134-
Filter: fmt.Sprintf("attributes.\"ce-originalsource\"=\"%s\"", sourceID),
135-
})
136-
gomega.Expect(err).ToNot(gomega.HaveOccurred())
137-
}
138-
agentBroadcastTopic := fmt.Sprintf("projects/%s/topics/agentbroadcast", pubsubProjectID)
139-
if _, err = pubsubClient.TopicAdminClient.GetTopic(ctx, &pubsubpb.GetTopicRequest{Topic: agentBroadcastTopic}); err != nil {
140-
_, err = pubsubClient.TopicAdminClient.CreateTopic(ctx, &pubsubpb.Topic{Name: agentBroadcastTopic})
141-
gomega.Expect(err).ToNot(gomega.HaveOccurred())
142-
}
143-
agentBroadcastSubscription := fmt.Sprintf("projects/%s/subscriptions/agentbroadcast-%s", pubsubProjectID, sourceID)
144-
if _, err = pubsubClient.SubscriptionAdminClient.GetSubscription(ctx, &pubsubpb.GetSubscriptionRequest{Subscription: agentBroadcastSubscription}); err != nil {
145-
_, err = pubsubClient.SubscriptionAdminClient.CreateSubscription(ctx, &pubsubpb.Subscription{
146-
Name: agentBroadcastSubscription,
147-
Topic: agentBroadcastTopic,
148-
})
149-
gomega.Expect(err).ToNot(gomega.HaveOccurred())
150-
}
88+
// setup topics and subscriptions
89+
gomega.Expect(setupTopicsAndSubscriptions(ctx, clusterName, sourceID)).ToNot(gomega.HaveOccurred())
15190
}
15291

15392
sourceCloudEventsClient, err = source.StartResourceSourceClient(
@@ -316,3 +255,83 @@ func crudResource(
316255
return nil
317256
}, 10*time.Second, 1*time.Second).Should(gomega.Succeed())
318257
}
258+
259+
func setupTopicsAndSubscriptions(ctx context.Context, clusterName, sourceID string) error {
260+
// prepare topics and subscriptions
261+
pubsubConn, err := grpc.NewClient(pubsubServer.Addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
262+
if err != nil {
263+
return err
264+
}
265+
defer pubsubConn.Close()
266+
267+
pubsubClient, err := pubsubv2.NewClient(ctx, pubsubProjectID, option.WithGRPCConn(pubsubConn))
268+
if err != nil {
269+
return err
270+
}
271+
defer pubsubClient.Close()
272+
273+
sourceEventsTopic := fmt.Sprintf("projects/%s/topics/sourceevents", pubsubProjectID)
274+
if _, err = pubsubClient.TopicAdminClient.GetTopic(ctx, &pubsubpb.GetTopicRequest{Topic: sourceEventsTopic}); err != nil {
275+
if _, err = pubsubClient.TopicAdminClient.CreateTopic(ctx, &pubsubpb.Topic{Name: sourceEventsTopic}); err != nil {
276+
return err
277+
}
278+
}
279+
sourceEventsSubscription := fmt.Sprintf("projects/%s/subscriptions/sourceevents-%s", pubsubProjectID, clusterName)
280+
if _, err = pubsubClient.SubscriptionAdminClient.GetSubscription(ctx, &pubsubpb.GetSubscriptionRequest{Subscription: sourceEventsSubscription}); err != nil {
281+
if _, err = pubsubClient.SubscriptionAdminClient.CreateSubscription(ctx, &pubsubpb.Subscription{
282+
Name: sourceEventsSubscription,
283+
Topic: sourceEventsTopic,
284+
Filter: fmt.Sprintf("attributes.\"ce-clustername\"=\"%s\"", clusterName),
285+
}); err != nil {
286+
return err
287+
}
288+
}
289+
sourceBroadcastTopic := fmt.Sprintf("projects/%s/topics/sourcebroadcast", pubsubProjectID)
290+
if _, err = pubsubClient.TopicAdminClient.GetTopic(ctx, &pubsubpb.GetTopicRequest{Topic: sourceBroadcastTopic}); err != nil {
291+
if _, err = pubsubClient.TopicAdminClient.CreateTopic(ctx, &pubsubpb.Topic{Name: sourceBroadcastTopic}); err != nil {
292+
return err
293+
}
294+
}
295+
sourceBroadcastSubscription := fmt.Sprintf("projects/%s/subscriptions/sourcebroadcast-%s", pubsubProjectID, clusterName)
296+
if _, err = pubsubClient.SubscriptionAdminClient.GetSubscription(ctx, &pubsubpb.GetSubscriptionRequest{Subscription: sourceBroadcastSubscription}); err != nil {
297+
if _, err = pubsubClient.SubscriptionAdminClient.CreateSubscription(ctx, &pubsubpb.Subscription{
298+
Name: sourceBroadcastSubscription,
299+
Topic: sourceBroadcastTopic,
300+
}); err != nil {
301+
return err
302+
}
303+
}
304+
agentEventsTopic := fmt.Sprintf("projects/%s/topics/agentevents", pubsubProjectID)
305+
if _, err = pubsubClient.TopicAdminClient.GetTopic(ctx, &pubsubpb.GetTopicRequest{Topic: agentEventsTopic}); err != nil {
306+
if _, err = pubsubClient.TopicAdminClient.CreateTopic(ctx, &pubsubpb.Topic{Name: agentEventsTopic}); err != nil {
307+
return err
308+
}
309+
}
310+
agentEventsSubscription := fmt.Sprintf("projects/%s/subscriptions/agentevents-%s", pubsubProjectID, sourceID)
311+
if _, err = pubsubClient.SubscriptionAdminClient.GetSubscription(ctx, &pubsubpb.GetSubscriptionRequest{Subscription: agentEventsSubscription}); err != nil {
312+
if _, err = pubsubClient.SubscriptionAdminClient.CreateSubscription(ctx, &pubsubpb.Subscription{
313+
Name: agentEventsSubscription,
314+
Topic: agentEventsTopic,
315+
Filter: fmt.Sprintf("attributes.\"ce-originalsource\"=\"%s\"", sourceID),
316+
}); err != nil {
317+
return err
318+
}
319+
}
320+
agentBroadcastTopic := fmt.Sprintf("projects/%s/topics/agentbroadcast", pubsubProjectID)
321+
if _, err = pubsubClient.TopicAdminClient.GetTopic(ctx, &pubsubpb.GetTopicRequest{Topic: agentBroadcastTopic}); err != nil {
322+
if _, err = pubsubClient.TopicAdminClient.CreateTopic(ctx, &pubsubpb.Topic{Name: agentBroadcastTopic}); err != nil {
323+
return err
324+
}
325+
}
326+
agentBroadcastSubscription := fmt.Sprintf("projects/%s/subscriptions/agentbroadcast-%s", pubsubProjectID, sourceID)
327+
if _, err = pubsubClient.SubscriptionAdminClient.GetSubscription(ctx, &pubsubpb.GetSubscriptionRequest{Subscription: agentBroadcastSubscription}); err != nil {
328+
if _, err = pubsubClient.SubscriptionAdminClient.CreateSubscription(ctx, &pubsubpb.Subscription{
329+
Name: agentBroadcastSubscription,
330+
Topic: agentBroadcastTopic,
331+
}); err != nil {
332+
return err
333+
}
334+
}
335+
336+
return nil
337+
}

test/integration/cloudevents/suite_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,10 @@ var _ = ginkgo.AfterSuite(func() {
177177
err := mqttBroker.Close()
178178
gomega.Expect(err).ToNot(gomega.HaveOccurred())
179179

180+
// close the pubsub test server
181+
err = pubsubServer.Close()
182+
gomega.Expect(err).ToNot(gomega.HaveOccurred())
183+
180184
// remove the temp files
181185
err = clienttesting.RemoveTempFile(caFile)
182186
gomega.Expect(err).ToNot(gomega.HaveOccurred())

0 commit comments

Comments
 (0)