Skip to content

Conversation

@morvencao
Copy link
Member

@morvencao morvencao commented Nov 4, 2025

Signed-off-by: morvencao [email protected]

rh-pre-commit.version: 2.3.2
rh-pre-commit.check-secrets: ENABLED

Summary

Related issue(s)

Fixes #

Summary by CodeRabbit

  • New Features

    • Added Google Cloud Pub/Sub support as a CloudEvents transport for sources and agents (publish/subscribe lifecycle, encoding/decoding, send/receive, error channel, configurable keepalive/receive).
  • Documentation

    • Added Pub/Sub configuration docs with YAML examples, semantics, and validation guidance; updated design docs to list Pub/Sub.
  • Tests

    • Added unit and integration tests and helpers covering Pub/Sub options, codec, transport, validation, and end-to-end flows.
  • Chores

    • Upgraded multiple dependencies and related metadata.

@coderabbitai
Copy link

coderabbitai bot commented Nov 4, 2025

Walkthrough

Adds Google Cloud Pub/Sub as a CloudEvents transport: new PubSub config/types, YAML loader and validation, codec (Encode/Decode), transport implementation (Connect/Send/Receive/Close/ErrorChan), builder wiring and constructors for source/agent, tests, docs updates, MQTT topic-pattern renames, and dependency upgrades.

Changes

Cohort / File(s) Summary
Dependency Management
go.mod
Added cloud.google.com/go/pubsub/v2 and upgraded numerous google/golang.org, golang.org/x, OpenTelemetry, and other indirect dependencies.
Constants & Core Types
pkg/cloudevents/constants/constants.go, pkg/cloudevents/generic/types/types.go
Added ConfigTypePubSub; introduced MQTT-specific topic pattern constants; added PubSubTopicPattern, PubSubSubscriptionPattern, and new Subscriptions struct.
Documentation
pkg/cloudevents/README.md, pkg/cloudevents/doc/design.md, pkg/cloudevents/generic/options/options.go
Documented Pub/Sub support, added YAML configuration examples, updated design diagram and comments to include Pub/Sub.
Options Builder Integration
pkg/cloudevents/generic/options/builder/optionsbuilder.go, pkg/cloudevents/generic/options/builder/optionsbuilder_test.go
Extended builder to handle pubsub config type; added Pub/Sub test cases and refactored test helpers for source/agent assertions.
Pub/Sub Configuration & Validation
pkg/cloudevents/generic/options/pubsub/options.go, pkg/cloudevents/generic/options/pubsub/options_test.go
New PubSubOptions/PubSubConfig types, YAML load function, BuildPubSubOptionsFromFlags, topic/subscription validation against project-derived regex, keepalive/receive settings and extensive tests.
Pub/Sub Codec
pkg/cloudevents/generic/options/pubsub/codec.go, pkg/cloudevents/generic/options/pubsub/codec_test.go
Added Encode(evt) -> *pubsub.Message and Decode(msg) -> cloudevents.Event mapping using ce- attributes and Content-Type handling, with unit tests including round-trip coverage.
Pub/Sub Transport & Constructors
pkg/cloudevents/generic/options/pubsub/transport.go, pkg/cloudevents/generic/options/pubsub/sourceoptions.go, pkg/cloudevents/generic/options/pubsub/agentoptions.go, pkg/cloudevents/generic/options/pubsub/*_test.go
Implemented unexported pubsubTransport with Connect/Send/Receive/Close/ErrorChan; added NewSourceOptions and NewAgentOptions; added transport and structure unit tests.
MQTT Pattern Refactor
pkg/cloudevents/generic/options/mqtt/options.go
Switched topic validation and parsing to MQTT-specific pattern constants.
Integration Tests & Utilities
test/integration/cloudevents/suite_test.go, test/integration/cloudevents/cloudevents_test.go, test/integration/cloudevents/cloudevetns_pubsub_test.go, test/integration/cloudevents/util/pubsub.go
Added PSTest server lifecycle (setup/teardown), helpers to create topics/subscriptions with filters, Pub/Sub integration test file and helpers NewPubSubSourceOptions / NewPubSubAgentOptions.
Docs / Readme Additions
pkg/cloudevents/README.md
Added Pub/Sub protocol/driver documentation and YAML examples under Supported Protocols and Drivers (and duplicated under gRPC section).

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

  • Areas needing extra attention:
    • pkg/cloudevents/generic/options/pubsub/codec.go — CE attribute <-> pubsub attribute mapping, specversion defaulting, extensions, Content-Type handling, and validation/error semantics.
    • pkg/cloudevents/generic/options/pubsub/transport.go — concurrency in Receive, publish/ack semantics, publisher/subscriber initialization, GRPC keepalive and credential handling, and non-blocking error channel usage.
    • pkg/cloudevents/generic/options/pubsub/options.go — regex construction for project-scoped names, topic/subscription validation logic, and aggregated error messages.
    • Integration tests — PSTest server lifecycle, topic/subscription creation and filters, and test isolation.

Possibly related PRs

Suggested labels

approved, lgtm

Suggested reviewers

  • qiujian16
  • deads2k
  • skeeey

Pre-merge checks and finishing touches

❌ Failed checks (2 warnings)
Check name Status Explanation Resolution
Description check ⚠️ Warning The PR description is largely incomplete, containing only the repository template with empty Summary and Related issue fields and no meaningful implementation details. Complete the Summary section with details about the Pub/Sub support implementation, configuration options, and testing approach. Link any related issues in the 'Related issue(s)' section.
Docstring Coverage ⚠️ Warning Docstring coverage is 25.81% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (1 passed)
Check name Status Explanation
Title check ✅ Passed The title clearly describes the main change: adding Pub/Sub support for CloudEvents transport, which aligns with the comprehensive modifications across multiple files.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@morvencao morvencao closed this Nov 5, 2025
@morvencao morvencao force-pushed the br_add_pubsub_support branch from c9eedbd to b1be73e Compare November 5, 2025 07:21
@morvencao morvencao reopened this Nov 5, 2025
@morvencao morvencao force-pushed the br_add_pubsub_support branch from 6a7ab88 to 72d32c6 Compare November 5, 2025 15:05
@morvencao morvencao changed the title [WIP] ✨ add pubsub support for cloudevents transport. ✨ add pubsub support for cloudevents transport. Nov 5, 2025
@morvencao morvencao marked this pull request as ready for review November 5, 2025 15:05
@openshift-ci openshift-ci bot requested review from deads2k and qiujian16 November 5, 2025 15:05
@morvencao morvencao force-pushed the br_add_pubsub_support branch from 72d32c6 to a38bf0e Compare November 5, 2025 15:15
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🧹 Nitpick comments (6)
test/integration/cloudevents/cloudevents_test.go (1)

97-150: Topic and subscription setup looks correct.

The topic and subscription creation logic follows the correct pattern: check if exists, create if not. The filter attributes for subscriptions correctly use the CloudEvents attribute format.

The repetitive pattern could be extracted into a helper function to improve maintainability:

func ensureTopic(ctx context.Context, client *pubsubv2.Client, topicPath string) error {
	if _, err := client.TopicAdminClient.GetTopic(ctx, &pubsubpb.GetTopicRequest{Topic: topicPath}); err != nil {
		_, err = client.TopicAdminClient.CreateTopic(ctx, &pubsubpb.Topic{Name: topicPath})
		return err
	}
	return nil
}

func ensureSubscription(ctx context.Context, client *pubsubv2.Client, subscription *pubsubpb.Subscription) error {
	if _, err := client.SubscriptionAdminClient.GetSubscription(ctx, &pubsubpb.GetSubscriptionRequest{Subscription: subscription.Name}); err != nil {
		_, err = client.SubscriptionAdminClient.CreateSubscription(ctx, subscription)
		return err
	}
	return nil
}
pkg/cloudevents/generic/options/pubsub/options_test.go (1)

462-473: Simplify the contains helper function.

The current implementation is overly complex with redundant checks. The function can be simplified for better readability.

Apply this diff:

 func contains(s, substr string) bool {
-	return len(s) >= len(substr) && (s == substr || len(substr) == 0 || (len(s) > 0 && len(substr) > 0 && stringContains(s, substr)))
+	return stringContains(s, substr)
 }
 
 func stringContains(s, substr string) bool {
+	if len(substr) == 0 {
+		return true
+	}
 	for i := 0; i <= len(s)-len(substr); i++ {
 		if s[i:i+len(substr)] == substr {
 			return true
 		}
 	}
 	return false
 }

Or better yet, use the standard library:

-func contains(s, substr string) bool {
-	return len(s) >= len(substr) && (s == substr || len(substr) == 0 || (len(s) > 0 && len(substr) > 0 && stringContains(s, substr)))
-}
-
-func stringContains(s, substr string) bool {
-	for i := 0; i <= len(s)-len(substr); i++ {
-		if s[i:i+len(substr)] == substr {
-			return true
-		}
-	}
-	return false
-}
+import "strings"
+
+func contains(s, substr string) bool {
+	return strings.Contains(s, substr)
+}
pkg/cloudevents/generic/options/pubsub/options.go (1)

91-170: Consider caching compiled regex patterns.

The validation logic is correct, but it recompiles the same regex pattern multiple times (lines 120, 126, 132, 138, 146, 152, 158, 164). Consider compiling the patterns once at the start of the function to improve performance.

Apply this diff to optimize regex compilation:

 func validateTopicsAndSubscriptions(topics *types.Topics, subscriptions *types.Subscriptions, projectID string) error {
 	if topics == nil {
 		return fmt.Errorf("the topics must be set")
 	}
 
 	if subscriptions == nil {
 		return fmt.Errorf("the subscriptions must be set")
 	}
+
+	// Compile regex patterns once
+	topicPattern := strings.ReplaceAll(types.PubSubTopicPattern, "PROJECT_ID", regexp.QuoteMeta(projectID))
+	topicRegex := regexp.MustCompile(topicPattern)
+	subscriptionPattern := strings.ReplaceAll(types.PubSubSubscriptionPattern, "PROJECT_ID", regexp.QuoteMeta(projectID))
+	subscriptionRegex := regexp.MustCompile(subscriptionPattern)
 
 	// validate topics and subscription for source
 	isValidForSource := len(topics.SourceEvents) != 0 &&
 		len(topics.SourceBroadcast) != 0 &&
 		len(subscriptions.AgentEvents) != 0 &&
 		len(subscriptions.AgentBroadcast) != 0
 	// validate topics and subscription for agent
 	isValidForAgent := len(topics.AgentEvents) != 0 &&
 		len(topics.AgentBroadcast) != 0 &&
 		len(subscriptions.SourceEvents) != 0 &&
 		len(subscriptions.SourceBroadcast) != 0
 
 	var errs []error
 	if !isValidForSource && !isValidForAgent {
 		errs = append(errs, fmt.Errorf("invalid topic/subscription combination: "+
 			"for source, required topics: 'sourceEvents', 'sourceBroadcast'; required subscriptions: 'agentEvents', 'agentBroadcast'; "+
 			"for agent, required topics: 'agentEvents', 'agentBroadcast'; required subscriptions: 'sourceEvents', 'sourceBroadcast'"))
 	}
 
-	topicPattern := strings.ReplaceAll(types.PubSubTopicPattern, "PROJECT_ID", regexp.QuoteMeta(projectID))
 	if len(topics.SourceEvents) != 0 {
-		if !regexp.MustCompile(topicPattern).MatchString(topics.SourceEvents) {
+		if !topicRegex.MatchString(topics.SourceEvents) {
 			errs = append(errs, fmt.Errorf("invalid source events topic %q, it should match `%s`",
 				topics.SourceEvents, topicPattern))
 		}
 	}

(Apply similar changes for all other pattern checks)

pkg/cloudevents/generic/options/pubsub/sourceoptions.go (1)

106-156: Consider coordinated shutdown of subscriber goroutines.

When one subscriber encounters an error (lines 125-126, 143-144), the function returns immediately, but the other subscriber goroutine continues running until context cancellation. Consider using a derived context with cancel to ensure both goroutines are promptly terminated when the first error occurs.

Apply this pattern for coordinated shutdown:

 func (o *pubsubSourceTransport) Receive(ctx context.Context, fn options.ReceiveHandlerFn) error {
+	// Create a cancellable context to coordinate goroutine shutdown
+	ctx, cancel := context.WithCancel(ctx)
+	defer cancel()
+
 	// create error channels for both subscribers
 	subscriberErrChan := make(chan error, 1)
 	resyncSubscriberErrChan := make(chan error, 1)
 
 	// start the subscriber for status updates
 	go func() {
 		err := o.subscriber.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
 			evt, err := Decode(msg)
 			if err != nil {
 				// also send ACK on decode error since redelivery won't fix it.
 				klog.Errorf("failed to decode pubsub message to resource status event: %v", err)
 			} else {
 				fn(evt)
 			}
 			// send ACK after all receiver handlers complete.
 			msg.Ack()
 		})
 		if err != nil {
 			subscriberErrChan <- fmt.Errorf("subscriber is interrupted by error: %w", err)
+			cancel() // Trigger cancellation for other goroutines
 		}
 	}()
 
 	// start the resync subscriber for broadcast messages
 	go func() {
 		err := o.resyncSubscriber.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
 			evt, err := Decode(msg)
 			if err != nil {
 				// also send ACK on decode error since redelivery won't fix it.
 				klog.Errorf("failed to decode pubsub message to resource specresync event: %v", err)
 			} else {
 				fn(evt)
 			}
 			// send ACK after all receiver handlers complete.
 			msg.Ack()
 		})
 		if err != nil {
 			resyncSubscriberErrChan <- fmt.Errorf("resync subscriber is interrupted by error: %w", err)
+			cancel() // Trigger cancellation for other goroutines
 		}
 	}()
 
 	// wait for either subscriber to error or context cancellation
 	select {
 	case err := <-subscriberErrChan:
 		return err
 	case err := <-resyncSubscriberErrChan:
 		return err
 	case <-ctx.Done():
 		return ctx.Err()
 	}
 }
pkg/cloudevents/generic/options/pubsub/agentoptions.go (2)

18-35: Error channel is initialized but never used.

Same as sourceoptions.go, the errorChan is created but never receives errors. The TODO on line 33 acknowledges this.

This is the same pattern as in sourceoptions.go. Consider implementing error channel handling consistently across both transport types, or documenting why it's deferred.


107-157: Same goroutine coordination issue as sourceoptions.go.

This Receive method has the same issue with uncoordinated goroutine shutdown as noted in sourceoptions.go. When one subscriber errors, the other continues running until context cancellation. Apply the same context cancellation pattern suggested for sourceoptions.go.

Additionally, line 136 has a minor typo: "statusresync" should be "status resync" (with space) for consistency.

Apply the same coordinated shutdown pattern as suggested for sourceoptions.go, and fix the typo:

-				klog.Errorf("failed to decode pubsub message to resource statusresync event: %v", err)
+				klog.Errorf("failed to decode pubsub message to resource status resync event: %v", err)
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between b1be73e and a38bf0e.

⛔ Files ignored due to path filters (278)
  • go.sum is excluded by !**/*.sum
  • vendor/cel.dev/expr/eval.pb.go is excluded by !**/*.pb.go, !vendor/**
  • vendor/cloud.google.com/go/LICENSE is excluded by !vendor/**
  • vendor/cloud.google.com/go/auth/CHANGES.md is excluded by !vendor/**
  • vendor/cloud.google.com/go/auth/LICENSE is excluded by !vendor/**
  • vendor/cloud.google.com/go/auth/README.md is excluded by !vendor/**
  • vendor/cloud.google.com/go/auth/auth.go is excluded by !vendor/**
  • vendor/cloud.google.com/go/auth/credentials/compute.go is excluded by !vendor/**
  • vendor/cloud.google.com/go/auth/credentials/detect.go is excluded by !vendor/**
  • vendor/cloud.google.com/go/auth/credentials/doc.go is excluded by !vendor/**
  • vendor/cloud.google.com/go/auth/credentials/filetypes.go is excluded by !vendor/**
  • vendor/cloud.google.com/go/auth/credentials/internal/externalaccount/aws_provider.go is excluded by !vendor/**
  • vendor/cloud.google.com/go/auth/credentials/internal/externalaccount/executable_provider.go is excluded by !vendor/**
  • vendor/cloud.google.com/go/auth/credentials/internal/externalaccount/externalaccount.go is excluded by !vendor/**
  • vendor/cloud.google.com/go/auth/credentials/internal/externalaccount/file_provider.go is excluded by !vendor/**
  • vendor/cloud.google.com/go/auth/credentials/internal/externalaccount/info.go is excluded by !vendor/**
  • vendor/cloud.google.com/go/auth/credentials/internal/externalaccount/programmatic_provider.go is excluded by !vendor/**
  • vendor/cloud.google.com/go/auth/credentials/internal/externalaccount/url_provider.go is excluded by !vendor/**
  • vendor/cloud.google.com/go/auth/credentials/internal/externalaccount/x509_provider.go is excluded by !vendor/**
  • vendor/cloud.google.com/go/auth/credentials/internal/externalaccountuser/externalaccountuser.go is excluded by !vendor/**
  • vendor/cloud.google.com/go/auth/credentials/internal/gdch/gdch.go is excluded by !vendor/**
  • vendor/cloud.google.com/go/auth/credentials/internal/impersonate/idtoken.go is excluded by !vendor/**
  • vendor/cloud.google.com/go/auth/credentials/internal/impersonate/impersonate.go is excluded by !vendor/**
  • vendor/cloud.google.com/go/auth/credentials/internal/stsexchange/sts_exchange.go is excluded by !vendor/**
  • vendor/cloud.google.com/go/auth/credentials/selfsignedjwt.go is excluded by !vendor/**
  • vendor/cloud.google.com/go/auth/grpctransport/dial_socketopt.go is excluded by !vendor/**
  • vendor/cloud.google.com/go/auth/grpctransport/directpath.go is excluded by !vendor/**
  • vendor/cloud.google.com/go/auth/grpctransport/grpctransport.go is excluded by !vendor/**
  • vendor/cloud.google.com/go/auth/grpctransport/pool.go is excluded by !vendor/**
  • vendor/cloud.google.com/go/auth/httptransport/httptransport.go is excluded by !vendor/**
  • vendor/cloud.google.com/go/auth/httptransport/transport.go is excluded by !vendor/**
  • vendor/cloud.google.com/go/auth/internal/compute/compute.go is excluded by !vendor/**
  • vendor/cloud.google.com/go/auth/internal/compute/manufacturer.go is excluded by !vendor/**
  • vendor/cloud.google.com/go/auth/internal/compute/manufacturer_linux.go is excluded by !vendor/**
  • vendor/cloud.google.com/go/auth/internal/compute/manufacturer_windows.go is excluded by !vendor/**
  • vendor/cloud.google.com/go/auth/internal/credsfile/credsfile.go is excluded by !vendor/**
  • vendor/cloud.google.com/go/auth/internal/credsfile/filetype.go is excluded by !vendor/**
  • vendor/cloud.google.com/go/auth/internal/credsfile/parse.go is excluded by !vendor/**
  • vendor/cloud.google.com/go/auth/internal/internal.go is excluded by !vendor/**
  • vendor/cloud.google.com/go/auth/internal/jwt/jwt.go is excluded by !vendor/**
  • vendor/cloud.google.com/go/auth/internal/retry/retry.go is excluded by !vendor/**
  • vendor/cloud.google.com/go/auth/internal/transport/cba.go is excluded by !vendor/**
  • vendor/cloud.google.com/go/auth/internal/transport/cert/default_cert.go is excluded by !vendor/**
  • vendor/cloud.google.com/go/auth/internal/transport/cert/enterprise_cert.go is excluded by !vendor/**
  • vendor/cloud.google.com/go/auth/internal/transport/cert/secureconnect_cert.go is excluded by !vendor/**
  • vendor/cloud.google.com/go/auth/internal/transport/cert/workload_cert.go is excluded by !vendor/**
  • vendor/cloud.google.com/go/auth/internal/transport/headers/headers.go is excluded by !vendor/**
  • vendor/cloud.google.com/go/auth/internal/transport/s2a.go is excluded by !vendor/**
  • vendor/cloud.google.com/go/auth/internal/transport/transport.go is excluded by !vendor/**
  • vendor/cloud.google.com/go/auth/internal/trustboundary/external_accounts_config_providers.go is excluded by !vendor/**
  • vendor/cloud.google.com/go/auth/internal/trustboundary/trust_boundary.go is excluded by !vendor/**
  • vendor/cloud.google.com/go/auth/internal/version.go is excluded by !vendor/**
  • vendor/cloud.google.com/go/auth/oauth2adapt/CHANGES.md is excluded by !vendor/**
  • vendor/cloud.google.com/go/auth/oauth2adapt/LICENSE is excluded by !vendor/**
  • vendor/cloud.google.com/go/auth/oauth2adapt/oauth2adapt.go is excluded by !vendor/**
  • vendor/cloud.google.com/go/auth/threelegged.go is excluded by !vendor/**
  • vendor/cloud.google.com/go/compute/metadata/CHANGES.md is excluded by !vendor/**
  • vendor/cloud.google.com/go/compute/metadata/log.go is excluded by !vendor/**
  • vendor/cloud.google.com/go/compute/metadata/metadata.go is excluded by !vendor/**
  • vendor/cloud.google.com/go/compute/metadata/retry.go is excluded by !vendor/**
  • vendor/cloud.google.com/go/compute/metadata/retry_linux.go is excluded by !vendor/**
  • vendor/cloud.google.com/go/compute/metadata/syscheck.go is excluded by !vendor/**
  • vendor/cloud.google.com/go/compute/metadata/syscheck_linux.go is excluded by !vendor/**
  • vendor/cloud.google.com/go/compute/metadata/syscheck_windows.go is excluded by !vendor/**
  • vendor/cloud.google.com/go/iam/LICENSE is excluded by !vendor/**
  • vendor/cloud.google.com/go/iam/apiv1/iampb/iam_policy.pb.go is excluded by !**/*.pb.go, !vendor/**
  • vendor/cloud.google.com/go/iam/apiv1/iampb/options.pb.go is excluded by !**/*.pb.go, !vendor/**
  • vendor/cloud.google.com/go/iam/apiv1/iampb/policy.pb.go is excluded by !**/*.pb.go, !vendor/**
  • vendor/cloud.google.com/go/iam/apiv1/iampb/resource_policy_member.pb.go is excluded by !**/*.pb.go, !vendor/**
  • vendor/cloud.google.com/go/internal/detect/detect.go is excluded by !vendor/**
  • vendor/cloud.google.com/go/internal/pubsub/message.go is excluded by !vendor/**
  • vendor/cloud.google.com/go/internal/pubsub/publish.go is excluded by !vendor/**
  • vendor/cloud.google.com/go/internal/testutil/cmp.go is excluded by !vendor/**
  • vendor/cloud.google.com/go/internal/testutil/context.go is excluded by !vendor/**
  • vendor/cloud.google.com/go/internal/testutil/headers_enforcer.go is excluded by !vendor/**
  • vendor/cloud.google.com/go/internal/testutil/rand.go is excluded by !vendor/**
  • vendor/cloud.google.com/go/internal/testutil/retry.go is excluded by !vendor/**
  • vendor/cloud.google.com/go/internal/testutil/server.go is excluded by !vendor/**
  • vendor/cloud.google.com/go/internal/testutil/trace_otel.go is excluded by !vendor/**
  • vendor/cloud.google.com/go/pubsub/v2/CHANGES.md is excluded by !vendor/**
  • vendor/cloud.google.com/go/pubsub/v2/LICENSE is excluded by !vendor/**
  • vendor/cloud.google.com/go/pubsub/v2/apiv1/auxiliary.go is excluded by !vendor/**
  • vendor/cloud.google.com/go/pubsub/v2/apiv1/auxiliary_go123.go is excluded by !vendor/**
  • vendor/cloud.google.com/go/pubsub/v2/apiv1/doc.go is excluded by !vendor/**
  • vendor/cloud.google.com/go/pubsub/v2/apiv1/helpers.go is excluded by !vendor/**
  • vendor/cloud.google.com/go/pubsub/v2/apiv1/info.go is excluded by !vendor/**
  • vendor/cloud.google.com/go/pubsub/v2/apiv1/pubsubpb/pubsub.pb.go is excluded by !**/*.pb.go, !vendor/**
  • vendor/cloud.google.com/go/pubsub/v2/apiv1/pubsubpb/pubsub_grpc.pb.go is excluded by !**/*.pb.go, !vendor/**
  • vendor/cloud.google.com/go/pubsub/v2/apiv1/pubsubpb/schema.pb.go is excluded by !**/*.pb.go, !vendor/**
  • vendor/cloud.google.com/go/pubsub/v2/apiv1/pubsubpb/schema_grpc.pb.go is excluded by !**/*.pb.go, !vendor/**
  • vendor/cloud.google.com/go/pubsub/v2/apiv1/schema_client.go is excluded by !vendor/**
  • vendor/cloud.google.com/go/pubsub/v2/apiv1/subscription_admin_client.go is excluded by !vendor/**
  • vendor/cloud.google.com/go/pubsub/v2/apiv1/topic_admin_client.go is excluded by !vendor/**
  • vendor/cloud.google.com/go/pubsub/v2/apiv1/version.go is excluded by !vendor/**
  • vendor/cloud.google.com/go/pubsub/v2/debug.go is excluded by !vendor/**
  • vendor/cloud.google.com/go/pubsub/v2/doc.go is excluded by !vendor/**
  • vendor/cloud.google.com/go/pubsub/v2/flow_controller.go is excluded by !vendor/**
  • vendor/cloud.google.com/go/pubsub/v2/internal/distribution/distribution.go is excluded by !vendor/**
  • vendor/cloud.google.com/go/pubsub/v2/internal/scheduler/publish_scheduler.go is excluded by !vendor/**
  • vendor/cloud.google.com/go/pubsub/v2/internal/scheduler/receive_scheduler.go is excluded by !vendor/**
  • vendor/cloud.google.com/go/pubsub/v2/internal/version.go is excluded by !vendor/**
  • vendor/cloud.google.com/go/pubsub/v2/iterator.go is excluded by !vendor/**
  • vendor/cloud.google.com/go/pubsub/v2/message.go is excluded by !vendor/**
  • vendor/cloud.google.com/go/pubsub/v2/nodebug.go is excluded by !vendor/**
  • vendor/cloud.google.com/go/pubsub/v2/pstest/fake.go is excluded by !vendor/**
  • vendor/cloud.google.com/go/pubsub/v2/pstest/filtering.go is excluded by !vendor/**
  • vendor/cloud.google.com/go/pubsub/v2/publisher.go is excluded by !vendor/**
  • vendor/cloud.google.com/go/pubsub/v2/pubsub.go is excluded by !vendor/**
  • vendor/cloud.google.com/go/pubsub/v2/pullstream.go is excluded by !vendor/**
  • vendor/cloud.google.com/go/pubsub/v2/service.go is excluded by !vendor/**
  • vendor/cloud.google.com/go/pubsub/v2/shutdown.go is excluded by !vendor/**
  • vendor/cloud.google.com/go/pubsub/v2/subscriber.go is excluded by !vendor/**
  • vendor/cloud.google.com/go/pubsub/v2/trace.go is excluded by !vendor/**
  • vendor/github.com/felixge/httpsnoop/.gitignore is excluded by !vendor/**
  • vendor/github.com/felixge/httpsnoop/LICENSE.txt is excluded by !vendor/**
  • vendor/github.com/felixge/httpsnoop/Makefile is excluded by !vendor/**
  • vendor/github.com/felixge/httpsnoop/README.md is excluded by !vendor/**
  • vendor/github.com/felixge/httpsnoop/capture_metrics.go is excluded by !vendor/**
  • vendor/github.com/felixge/httpsnoop/docs.go is excluded by !vendor/**
  • vendor/github.com/felixge/httpsnoop/wrap_generated_gteq_1.8.go is excluded by !vendor/**
  • vendor/github.com/felixge/httpsnoop/wrap_generated_lt_1.8.go is excluded by !vendor/**
  • vendor/github.com/go-logr/logr/.golangci.yaml is excluded by !vendor/**
  • vendor/github.com/go-logr/logr/funcr/funcr.go is excluded by !vendor/**
  • vendor/github.com/go-logr/logr/funcr/slogsink.go is excluded by !vendor/**
  • vendor/github.com/go-logr/stdr/LICENSE is excluded by !vendor/**
  • vendor/github.com/go-logr/stdr/README.md is excluded by !vendor/**
  • vendor/github.com/go-logr/stdr/stdr.go is excluded by !vendor/**
  • vendor/github.com/google/s2a-go/.gitignore is excluded by !vendor/**
  • vendor/github.com/google/s2a-go/CODE_OF_CONDUCT.md is excluded by !vendor/**
  • vendor/github.com/google/s2a-go/CONTRIBUTING.md is excluded by !vendor/**
  • vendor/github.com/google/s2a-go/LICENSE.md is excluded by !vendor/**
  • vendor/github.com/google/s2a-go/README.md is excluded by !vendor/**
  • vendor/github.com/google/s2a-go/fallback/s2a_fallback.go is excluded by !vendor/**
  • vendor/github.com/google/s2a-go/internal/authinfo/authinfo.go is excluded by !vendor/**
  • vendor/github.com/google/s2a-go/internal/handshaker/handshaker.go is excluded by !vendor/**
  • vendor/github.com/google/s2a-go/internal/handshaker/service/service.go is excluded by !vendor/**
  • vendor/github.com/google/s2a-go/internal/proto/common_go_proto/common.pb.go is excluded by !**/*.pb.go, !vendor/**
  • vendor/github.com/google/s2a-go/internal/proto/s2a_context_go_proto/s2a_context.pb.go is excluded by !**/*.pb.go, !vendor/**
  • vendor/github.com/google/s2a-go/internal/proto/s2a_go_proto/s2a.pb.go is excluded by !**/*.pb.go, !vendor/**
  • vendor/github.com/google/s2a-go/internal/proto/s2a_go_proto/s2a_grpc.pb.go is excluded by !**/*.pb.go, !vendor/**
  • vendor/github.com/google/s2a-go/internal/proto/v2/common_go_proto/common.pb.go is excluded by !**/*.pb.go, !vendor/**
  • vendor/github.com/google/s2a-go/internal/proto/v2/s2a_context_go_proto/s2a_context.pb.go is excluded by !**/*.pb.go, !vendor/**
  • vendor/github.com/google/s2a-go/internal/proto/v2/s2a_go_proto/s2a.pb.go is excluded by !**/*.pb.go, !vendor/**
  • vendor/github.com/google/s2a-go/internal/proto/v2/s2a_go_proto/s2a_grpc.pb.go is excluded by !**/*.pb.go, !vendor/**
  • vendor/github.com/google/s2a-go/internal/record/internal/aeadcrypter/aeadcrypter.go is excluded by !vendor/**
  • vendor/github.com/google/s2a-go/internal/record/internal/aeadcrypter/aesgcm.go is excluded by !vendor/**
  • vendor/github.com/google/s2a-go/internal/record/internal/aeadcrypter/chachapoly.go is excluded by !vendor/**
  • vendor/github.com/google/s2a-go/internal/record/internal/aeadcrypter/common.go is excluded by !vendor/**
  • vendor/github.com/google/s2a-go/internal/record/internal/halfconn/ciphersuite.go is excluded by !vendor/**
  • vendor/github.com/google/s2a-go/internal/record/internal/halfconn/counter.go is excluded by !vendor/**
  • vendor/github.com/google/s2a-go/internal/record/internal/halfconn/expander.go is excluded by !vendor/**
  • vendor/github.com/google/s2a-go/internal/record/internal/halfconn/halfconn.go is excluded by !vendor/**
  • vendor/github.com/google/s2a-go/internal/record/record.go is excluded by !vendor/**
  • vendor/github.com/google/s2a-go/internal/record/ticketsender.go is excluded by !vendor/**
  • vendor/github.com/google/s2a-go/internal/tokenmanager/tokenmanager.go is excluded by !vendor/**
  • vendor/github.com/google/s2a-go/internal/v2/README.md is excluded by !vendor/**
  • vendor/github.com/google/s2a-go/internal/v2/certverifier/certverifier.go is excluded by !vendor/**
  • vendor/github.com/google/s2a-go/internal/v2/remotesigner/remotesigner.go is excluded by !vendor/**
  • vendor/github.com/google/s2a-go/internal/v2/s2av2.go is excluded by !vendor/**
  • vendor/github.com/google/s2a-go/internal/v2/tlsconfigstore/tlsconfigstore.go is excluded by !vendor/**
  • vendor/github.com/google/s2a-go/retry/retry.go is excluded by !vendor/**
  • vendor/github.com/google/s2a-go/s2a.go is excluded by !vendor/**
  • vendor/github.com/google/s2a-go/s2a_options.go is excluded by !vendor/**
  • vendor/github.com/google/s2a-go/s2a_utils.go is excluded by !vendor/**
  • vendor/github.com/google/s2a-go/stream/s2a_stream.go is excluded by !vendor/**
  • vendor/github.com/googleapis/enterprise-certificate-proxy/LICENSE is excluded by !vendor/**
  • vendor/github.com/googleapis/enterprise-certificate-proxy/client/client.go is excluded by !vendor/**
  • vendor/github.com/googleapis/enterprise-certificate-proxy/client/util/util.go is excluded by !vendor/**
  • vendor/github.com/googleapis/gax-go/v2/.release-please-manifest.json is excluded by !vendor/**
  • vendor/github.com/googleapis/gax-go/v2/CHANGES.md is excluded by !vendor/**
  • vendor/github.com/googleapis/gax-go/v2/LICENSE is excluded by !vendor/**
  • vendor/github.com/googleapis/gax-go/v2/apierror/apierror.go is excluded by !vendor/**
  • vendor/github.com/googleapis/gax-go/v2/apierror/internal/proto/README.md is excluded by !vendor/**
  • vendor/github.com/googleapis/gax-go/v2/apierror/internal/proto/custom_error.pb.go is excluded by !**/*.pb.go, !vendor/**
  • vendor/github.com/googleapis/gax-go/v2/apierror/internal/proto/custom_error.proto is excluded by !vendor/**
  • vendor/github.com/googleapis/gax-go/v2/apierror/internal/proto/error.pb.go is excluded by !**/*.pb.go, !vendor/**
  • vendor/github.com/googleapis/gax-go/v2/apierror/internal/proto/error.proto is excluded by !vendor/**
  • vendor/github.com/googleapis/gax-go/v2/call_option.go is excluded by !vendor/**
  • vendor/github.com/googleapis/gax-go/v2/callctx/callctx.go is excluded by !vendor/**
  • vendor/github.com/googleapis/gax-go/v2/content_type.go is excluded by !vendor/**
  • vendor/github.com/googleapis/gax-go/v2/gax.go is excluded by !vendor/**
  • vendor/github.com/googleapis/gax-go/v2/header.go is excluded by !vendor/**
  • vendor/github.com/googleapis/gax-go/v2/internal/version.go is excluded by !vendor/**
  • vendor/github.com/googleapis/gax-go/v2/internallog/grpclog/grpclog.go is excluded by !vendor/**
  • vendor/github.com/googleapis/gax-go/v2/internallog/internal/internal.go is excluded by !vendor/**
  • vendor/github.com/googleapis/gax-go/v2/internallog/internallog.go is excluded by !vendor/**
  • vendor/github.com/googleapis/gax-go/v2/invoke.go is excluded by !vendor/**
  • vendor/github.com/googleapis/gax-go/v2/iterator/iterator.go is excluded by !vendor/**
  • vendor/github.com/googleapis/gax-go/v2/proto_json_stream.go is excluded by !vendor/**
  • vendor/github.com/googleapis/gax-go/v2/release-please-config.json is excluded by !vendor/**
  • vendor/github.com/stoewer/go-strcase/.golangci.yml is excluded by !vendor/**
  • vendor/github.com/stoewer/go-strcase/camel.go is excluded by !vendor/**
  • vendor/github.com/stoewer/go-strcase/helper.go is excluded by !vendor/**
  • vendor/go.einride.tech/aip/LICENSE is excluded by !vendor/**
  • vendor/go.einride.tech/aip/filtering/checker.go is excluded by !vendor/**
  • vendor/go.einride.tech/aip/filtering/declarations.go is excluded by !vendor/**
  • vendor/go.einride.tech/aip/filtering/doc.go is excluded by !vendor/**
  • vendor/go.einride.tech/aip/filtering/errors.go is excluded by !vendor/**
  • vendor/go.einride.tech/aip/filtering/expr.go is excluded by !vendor/**
  • vendor/go.einride.tech/aip/filtering/exprs/doc.go is excluded by !vendor/**
  • vendor/go.einride.tech/aip/filtering/exprs/match.go is excluded by !vendor/**
  • vendor/go.einride.tech/aip/filtering/filter.go is excluded by !vendor/**
  • vendor/go.einride.tech/aip/filtering/functions.go is excluded by !vendor/**
  • vendor/go.einride.tech/aip/filtering/lexer.go is excluded by !vendor/**
  • vendor/go.einride.tech/aip/filtering/macro.go is excluded by !vendor/**
  • vendor/go.einride.tech/aip/filtering/parsedexpr.go is excluded by !vendor/**
  • vendor/go.einride.tech/aip/filtering/parser.go is excluded by !vendor/**
  • vendor/go.einride.tech/aip/filtering/position.go is excluded by !vendor/**
  • vendor/go.einride.tech/aip/filtering/request.go is excluded by !vendor/**
  • vendor/go.einride.tech/aip/filtering/token.go is excluded by !vendor/**
  • vendor/go.einride.tech/aip/filtering/tokentype.go is excluded by !vendor/**
  • vendor/go.einride.tech/aip/filtering/types.go is excluded by !vendor/**
  • vendor/go.einride.tech/aip/filtering/unescape.go is excluded by !vendor/**
  • vendor/go.einride.tech/aip/filtering/walk.go is excluded by !vendor/**
  • vendor/go.opencensus.io/AUTHORS is excluded by !vendor/**
  • vendor/go.opencensus.io/LICENSE is excluded by !vendor/**
  • vendor/go.opencensus.io/internal/tagencoding/tagencoding.go is excluded by !vendor/**
  • vendor/go.opencensus.io/metric/metricdata/doc.go is excluded by !vendor/**
  • vendor/go.opencensus.io/metric/metricdata/exemplar.go is excluded by !vendor/**
  • vendor/go.opencensus.io/metric/metricdata/label.go is excluded by !vendor/**
  • vendor/go.opencensus.io/metric/metricdata/metric.go is excluded by !vendor/**
  • vendor/go.opencensus.io/metric/metricdata/point.go is excluded by !vendor/**
  • vendor/go.opencensus.io/metric/metricdata/type_string.go is excluded by !vendor/**
  • vendor/go.opencensus.io/metric/metricdata/unit.go is excluded by !vendor/**
  • vendor/go.opencensus.io/metric/metricproducer/manager.go is excluded by !vendor/**
  • vendor/go.opencensus.io/metric/metricproducer/producer.go is excluded by !vendor/**
  • vendor/go.opencensus.io/resource/resource.go is excluded by !vendor/**
  • vendor/go.opencensus.io/stats/doc.go is excluded by !vendor/**
  • vendor/go.opencensus.io/stats/internal/record.go is excluded by !vendor/**
  • vendor/go.opencensus.io/stats/measure.go is excluded by !vendor/**
  • vendor/go.opencensus.io/stats/measure_float64.go is excluded by !vendor/**
  • vendor/go.opencensus.io/stats/measure_int64.go is excluded by !vendor/**
  • vendor/go.opencensus.io/stats/record.go is excluded by !vendor/**
  • vendor/go.opencensus.io/stats/units.go is excluded by !vendor/**
  • vendor/go.opencensus.io/stats/view/aggregation.go is excluded by !vendor/**
  • vendor/go.opencensus.io/stats/view/aggregation_data.go is excluded by !vendor/**
  • vendor/go.opencensus.io/stats/view/collector.go is excluded by !vendor/**
  • vendor/go.opencensus.io/stats/view/doc.go is excluded by !vendor/**
  • vendor/go.opencensus.io/stats/view/export.go is excluded by !vendor/**
  • vendor/go.opencensus.io/stats/view/view.go is excluded by !vendor/**
  • vendor/go.opencensus.io/stats/view/view_to_metric.go is excluded by !vendor/**
  • vendor/go.opencensus.io/stats/view/worker.go is excluded by !vendor/**
  • vendor/go.opencensus.io/stats/view/worker_commands.go is excluded by !vendor/**
  • vendor/go.opencensus.io/tag/context.go is excluded by !vendor/**
  • vendor/go.opencensus.io/tag/doc.go is excluded by !vendor/**
  • vendor/go.opencensus.io/tag/key.go is excluded by !vendor/**
  • vendor/go.opencensus.io/tag/map.go is excluded by !vendor/**
  • vendor/go.opencensus.io/tag/map_codec.go is excluded by !vendor/**
  • vendor/go.opencensus.io/tag/metadata.go is excluded by !vendor/**
  • vendor/go.opencensus.io/tag/profile_19.go is excluded by !vendor/**
  • vendor/go.opencensus.io/tag/profile_not19.go is excluded by !vendor/**
  • vendor/go.opencensus.io/tag/validate.go is excluded by !vendor/**
  • vendor/go.opentelemetry.io/auto/sdk/CONTRIBUTING.md is excluded by !vendor/**
  • vendor/go.opentelemetry.io/auto/sdk/LICENSE is excluded by !vendor/**
  • vendor/go.opentelemetry.io/auto/sdk/VERSIONING.md is excluded by !vendor/**
  • vendor/go.opentelemetry.io/auto/sdk/doc.go is excluded by !vendor/**
  • vendor/go.opentelemetry.io/auto/sdk/internal/telemetry/attr.go is excluded by !vendor/**
  • vendor/go.opentelemetry.io/auto/sdk/internal/telemetry/doc.go is excluded by !vendor/**
  • vendor/go.opentelemetry.io/auto/sdk/internal/telemetry/id.go is excluded by !vendor/**
  • vendor/go.opentelemetry.io/auto/sdk/internal/telemetry/number.go is excluded by !vendor/**
  • vendor/go.opentelemetry.io/auto/sdk/internal/telemetry/resource.go is excluded by !vendor/**
  • vendor/go.opentelemetry.io/auto/sdk/internal/telemetry/scope.go is excluded by !vendor/**
  • vendor/go.opentelemetry.io/auto/sdk/internal/telemetry/span.go is excluded by !vendor/**
  • vendor/go.opentelemetry.io/auto/sdk/internal/telemetry/status.go is excluded by !vendor/**
  • vendor/go.opentelemetry.io/auto/sdk/internal/telemetry/traces.go is excluded by !vendor/**
  • vendor/go.opentelemetry.io/auto/sdk/internal/telemetry/value.go is excluded by !vendor/**
  • vendor/go.opentelemetry.io/auto/sdk/limit.go is excluded by !vendor/**
  • vendor/go.opentelemetry.io/auto/sdk/span.go is excluded by !vendor/**
  • vendor/go.opentelemetry.io/auto/sdk/tracer.go is excluded by !vendor/**
  • vendor/go.opentelemetry.io/auto/sdk/tracer_provider.go is excluded by !vendor/**
  • vendor/go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc/LICENSE is excluded by !vendor/**
  • vendor/go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc/config.go is excluded by !vendor/**
  • vendor/go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc/doc.go is excluded by !vendor/**
  • vendor/go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc/interceptor.go is excluded by !vendor/**
  • vendor/go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc/interceptorinfo.go is excluded by !vendor/**
  • vendor/go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc/internal/parse.go is excluded by !vendor/**
  • vendor/go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc/metadata_supplier.go is excluded by !vendor/**
  • vendor/go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go is excluded by !vendor/**
📒 Files selected for processing (22)
  • go.mod (4 hunks)
  • pkg/cloudevents/README.md (2 hunks)
  • pkg/cloudevents/clients/options/generic.go (1 hunks)
  • pkg/cloudevents/constants/constants.go (1 hunks)
  • pkg/cloudevents/doc/design.md (2 hunks)
  • pkg/cloudevents/generic/options/builder/optionsbuilder.go (5 hunks)
  • pkg/cloudevents/generic/options/builder/optionsbuilder_test.go (5 hunks)
  • pkg/cloudevents/generic/options/mqtt/options.go (3 hunks)
  • pkg/cloudevents/generic/options/options.go (1 hunks)
  • pkg/cloudevents/generic/options/pubsub/agentoptions.go (1 hunks)
  • pkg/cloudevents/generic/options/pubsub/agentoptions_test.go (1 hunks)
  • pkg/cloudevents/generic/options/pubsub/codec.go (1 hunks)
  • pkg/cloudevents/generic/options/pubsub/codec_test.go (1 hunks)
  • pkg/cloudevents/generic/options/pubsub/options.go (1 hunks)
  • pkg/cloudevents/generic/options/pubsub/options_test.go (1 hunks)
  • pkg/cloudevents/generic/options/pubsub/sourceoptions.go (1 hunks)
  • pkg/cloudevents/generic/options/pubsub/sourceoptions_test.go (1 hunks)
  • pkg/cloudevents/generic/types/types.go (2 hunks)
  • test/integration/cloudevents/cloudevents_test.go (2 hunks)
  • test/integration/cloudevents/cloudevetns_pubsub_test.go (1 hunks)
  • test/integration/cloudevents/suite_test.go (3 hunks)
  • test/integration/cloudevents/util/pubsub.go (1 hunks)
🧰 Additional context used
🧠 Learnings (2)
📚 Learning: 2025-09-16T02:22:20.929Z
Learnt from: skeeey
Repo: open-cluster-management-io/sdk-go PR: 144
File: pkg/cloudevents/generic/options/grpc/protocol/protocol.go:200-213
Timestamp: 2025-09-16T02:22:20.929Z
Learning: In the GRPC CloudEvents protocol implementation, when startEventsReceiver encounters a stream error, it sends the error to reconnectErrorChan. The consumer of this channel handles the error by calling Close() on the protocol, which triggers close(p.closeChan), causing OpenInbound to unblock and call cancel() to properly terminate both the events receiver and heartbeat watcher goroutines.

Applied to files:

  • pkg/cloudevents/generic/options/pubsub/sourceoptions.go
  • pkg/cloudevents/generic/options/pubsub/agentoptions.go
📚 Learning: 2025-09-17T13:29:00.675Z
Learnt from: qiujian16
Repo: open-cluster-management-io/sdk-go PR: 147
File: pkg/cloudevents/generic/options/grpc/protocol/heartbeat_integration_test.go:66-68
Timestamp: 2025-09-17T13:29:00.675Z
Learning: In the pkg/cloudevents/generic/options/grpc/protocol package, bufSize is already defined as a constant in protocal_test.go and is accessible to other test files in the same package, including heartbeat_integration_test.go.

Applied to files:

  • pkg/cloudevents/generic/options/builder/optionsbuilder_test.go
🧬 Code graph analysis (5)
test/integration/cloudevents/cloudevents_test.go (1)
pkg/cloudevents/constants/constants.go (1)
  • ConfigTypePubSub (6-6)
pkg/cloudevents/generic/options/mqtt/options.go (1)
pkg/cloudevents/generic/types/types.go (5)
  • MQTTSourceEventsTopicPattern (105-105)
  • MQTTAgentEventsTopicPattern (106-106)
  • MQTTSourceBroadcastTopicPattern (107-107)
  • MQTTAgentBroadcastTopicPattern (108-108)
  • MQTTEventsTopicPattern (104-104)
test/integration/cloudevents/suite_test.go (2)
vendor/github.com/mochi-mqtt/server/v2/server.go (1)
  • Server (136-147)
test/integration/cloudevents/server/grpc.go (1)
  • GRPCServer (34-45)
pkg/cloudevents/generic/options/builder/optionsbuilder.go (5)
pkg/cloudevents/constants/constants.go (1)
  • ConfigTypePubSub (6-6)
pkg/cloudevents/generic/options/grpc/sourceoptions.go (1)
  • NewSourceOptions (23-34)
pkg/cloudevents/generic/options/mqtt/sourceoptions.go (1)
  • NewSourceOptions (26-38)
pkg/cloudevents/generic/options/grpc/agentoptions.go (1)
  • NewAgentOptions (23-35)
pkg/cloudevents/generic/options/mqtt/agentoptions.go (1)
  • NewAgentOptions (27-40)
pkg/cloudevents/generic/options/builder/optionsbuilder_test.go (4)
pkg/cloudevents/generic/types/types.go (2)
  • Topics (114-158)
  • CloudEventsDataType (210-214)
pkg/cloudevents/generic/options/mqtt/options.go (2)
  • MQTTOptions (68-77)
  • MQTTDialer (30-36)
pkg/cloudevents/generic/options/grpc/options.go (3)
  • GRPCOptions (114-118)
  • GRPCDialer (30-37)
  • KeepAliveOptions (40-45)
pkg/cloudevents/generic/options/builder/optionsbuilder.go (3)
  • NewConfigLoader (26-31)
  • BuildCloudEventsSourceOptions (63-75)
  • BuildCloudEventsAgentOptions (78-90)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
  • GitHub Check: integration
  • GitHub Check: verify
🔇 Additional comments (48)
go.mod (3)

57-112: Confirm Pub/Sub library is integrated into application code.

The go.mod file alone does not show whether cloud.google.com/go/pubsub/v2 is actually used in the codebase. The PR summary mentions new Pub/Sub types, codec, transports, and tests, but only the go.mod changes are provided for review. Ensure that:

  1. The Pub/Sub dependency is actually imported and used in source code (not just a transitive pull).
  2. Unused indirect dependencies (if any) are cleaned up (run go mod tidy if not already done).

28-31: Security advisories verified—all versions are safe and patched.

The dependency updates pull in versions that either have no known vulnerabilities or are above the patched versions for disclosed advisories:

  • golang.org/x/oauth2 v0.32.0 is safe (CVE-2025-22868 fixed in 0.27.0)
  • google.golang.org/grpc v1.76.0 is safe (HTTP/2 Rapid Reset vulnerabilities fixed in earlier patch ranges)
  • google.golang.org/protobuf v1.36.10 is safe (known DoS and infinite loop issues fixed in earlier versions)
  • google.golang.org/api v0.255.0 has no advisories
  • go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.61.0 is safe (DoS vulnerability fixed in 0.46.0)

All OTEL modules are free of security advisories. Integration testing is recommended as general best practice to ensure compatibility across your stack.


6-6: cloud.google.com/go/pubsub/v2 v2.3.0 is valid and secure.

The version is documented in the official cloud.google.com Go reference, and no public security advisory exists for this version. The dependency is appropriate for use.

pkg/cloudevents/README.md (1)

119-191: LGTM! Comprehensive Pub/Sub documentation added.

The Pub/Sub protocol documentation is well-structured with clear YAML examples for both source and agent configurations, along with helpful notes about topic/subscription semantics and requirements.

pkg/cloudevents/clients/options/generic.go (1)

36-37: LGTM! Documentation updated correctly.

The PubSubOptions addition to the godoc is consistent with existing MQTT and gRPC entries.

pkg/cloudevents/constants/constants.go (1)

3-7: LGTM! Pub/Sub config type constant added.

The new ConfigTypePubSub constant follows the existing naming convention and the formatting alignment improves readability.

pkg/cloudevents/generic/options/options.go (1)

20-20: LGTM! CloudEventTransport documentation updated.

The PubSub addition to the available implementations list is consistent with the new transport support.

pkg/cloudevents/doc/design.md (2)

19-19: LGTM! Design doc updated to include PubSub.

The documentation correctly reflects the addition of PubSub as a supported protocol.


48-49: LGTM! Class diagram updated for Pub/Sub.

The class diagram additions correctly show the Pub/Sub source and agent options implementing the CloudEvents options interfaces.

test/integration/cloudevents/suite_test.go (1)

49-50: LGTM! Pub/Sub test server setup is appropriate.

The test suite correctly initializes a Pub/Sub test server and generates a unique project ID for testing, enabling integration tests for the new Pub/Sub transport.

Also applies to: 84-86

pkg/cloudevents/generic/options/pubsub/agentoptions_test.go (2)

9-99: LGTM! Comprehensive agent options tests.

The tests thoroughly validate the NewAgentOptions constructor, covering both basic and credentials-based configurations, and verifying all expected fields are set correctly.


101-153: LGTM! Transport structure validation is thorough.

The structure test effectively verifies that topics, subscriptions, and internal client/publisher/subscriber fields are initialized correctly before Connect is called.

pkg/cloudevents/generic/options/builder/optionsbuilder.go (4)

10-10: LGTM! Pub/Sub package import and documentation added.

The import and documentation addition are consistent with the existing MQTT and gRPC patterns.

Also applies to: 25-25


50-56: LGTM! LoadConfig handles Pub/Sub correctly.

The Pub/Sub configuration loading follows the existing pattern. Returning an empty string for the broker host is appropriate since Pub/Sub uses a project ID and endpoint model rather than a traditional broker host.


70-71: LGTM! BuildCloudEventsSourceOptions handles Pub/Sub.

The Pub/Sub source options builder is consistent with existing MQTT and gRPC implementations.


85-86: LGTM! BuildCloudEventsAgentOptions handles Pub/Sub.

The Pub/Sub agent options builder is consistent with existing MQTT and gRPC implementations.

test/integration/cloudevents/cloudevetns_pubsub_test.go (1)

1-21: LGTM!

The Pub/Sub test integration follows the established pattern for other transports. The implementation correctly constructs CloudEvents source options for Pub/Sub using the helper utilities.

pkg/cloudevents/generic/options/mqtt/options.go (3)

248-272: LGTM! Pattern constant updates are consistent.

The refactoring correctly replaces generic topic pattern constants with MQTT-specific ones, improving clarity and enabling differentiation between MQTT and Pub/Sub transports.


276-318: LGTM! Validation logic correctly updated.

The getSourceFromEventsTopic function properly uses the MQTT-specific pattern constant, maintaining backward compatibility while supporting the new Pub/Sub transport.


320-340: LGTM! Pub topic validation updated correctly.

Both getSourcePubTopic and getAgentPubTopic functions now use MQTT-specific pattern constants, ensuring proper validation for the MQTT transport.

pkg/cloudevents/generic/options/pubsub/sourceoptions_test.go (2)

9-92: LGTM! Comprehensive test coverage for source options initialization.

The test cases validate both basic and credential-based configurations, correctly asserting that NewSourceOptions initializes all required fields including the transport structure and error channel.


94-146: LGTM! Transport structure validation is thorough.

The test correctly verifies that topics and subscriptions are properly set for the source, and confirms that client/publisher/subscriber fields remain nil until Connect is called.

pkg/cloudevents/generic/options/builder/optionsbuilder_test.go (4)

32-49: LGTM! Pub/Sub test configurations are well-structured.

The YAML configurations for both source and agent Pub/Sub options follow the correct format and align with the Pub/Sub topic/subscription naming conventions.


59-118: LGTM! Test refactoring improves clarity.

The addition of Pub/Sub test cases and the introduction of dedicated assertSourceOptions helper function improve test organization and readability.


120-179: LGTM! New agent options test provides proper coverage.

The new TestBuildCloudEventsAgentOptions function correctly tests agent-specific configuration for all three transports (MQTT, gRPC, Pub/Sub).


190-236: LGTM! Helper functions properly separated.

The split into assertSourceOptions and assertAgentOptions correctly reflects the distinction between source and agent option builders (BuildCloudEventsSourceOptions vs BuildCloudEventsAgentOptions).

pkg/cloudevents/generic/options/pubsub/options_test.go (3)

60-199: LGTM! Comprehensive validation test coverage.

The test cases cover all critical scenarios: missing required fields, invalid formats, and valid configurations for both source and agent setups. The error message validation ensures clear feedback to users.


201-280: LGTM! Config loading tests are well-structured.

The tests properly validate both successful config loading and error handling for missing files, covering YAML and JSON formats.


282-460: LGTM! Thorough validation test coverage.

The 17 test cases comprehensively cover validation scenarios including nil checks, missing fields, invalid formats, project ID mismatches, and edge cases like UUIDs and special characters.

test/integration/cloudevents/util/pubsub.go (1)

10-37: LGTM! Clean helper functions for Pub/Sub test setup.

The utilities correctly construct Pub/Sub options with proper topic/subscription mappings for bidirectional source-agent communication. The naming conventions follow Google Cloud Pub/Sub standards.

pkg/cloudevents/generic/options/pubsub/codec_test.go (3)

11-165: LGTM! Comprehensive encode test coverage.

The test cases thoroughly validate encoding of CloudEvents to Pub/Sub messages, including required/optional attributes, extensions, and edge cases. The use of validation callbacks provides clear, focused assertions.


167-360: LGTM! Thorough decode test coverage.

The decode tests properly validate:

  • Required attribute enforcement with clear error messages
  • Default specversion fallback behavior
  • Extension attribute handling
  • Filtering of non-CloudEvents attributes

The error validation with substring checks ensures robust error handling.


362-418: LGTM! Round-trip test ensures codec integrity.

The encode-decode round-trip test validates that all CloudEvents attributes and extensions are preserved through the codec transformation. The time truncation on line 370 is a good practice to avoid precision-related test flakiness.

pkg/cloudevents/generic/types/types.go (3)

104-111: LGTM: Clean separation of MQTT and Pub/Sub patterns.

The renamed MQTT constants and new Pub/Sub patterns provide clear separation between transport types. The PROJECT_ID placeholder in Pub/Sub patterns is correctly handled during validation in options.go.


120-157: LGTM: Comprehensive documentation for dual transport support.

The updated documentation clearly describes the topic formats for both MQTT and Pub/Sub, including the pre-creation requirement for Pub/Sub topics.


160-193: Documentation is appropriate; no code changes needed.

The documentation correctly describes a deployment pre-requisite, not a code responsibility. In Google Cloud Pub/Sub, subscription filters are configured at subscription creation time (by infrastructure/deployment tooling) and enforced server-side by the Pub/Sub service. Application code does not and should not create or validate these filters—it only references subscription names. The integration tests demonstrate this pattern: subscriptions are created with filters as part of test setup, and the application code simply uses the subscription names provided.

pkg/cloudevents/generic/options/pubsub/options.go (3)

15-48: LGTM: Well-structured configuration types.

The struct definitions properly use pointers for Topics/Subscriptions to enable nil-checking during validation, and include appropriate JSON/YAML tags for configuration parsing.


50-63: LGTM: Clean configuration loading.

The function correctly reads and parses YAML configuration with appropriate error handling.


65-89: LGTM: Options builder correctly validates before dereferencing.

The function properly validates that Topics and Subscriptions are non-nil before dereferencing them on lines 86-87, ensuring safe construction of PubSubOptions.

pkg/cloudevents/generic/options/pubsub/codec.go (1)

38-79: LGTM: Correct CloudEvents to Pub/Sub encoding.

The encoding logic properly maps CloudEvents attributes to Pub/Sub message attributes with appropriate prefix handling, and correctly treats DataContentType as Content-Type per CloudEvents HTTP protocol binding convention.

pkg/cloudevents/generic/options/pubsub/sourceoptions.go (4)

37-48: LGTM: Clean constructor pattern.

The constructor properly initializes the transport with all required fields and returns a well-structured CloudEventsSourceOptions.


50-80: LGTM: Flexible connection setup for production and testing.

The Connect method properly handles both authenticated connections (with credentials) and insecure connections (for local testing/emulator), and correctly initializes all publishers and subscribers.


82-104: LGTM: Correct message routing and synchronous publish.

The Send method properly routes messages to the appropriate publisher based on event type and waits for publish confirmation, ensuring reliable message delivery.


158-164: LGTM: Clean resource cleanup.

The Close method properly releases the Pub/Sub client resources, and ErrorChan provides the expected interface method.

pkg/cloudevents/generic/options/pubsub/agentoptions.go (4)

37-49: LGTM: Clean constructor pattern.

The constructor properly initializes the agent transport with all required fields including both AgentID and ClusterName.


51-81: LGTM: Correct agent-side initialization.

The Connect method properly initializes publishers for agent-side topics (AgentEvents, AgentBroadcast) and subscribers for source-side topics (SourceEvents, SourceBroadcast), matching the agent's role in the system.


83-105: LGTM: Correct agent-side message routing.

The Send method properly routes agent messages to AgentBroadcast for resync requests and AgentEvents for other events.


159-165: LGTM: Clean resource cleanup.

The Close and ErrorChan methods are correctly implemented, matching the pattern from sourceoptions.go.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (2)
pkg/cloudevents/generic/options/pubsub/sourceoptions.go (1)

106-154: Use non-blocking sends to errorChan to prevent goroutine blocking.

The subscriber goroutines send errors to errorChan using blocking sends (lines 125, 147). If the consumer of this channel is slow or not actively reading, these goroutines will block indefinitely. The past review comment recommended using non-blocking sends with a select statement to avoid this issue.

Apply this pattern to both error sends:

 		if err != nil {
-			o.errorChan <- fmt.Errorf("subscriber is interrupted by error: %w", err)
+			select {
+			case o.errorChan <- fmt.Errorf("subscriber is interrupted by error: %w", err):
+			default:
+				klog.Warningf("error channel full, dropping subscriber error: %v", err)
+			}
 		}

Similarly for the resync subscriber error on line 147.

pkg/cloudevents/generic/options/pubsub/agentoptions.go (1)

107-155: Use non-blocking sends to errorChan to prevent goroutine blocking.

The subscriber goroutines use blocking sends to errorChan (lines 126, 148). If the channel consumer is slow or unavailable, these goroutines will block. Use non-blocking sends with a select statement as recommended in the previous review to prevent this issue.

Apply this pattern to both error sends:

 		if err != nil {
-			o.errorChan <- fmt.Errorf("subscriber is interrupted by error: %w", err)
+			select {
+			case o.errorChan <- fmt.Errorf("subscriber is interrupted by error: %w", err):
+			default:
+				klog.Warningf("error channel full, dropping subscriber error: %v", err)
+			}
 		}

Similarly for the resync subscriber error on line 148.

🧹 Nitpick comments (2)
pkg/cloudevents/generic/options/pubsub/sourceoptions.go (1)

33-34: Remove outdated TODO comment.

The error channel is now actively used in the Receive method (lines 125, 147) to forward subscriber errors, so this TODO is no longer accurate.

Apply this diff to remove the outdated comment:

 	// Subscriber for resync broadcasts
 	resyncSubscriber *pubsub.Subscriber
-	// TODO: handle error channel
 	errorChan chan error
pkg/cloudevents/generic/options/pubsub/agentoptions.go (1)

33-34: Remove outdated TODO comment.

The error channel is now actively used in the Receive method (lines 126, 148) to forward subscriber errors, making this TODO comment obsolete.

Apply this diff:

 	// Subscriber for resync broadcasts
 	resyncSubscriber *pubsub.Subscriber
-	// TODO: handle error channel
 	errorChan chan error
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between a38bf0e and 941e1f1.

📒 Files selected for processing (4)
  • pkg/cloudevents/generic/options/pubsub/agentoptions.go (1 hunks)
  • pkg/cloudevents/generic/options/pubsub/sourceoptions.go (1 hunks)
  • test/integration/cloudevents/cloudevents_test.go (3 hunks)
  • test/integration/cloudevents/suite_test.go (4 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • test/integration/cloudevents/suite_test.go
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-09-16T02:22:20.929Z
Learnt from: skeeey
Repo: open-cluster-management-io/sdk-go PR: 144
File: pkg/cloudevents/generic/options/grpc/protocol/protocol.go:200-213
Timestamp: 2025-09-16T02:22:20.929Z
Learning: In the GRPC CloudEvents protocol implementation, when startEventsReceiver encounters a stream error, it sends the error to reconnectErrorChan. The consumer of this channel handles the error by calling Close() on the protocol, which triggers close(p.closeChan), causing OpenInbound to unblock and call cancel() to properly terminate both the events receiver and heartbeat watcher goroutines.

Applied to files:

  • test/integration/cloudevents/cloudevents_test.go
  • pkg/cloudevents/generic/options/pubsub/agentoptions.go
  • pkg/cloudevents/generic/options/pubsub/sourceoptions.go
🧬 Code graph analysis (3)
test/integration/cloudevents/cloudevents_test.go (1)
pkg/cloudevents/constants/constants.go (1)
  • ConfigTypePubSub (6-6)
pkg/cloudevents/generic/options/pubsub/agentoptions.go (2)
pkg/cloudevents/generic/options/options.go (3)
  • CloudEventTransport (21-56)
  • CloudEventsAgentOptions (73-86)
  • ReceiveHandlerFn (13-13)
pkg/cloudevents/generic/types/types.go (3)
  • Topics (114-158)
  • ParseCloudEventsType (254-275)
  • ResyncRequestAction (41-41)
pkg/cloudevents/generic/options/pubsub/sourceoptions.go (2)
pkg/cloudevents/generic/options/options.go (3)
  • CloudEventTransport (21-56)
  • CloudEventsSourceOptions (59-70)
  • ReceiveHandlerFn (13-13)
pkg/cloudevents/generic/types/types.go (3)
  • Topics (114-158)
  • ParseCloudEventsType (254-275)
  • ResyncRequestAction (41-41)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
  • GitHub Check: integration
  • GitHub Check: unit
  • GitHub Check: verify
🔇 Additional comments (1)
test/integration/cloudevents/cloudevents_test.go (1)

259-337: LGTM! Proper defer usage and idempotent topic/subscription setup.

The defer statements on lines 265 and 271 are correctly scoped to the setupTopicsAndSubscriptions function, ensuring connections are closed after setup completes. The Get-then-Create pattern for topics and subscriptions makes this function idempotent, which is appropriate for test initialization. The CloudEvents attribute filters (ce-clustername, ce-originalsource) properly route messages to the correct subscribers.

@morvencao morvencao force-pushed the br_add_pubsub_support branch from 941e1f1 to bf8e164 Compare November 6, 2025 03:46
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (2)
pkg/cloudevents/generic/options/pubsub/codec.go (1)

157-164: Minor: Fix comment grammar.

The implementation correctly extracts and uses the content type from Pub/Sub message attributes (line 123-124) and only defaults to ApplicationJSON when absent, which properly addresses the concern raised in the past review. However, the comment on line 158 has awkward phrasing.

Apply this diff to improve the comment:

 	if dataContentType == "" {
-		// default data content type be "application/JSON"
+		// default data content type to "application/json"
 		dataContentType = cloudevents.ApplicationJSON
 	}
pkg/cloudevents/generic/options/pubsub/agentoptions.go (1)

164-166: Consider closing the error channel on cleanup.

While client.Close() stops publishers and subscribers, the errorChan is never closed. Depending on how consumers use this channel, they may not be able to detect that no more errors will be sent.

If consumers need to detect channel closure, apply this diff:

 func (o *pubsubAgentTransport) Close(ctx context.Context) error {
-	return o.client.Close()
+	defer close(o.errorChan)
+	return o.client.Close()
 }

Note: Only apply this if consumers range over the channel or need closure detection. If consumed via select statements, the current implementation is sufficient.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 941e1f1 and bf8e164.

📒 Files selected for processing (5)
  • pkg/cloudevents/generic/options/pubsub/agentoptions.go (1 hunks)
  • pkg/cloudevents/generic/options/pubsub/codec.go (1 hunks)
  • pkg/cloudevents/generic/options/pubsub/sourceoptions.go (1 hunks)
  • test/integration/cloudevents/cloudevents_test.go (3 hunks)
  • test/integration/cloudevents/suite_test.go (4 hunks)
🚧 Files skipped from review as they are similar to previous changes (3)
  • test/integration/cloudevents/suite_test.go
  • test/integration/cloudevents/cloudevents_test.go
  • pkg/cloudevents/generic/options/pubsub/sourceoptions.go
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-09-16T02:22:20.929Z
Learnt from: skeeey
Repo: open-cluster-management-io/sdk-go PR: 144
File: pkg/cloudevents/generic/options/grpc/protocol/protocol.go:200-213
Timestamp: 2025-09-16T02:22:20.929Z
Learning: In the GRPC CloudEvents protocol implementation, when startEventsReceiver encounters a stream error, it sends the error to reconnectErrorChan. The consumer of this channel handles the error by calling Close() on the protocol, which triggers close(p.closeChan), causing OpenInbound to unblock and call cancel() to properly terminate both the events receiver and heartbeat watcher goroutines.

Applied to files:

  • pkg/cloudevents/generic/options/pubsub/agentoptions.go
🧬 Code graph analysis (2)
pkg/cloudevents/generic/options/pubsub/codec.go (2)
vendor/github.com/cloudevents/sdk-go/v2/alias.go (2)
  • VersionV1 (69-69)
  • NewEvent (109-109)
vendor/github.com/cloudevents/sdk-go/v2/types/value.go (1)
  • IsZero (305-314)
pkg/cloudevents/generic/options/pubsub/agentoptions.go (3)
pkg/cloudevents/generic/options/options.go (3)
  • CloudEventTransport (21-56)
  • CloudEventsAgentOptions (73-86)
  • ReceiveHandlerFn (13-13)
pkg/cloudevents/generic/types/types.go (3)
  • Topics (114-158)
  • ParseCloudEventsType (254-275)
  • ResyncRequestAction (41-41)
pkg/cloudevents/generic/options/pubsub/codec.go (2)
  • Encode (38-79)
  • Decode (96-177)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
  • GitHub Check: verify
  • GitHub Check: integration
  • GitHub Check: unit
🔇 Additional comments (6)
pkg/cloudevents/generic/options/pubsub/codec.go (2)

1-17: LGTM!

Package structure, imports, and constant definitions are clean and appropriate for the Pub/Sub CloudEvents codec.


38-78: LGTM!

The encoding logic correctly maps CloudEvent attributes to Pub/Sub message attributes with proper prefixing, handles the Content-Type special case per HTTP protocol binding conventions, and includes appropriate error handling for attribute formatting.

pkg/cloudevents/generic/options/pubsub/agentoptions.go (4)

1-34: LGTM!

Package structure and transport type definition are well-organized. The interface assertion and separation of regular vs. resync publishers/subscribers provide clear intent.


36-48: LGTM!

Constructor correctly initializes the agent transport with appropriate configuration and an error channel. The unbuffered channel is safe given the non-blocking sends with default cases in the Receive method.


106-162: LGTM!

The Receive implementation correctly:

  • Starts concurrent subscribers for both regular and resync messages
  • Respects context cancellation for graceful shutdown
  • Uses non-blocking error channel sends to prevent deadlock
  • ACKs messages even on decode errors (since redelivery won't fix them)
  • Follows the established error handling pattern where errors signal the client to reconnect

168-170: LGTM!

Correctly returns a read-only error channel for consumers to monitor transport errors.

@morvencao morvencao force-pushed the br_add_pubsub_support branch from bf8e164 to ba50aa1 Compare November 6, 2025 10:03
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (4)
test/integration/cloudevents/cloudevents_test.go (1)

259-337: Consider reducing duplication in topic/subscription setup.

The function has repetitive patterns for creating topics and subscriptions (lines 273-334). While the current implementation is clear and functional, you could optionally extract helper functions to reduce the ~80 lines of similar code.

Example refactor pattern:

func setupTopicsAndSubscriptions(ctx context.Context, clusterName, sourceID string) error {
	pubsubConn, err := grpc.NewClient(pubsubServer.Addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
	if err != nil {
		return err
	}
	defer pubsubConn.Close()

	pubsubClient, err := pubsubv2.NewClient(ctx, pubsubProjectID, option.WithGRPCConn(pubsubConn))
	if err != nil {
		return err
	}
	defer pubsubClient.Close()

	// Helper to create topic if not exists
	ensureTopic := func(topicName string) error {
		topicPath := fmt.Sprintf("projects/%s/topics/%s", pubsubProjectID, topicName)
		if _, err := pubsubClient.TopicAdminClient.GetTopic(ctx, &pubsubpb.GetTopicRequest{Topic: topicPath}); err != nil {
			_, err = pubsubClient.TopicAdminClient.CreateTopic(ctx, &pubsubpb.Topic{Name: topicPath})
			return err
		}
		return nil
	}

	// Helper to create subscription if not exists
	ensureSubscription := func(subName, topicName, filter string) error {
		subPath := fmt.Sprintf("projects/%s/subscriptions/%s", pubsubProjectID, subName)
		topicPath := fmt.Sprintf("projects/%s/topics/%s", pubsubProjectID, topicName)
		if _, err := pubsubClient.SubscriptionAdminClient.GetSubscription(ctx, &pubsubpb.GetSubscriptionRequest{Subscription: subPath}); err != nil {
			sub := &pubsubpb.Subscription{Name: subPath, Topic: topicPath}
			if filter != "" {
				sub.Filter = filter
			}
			_, err = pubsubClient.SubscriptionAdminClient.CreateSubscription(ctx, sub)
			return err
		}
		return nil
	}

	// Create topics
	for _, topic := range []string{"sourceevents", "sourcebroadcast", "agentevents", "agentbroadcast"} {
		if err := ensureTopic(topic); err != nil {
			return err
		}
	}

	// Create subscriptions with filters
	subs := []struct{ name, topic, filter string }{
		{fmt.Sprintf("sourceevents-%s", clusterName), "sourceevents", fmt.Sprintf("attributes.\"ce-clustername\"=\"%s\"", clusterName)},
		{fmt.Sprintf("sourcebroadcast-%s", clusterName), "sourcebroadcast", ""},
		{fmt.Sprintf("agentevents-%s", sourceID), "agentevents", fmt.Sprintf("attributes.\"ce-originalsource\"=\"%s\"", sourceID)},
		{fmt.Sprintf("agentbroadcast-%s", sourceID), "agentbroadcast", ""},
	}
	for _, sub := range subs {
		if err := ensureSubscription(sub.name, sub.topic, sub.filter); err != nil {
			return err
		}
	}

	return nil
}
pkg/cloudevents/generic/options/pubsub/options_test.go (1)

613-624: Use standard library strings.Contains instead of custom implementation.

The custom contains and stringContains functions duplicate functionality available in the standard library.

Replace with:

+import (
+	"strings"
+	// ... other imports
+)

-func contains(s, substr string) bool {
-	return len(s) >= len(substr) && (s == substr || len(substr) == 0 || (len(s) > 0 && len(substr) > 0 && stringContains(s, substr)))
-}
-
-func stringContains(s, substr string) bool {
-	for i := 0; i <= len(s)-len(substr); i++ {
-		if s[i:i+len(substr)] == substr {
-			return true
-		}
-	}
-	return false
-}
+func contains(s, substr string) bool {
+	return strings.Contains(s, substr)
+}

Or directly use strings.Contains in the test assertions (lines 252, 333).

pkg/cloudevents/generic/options/pubsub/agentoptions.go (1)

185-198: Fix typo in function name.

The function name toGRPCKeepaliveParamater has a typo: "Paramater" should be "Parameter". Note that this function is also called from sourceoptions.go line 68, so both call sites need updating.

Apply this diff:

-// toGRPCKeepaliveParamater converts our KeepaliveSettings to GRPC ClientParameters.
-func toGRPCKeepaliveParamater(settings *KeepaliveSettings) keepalive.ClientParameters {
-	keepaliveParamater := keepalive.ClientParameters{
+// toGRPCKeepaliveParameter converts our KeepaliveSettings to GRPC ClientParameters.
+func toGRPCKeepaliveParameter(settings *KeepaliveSettings) keepalive.ClientParameters {
+	keepaliveParameter := keepalive.ClientParameters{
 		PermitWithoutStream: settings.PermitWithoutStream,
 	}
 	if settings.Time > 0 {
-		keepaliveParamater.Time = settings.Time
+		keepaliveParameter.Time = settings.Time
 	}
 	if settings.Timeout > 0 {
-		keepaliveParamater.Timeout = settings.Timeout
+		keepaliveParameter.Timeout = settings.Timeout
 	}
 
-	return keepaliveParamater
+	return keepaliveParameter
 }

Also update the call site in sourceoptions.go:

// Line 68 in sourceoptions.go
clientOptions = append(clientOptions, option.WithGRPCDialOption(grpc.WithKeepaliveParams(toGRPCKeepaliveParameter(o.KeepaliveSettings))))
pkg/cloudevents/generic/options/pubsub/options.go (1)

148-227: Consider caching compiled regex patterns.

The validation function compiles the same regex pattern multiple times (lines 175-199 for topics, 201-225 for subscriptions). While this is only called once at startup, caching the compiled patterns would be more efficient.

Example refactor:

func validateTopicsAndSubscriptions(topics *types.Topics, subscriptions *types.Subscriptions, projectID string) error {
	// ... nil checks and combination validation ...

	// Compile patterns once
	topicPattern := strings.ReplaceAll(types.PubSubTopicPattern, "PROJECT_ID", regexp.QuoteMeta(projectID))
	topicRegex := regexp.MustCompile(topicPattern)
	
	subscriptionPattern := strings.ReplaceAll(types.PubSubSubscriptionPattern, "PROJECT_ID", regexp.QuoteMeta(projectID))
	subscriptionRegex := regexp.MustCompile(subscriptionPattern)

	// Use cached regex for all checks
	if len(topics.SourceEvents) != 0 {
		if !topicRegex.MatchString(topics.SourceEvents) {
			errs = append(errs, fmt.Errorf("invalid source events topic %q, it should match `%s`",
				topics.SourceEvents, topicPattern))
		}
	}
	// ... continue with other validations using topicRegex and subscriptionRegex ...
}
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between bf8e164 and ba50aa1.

📒 Files selected for processing (8)
  • pkg/cloudevents/generic/options/pubsub/agentoptions.go (1 hunks)
  • pkg/cloudevents/generic/options/pubsub/agentoptions_test.go (1 hunks)
  • pkg/cloudevents/generic/options/pubsub/codec.go (1 hunks)
  • pkg/cloudevents/generic/options/pubsub/options.go (1 hunks)
  • pkg/cloudevents/generic/options/pubsub/options_test.go (1 hunks)
  • pkg/cloudevents/generic/options/pubsub/sourceoptions.go (1 hunks)
  • test/integration/cloudevents/cloudevents_test.go (3 hunks)
  • test/integration/cloudevents/suite_test.go (4 hunks)
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-09-16T02:22:20.929Z
Learnt from: skeeey
Repo: open-cluster-management-io/sdk-go PR: 144
File: pkg/cloudevents/generic/options/grpc/protocol/protocol.go:200-213
Timestamp: 2025-09-16T02:22:20.929Z
Learning: In the GRPC CloudEvents protocol implementation, when startEventsReceiver encounters a stream error, it sends the error to reconnectErrorChan. The consumer of this channel handles the error by calling Close() on the protocol, which triggers close(p.closeChan), causing OpenInbound to unblock and call cancel() to properly terminate both the events receiver and heartbeat watcher goroutines.

Applied to files:

  • test/integration/cloudevents/cloudevents_test.go
  • pkg/cloudevents/generic/options/pubsub/sourceoptions.go
🧬 Code graph analysis (7)
test/integration/cloudevents/cloudevents_test.go (1)
pkg/cloudevents/constants/constants.go (1)
  • ConfigTypePubSub (6-6)
pkg/cloudevents/generic/options/pubsub/options_test.go (3)
pkg/cloudevents/generic/options/pubsub/options.go (6)
  • PubSubOptions (17-25)
  • KeepaliveSettings (58-67)
  • ReceiveSettings (70-89)
  • BuildPubSubOptionsFromFlags (107-146)
  • PubSubConfig (28-55)
  • LoadConfig (92-104)
pkg/cloudevents/generic/types/types.go (1)
  • Topics (114-158)
pkg/testing/util.go (1)
  • WriteToTempFile (10-21)
pkg/cloudevents/generic/options/pubsub/sourceoptions.go (4)
pkg/cloudevents/generic/options/options.go (3)
  • CloudEventTransport (21-56)
  • CloudEventsSourceOptions (59-70)
  • ReceiveHandlerFn (13-13)
pkg/cloudevents/generic/options/pubsub/options.go (3)
  • PubSubOptions (17-25)
  • KeepaliveSettings (58-67)
  • ReceiveSettings (70-89)
pkg/cloudevents/generic/types/types.go (3)
  • Topics (114-158)
  • ParseCloudEventsType (254-275)
  • ResyncRequestAction (41-41)
pkg/cloudevents/generic/options/pubsub/codec.go (2)
  • Encode (38-79)
  • Decode (96-177)
pkg/cloudevents/generic/options/pubsub/agentoptions_test.go (3)
pkg/cloudevents/generic/options/pubsub/options.go (3)
  • PubSubOptions (17-25)
  • KeepaliveSettings (58-67)
  • ReceiveSettings (70-89)
pkg/cloudevents/generic/types/types.go (1)
  • Topics (114-158)
pkg/cloudevents/generic/options/pubsub/agentoptions.go (1)
  • NewAgentOptions (38-49)
pkg/cloudevents/generic/options/pubsub/agentoptions.go (4)
pkg/cloudevents/generic/options/options.go (3)
  • CloudEventTransport (21-56)
  • CloudEventsAgentOptions (73-86)
  • ReceiveHandlerFn (13-13)
pkg/cloudevents/generic/options/pubsub/options.go (3)
  • PubSubOptions (17-25)
  • KeepaliveSettings (58-67)
  • ReceiveSettings (70-89)
pkg/cloudevents/generic/types/types.go (3)
  • Topics (114-158)
  • ParseCloudEventsType (254-275)
  • ResyncRequestAction (41-41)
pkg/cloudevents/generic/options/pubsub/codec.go (2)
  • Encode (38-79)
  • Decode (96-177)
pkg/cloudevents/generic/options/pubsub/codec.go (2)
vendor/github.com/cloudevents/sdk-go/v2/alias.go (2)
  • VersionV1 (69-69)
  • NewEvent (109-109)
vendor/github.com/cloudevents/sdk-go/v2/types/value.go (1)
  • IsZero (305-314)
pkg/cloudevents/generic/options/pubsub/options.go (1)
pkg/cloudevents/generic/types/types.go (3)
  • Topics (114-158)
  • PubSubTopicPattern (109-109)
  • PubSubSubscriptionPattern (110-110)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
  • GitHub Check: integration
  • GitHub Check: unit
  • GitHub Check: verify
🔇 Additional comments (12)
test/integration/cloudevents/suite_test.go (2)

7-86: LGTM! Pub/Sub test server properly initialized.

The Pub/Sub test server setup is clean and well-integrated. The random project ID ensures test isolation, and the server is initialized early in BeforeSuite before other components.


180-182: LGTM! Cleanup properly implemented.

The Pub/Sub test server is correctly closed in AfterSuite with proper error checking, preventing resource leaks.

test/integration/cloudevents/cloudevents_test.go (2)

12-20: LGTM! Necessary imports for Pub/Sub integration.

All imports are properly used in the Pub/Sub setup and test flow.


86-89: LGTM! Pub/Sub test case properly integrated.

The Pub/Sub configuration case correctly initializes agent options and sets up topics/subscriptions before starting the test.

pkg/cloudevents/generic/options/pubsub/options_test.go (1)

81-529: LGTM! Comprehensive test coverage.

The test suite is well-structured with excellent coverage of valid configurations, error cases, and edge cases including:

  • Missing required fields
  • Invalid topic/subscription formats
  • Project ID mismatches
  • UUID-containing subscriptions
  • Custom settings overrides
pkg/cloudevents/generic/options/pubsub/agentoptions_test.go (1)

1-323: LGTM! Comprehensive agent options test coverage.

The test suite thoroughly validates:

  • Agent options construction with correct field assignments
  • Internal transport structure before connection
  • Proper conversion of keepalive settings to gRPC parameters
  • Correct mapping of receive settings to Pub/Sub configuration

All tests follow best practices with table-driven patterns and proper assertions.

pkg/cloudevents/generic/options/pubsub/codec.go (2)

38-79: LGTM! Encode function correctly implements CloudEvents Pub/Sub binding.

The encoding logic properly:

  • Maps CloudEvents attributes to Pub/Sub message attributes with ce- prefix
  • Handles datacontenttype specially as Content-Type without prefix (following HTTP protocol binding convention)
  • Includes extensions as prefixed attributes
  • Returns detailed error messages for formatting failures

96-177: LGTM! Decode function properly reconstructs CloudEvents.

The decoding logic correctly:

  • Validates input and handles nil messages
  • Defaults to CloudEvents V1 if specversion is missing
  • Maps Pub/Sub attributes back to CloudEvent context (with special handling for Content-Type)
  • Extracts extensions from ce- prefixed attributes
  • Defaults data content type to application/json when not specified (addressing the previous review comment)
  • Validates all required CloudEvent attributes
pkg/cloudevents/generic/options/pubsub/sourceoptions.go (1)

117-173: LGTM: Error handling correctly forwards to errorChan.

The Receive method properly implements error forwarding to errorChan using non-blocking select statements (lines 136-140, 162-166), consistent with the MQTT/GRPC transport pattern.

pkg/cloudevents/generic/options/pubsub/agentoptions.go (2)

51-93: LGTM: Variable shadowing issue resolved.

The Connect method correctly uses clientOptions instead of options, avoiding shadowing of the imported option package.


95-117: LGTM: Error message correctly references event type string.

Line 103 properly uses evt.Context.GetType() in the error message instead of the potentially nil eventType variable.

pkg/cloudevents/generic/options/pubsub/options.go (1)

106-146: LGTM: Options builder correctly applies defaults and validates config.

The function properly loads config, validates required fields, and applies sensible default keepalive settings (5m time, 20s timeout) while allowing config overrides.

@morvencao morvencao force-pushed the br_add_pubsub_support branch from ba50aa1 to dc2674a Compare November 6, 2025 10:50
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🧹 Nitpick comments (1)
pkg/cloudevents/generic/options/pubsub/options.go (1)

148-227: LGTM with optional refactor suggestion.

The validation logic correctly ensures that either all source-related fields or all agent-related fields are populated, and validates the format of each topic and subscription against the expected patterns.

Optional: Pre-compile regex patterns for slight efficiency gain.

The regex patterns are compiled multiple times (once per field validation). While this function runs infrequently (typically once at startup), you could pre-compile the patterns once:

func validateTopicsAndSubscriptions(topics *types.Topics, subscriptions *types.Subscriptions, projectID string) error {
	// ... existing nil checks ...

	var errs []error
	if !isValidForSource && !isValidForAgent {
		// ... existing error ...
	}

	// Compile patterns once
	topicPattern := strings.ReplaceAll(types.PubSubTopicPattern, "PROJECT_ID", regexp.QuoteMeta(projectID))
	topicRegex := regexp.MustCompile(topicPattern)
	subscriptionPattern := strings.ReplaceAll(types.PubSubSubscriptionPattern, "PROJECT_ID", regexp.QuoteMeta(projectID))
	subscriptionRegex := regexp.MustCompile(subscriptionPattern)

	// Then use topicRegex and subscriptionRegex for all validations
	if len(topics.SourceEvents) != 0 {
		if !topicRegex.MatchString(topics.SourceEvents) {
			// ... error ...
		}
	}
	// ... rest of validations using the pre-compiled regex ...
}
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between ba50aa1 and dc2674a.

📒 Files selected for processing (9)
  • pkg/cloudevents/generic/options/builder/optionsbuilder_test.go (5 hunks)
  • pkg/cloudevents/generic/options/pubsub/agentoptions.go (1 hunks)
  • pkg/cloudevents/generic/options/pubsub/agentoptions_test.go (1 hunks)
  • pkg/cloudevents/generic/options/pubsub/codec.go (1 hunks)
  • pkg/cloudevents/generic/options/pubsub/options.go (1 hunks)
  • pkg/cloudevents/generic/options/pubsub/options_test.go (1 hunks)
  • pkg/cloudevents/generic/options/pubsub/sourceoptions.go (1 hunks)
  • test/integration/cloudevents/cloudevents_test.go (3 hunks)
  • test/integration/cloudevents/suite_test.go (4 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
  • pkg/cloudevents/generic/options/pubsub/agentoptions_test.go
  • pkg/cloudevents/generic/options/builder/optionsbuilder_test.go
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-09-16T02:22:20.929Z
Learnt from: skeeey
Repo: open-cluster-management-io/sdk-go PR: 144
File: pkg/cloudevents/generic/options/grpc/protocol/protocol.go:200-213
Timestamp: 2025-09-16T02:22:20.929Z
Learning: In the GRPC CloudEvents protocol implementation, when startEventsReceiver encounters a stream error, it sends the error to reconnectErrorChan. The consumer of this channel handles the error by calling Close() on the protocol, which triggers close(p.closeChan), causing OpenInbound to unblock and call cancel() to properly terminate both the events receiver and heartbeat watcher goroutines.

Applied to files:

  • pkg/cloudevents/generic/options/pubsub/sourceoptions.go
  • test/integration/cloudevents/cloudevents_test.go
🧬 Code graph analysis (6)
pkg/cloudevents/generic/options/pubsub/agentoptions.go (4)
pkg/cloudevents/generic/options/options.go (3)
  • CloudEventTransport (21-56)
  • CloudEventsAgentOptions (73-86)
  • ReceiveHandlerFn (13-13)
pkg/cloudevents/generic/options/pubsub/options.go (3)
  • PubSubOptions (17-25)
  • KeepaliveSettings (58-67)
  • ReceiveSettings (70-89)
pkg/cloudevents/generic/types/types.go (3)
  • Topics (114-158)
  • ParseCloudEventsType (254-275)
  • ResyncRequestAction (41-41)
pkg/cloudevents/generic/options/pubsub/codec.go (2)
  • Encode (38-79)
  • Decode (96-177)
pkg/cloudevents/generic/options/pubsub/sourceoptions.go (4)
pkg/cloudevents/generic/options/options.go (3)
  • CloudEventTransport (21-56)
  • CloudEventsSourceOptions (59-70)
  • ReceiveHandlerFn (13-13)
pkg/cloudevents/generic/options/pubsub/options.go (3)
  • PubSubOptions (17-25)
  • KeepaliveSettings (58-67)
  • ReceiveSettings (70-89)
pkg/cloudevents/generic/types/types.go (3)
  • Topics (114-158)
  • ParseCloudEventsType (254-275)
  • ResyncRequestAction (41-41)
pkg/cloudevents/generic/options/pubsub/codec.go (2)
  • Encode (38-79)
  • Decode (96-177)
pkg/cloudevents/generic/options/pubsub/options.go (1)
pkg/cloudevents/generic/types/types.go (3)
  • Topics (114-158)
  • PubSubTopicPattern (109-109)
  • PubSubSubscriptionPattern (110-110)
pkg/cloudevents/generic/options/pubsub/options_test.go (3)
pkg/cloudevents/generic/options/pubsub/options.go (6)
  • PubSubOptions (17-25)
  • KeepaliveSettings (58-67)
  • ReceiveSettings (70-89)
  • BuildPubSubOptionsFromFlags (107-146)
  • PubSubConfig (28-55)
  • LoadConfig (92-104)
pkg/cloudevents/generic/types/types.go (1)
  • Topics (114-158)
pkg/testing/util.go (1)
  • WriteToTempFile (10-21)
test/integration/cloudevents/cloudevents_test.go (1)
pkg/cloudevents/constants/constants.go (1)
  • ConfigTypePubSub (6-6)
pkg/cloudevents/generic/options/pubsub/codec.go (2)
vendor/github.com/cloudevents/sdk-go/v2/alias.go (2)
  • VersionV1 (69-69)
  • NewEvent (109-109)
vendor/github.com/cloudevents/sdk-go/v2/types/value.go (1)
  • IsZero (305-314)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
  • GitHub Check: integration
  • GitHub Check: verify
  • GitHub Check: unit
🔇 Additional comments (9)
pkg/cloudevents/generic/options/pubsub/agentoptions.go (6)

51-93: LGTM!

The connection setup is well-structured, with proper handling of credentials, endpoints, keepalive settings, and receive settings. Previous shadowing concerns have been resolved.


95-117: LGTM!

The send logic correctly routes events to the appropriate publisher based on action type and properly blocks until the publish operation completes. Previous error message issues have been resolved.


119-175: LGTM!

The receive implementation properly starts both subscribers in separate goroutines, handles decode errors appropriately by ACKing (since redelivery won't fix them), and includes helpful comments about Pub/Sub's automatic retry behavior.


177-179: LGTM!

The close implementation is straightforward and correct.


181-183: LGTM!

The error channel accessor is correctly implemented with a read-only return type.


200-224: LGTM!

The conversion logic correctly maps all fields from the custom ReceiveSettings to Pub/Sub's ReceiveSettings.

pkg/cloudevents/generic/options/pubsub/options.go (3)

16-89: LGTM!

The configuration structures are well-designed with clear documentation and appropriate JSON/YAML tags for serialization.


91-104: LGTM!

The config loading function is simple and correct.


106-146: LGTM!

The option building function provides sensible defaults for keepalive settings and properly validates the configuration before creating the options object.

@morvencao morvencao force-pushed the br_add_pubsub_support branch from dc2674a to b6eb31d Compare November 6, 2025 11:10
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (5)
pkg/cloudevents/generic/options/pubsub/agentoptions.go (3)

44-44: Buffer the error channel to prevent immediate error loss.

The error channel is created unbuffered, but errors are sent with a non-blocking select/default in the Receive method (lines 138-142, 164-168). With an unbuffered channel, if no goroutine is reading when an error occurs, it's immediately dropped. Buffer the channel with at least one slot to allow errors to be queued briefly.

Apply this diff:

-		errorChan:     make(chan error),
+		errorChan:     make(chan error, 1),

60-64: Store and close the grpc connection to prevent resource leak.

The grpc connection created for the emulator/test path is never stored or closed. When you pass a connection via option.WithGRPCConn, the Pub/Sub client does not take ownership—the caller must close it. Add a grpcConn *grpc.ClientConn field to the transport struct, store the connection there, and close it in the Close() method after closing the Pub/Sub client.

Apply this diff to the struct:

 type pubsubAgentTransport struct {
 	PubSubOptions
 	clusterName string
 	client      *pubsub.Client
+	grpcConn    *grpc.ClientConn
 	// Publisher for status updates
 	publisher *pubsub.Publisher

Then in Connect:

 		if o.CredentialsFile == "" {
 			// use the insecure connection for local development/testing when no credentials are provided
 			pubsubConn, err := grpc.NewClient(o.Endpoint, grpc.WithTransportCredentials(insecure.NewCredentials()))
 			if err != nil {
 				return err
 			}
+			o.grpcConn = pubsubConn
 			clientOptions = append(clientOptions, option.WithGRPCConn(pubsubConn))
 		}

And update Close accordingly (see comment on Close method).


177-179: Close the error channel to signal shutdown completion.

The CloudEventTransport interface contract specifies that ErrorChan() should be closed when Close() is called, but currently only the Pub/Sub client is closed. Consumers waiting on the error channel will hang indefinitely. Add close(o.errorChan) after closing the client and grpc connection to properly signal completion.

Apply this diff:

 func (o *pubsubAgentTransport) Close(ctx context.Context) error {
-	return o.client.Close()
+	var errs []error
+	if o.client != nil {
+		if err := o.client.Close(); err != nil {
+			errs = append(errs, err)
+		}
+	}
+	if o.grpcConn != nil {
+		if err := o.grpcConn.Close(); err != nil {
+			errs = append(errs, err)
+		}
+	}
+	close(o.errorChan)
+	if len(errs) > 0 {
+		return fmt.Errorf("errors closing transport: %v", errs)
+	}
+	return nil
 }
pkg/cloudevents/generic/options/pubsub/sourceoptions.go (2)

58-62: Store and close the grpc connection to prevent resource leak.

The grpc connection created for the emulator/test path is never stored or closed. When you pass a connection via option.WithGRPCConn, the Pub/Sub client does not take ownership—the caller must close it. Add a grpcConn *grpc.ClientConn field to the transport struct, store the connection there, and close it in the Close() method after closing the Pub/Sub client.

Apply this diff:

 type pubsubSourceTransport struct {
 	PubSubOptions
 	sourceID string
 	client   *pubsub.Client
+	grpcConn *grpc.ClientConn
 	// Publisher for spec updates
 	publisher *pubsub.Publisher

Then in Connect:

 		if o.CredentialsFile == "" {
 			// use the insecure connection for local development/testing when no credentials are provided
 			pubsubConn, err := grpc.NewClient(o.Endpoint, grpc.WithTransportCredentials(insecure.NewCredentials()))
 			if err != nil {
 				return err
 			}
+			o.grpcConn = pubsubConn
 			clientOptions = append(clientOptions, option.WithGRPCConn(pubsubConn))
 		}

And in Close:

 func (o *pubsubSourceTransport) Close(ctx context.Context) error {
-	return o.client.Close()
+	var errs []error
+	if o.client != nil {
+		if err := o.client.Close(); err != nil {
+			errs = append(errs, err)
+		}
+	}
+	if o.grpcConn != nil {
+		if err := o.grpcConn.Close(); err != nil {
+			errs = append(errs, err)
+		}
+	}
+	if len(errs) > 0 {
+		return fmt.Errorf("errors closing transport: %v", errs)
+	}
+	return nil
 }

175-177: Close the error channel to signal shutdown completion.

The CloudEventTransport interface contract specifies that ErrorChan() should be closed when Close() is called, but currently only the Pub/Sub client is closed. Consumers waiting on the error channel will hang indefinitely. Add close(o.errorChan) after closing the client to properly signal completion.

Apply this diff:

 func (o *pubsubSourceTransport) Close(ctx context.Context) error {
-	return o.client.Close()
+	var errs []error
+	if o.client != nil {
+		if err := o.client.Close(); err != nil {
+			errs = append(errs, err)
+		}
+	}
+	if o.grpcConn != nil {
+		if err := o.grpcConn.Close(); err != nil {
+			errs = append(errs, err)
+		}
+	}
+	close(o.errorChan)
+	if len(errs) > 0 {
+		return fmt.Errorf("errors closing transport: %v", errs)
+	}
+	return nil
 }
🧹 Nitpick comments (1)
pkg/cloudevents/generic/options/pubsub/options.go (1)

175-225: Consider compiling regex patterns once for efficiency.

The topic and subscription validation patterns are compiled multiple times (lines 175-199 and 201-225), once for each field being validated. While this code runs during initialization and isn't performance-critical, compiling each pattern once at the function start and reusing it would be more efficient and follows the Go best practice of avoiding repeated regex compilation.

Example refactor:

 func validateTopicsAndSubscriptions(topics *types.Topics, subscriptions *types.Subscriptions, projectID string) error {
 	// ... validation checks ...
 
 	topicPattern := strings.ReplaceAll(types.PubSubTopicPattern, "PROJECT_ID", regexp.QuoteMeta(projectID))
+	topicRegex := regexp.MustCompile(topicPattern)
+	
 	if len(topics.SourceEvents) != 0 {
-		if !regexp.MustCompile(topicPattern).MatchString(topics.SourceEvents) {
+		if !topicRegex.MatchString(topics.SourceEvents) {
 			errs = append(errs, fmt.Errorf("invalid source events topic %q, it should match `%s`",
 				topics.SourceEvents, topicPattern))
 		}
 	}
 	// ... repeat for other topics ...
 
 	subscriptionPattern := strings.ReplaceAll(types.PubSubSubscriptionPattern, "PROJECT_ID", regexp.QuoteMeta(projectID))
+	subscriptionRegex := regexp.MustCompile(subscriptionPattern)
+	
 	if len(subscriptions.SourceEvents) != 0 {
-		if !regexp.MustCompile(subscriptionPattern).MatchString(subscriptions.SourceEvents) {
+		if !subscriptionRegex.MatchString(subscriptions.SourceEvents) {
 			errs = append(errs, fmt.Errorf("invalid source events subscription %q, it should match `%s`",
 				subscriptions.SourceEvents, subscriptionPattern))
 		}
 	}
 	// ... repeat for other subscriptions ...
 }
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between dc2674a and b6eb31d.

📒 Files selected for processing (9)
  • pkg/cloudevents/generic/options/builder/optionsbuilder_test.go (5 hunks)
  • pkg/cloudevents/generic/options/pubsub/agentoptions.go (1 hunks)
  • pkg/cloudevents/generic/options/pubsub/agentoptions_test.go (1 hunks)
  • pkg/cloudevents/generic/options/pubsub/codec.go (1 hunks)
  • pkg/cloudevents/generic/options/pubsub/options.go (1 hunks)
  • pkg/cloudevents/generic/options/pubsub/options_test.go (1 hunks)
  • pkg/cloudevents/generic/options/pubsub/sourceoptions.go (1 hunks)
  • test/integration/cloudevents/cloudevents_test.go (3 hunks)
  • test/integration/cloudevents/suite_test.go (4 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
  • test/integration/cloudevents/suite_test.go
  • pkg/cloudevents/generic/options/builder/optionsbuilder_test.go
🧰 Additional context used
🧠 Learnings (2)
📚 Learning: 2025-09-16T02:22:20.929Z
Learnt from: skeeey
Repo: open-cluster-management-io/sdk-go PR: 144
File: pkg/cloudevents/generic/options/grpc/protocol/protocol.go:200-213
Timestamp: 2025-09-16T02:22:20.929Z
Learning: In the GRPC CloudEvents protocol implementation, when startEventsReceiver encounters a stream error, it sends the error to reconnectErrorChan. The consumer of this channel handles the error by calling Close() on the protocol, which triggers close(p.closeChan), causing OpenInbound to unblock and call cancel() to properly terminate both the events receiver and heartbeat watcher goroutines.

Applied to files:

  • test/integration/cloudevents/cloudevents_test.go
  • pkg/cloudevents/generic/options/pubsub/agentoptions.go
  • pkg/cloudevents/generic/options/pubsub/sourceoptions.go
📚 Learning: 2025-09-16T06:03:45.232Z
Learnt from: skeeey
Repo: open-cluster-management-io/sdk-go PR: 144
File: pkg/cloudevents/server/grpc/broker.go:169-194
Timestamp: 2025-09-16T06:03:45.232Z
Learning: In the Open Cluster Management gRPC CloudEvents implementation, client reconnections due to heartbeat timeout are considered acceptable behavior by the maintainer skeeey, rather than something that needs optimization to prevent.

Applied to files:

  • pkg/cloudevents/generic/options/pubsub/sourceoptions.go
🧬 Code graph analysis (7)
pkg/cloudevents/generic/options/pubsub/codec.go (2)
vendor/github.com/cloudevents/sdk-go/v2/alias.go (2)
  • VersionV1 (69-69)
  • NewEvent (109-109)
vendor/github.com/cloudevents/sdk-go/v2/types/value.go (1)
  • IsZero (305-314)
pkg/cloudevents/generic/options/pubsub/agentoptions_test.go (3)
pkg/cloudevents/generic/options/pubsub/options.go (3)
  • PubSubOptions (17-25)
  • KeepaliveSettings (58-67)
  • ReceiveSettings (70-89)
pkg/cloudevents/generic/types/types.go (1)
  • Topics (114-158)
pkg/cloudevents/generic/options/pubsub/agentoptions.go (1)
  • NewAgentOptions (38-49)
pkg/cloudevents/generic/options/pubsub/options.go (1)
pkg/cloudevents/generic/types/types.go (3)
  • Topics (114-158)
  • PubSubTopicPattern (109-109)
  • PubSubSubscriptionPattern (110-110)
pkg/cloudevents/generic/options/pubsub/options_test.go (3)
pkg/cloudevents/generic/options/pubsub/options.go (6)
  • PubSubOptions (17-25)
  • KeepaliveSettings (58-67)
  • ReceiveSettings (70-89)
  • BuildPubSubOptionsFromFlags (107-146)
  • PubSubConfig (28-55)
  • LoadConfig (92-104)
pkg/cloudevents/generic/types/types.go (1)
  • Topics (114-158)
pkg/testing/util.go (1)
  • WriteToTempFile (10-21)
test/integration/cloudevents/cloudevents_test.go (1)
pkg/cloudevents/constants/constants.go (1)
  • ConfigTypePubSub (6-6)
pkg/cloudevents/generic/options/pubsub/agentoptions.go (4)
pkg/cloudevents/generic/options/options.go (2)
  • CloudEventTransport (21-56)
  • ReceiveHandlerFn (13-13)
pkg/cloudevents/generic/options/pubsub/options.go (3)
  • PubSubOptions (17-25)
  • KeepaliveSettings (58-67)
  • ReceiveSettings (70-89)
pkg/cloudevents/generic/types/types.go (3)
  • Topics (114-158)
  • ParseCloudEventsType (254-275)
  • ResyncRequestAction (41-41)
pkg/cloudevents/generic/options/pubsub/codec.go (2)
  • Encode (38-79)
  • Decode (96-177)
pkg/cloudevents/generic/options/pubsub/sourceoptions.go (4)
pkg/cloudevents/generic/options/options.go (3)
  • CloudEventTransport (21-56)
  • CloudEventsSourceOptions (59-70)
  • ReceiveHandlerFn (13-13)
pkg/cloudevents/generic/options/pubsub/options.go (3)
  • PubSubOptions (17-25)
  • KeepaliveSettings (58-67)
  • ReceiveSettings (70-89)
pkg/cloudevents/generic/types/types.go (3)
  • Topics (114-158)
  • ParseCloudEventsType (254-275)
  • ResyncRequestAction (41-41)
pkg/cloudevents/generic/options/pubsub/codec.go (2)
  • Encode (38-79)
  • Decode (96-177)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
  • GitHub Check: unit
  • GitHub Check: verify
  • GitHub Check: integration
🔇 Additional comments (6)
test/integration/cloudevents/cloudevents_test.go (2)

86-89: LGTM! PubSub test setup is correctly integrated.

The PubSub configuration case properly initializes agent options and sets up topics/subscriptions before test execution. The synchronous setup in BeforeEach ensures resources are ready before tests run.


259-337: LGTM! Helper function correctly scopes resource cleanup.

The setupTopicsAndSubscriptions helper properly manages the grpc connection and pubsub client lifecycle using defer statements. Since this function only performs setup and returns before test execution, the deferred cleanup executes at the right time. The idempotent topic/subscription creation pattern is appropriate for integration tests.

pkg/cloudevents/generic/options/pubsub/sourceoptions.go (1)

93-115: LGTM! Send method correctly routes events to appropriate publishers.

The logic properly encodes CloudEvents, parses the event type, and routes to the resync or normal publisher based on the action type. Error handling is appropriate.

pkg/cloudevents/generic/options/pubsub/codec.go (1)

157-160: LGTM! Content type handling is correct.

The code properly reads the content type from message attributes (lines 122-124) and only defaults to ApplicationJSON when no content type was provided. This is the correct fallback behavior.

pkg/cloudevents/generic/options/pubsub/agentoptions.go (1)

95-117: LGTM! Send method correctly routes events to appropriate publishers.

The implementation properly encodes CloudEvents, parses event types, and routes to the resync or normal publisher based on the action. Error handling is correct.

pkg/cloudevents/generic/options/pubsub/options.go (1)

106-146: LGTM! Clean configuration loading with sensible defaults.

The BuildPubSubOptionsFromFlags function properly loads configuration, validates required fields, and applies sensible keepalive defaults (5 minutes with 20 second timeout) while allowing user overrides. The structure is clear and maintainable.

@morvencao
Copy link
Member Author

/assign @qiujian16 @skeeey

}
}

func (o *pubsubSourceTransport) Connect(ctx context.Context) error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it possible to use one pubsubTranspor? it can support agent and source, I think the transport implement logic should be same, only with different args?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, done to use one pubsubTransport.


const (
// CloudEvents attribute prefix for Pub/Sub message attributes
ceAttrPrefix = "ce-"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this prefix is required by ce-sdk, for us do we still need this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'm not completely sure.
per my understanding, if customers use events from another platform that already include the ce- prefix, then we should keep it. Otherwise, if that's not the case, we can drop the ce- prefix.
@qiujian16 comments?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we could keep it, just to avoid attribute conflict.

@morvencao morvencao force-pushed the br_add_pubsub_support branch from b6eb31d to 043659b Compare November 11, 2025 03:20
@openshift-ci
Copy link

openshift-ci bot commented Nov 11, 2025

[APPROVALNOTIFIER] This PR is NOT APPROVED

This pull-request has been approved by: morvencao
Once this PR has been reviewed and has the lgtm label, please ask for approval from qiujian16. For more information see the Code Review Process.

The full list of commands accepted by this bot can be found here.

Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (2)
pkg/cloudevents/generic/options/pubsub/agentoptions.go (1)

7-19: Consider buffering the error channel to prevent error drops.

The error channel is created unbuffered (line 14), which may result in dropped errors if the non-blocking select/default pattern is used in the transport's error reporting and no reader is immediately available. Consider buffering with make(chan error, 1) to allow at least one error to be queued, matching the recommendation from past reviews.

Note: The same consideration applies to sourceoptions.go (line 14).

Apply this diff:

-		errorChan:     make(chan error),
+		errorChan:     make(chan error, 1),
pkg/cloudevents/generic/options/pubsub/options_test.go (1)

613-624: Simplify string containment check using stdlib.

The helper functions reimplement strings.Contains from the standard library. Consider using strings.Contains(s, substr) directly throughout the tests.

Example replacement:

if !strings.Contains(err.Error(), c.expectedErrorMsg) {
    t.Errorf("expected error to contain %q, got %q", c.expectedErrorMsg, err.Error())
}
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between b6eb31d and 043659b.

📒 Files selected for processing (12)
  • pkg/cloudevents/generic/options/builder/optionsbuilder_test.go (5 hunks)
  • pkg/cloudevents/generic/options/pubsub/agentoptions.go (1 hunks)
  • pkg/cloudevents/generic/options/pubsub/agentoptions_test.go (1 hunks)
  • pkg/cloudevents/generic/options/pubsub/codec.go (1 hunks)
  • pkg/cloudevents/generic/options/pubsub/options.go (1 hunks)
  • pkg/cloudevents/generic/options/pubsub/options_test.go (1 hunks)
  • pkg/cloudevents/generic/options/pubsub/sourceoptions.go (1 hunks)
  • pkg/cloudevents/generic/options/pubsub/sourceoptions_test.go (1 hunks)
  • pkg/cloudevents/generic/options/pubsub/transport.go (1 hunks)
  • pkg/cloudevents/generic/options/pubsub/transport_test.go (1 hunks)
  • test/integration/cloudevents/cloudevents_test.go (3 hunks)
  • test/integration/cloudevents/suite_test.go (4 hunks)
🚧 Files skipped from review as they are similar to previous changes (3)
  • pkg/cloudevents/generic/options/pubsub/codec.go
  • test/integration/cloudevents/cloudevents_test.go
  • pkg/cloudevents/generic/options/builder/optionsbuilder_test.go
🧰 Additional context used
🧠 Learnings (2)
📚 Learning: 2025-09-16T02:22:20.929Z
Learnt from: skeeey
Repo: open-cluster-management-io/sdk-go PR: 144
File: pkg/cloudevents/generic/options/grpc/protocol/protocol.go:200-213
Timestamp: 2025-09-16T02:22:20.929Z
Learning: In the GRPC CloudEvents protocol implementation, when startEventsReceiver encounters a stream error, it sends the error to reconnectErrorChan. The consumer of this channel handles the error by calling Close() on the protocol, which triggers close(p.closeChan), causing OpenInbound to unblock and call cancel() to properly terminate both the events receiver and heartbeat watcher goroutines.

Applied to files:

  • pkg/cloudevents/generic/options/pubsub/agentoptions.go
  • pkg/cloudevents/generic/options/pubsub/sourceoptions.go
  • pkg/cloudevents/generic/options/pubsub/transport.go
📚 Learning: 2025-09-16T06:03:45.232Z
Learnt from: skeeey
Repo: open-cluster-management-io/sdk-go PR: 144
File: pkg/cloudevents/server/grpc/broker.go:169-194
Timestamp: 2025-09-16T06:03:45.232Z
Learning: In the Open Cluster Management gRPC CloudEvents implementation, client reconnections due to heartbeat timeout are considered acceptable behavior by the maintainer skeeey, rather than something that needs optimization to prevent.

Applied to files:

  • pkg/cloudevents/generic/options/pubsub/sourceoptions.go
🧬 Code graph analysis (8)
pkg/cloudevents/generic/options/pubsub/agentoptions.go (2)
pkg/cloudevents/generic/options/pubsub/options.go (1)
  • PubSubOptions (17-25)
pkg/cloudevents/generic/options/options.go (1)
  • CloudEventsAgentOptions (73-86)
pkg/cloudevents/generic/options/pubsub/sourceoptions_test.go (3)
pkg/cloudevents/generic/options/pubsub/options.go (1)
  • PubSubOptions (17-25)
pkg/cloudevents/generic/types/types.go (1)
  • Topics (114-158)
pkg/cloudevents/generic/options/pubsub/sourceoptions.go (1)
  • NewSourceOptions (8-18)
pkg/cloudevents/generic/options/pubsub/sourceoptions.go (2)
pkg/cloudevents/generic/options/pubsub/options.go (1)
  • PubSubOptions (17-25)
pkg/cloudevents/generic/options/options.go (1)
  • CloudEventsSourceOptions (59-70)
pkg/cloudevents/generic/options/pubsub/agentoptions_test.go (3)
pkg/cloudevents/generic/options/pubsub/options.go (1)
  • PubSubOptions (17-25)
pkg/cloudevents/generic/types/types.go (1)
  • Topics (114-158)
pkg/cloudevents/generic/options/pubsub/agentoptions.go (1)
  • NewAgentOptions (8-19)
pkg/cloudevents/generic/options/pubsub/options.go (1)
pkg/cloudevents/generic/types/types.go (3)
  • Topics (114-158)
  • PubSubTopicPattern (109-109)
  • PubSubSubscriptionPattern (110-110)
pkg/cloudevents/generic/options/pubsub/transport_test.go (1)
pkg/cloudevents/generic/options/pubsub/options.go (2)
  • KeepaliveSettings (58-67)
  • ReceiveSettings (70-89)
pkg/cloudevents/generic/options/pubsub/options_test.go (3)
pkg/cloudevents/generic/options/pubsub/options.go (6)
  • PubSubOptions (17-25)
  • KeepaliveSettings (58-67)
  • ReceiveSettings (70-89)
  • BuildPubSubOptionsFromFlags (107-146)
  • PubSubConfig (28-55)
  • LoadConfig (92-104)
pkg/cloudevents/generic/types/types.go (1)
  • Topics (114-158)
pkg/testing/util.go (1)
  • WriteToTempFile (10-21)
pkg/cloudevents/generic/options/pubsub/transport.go (4)
pkg/cloudevents/generic/options/options.go (2)
  • CloudEventTransport (21-56)
  • ReceiveHandlerFn (13-13)
pkg/cloudevents/generic/options/pubsub/options.go (3)
  • PubSubOptions (17-25)
  • KeepaliveSettings (58-67)
  • ReceiveSettings (70-89)
pkg/cloudevents/generic/types/types.go (3)
  • Topics (114-158)
  • ParseCloudEventsType (254-275)
  • ResyncRequestAction (41-41)
pkg/cloudevents/generic/options/pubsub/codec.go (2)
  • Encode (38-79)
  • Decode (96-177)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
  • GitHub Check: unit
  • GitHub Check: integration
  • GitHub Check: verify
🔇 Additional comments (8)
test/integration/cloudevents/suite_test.go (1)

84-86: LGTM! Pub/Sub test server lifecycle properly integrated.

The test server initialization and cleanup are correctly implemented, following the same pattern as the MQTT broker. The random project ID generation ensures test isolation.

Also applies to: 180-182

pkg/cloudevents/generic/options/pubsub/options.go (2)

106-146: LGTM! Comprehensive configuration loading with sensible defaults.

The function properly validates required fields, applies default keepalive settings (5m/20s), and allows config-based overrides. Error handling and validation flow are correct.


148-227: LGTM! Thorough topic and subscription validation.

The validation logic correctly enforces source/agent topic-subscription requirements and validates naming patterns against the project ID using regex. Error aggregation provides clear feedback for multiple validation failures.

pkg/cloudevents/generic/options/pubsub/sourceoptions.go (1)

7-18: LGTM! Source options constructor properly structured.

The constructor correctly initializes CloudEventsSourceOptions with a pubsubTransport embedding the provided configuration and sourceID.

pkg/cloudevents/generic/options/pubsub/agentoptions_test.go (1)

9-153: LGTM! Comprehensive test coverage for agent options.

The tests thoroughly validate the NewAgentOptions constructor and transport structure across multiple scenarios, including configurations with and without credentials. Assertions correctly verify initialization state before Connect.

pkg/cloudevents/generic/options/pubsub/options_test.go (1)

81-268: LGTM! Excellent test coverage for Pub/Sub options.

The tests comprehensively validate configuration loading, topic/subscription validation, and default/override behavior across multiple scenarios. Error cases are well-covered with specific assertions.

Also applies to: 270-349, 351-529, 531-611

pkg/cloudevents/generic/options/pubsub/transport_test.go (1)

11-84: LGTM! Thorough validation of conversion functions.

The tests comprehensively validate the keepalive and receive settings conversion functions across all scenarios (all fields, partial, zero values) with precise field-by-field assertions.

Also applies to: 86-176

pkg/cloudevents/generic/options/pubsub/sourceoptions_test.go (1)

9-146: LGTM! Comprehensive test coverage for source options.

The tests thoroughly validate the NewSourceOptions constructor and transport structure, mirroring the agent options test coverage. Assertions correctly verify configuration and initialization state.

Signed-off-by: morvencao <[email protected]>
@morvencao morvencao force-pushed the br_add_pubsub_support branch from 043659b to 87053df Compare November 11, 2025 03:51
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (1)
pkg/cloudevents/generic/options/pubsub/transport.go (1)

185-187: Close must close errorChan to honor the transport contract.

The CloudEventTransport interface documentation states that "The channel is closed when Close() is called on the transport." Currently, only the Pub/Sub client is closed, leaving errorChan open. Consumers waiting for the channel to close (e.g., using range or checking if closed) will block indefinitely. This issue was previously flagged but remains unaddressed.

Apply this diff to close the error channel after client cleanup:

 func (o *pubsubTransport) Close(ctx context.Context) error {
-	return o.client.Close()
+	err := o.client.Close()
+	if o.errorChan != nil {
+		close(o.errorChan)
+	}
+	return err
 }

Based on learnings

🧹 Nitpick comments (2)
pkg/cloudevents/generic/options/pubsub/sourceoptions.go (1)

7-18: Consider buffering the error channel for consistency.

The error channel is created unbuffered (line 14), which means if errors are sent with a non-blocking select (as typically done in async handlers), they'll be dropped immediately if no reader is present. Based on past review discussion on agentoptions.go, buffering the channel would allow at least one error to be queued for the consumer.

Apply this diff:

 	return &options.CloudEventsSourceOptions{
 		CloudEventsTransport: &pubsubTransport{
 			PubSubOptions: *pubsubOptions,
 			sourceID:      sourceID,
-			errorChan:     make(chan error),
+			errorChan:     make(chan error, 1),
 		},
 		SourceID: sourceID,
 	}
pkg/cloudevents/generic/options/pubsub/agentoptions.go (1)

7-19: Consider buffering the error channel for consistency.

Similar to sourceoptions.go, the error channel is created unbuffered (line 14). If errors are sent with non-blocking select statements, they can be immediately dropped when no reader is present. Buffering with make(chan error, 1) would allow at least one error to be queued.

Apply this diff:

 	return &options.CloudEventsAgentOptions{
 		CloudEventsTransport: &pubsubTransport{
 			PubSubOptions: *pubsubOptions,
 			clusterName:   clusterName,
-			errorChan:     make(chan error),
+			errorChan:     make(chan error, 1),
 		},
 		AgentID:     agentID,
 		ClusterName: clusterName,
 	}
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 043659b and 87053df.

📒 Files selected for processing (13)
  • pkg/cloudevents/generic/options/builder/optionsbuilder_test.go (5 hunks)
  • pkg/cloudevents/generic/options/pubsub/agentoptions.go (1 hunks)
  • pkg/cloudevents/generic/options/pubsub/agentoptions_test.go (1 hunks)
  • pkg/cloudevents/generic/options/pubsub/codec.go (1 hunks)
  • pkg/cloudevents/generic/options/pubsub/codec_test.go (1 hunks)
  • pkg/cloudevents/generic/options/pubsub/options.go (1 hunks)
  • pkg/cloudevents/generic/options/pubsub/options_test.go (1 hunks)
  • pkg/cloudevents/generic/options/pubsub/sourceoptions.go (1 hunks)
  • pkg/cloudevents/generic/options/pubsub/sourceoptions_test.go (1 hunks)
  • pkg/cloudevents/generic/options/pubsub/transport.go (1 hunks)
  • pkg/cloudevents/generic/options/pubsub/transport_test.go (1 hunks)
  • test/integration/cloudevents/cloudevents_test.go (3 hunks)
  • test/integration/cloudevents/suite_test.go (4 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • test/integration/cloudevents/suite_test.go
🧰 Additional context used
🧠 Learnings (3)
📚 Learning: 2025-09-16T02:22:20.929Z
Learnt from: skeeey
Repo: open-cluster-management-io/sdk-go PR: 144
File: pkg/cloudevents/generic/options/grpc/protocol/protocol.go:200-213
Timestamp: 2025-09-16T02:22:20.929Z
Learning: In the GRPC CloudEvents protocol implementation, when startEventsReceiver encounters a stream error, it sends the error to reconnectErrorChan. The consumer of this channel handles the error by calling Close() on the protocol, which triggers close(p.closeChan), causing OpenInbound to unblock and call cancel() to properly terminate both the events receiver and heartbeat watcher goroutines.

Applied to files:

  • pkg/cloudevents/generic/options/pubsub/sourceoptions.go
  • pkg/cloudevents/generic/options/pubsub/agentoptions.go
  • test/integration/cloudevents/cloudevents_test.go
  • pkg/cloudevents/generic/options/pubsub/transport.go
📚 Learning: 2025-09-16T06:03:45.232Z
Learnt from: skeeey
Repo: open-cluster-management-io/sdk-go PR: 144
File: pkg/cloudevents/server/grpc/broker.go:169-194
Timestamp: 2025-09-16T06:03:45.232Z
Learning: In the Open Cluster Management gRPC CloudEvents implementation, client reconnections due to heartbeat timeout are considered acceptable behavior by the maintainer skeeey, rather than something that needs optimization to prevent.

Applied to files:

  • pkg/cloudevents/generic/options/pubsub/sourceoptions.go
📚 Learning: 2025-09-12T06:49:25.727Z
Learnt from: qiujian16
Repo: open-cluster-management-io/sdk-go PR: 140
File: pkg/cloudevents/generic/options/grpc/protocol/protocol.go:186-197
Timestamp: 2025-09-12T06:49:25.727Z
Learning: In the gRPC health check implementation, Canceled/DeadlineExceeded errors are intentionally handled externally by connection management components. The health check component has a focused responsibility of only monitoring server reachability, not handling connection-level issues.

Applied to files:

  • pkg/cloudevents/generic/options/pubsub/transport.go
🧬 Code graph analysis (12)
pkg/cloudevents/generic/options/pubsub/sourceoptions.go (2)
pkg/cloudevents/generic/options/pubsub/options.go (1)
  • PubSubOptions (17-25)
pkg/cloudevents/generic/options/options.go (1)
  • CloudEventsSourceOptions (59-70)
pkg/cloudevents/generic/options/pubsub/codec.go (2)
vendor/github.com/cloudevents/sdk-go/v2/alias.go (2)
  • VersionV1 (69-69)
  • NewEvent (109-109)
vendor/github.com/cloudevents/sdk-go/v2/types/value.go (1)
  • IsZero (305-314)
pkg/cloudevents/generic/options/pubsub/agentoptions_test.go (3)
pkg/cloudevents/generic/options/pubsub/options.go (1)
  • PubSubOptions (17-25)
pkg/cloudevents/generic/types/types.go (1)
  • Topics (114-158)
pkg/cloudevents/generic/options/pubsub/agentoptions.go (1)
  • NewAgentOptions (8-19)
pkg/cloudevents/generic/options/builder/optionsbuilder_test.go (5)
pkg/cloudevents/generic/options/pubsub/options.go (2)
  • PubSubOptions (17-25)
  • KeepaliveSettings (58-67)
pkg/cloudevents/generic/types/types.go (2)
  • Topics (114-158)
  • CloudEventsDataType (210-214)
pkg/cloudevents/generic/options/mqtt/options.go (2)
  • MQTTOptions (68-77)
  • MQTTDialer (30-36)
pkg/cloudevents/generic/options/grpc/options.go (3)
  • GRPCOptions (114-118)
  • GRPCDialer (30-37)
  • KeepAliveOptions (40-45)
pkg/cloudevents/generic/options/builder/optionsbuilder.go (3)
  • NewConfigLoader (26-31)
  • BuildCloudEventsSourceOptions (63-75)
  • BuildCloudEventsAgentOptions (78-90)
pkg/cloudevents/generic/options/pubsub/options_test.go (3)
pkg/cloudevents/generic/options/pubsub/options.go (6)
  • PubSubOptions (17-25)
  • KeepaliveSettings (58-67)
  • ReceiveSettings (70-89)
  • BuildPubSubOptionsFromFlags (107-146)
  • PubSubConfig (28-55)
  • LoadConfig (92-104)
pkg/cloudevents/generic/types/types.go (1)
  • Topics (114-158)
pkg/testing/util.go (1)
  • WriteToTempFile (10-21)
pkg/cloudevents/generic/options/pubsub/agentoptions.go (2)
pkg/cloudevents/generic/options/pubsub/options.go (1)
  • PubSubOptions (17-25)
pkg/cloudevents/generic/options/options.go (1)
  • CloudEventsAgentOptions (73-86)
test/integration/cloudevents/cloudevents_test.go (1)
pkg/cloudevents/constants/constants.go (1)
  • ConfigTypePubSub (6-6)
pkg/cloudevents/generic/options/pubsub/sourceoptions_test.go (3)
pkg/cloudevents/generic/options/pubsub/options.go (1)
  • PubSubOptions (17-25)
pkg/cloudevents/generic/types/types.go (1)
  • Topics (114-158)
pkg/cloudevents/generic/options/pubsub/sourceoptions.go (1)
  • NewSourceOptions (8-18)
pkg/cloudevents/generic/options/pubsub/options.go (1)
pkg/cloudevents/generic/types/types.go (3)
  • Topics (114-158)
  • PubSubTopicPattern (109-109)
  • PubSubSubscriptionPattern (110-110)
pkg/cloudevents/generic/options/pubsub/transport.go (4)
pkg/cloudevents/generic/options/options.go (2)
  • CloudEventTransport (21-56)
  • ReceiveHandlerFn (13-13)
pkg/cloudevents/generic/options/pubsub/options.go (3)
  • PubSubOptions (17-25)
  • KeepaliveSettings (58-67)
  • ReceiveSettings (70-89)
pkg/cloudevents/generic/types/types.go (3)
  • Topics (114-158)
  • ParseCloudEventsType (254-275)
  • ResyncRequestAction (41-41)
pkg/cloudevents/generic/options/pubsub/codec.go (2)
  • Encode (38-79)
  • Decode (96-177)
pkg/cloudevents/generic/options/pubsub/transport_test.go (1)
pkg/cloudevents/generic/options/pubsub/options.go (2)
  • KeepaliveSettings (58-67)
  • ReceiveSettings (70-89)
pkg/cloudevents/generic/options/pubsub/codec_test.go (2)
pkg/cloudevents/generic/options/pubsub/codec.go (2)
  • Encode (38-79)
  • Decode (96-177)
vendor/github.com/cloudevents/sdk-go/v2/types/value.go (1)
  • IsZero (305-314)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
  • GitHub Check: unit
  • GitHub Check: verify
  • GitHub Check: integration
🔇 Additional comments (27)
test/integration/cloudevents/cloudevents_test.go (2)

12-13: LGTM - Pub/Sub integration follows existing patterns.

The imports and test case structure are consistent with the existing MQTT and GRPC implementations. The synchronous topic/subscription setup before the test ensures proper initialization.

Also applies to: 18-21, 86-89


259-337: LGTM - Defer placement is correct for setup-only connections.

The past review concern about premature connection closure does not apply here. The defer statements (lines 265, 271) are scoped to setupTopicsAndSubscriptions, which completes execution before returning to BeforeEach. These connections are only needed to create topics and subscriptions during setup, not during test execution. The actual tests use separate connections initialized via agentOptions and sourceOptions.

The implementation correctly:

  • Checks for existing topics/subscriptions before creating
  • Uses appropriate filters for targeted subscriptions (ce-clustername for agents, ce-originalsource for sources)
  • Follows Pub/Sub naming conventions
pkg/cloudevents/generic/options/builder/optionsbuilder_test.go (2)

95-115: LGTM - Comprehensive test coverage for Pub/Sub options.

The test cases correctly validate:

  • Source configuration with source topics (publish) and agent subscriptions (receive)
  • Agent configuration with agent topics (publish) and source subscriptions (receive)
  • Default keepalive settings (5min time, 20s timeout, false permitWithoutStream)

The role-based topic/subscription separation is properly tested.

Also applies to: 161-181


200-246: Helper functions provide adequate validation.

The JSON marshaling and string containment approach effectively validates that the builder chain preserves configuration data through the option construction process. This integration-level verification complements the unit tests.

pkg/cloudevents/generic/options/pubsub/options.go (4)

16-89: Well-structured configuration types.

The type definitions appropriately:

  • Use pointer types for Topics/Subscriptions to enable nil detection during validation
  • Include comprehensive field documentation with defaults and requirements
  • Use time.Duration for duration fields, which correctly parses YAML/JSON time strings

91-104: LGTM - Standard configuration loading.

The function follows the standard pattern for loading YAML configuration files with appropriate error handling.


106-146: LGTM - Proper configuration building with sensible defaults.

The function correctly:

  • Validates required fields (ProjectID)
  • Applies reasonable keepalive defaults (5min time, 20s timeout)
  • Allows config overrides while preserving defaults
  • Delegates validation to a dedicated function

148-227: Robust validation with proper security handling.

The validation function demonstrates strong design:

  • Correctly distinguishes source vs agent topic/subscription requirements
  • Uses regexp.QuoteMeta to safely interpolate projectID into patterns, preventing regex injection
  • Aggregates multiple validation errors for better user experience
  • Validates naming conventions match Pub/Sub patterns
pkg/cloudevents/generic/options/pubsub/agentoptions_test.go (2)

9-99: Comprehensive test coverage for agent options constructor.

The test validates all critical aspects:

  • Proper field assignment (AgentID, ClusterName)
  • Transport type and initialization
  • PubSubOptions embedding
  • Error channel initialization

Test cases cover both minimal and full configuration scenarios.


101-153: LGTM - Validates transport initialization state.

The test correctly verifies:

  • Agent-specific topics and subscriptions are properly assigned
  • Transport starts in an unconnected state (nil client and publishers/subscribers)

This ensures the transport is ready for connection without premature initialization.

pkg/cloudevents/generic/options/pubsub/options_test.go (3)

82-269: Excellent test coverage for configuration building.

The test suite comprehensively validates:

  • Required field enforcement
  • Source vs agent configuration patterns
  • Format validation for topics and subscriptions
  • Both JSON and YAML parsing
  • Default value application
  • Error message specificity

The extensive test cases ensure robust configuration handling.


271-530: LGTM - Thorough validation edge case coverage.

The validation tests comprehensively cover:

  • Nil/missing required fields
  • Role-specific requirements (source vs agent)
  • Format validation with various project ID patterns
  • Special character handling (hyphens, UUIDs)

This ensures the validation logic is robust and handles real-world scenarios.


532-612: LGTM - Validates keepalive configuration behavior.

The tests correctly verify:

  • Default keepalive settings are applied when not specified
  • Custom settings properly override defaults
  • Each keepalive parameter is independently configurable
pkg/cloudevents/generic/options/pubsub/transport_test.go (2)

11-84: LGTM - Comprehensive keepalive conversion testing.

The test validates the conversion function with:

  • All fields populated
  • Partial field scenarios (only Time or Timeout)
  • Zero value handling

Field-by-field assertions ensure correct mapping between types.


86-176: LGTM - Complete receive settings conversion validation.

The test thoroughly validates:

  • All six receive settings fields
  • Partial configuration handling
  • Zero value behavior

Individual field assertions with descriptive messages ensure robustness.

pkg/cloudevents/generic/options/pubsub/sourceoptions_test.go (2)

9-92: LGTM! Comprehensive constructor tests.

The test cases thoroughly validate NewSourceOptions, covering both local development and production scenarios with credentials. All relevant fields are checked, including the transport structure and error channel initialization.


94-146: LGTM! Thorough pre-Connect state validation.

This test correctly verifies that the transport's topics, subscriptions, and internal fields are properly initialized before Connect is invoked. The nil checks for client/publishers/subscribers align with the expected transport lifecycle.

pkg/cloudevents/generic/options/pubsub/transport.go (5)

42-93: LGTM! Well-structured Connect implementation.

The method properly handles credentials, endpoint configuration, keepalive settings, and conditionally initializes publishers/subscribers based on whether this is a source or agent transport. The error handling and configuration flow are appropriate.


95-117: LGTM! Correct Send implementation.

The method properly encodes events, determines the appropriate publisher based on event type (resync vs. regular), and blocks until the publish result is available. Error handling is appropriate.


119-183: LGTM! Context cancellation is now handled correctly.

The implementation properly starts subscriber goroutines and handles both normal and resync events. The context cancellation checks on lines 135-136 and 165-166 correctly prevent sending expected shutdown errors to errorChan, addressing the concern from the previous review. The non-blocking error channel sends with fallback logging are appropriate.


189-191: LGTM! Correct ErrorChan implementation.

The method correctly returns the read-only error channel as specified by the CloudEventTransport interface.


193-232: LGTM! Clean helper functions.

Both toGRPCKeepaliveParameter and toPubSubReceiveSettings properly convert internal settings to their respective external types, applying values only when they are explicitly set (> 0).

pkg/cloudevents/generic/options/pubsub/codec_test.go (3)

12-166: LGTM! Comprehensive Encode test coverage.

The test cases thoroughly validate encoding across multiple scenarios: required attributes, optional attributes (subject, dataschema, time), extensions (clustername, resourceid, resourceversion), and events without data. Each case includes appropriate validation of both the data payload and CloudEvents attributes.


168-361: LGTM! Thorough Decode test coverage.

The test suite comprehensively validates decoding across valid messages (with required, optional, and extension attributes) and error scenarios (nil message, missing required attributes). The tests correctly verify that non-ce-prefixed attributes are ignored and that the spec version defaults to V1 when absent.


363-419: LGTM! Comprehensive round-trip validation.

The round-trip test ensures data integrity across the encode/decode cycle, validating all attributes, extensions, and data payload. The time truncation to avoid precision issues is a good practice. This test provides confidence that the codec preserves all CloudEvents information.

pkg/cloudevents/generic/options/pubsub/codec.go (2)

38-79: LGTM! Correct Encode implementation.

The function properly serializes CloudEvents to Pub/Sub messages, correctly handling required and optional attributes with the ce- prefix. The special handling of datacontenttype as Content-Type (without prefix) aligns with CloudEvents HTTP protocol binding conventions. Error handling and empty value filtering are appropriate.


96-177: LGTM! Correct Decode implementation.

The function properly reconstructs CloudEvents from Pub/Sub messages, handling nil input, spec version defaults, context attributes, and extensions. The default to ApplicationJSON when no content type is present is a reasonable fallback. Required attribute validation ensures message integrity. The special handling of Content-Type correctly mirrors the Encode behavior.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants