Skip to content

add integration test for pubsub.#455

Merged
clyang82 merged 4 commits intoopenshift-online:mainfrom
morvencao:br_add_pubsub_integration
Jan 13, 2026
Merged

add integration test for pubsub.#455
clyang82 merged 4 commits intoopenshift-online:mainfrom
morvencao:br_add_pubsub_integration

Conversation

@morvencao
Copy link
Contributor

No description provided.

@coderabbitai
Copy link

coderabbitai bot commented Dec 26, 2025

Walkthrough

Adds Google Cloud Pub/Sub support and integration tests: Makefile variables/targets and CI pipeline changes for emulator lifecycle, per-consumer Pub/Sub config generation and subscription creation in tests, go.mod dependency updates, a new emulator init script, and small test concurrency/broker-selection adjustments.

Changes

Cohort / File(s) Summary
Gitignore
\.gitignore
Add ignore entry for secrets/pubsub.config.
Makefile & targets
\Makefile``
Add Pub/Sub variables (pubsub_host, pubsub_port, pubsub_project_id, pubsub_config_file, pubsub_image, pubsub_integration_test_json_output, mqtt_image), extend MESSAGE_DRIVER_TYPE to include pubsub, add PHONY targets pubsub/setup, pubsub/init, pubsub/teardown, test-integration-pubsub, and wire Pub/Sub into run/test flows and help text.
Documentation
\README.md``
Document Pub/Sub emulator setup and usage (pubsub/setup, pubsub/init) and running service with MESSAGE_DRIVER_TYPE=pubsub; adjust run instructions and formatting.
Go modules
\go.mod``
Promote cloud.google.com/go/pubsub/v2 and google.golang.org/api from indirect to direct requires.
Test factories
\test/factories.go``
Add pubSubConfig type, parsePubSubConfig, and createPubSubSubscriptions to create topics/subscriptions and produce per-consumer Pub/Sub agent config files; add Pub/Sub v2 client usage and imports.
Test helper & wiring
\test/helper.go``
Replace BROKER with MESSAGE_DRIVER_TYPE/MESSAGE_DRIVER_CONFIG, export many Helper fields, add consumerPubSubConfigMap, getConsumerPubSubConfig, cleanupTempFiles, branch start flows for mqtt/pubsub/grpc, and adjust client construction calls.
Integration tests
\test/integration/status_dispatcher_test.go`, `test/integration/controller_test.go`, `test/integration/resource_test.go``
Change broker-skip logic to require MQTT for StatusDispatcher; make Maestro instance creation transactional (GORM tx + OnConflict) to avoid races; add a short sleep in racing update test to allow DB locks to release.
CI pipeline
\.tekton/integration-test.yaml``
Add pubsub_test_output_file, generate secrets/pubsub.config, add Pub/Sub emulator sidecar, wait/init emulator, run pubsub integration tests and aggregate mqtt/pubsub/grpc outputs.
Init script
\hack/init-pubsub-emulator.sh``
New script to initialize the Pub/Sub emulator: wait for emulator readiness, create topics and subscriptions (including optional per-consumer subscriptions with filters), and surface errors on failure.

Sequence Diagram(s)

sequenceDiagram
  participant Dev as Developer
  participant Make as Makefile
  participant Secrets as "secrets/pubsub.config"
  participant PubSub as "Pub/Sub Emulator (container)"
  participant Init as "hack/init-pubsub-emulator.sh"
  participant Maestro as "Maestro server / test runner"

  Dev->>Make: run pubsub/setup
  Make->>Secrets: write secrets/pubsub.config
  Make->>PubSub: start emulator container (pubsub_image)
  Make->>Init: run pubsub/init
  Init->>PubSub: create topics & subscriptions via emulator API
  Dev->>Make: run test-integration (includes pubsub)
  Make->>Maestro: start with MESSAGE_DRIVER_TYPE=pubsub and config file
  Maestro->>PubSub: connect, publish/subscribe during tests
  Maestro->>Make: emit test results (pubsub_integration_test_json_output)
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

🚥 Pre-merge checks | ✅ 1 | ❌ 2
❌ Failed checks (1 warning, 1 inconclusive)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 54.55% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
Description check ❓ Inconclusive No pull request description was provided by the author, making it impossible to evaluate whether the description relates to the changeset. Add a description explaining the purpose and scope of the Pub/Sub integration test changes, including how to run the new tests and any new dependencies required.
✅ Passed checks (1 passed)
Check name Status Explanation
Title check ✅ Passed The title 'add integration test for pubsub' accurately summarizes the main change in the changeset, which adds Pub/Sub integration test support across multiple files including the Makefile, test helpers, and configuration scripts.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
test/helper.go (1)

370-376: Standardize generic type parameter usage for NewCloudEventSourceClient.

The function call at line 370 omits the explicit generic type parameter [*api.Resource], relying on type inference from the assignment to helper.GRPCSourceClient. While this compiles correctly, the codebase is inconsistent: pkg/client/cloudevents/source_client.go explicitly specifies [*api.Resource] for the same function. Either standardize the type parameter across all call sites for consistency, or document why type inference is intentionally used here.

🧹 Nitpick comments (4)
hack/init-pubsub-emulator.py (1)

19-66: Consider catching more specific exception types.

While broad exception catching is flagged by the linter, it may be acceptable for an initialization script. However, you could improve specificity by catching google.api_core.exceptions.GoogleAPICallError instead of the base Exception type.

🔎 More specific exception handling
 from google.cloud import pubsub_v1
-from google.api_core import exceptions
+from google.api_core import exceptions as gcp_exceptions

         except exceptions.AlreadyExists:
             print(f"  - Topic already exists: {topic_name}")
-        except Exception as e:
+        except gcp_exceptions.GoogleAPICallError as e:
             print(f"  ✗ Error creating topic {topic_name}: {e}", file=sys.stderr)
             return False
test/factories.go (2)

525-525: Use helper.Ctx instead of context.Background() for proper cancellation.

The function should respect the test helper's context for proper cleanup and cancellation handling.

🔎 Proposed fix
-	ctx := context.Background()
+	ctx := helper.Ctx

582-588: Replace string-based error checking with proper error type assertion.

Checking for "already exists" in the error string is fragile and language-dependent.

🔎 Proposed fix
+import (
+	"google.golang.org/grpc/codes"
+	"google.golang.org/grpc/status"
+)

 		_, err = client.SubscriptionAdminClient.CreateSubscription(ctx, subConfig)
 		if err != nil {
-			// Ignore already exists errors
-			if !strings.Contains(err.Error(), "already exists") {
+			// Ignore already exists errors  
+			if st, ok := status.FromError(err); !ok || st.Code() != codes.AlreadyExists {
 				return "", fmt.Errorf("failed to create subscription %s: %w", sub.name, err)
 			}
 		}
Makefile (1)

449-463: Consider documenting Python dependencies for pubsub/init.

The pubsub/init target requires Python 3 and the google-cloud-pubsub package to be installed. While the README mentions this, the Makefile could also check for these dependencies or provide a clearer error message if they're missing.

🔎 Add dependency check
 .PHONY: pubsub/init
 pubsub/init:
+	@command -v python3 >/dev/null 2>&1 || { echo "Error: python3 is required but not installed."; exit 1; }
+	@python3 -c "import google.cloud.pubsub_v1" 2>/dev/null || { echo "Error: google-cloud-pubsub package not found. Install with: pip3 install google-cloud-pubsub"; exit 1; }
 	@echo "Initializing Pub/Sub emulator topics and subscriptions..."
 	@PUBSUB_EMULATOR_HOST=localhost:$(pubsub_port) PUBSUB_PROJECT_ID=$(pubsub_project_id) python3 hack/init-pubsub-emulator.py
📜 Review details

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Cache: Disabled due to data retention organization setting

Knowledge base: Disabled due to Reviews -> Disable Knowledge Base setting

📥 Commits

Reviewing files that changed from the base of the PR and between 0adb891 and f561473.

⛔ Files ignored due to path filters (1)
  • go.sum is excluded by !**/*.sum
📒 Files selected for processing (8)
  • .gitignore
  • Makefile
  • README.md
  • go.mod
  • hack/init-pubsub-emulator.py
  • test/factories.go
  • test/helper.go
  • test/integration/status_dispatcher_test.go
🧰 Additional context used
🧬 Code graph analysis (1)
test/factories.go (4)
pkg/config/config.go (1)
  • ReadFile (110-135)
test/helper.go (1)
  • Helper (72-96)
cmd/maestro/environments/types.go (2)
  • Env (14-25)
  • MessageBroker (35-36)
pkg/config/message_broker.go (1)
  • MessageBrokerConfig (9-16)
🪛 Ruff (0.14.10)
hack/init-pubsub-emulator.py

35-35: Do not catch blind exception: Exception

(BLE001)


62-62: Do not catch blind exception: Exception

(BLE001)


113-113: Do not catch blind exception: Exception

(BLE001)


145-145: Do not catch blind exception: Exception

(BLE001)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (7)
  • GitHub Check: Red Hat Konflux / maestro-on-pull-request
  • GitHub Check: Red Hat Konflux / maestro-e2e-on-pull-request
  • GitHub Check: e2e-with-istio
  • GitHub Check: e2e-broadcast-subscription
  • GitHub Check: e2e
  • GitHub Check: upgrade
  • GitHub Check: e2e-grpc-broker
🔇 Additional comments (7)
README.md (1)

45-79: LGTM! Clear documentation for Pub/Sub emulator setup.

The documentation updates clearly explain the Pub/Sub emulator option alongside the existing MQTT broker setup, with helpful notes about dependencies.

.gitignore (1)

50-50: LGTM! Correctly ignores runtime Pub/Sub configuration.

This follows the established pattern for MQTT configuration files.

test/factories.go (1)

294-306: LGTM! Appropriate error handling for test setup.

The code properly handles Pub/Sub subscription creation failures by logging a warning without failing the consumer creation, which is suitable for test infrastructure.

Makefile (2)

65-69: LGTM! Pub/Sub configuration variables follow established patterns.

The variable naming and defaults are consistent with the existing MQTT configuration.


258-261: LGTM! Pub/Sub integration test target is well-structured.

The target follows the same pattern as the MQTT integration tests and correctly configures the environment.

test/helper.go (1)

151-172: LGTM! Broker initialization correctly handles multiple broker types.

The switch statement appropriately configures different dispatchers for each broker type, with NoopDispatcher for Pub/Sub aligning with the test that skips StatusDispatcher for non-MQTT brokers.

go.mod (1)

6-6: Both Pub/Sub dependencies are current and secure with no known direct vulnerabilities.

  • cloud.google.com/go/pubsub/v2 v2.3.0: Current with zero direct vulnerabilities (verified via Snyk).
  • google.golang.org/api v0.255.0: No direct CVEs; v0.256.0 is available but v0.255.0 remains secure.
  • Transitive dependencies (gRPC v1.77.0, protobuf v1.36.10) are patched for known CVEs.

@morvencao morvencao force-pushed the br_add_pubsub_integration branch from f561473 to 3b439fd Compare December 26, 2025 04:33
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (1)
test/factories.go (1)

502-514: Consider adding validation for required config fields.

The function parses the JSON config but doesn't validate that required fields (ProjectID, Endpoint) are non-empty. This could lead to unclear errors later when the client is created.

🔎 Proposed validation
 	var config pubSubConfig
 	if err := json.Unmarshal(data, &config); err != nil {
 		return nil, fmt.Errorf("failed to parse pubsub config: %w", err)
 	}
+
+	if config.ProjectID == "" {
+		return nil, fmt.Errorf("projectID is required in pubsub config")
+	}
+	if config.Endpoint == "" {
+		return nil, fmt.Errorf("endpoint is required in pubsub config")
+	}
 
 	return &config, nil
📜 Review details

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Cache: Disabled due to data retention organization setting

Knowledge base: Disabled due to Reviews -> Disable Knowledge Base setting

📥 Commits

Reviewing files that changed from the base of the PR and between f561473 and 3b439fd.

⛔ Files ignored due to path filters (1)
  • go.sum is excluded by !**/*.sum
📒 Files selected for processing (8)
  • .gitignore
  • Makefile
  • README.md
  • go.mod
  • hack/init-pubsub-emulator.py
  • test/factories.go
  • test/helper.go
  • test/integration/status_dispatcher_test.go
🚧 Files skipped from review as they are similar to previous changes (2)
  • .gitignore
  • go.mod
🧰 Additional context used
🧬 Code graph analysis (1)
test/helper.go (2)
test/store.go (1)
  • MemoryStore (13-16)
pkg/config/message_broker.go (1)
  • MessageBrokerConfig (9-16)
🪛 Ruff (0.14.10)
hack/init-pubsub-emulator.py

35-35: Do not catch blind exception: Exception

(BLE001)


62-62: Do not catch blind exception: Exception

(BLE001)


113-113: Do not catch blind exception: Exception

(BLE001)


145-145: Do not catch blind exception: Exception

(BLE001)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (7)
  • GitHub Check: e2e-with-istio
  • GitHub Check: upgrade
  • GitHub Check: e2e-grpc-broker
  • GitHub Check: e2e
  • GitHub Check: e2e-broadcast-subscription
  • GitHub Check: Red Hat Konflux / maestro-on-pull-request
  • GitHub Check: Red Hat Konflux / maestro-e2e-on-pull-request
🔇 Additional comments (9)
README.md (1)

45-79: LGTM! Clear documentation for dual broker support.

The documentation effectively presents parallel workflows for both MQTT and Pub/Sub brokers, making it easy for users to choose their preferred message broker during setup and execution.

Also applies to: 114-121

test/factories.go (1)

294-306: LGTM! Graceful error handling for Pub/Sub setup.

The code appropriately logs a warning but continues when Pub/Sub subscription creation fails, allowing manual setup if needed. This is suitable for test environments where subscriptions might already exist or require manual configuration.

hack/init-pubsub-emulator.py (1)

1-147: LGTM! Well-structured initialization script.

The script appropriately handles errors with clear user feedback and graceful degradation (e.g., skipping already-existing resources). The broad exception handlers flagged by static analysis are acceptable in this CLI tool context, as they ensure user-friendly error messages rather than stack traces.

Makefile (1)

65-69: LGTM! Well-integrated Pub/Sub test infrastructure.

The Makefile additions cleanly integrate Pub/Sub testing alongside existing MQTT and gRPC paths, with consistent patterns for setup, teardown, and test execution. The explicit MESSAGE_BROKER_TYPE environment variable settings ensure test isolation.

Also applies to: 101-101, 250-261, 449-464

test/integration/status_dispatcher_test.go (1)

23-25: LGTM! Skip condition correctly reflects MQTT-only support.

The inverted condition and updated message accurately communicate that this test runs only with the MQTT broker and is skipped for all other brokers (gRPC and Pub/Sub). This addresses the concern from the past review comment.

test/helper.go (4)

117-125: LGTM! Clean environment-driven configuration.

The environment variable handling properly allows test control over broker type and config, with fallback to the application's default configuration. This pattern enables the separate test-integration-* targets in the Makefile.


163-168: LGTM! Pub/Sub uses NoopDispatcher appropriately.

Using NoopDispatcher for Pub/Sub is correct, as Pub/Sub's native filtering and subscriptions handle message routing (unlike MQTT which requires hash-based dispatch). The structure parallels the MQTT path for consistency.


201-205: LGTM! Clean accessor for consumer-specific config.

The helper method provides safe access to the consumer Pub/Sub config map with existence checking, enabling appropriate error handling at the call site.


326-335: Verify consumerPubSubConfigMap is populated before StartWorkAgent is called.

The code fatally errors if a consumer's Pub/Sub config isn't found in consumerPubSubConfigMap. Ensure that CreateConsumerWithLabels is called before StartWorkAgent for the same consumer when using Pub/Sub broker, as the map is only populated during consumer creation.

@morvencao morvencao force-pushed the br_add_pubsub_integration branch from 3b439fd to 9717fd8 Compare December 29, 2025 02:08
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
test/helper.go (1)

371-389: Add the generic type parameter [*api.Resource] to the NewCloudEventSourceClient call at line 371.

The ceclients.NewCloudEventSourceClient function requires a generic type parameter. Line 371 is missing [*api.Resource], which is required to match the Helper.GRPCSourceClient field type declaration (*ceclients.CloudEventSourceClient[*api.Resource]). This is confirmed by the correct usage in pkg/client/cloudevents/source_client.go which includes the type parameter.

Update line 371 to:

sourceClient, err := ceclients.NewCloudEventSourceClient[*api.Resource](
	helper.Ctx,
	grpcoptions.NewSourceOptions(grpcOptions, source, workpayload.ManifestBundleEventDataType),
	resourceStore,
	resourceStatusHashGetter,
	cloudevents.NewCodec(source),
)
🧹 Nitpick comments (1)
test/factories.go (1)

563-589: Consider using gRPC status codes for error checking.

The code at line 585 uses strings.Contains(err.Error(), "already exists") to detect if a subscription already exists. While this works, it's fragile and could break if error message formatting changes. Using proper gRPC status codes would be more robust:

🔎 Suggested improvement
+import (
+	"google.golang.org/grpc/codes"
+	"google.golang.org/grpc/status"
+)
+
 	for _, sub := range subscriptions {
 		// Check if subscription already exists
 		_, err := client.SubscriptionAdminClient.GetSubscription(ctx, &pubsubpb.GetSubscriptionRequest{
 			Subscription: sub.name,
 		})
 		if err == nil {
 			// Subscription already exists, skip
 			continue
 		}
 
 		// Create the subscription
 		subConfig := &pubsubpb.Subscription{
 			Name:  sub.name,
 			Topic: sub.topic,
 		}
 		if sub.filter != "" {
 			subConfig.Filter = sub.filter
 		}
 
 		_, err = client.SubscriptionAdminClient.CreateSubscription(ctx, subConfig)
 		if err != nil {
-			// Ignore already exists errors
-			if !strings.Contains(err.Error(), "already exists") {
+			// Ignore already exists errors using proper gRPC status codes
+			if st, ok := status.FromError(err); !ok || st.Code() != codes.AlreadyExists {
 				return "", fmt.Errorf("failed to create subscription %s: %w", sub.name, err)
 			}
 		}
 	}
📜 Review details

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Cache: Disabled due to data retention organization setting

Knowledge base: Disabled due to Reviews -> Disable Knowledge Base setting

📥 Commits

Reviewing files that changed from the base of the PR and between 3b439fd and 9717fd8.

⛔ Files ignored due to path filters (1)
  • go.sum is excluded by !**/*.sum
📒 Files selected for processing (8)
  • .gitignore
  • Makefile
  • README.md
  • go.mod
  • hack/init-pubsub-emulator.py
  • test/factories.go
  • test/helper.go
  • test/integration/status_dispatcher_test.go
🚧 Files skipped from review as they are similar to previous changes (3)
  • go.mod
  • README.md
  • test/integration/status_dispatcher_test.go
🧰 Additional context used
🧬 Code graph analysis (2)
test/factories.go (3)
pkg/config/config.go (1)
  • ReadFile (110-135)
test/helper.go (1)
  • Helper (72-96)
pkg/config/message_broker.go (1)
  • MessageBrokerConfig (9-16)
test/helper.go (1)
test/store.go (1)
  • MemoryStore (13-16)
🪛 Ruff (0.14.10)
hack/init-pubsub-emulator.py

35-35: Do not catch blind exception: Exception

(BLE001)


62-62: Do not catch blind exception: Exception

(BLE001)


113-113: Do not catch blind exception: Exception

(BLE001)


145-145: Do not catch blind exception: Exception

(BLE001)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (7)
  • GitHub Check: Red Hat Konflux / maestro-e2e-on-pull-request
  • GitHub Check: Red Hat Konflux / maestro-on-pull-request
  • GitHub Check: e2e
  • GitHub Check: e2e-broadcast-subscription
  • GitHub Check: upgrade
  • GitHub Check: e2e-grpc-broker
  • GitHub Check: e2e-with-istio
🔇 Additional comments (20)
.gitignore (1)

50-50: LGTM!

The addition correctly excludes the runtime-generated Pub/Sub configuration file from version control, following the same pattern as the MQTT config on line 49.

test/factories.go (4)

11-11: LGTM!

The new imports correctly support the Pub/Sub v2 integration for test consumer setup. All library versions align with the declared dependencies.

Also applies to: 14-15, 21-21, 24-24


294-307: LGTM!

The Pub/Sub subscription creation logic is well-integrated into the consumer creation flow. The error handling is appropriate for a test environment—logging warnings without failing consumer creation allows tests to proceed even if subscription setup encounters issues.


494-514: LGTM!

The configuration parsing logic is straightforward and properly handles errors. The struct fields match the config file format created by the Makefile's pubsub/setup target.


591-623: LGTM!

The agent config file creation and temporary file handling are correctly implemented. The temporary files are properly cleaned up via the cleanupTempFiles function in test/helper.go (lines 427-434), which was added to address earlier review feedback.

hack/init-pubsub-emulator.py (3)

19-66: LGTM!

The server topics and subscriptions initialization logic is well-structured with clear user feedback. The code properly handles AlreadyExists exceptions and provides informative output.

Regarding the static analysis warning about catching bare Exception at line 35: for a CLI initialization script that needs to report all errors to users and exit gracefully, this pattern is acceptable. The function handles the specific AlreadyExists case separately and uses the catch-all for truly unexpected errors.


69-117: LGTM!

The agent subscription initialization correctly creates per-consumer subscriptions with appropriate filters. The filter expression at line 79 matches the filter used in test/factories.go (line 554), ensuring consistency between the Python initialization script and the Go test code.


120-147: LGTM!

The main function flow is well-organized, with clear separation between server and optional agent initialization. Environment variable handling with sensible defaults makes the script flexible for different deployment scenarios. The top-level exception handler ensures any unexpected errors are reported to the user with appropriate exit codes.

Makefile (5)

65-69: LGTM!

The Pub/Sub configuration variables follow the established pattern for MQTT variables and provide sensible defaults for local development with the emulator.


250-261: LGTM with a note!

The Pub/Sub integration test target correctly follows the established pattern for MQTT and gRPC tests. Note that users must manually run make pubsub/setup and make pubsub/init before running make test-integration-pubsub, as the test target doesn't declare these as dependencies (consistent with the MQTT pattern).


449-453: LGTM!

The Pub/Sub emulator setup correctly creates the configuration file and starts the emulator container. The JSON structure in the config file matches the pubSubConfig struct in test/factories.go (lines 494-499).


455-458: LGTM!

The teardown target properly stops and removes the Pub/Sub emulator container, following the same pattern as the MQTT teardown.


460-463: LGTM!

The initialization target correctly invokes the Python script with the appropriate environment variables. Separating initialization from setup allows for re-initializing topics and subscriptions without restarting the emulator container.

test/helper.go (7)

33-33: LGTM!

The new Pub/Sub options import is correctly added and used in the StartWorkAgent function at line 332.


76-95: LGTM!

The Helper struct correctly exports its fields and adds the consumerPubSubConfigMap to track per-consumer Pub/Sub configuration file paths. This map is populated by the consumer creation logic in test/factories.go and cleaned up by the cleanupTempFiles function.


117-125: LGTM!

Reading the message broker configuration from environment variables enables flexible test execution with different broker types, as demonstrated by the test-integration-pubsub target in the Makefile (line 259).


151-172: LGTM!

The Pub/Sub broker case is correctly integrated into the helper initialization, using the NoopDispatcher which is appropriate for Pub/Sub's push/pull model. The cleanup function registration at line 181 ensures temporary config files are properly removed during teardown.


202-206: LGTM!

The accessor method follows the standard Go pattern for safe map access with existence checking, used by the StartWorkAgent function at line 327.


326-336: LGTM!

The Pub/Sub case correctly retrieves the per-consumer config path and builds the Pub/Sub options. The call to pubsub.BuildPubSubOptionsFromFlags at line 332 was confirmed to exist in the SDK by the developer in previous review feedback.


427-434: LGTM!

The cleanup function properly removes temporary Pub/Sub config files during teardown, correctly handling the case where files may have already been deleted with os.IsNotExist check.

@morvencao morvencao force-pushed the br_add_pubsub_integration branch from 9717fd8 to 7e146cb Compare December 29, 2025 02:43
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
test/factories.go (1)

583-588: Use gRPC status codes instead of string matching for error detection.

Line 585 uses strings.Contains(err.Error(), "already exists") to detect AlreadyExists errors. This string matching is fragile and language-dependent. Use gRPC status codes for robust error detection.

🔎 Proposed fix using gRPC status codes
+import (
+	"google.golang.org/grpc/codes"
+	"google.golang.org/grpc/status"
+)
+
 		_, err = client.SubscriptionAdminClient.CreateSubscription(ctx, subConfig)
 		if err != nil {
-			// Ignore already exists errors
-			if !strings.Contains(err.Error(), "already exists") {
+			// Ignore if subscription already exists
+			if st, ok := status.FromError(err); !ok || st.Code() != codes.AlreadyExists {
 				return "", fmt.Errorf("failed to create subscription %s: %w", sub.name, err)
 			}
 		}
📜 Review details

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Cache: Disabled due to data retention organization setting

Knowledge base: Disabled due to Reviews -> Disable Knowledge Base setting

📥 Commits

Reviewing files that changed from the base of the PR and between 9717fd8 and 7e146cb.

⛔ Files ignored due to path filters (1)
  • go.sum is excluded by !**/*.sum
📒 Files selected for processing (8)
  • .gitignore
  • Makefile
  • README.md
  • go.mod
  • hack/init-pubsub-emulator.py
  • test/factories.go
  • test/helper.go
  • test/integration/status_dispatcher_test.go
🚧 Files skipped from review as they are similar to previous changes (1)
  • .gitignore
🧰 Additional context used
🧬 Code graph analysis (2)
test/factories.go (3)
pkg/config/config.go (1)
  • ReadFile (110-135)
test/helper.go (1)
  • Helper (72-96)
pkg/config/message_broker.go (1)
  • MessageBrokerConfig (9-16)
test/helper.go (2)
test/store.go (1)
  • MemoryStore (13-16)
pkg/config/message_broker.go (1)
  • MessageBrokerConfig (9-16)
🪛 Ruff (0.14.10)
hack/init-pubsub-emulator.py

35-35: Do not catch blind exception: Exception

(BLE001)


62-62: Do not catch blind exception: Exception

(BLE001)


113-113: Do not catch blind exception: Exception

(BLE001)


145-145: Do not catch blind exception: Exception

(BLE001)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (7)
  • GitHub Check: Red Hat Konflux / maestro-e2e-on-pull-request
  • GitHub Check: Red Hat Konflux / maestro-on-pull-request
  • GitHub Check: e2e-with-istio
  • GitHub Check: e2e
  • GitHub Check: e2e-grpc-broker
  • GitHub Check: e2e-broadcast-subscription
  • GitHub Check: upgrade
🔇 Additional comments (12)
test/integration/status_dispatcher_test.go (1)

23-24: LGTM! Skip condition correctly updated.

The test now correctly skips for any non-MQTT broker, and the skip message accurately reflects this. This aligns with the broader multi-broker support being added in this PR.

README.md (1)

45-79: LGTM! Clear documentation for Pub/Sub integration.

The README updates provide clear instructions for setting up and running with the Pub/Sub emulator alongside MQTT. The parallel structure (3a for MQTT, 3b for Pub/Sub) makes it easy for users to choose their preferred broker.

Also applies to: 114-121

test/factories.go (2)

294-306: LGTM! Pub/Sub integration properly integrated.

The Pub/Sub subscription creation and config file tracking is well-integrated into the consumer creation flow. The error handling logs warnings without failing consumer creation, which is appropriate for test scaffolding.


494-514: LGTM! Clean configuration parsing.

The pubSubConfig type and parsePubSubConfig helper provide clean abstraction for reading and validating Pub/Sub configuration files.

hack/init-pubsub-emulator.py (1)

1-147: LGTM! Well-structured Pub/Sub emulator initialization script.

The script provides clear error messages, handles AlreadyExists gracefully, and supports both server-side and per-consumer subscription initialization. The broad exception catches flagged by static analysis (lines 35, 62, 113, 145) are appropriate for this test/dev initialization script context, as they wrap specific operations and provide informative error messages to users.

Makefile (1)

65-70: LGTM! Comprehensive Pub/Sub integration in Makefile.

The Pub/Sub emulator support is well-integrated with:

  • Clear configuration variables matching existing patterns
  • Lifecycle targets (setup/teardown/init) mirroring MQTT structure
  • Dedicated integration test target with appropriate environment variables
  • Updated help text documenting new targets

The implementation is consistent with the existing MQTT and gRPC patterns.

Also applies to: 101-101, 144-146, 250-250, 258-261, 449-464

test/helper.go (5)

117-125: LGTM! Environment variable handling for message broker configuration.

The code properly reads MESSAGE_BROKER_TYPE and MESSAGE_BROKER_CONFIG environment variables and applies them to the configuration, enabling flexible broker selection in tests.


151-172: LGTM! Clean broker-specific initialization.

The switch statement cleanly separates initialization logic for MQTT, Pub/Sub, and gRPC brokers. The Pub/Sub path uses NoopDispatcher appropriately since Pub/Sub handles message filtering server-side via subscription filters.


326-342: LGTM! Proper per-consumer Pub/Sub configuration.

The StartWorkAgent function correctly retrieves per-consumer Pub/Sub config paths using the internal getConsumerPubSubConfig accessor and builds Pub/Sub options accordingly. The error handling appropriately fails fast if config is missing.


427-434: LGTM! Temp file cleanup properly implemented.

The cleanupTempFiles function iterates through the consumerPubSubConfigMap and removes temporary Pub/Sub config files, handling missing files gracefully. This addresses the cleanup concern raised in past reviews.


371-371: Removal of explicit generic type parameter is intentional and valid.

The call to ceclients.NewCloudEventSourceClient(...) on line 371 omits the explicit [*api.Resource] type parameter because Go 1.21+ supports generic type parameter inference. The type is correctly inferred from the assignment target helper.GRPCSourceClient (line 386), which is declared as *ceclients.CloudEventSourceClient[*api.Resource]. This pattern is consistent with other usage in the codebase (e.g., pkg/client/cloudevents/grpcsource/client.go), while other locations explicitly specify the type parameter for clarity. No action required.

go.mod (1)

6-6: The dependency versions specified are current and secure. cloud.google.com/go/pubsub/v2 v2.3.0 is among the latest releases (v2 GA released July 2025, with v2.3.0 being a recent stable patch). google.golang.org/api v0.255.0 is also current (v0.252.0 released Nov 25, 2025). No known CVEs exist for either library, and the direct transitive dependencies (google.golang.org/grpc v1.77.0 and google.golang.org/protobuf v1.36.10) are patched against historical vulnerabilities (CVE-2023-44487 and CVE-2024-24786).

@morvencao morvencao force-pushed the br_add_pubsub_integration branch from 7e146cb to 1a627a2 Compare January 7, 2026 13:52
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Fix all issues with AI agents
In @Makefile:
- Around line 253-261: The test-integration-mqtt Makefile target does not pass
MESSAGE_BROKER_CONFIG so MQTT integration tests won't see the generated config;
update the test-integration-mqtt target to include
MESSAGE_BROKER_CONFIG=$(mqtt_config_file) alongside MESSAGE_BROKER_TYPE=mqtt
(mirroring test-integration-pubsub) so the test process receives the config path
used by test/helper.go.
🧹 Nitpick comments (1)
test/factories.go (1)

563-589: Consider using gRPC status codes for error checking.

The error handling at lines 584-588 uses string matching (strings.Contains(err.Error(), "already exists")) to detect duplicate subscriptions. For better reliability and type safety, consider using gRPC status codes.

♻️ Refactor to use gRPC status codes

First, add the gRPC status and codes imports at the top of the file if not already present:

import (
    // ... existing imports ...
    "google.golang.org/grpc/codes"
    "google.golang.org/grpc/status"
)

Then update the error handling:

 		_, err = client.SubscriptionAdminClient.CreateSubscription(ctx, subConfig)
 		if err != nil {
-			// Ignore already exists errors
-			if !strings.Contains(err.Error(), "already exists") {
+			// Ignore if subscription already exists
+			if st, ok := status.FromError(err); !ok || st.Code() != codes.AlreadyExists {
 				return "", fmt.Errorf("failed to create subscription %s: %w", sub.name, err)
 			}
 		}
📜 Review details

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Cache: Disabled due to data retention organization setting

Knowledge base: Disabled due to Reviews -> Disable Knowledge Base setting

📥 Commits

Reviewing files that changed from the base of the PR and between 7e146cb and 1a627a2.

📒 Files selected for processing (8)
  • .gitignore
  • Makefile
  • README.md
  • go.mod
  • hack/init-pubsub-emulator.py
  • test/factories.go
  • test/helper.go
  • test/integration/status_dispatcher_test.go
🚧 Files skipped from review as they are similar to previous changes (2)
  • test/integration/status_dispatcher_test.go
  • go.mod
🧰 Additional context used
🧬 Code graph analysis (2)
test/factories.go (4)
pkg/config/config.go (1)
  • ReadFile (110-135)
test/helper.go (1)
  • Helper (72-96)
cmd/maestro/environments/types.go (2)
  • Env (14-25)
  • MessageBroker (35-36)
pkg/config/message_broker.go (1)
  • MessageBrokerConfig (9-16)
test/helper.go (3)
test/store.go (1)
  • MemoryStore (13-16)
pkg/dispatcher/dispatcher.go (1)
  • Dispatcher (11-16)
pkg/config/message_broker.go (1)
  • MessageBrokerConfig (9-16)
🪛 Ruff (0.14.10)
hack/init-pubsub-emulator.py

35-35: Do not catch blind exception: Exception

(BLE001)


62-62: Do not catch blind exception: Exception

(BLE001)


113-113: Do not catch blind exception: Exception

(BLE001)


145-145: Do not catch blind exception: Exception

(BLE001)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (7)
  • GitHub Check: Red Hat Konflux / maestro-on-pull-request
  • GitHub Check: Red Hat Konflux / maestro-e2e-on-pull-request
  • GitHub Check: e2e-broadcast-subscription
  • GitHub Check: upgrade
  • GitHub Check: e2e-with-istio
  • GitHub Check: e2e-grpc-broker
  • GitHub Check: e2e
🔇 Additional comments (23)
.gitignore (1)

50-50: LGTM!

The addition of secrets/pubsub.config to .gitignore is consistent with other broker configuration files already being ignored (mqtt.config, mqtt.password). This ensures PubSub configuration files are not accidentally committed to version control.

test/factories.go (4)

14-15: LGTM!

The new imports for PubSub v2 client libraries are correctly added to support the PubSub broker integration.

Also applies to: 21-21, 24-24


294-307: LGTM!

The PubSub subscription creation is properly integrated into the consumer creation flow. The error handling logs warnings without failing consumer creation, which is appropriate for test setup. The config path is correctly stored in consumerPubSubConfigMap for later cleanup.


494-514: LGTM!

The pubSubConfig type and parsePubSubConfig function properly read and parse the PubSub configuration file with clear error messages.


611-623: LGTM!

The temporary agent config file is created properly, and the path is returned to be stored in consumerPubSubConfigMap. The cleanup is handled by helper.cleanupTempFiles() in test/helper.go, which was added as part of this PR (lines 427-434).

hack/init-pubsub-emulator.py (1)

1-147: LGTM!

The PubSub emulator initialization script is well-structured with:

  • Clear separation between server and agent subscription initialization
  • Proper error handling with user-friendly output (✓, ✗, - symbols)
  • Graceful handling of AlreadyExists exceptions
  • Environment variable configuration with sensible defaults

Regarding the static analysis hints about broad exception catching: this is acceptable for an initialization script that needs to catch and report various failure modes (network errors, permission issues, etc.) in a user-friendly way. The script properly logs errors to stderr and exits with appropriate status codes.

test/helper.go (6)

76-96: LGTM!

Making the Helper struct fields public provides better access for tests. The addition of Broker field and consumerPubSubConfigMap properly supports the new PubSub broker integration.


117-125: LGTM!

Reading MESSAGE_BROKER_TYPE and MESSAGE_BROKER_CONFIG from environment variables allows tests to configure the broker type dynamically. This aligns with the Makefile targets for PubSub integration tests.


151-172: LGTM!

The broker-specific initialization properly routes MQTT, PubSub, and default (gRPC) brokers to their appropriate dispatchers and event servers. The PubSub case uses NoopDispatcher, which is appropriate since PubSub handles message distribution differently than MQTT.


202-206: LGTM!

The getConsumerPubSubConfig method provides thread-safe read access to the consumer PubSub config map, returning both the config path and an existence flag for proper error handling.


318-342: LGTM!

The StartWorkAgent method properly routes broker configuration based on broker type. The PubSub case retrieves the per-consumer config path from the map and builds options using the SDK's BuildPubSubOptionsFromFlags function (confirmed to exist in the SDK).


427-434: LGTM!

The cleanupTempFiles function properly removes temporary PubSub config files during teardown. It correctly handles the case where files may not exist (os.IsNotExist) and logs errors without failing the entire teardown process.

README.md (2)

45-79: LGTM!

The documentation clearly explains how to set up both MQTT and PubSub brokers, with step-by-step instructions. The note about requiring the google-cloud-pubsub Python package is helpful for users running the PubSub emulator initialization.


114-121: LGTM!

The instructions for running the service with PubSub emulator are clear and accurate, showing the required command-line flags (--message-broker-type and --message-broker-config-file) that align with the environment variable configuration in the test helper.

Makefile (9)

65-70: LGTM!

The Pub/Sub configuration variables follow the same pattern as the existing MQTT configuration and use appropriate defaults for local development.


88-89: LGTM!

Comment accurately reflects the new Pub/Sub message driver type option.


101-101: LGTM!

Test output file variable follows the established naming pattern.


140-146: LGTM!

Help text properly documents the new Pub/Sub targets. Note that users should run pubsub/setup before pubsub/init (the emulator must be running before initialization).


250-251: LGTM!

Comprehensive integration testing across all message broker types (MQTT, Pub/Sub, gRPC). Note that this will run all integration tests three times with different brokers, which may increase CI execution time.


263-266: LGTM!

The gRPC integration test target correctly uses --count=1 to disable test caching, which is appropriate for gRPC's stateful nature.


449-454: LGTM!

The Pub/Sub emulator setup is correctly configured for local development. The "insecure":true flag is appropriate for the local emulator environment.


455-459: LGTM!

Teardown target follows the established pattern for container cleanup.


460-464: Python dependencies for pubsub/init are already documented in README.

The google-cloud-pubsub package requirement is explicitly documented in README.md immediately before the make pubsub/init command instruction: "Note: Requires google-cloud-pubsub Python package (pip3 install google-cloud-pubsub)". No additional documentation is needed.

Likely an incorrect or invalid review comment.

@morvencao morvencao force-pushed the br_add_pubsub_integration branch from 1a627a2 to 094202b Compare January 7, 2026 14:05
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
test/factories.go (1)

563-589: Prefer gRPC status codes over string matching for error checking.

Line 585 uses strings.Contains(err.Error(), "already exists") to detect duplicate subscriptions, which is brittle and language-dependent. Use proper gRPC status codes instead.

♻️ Proposed refactor using gRPC status codes

Add these imports at the top of the file:

 import (
 	"context"
 	"crypto/tls"
 	"crypto/x509"
 	"encoding/json"
 	"fmt"
 	"os"
 	"slices"
 	"strings"
 	"time"

 	pubsubv2 "cloud.google.com/go/pubsub/v2"
 	"cloud.google.com/go/pubsub/v2/apiv1/pubsubpb"
 	cloudevents "github.com/cloudevents/sdk-go/v2"
 	"github.com/google/uuid"
 	"github.com/openshift-online/maestro/pkg/api"
 	"github.com/openshift-online/maestro/pkg/db"
 	"golang.org/x/oauth2"
 	"google.golang.org/api/option"
 	"google.golang.org/grpc"
+	"google.golang.org/grpc/codes"
 	"google.golang.org/grpc/credentials"
 	"google.golang.org/grpc/credentials/insecure"
 	"google.golang.org/grpc/credentials/oauth"
+	"google.golang.org/grpc/status"
 	"gorm.io/datatypes"

Then update the error handling:

 		_, err = client.SubscriptionAdminClient.CreateSubscription(ctx, subConfig)
 		if err != nil {
-			// Ignore already exists errors
-			if !strings.Contains(err.Error(), "already exists") {
+			// Ignore if subscription already exists
+			if st, ok := status.FromError(err); !ok || st.Code() != codes.AlreadyExists {
 				return "", fmt.Errorf("failed to create subscription %s: %w", sub.name, err)
 			}
 		}
📜 Review details

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Cache: Disabled due to data retention organization setting

Knowledge base: Disabled due to Reviews -> Disable Knowledge Base setting

📥 Commits

Reviewing files that changed from the base of the PR and between 1a627a2 and 094202b.

📒 Files selected for processing (8)
  • .gitignore
  • Makefile
  • README.md
  • go.mod
  • hack/init-pubsub-emulator.py
  • test/factories.go
  • test/helper.go
  • test/integration/status_dispatcher_test.go
✅ Files skipped from review due to trivial changes (1)
  • .gitignore
🚧 Files skipped from review as they are similar to previous changes (3)
  • go.mod
  • README.md
  • test/integration/status_dispatcher_test.go
🧰 Additional context used
🧬 Code graph analysis (1)
test/factories.go (2)
test/helper.go (1)
  • Helper (72-96)
pkg/config/message_broker.go (1)
  • MessageBrokerConfig (9-16)
🪛 Ruff (0.14.10)
hack/init-pubsub-emulator.py

35-35: Do not catch blind exception: Exception

(BLE001)


62-62: Do not catch blind exception: Exception

(BLE001)


113-113: Do not catch blind exception: Exception

(BLE001)


145-145: Do not catch blind exception: Exception

(BLE001)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (7)
  • GitHub Check: Red Hat Konflux / maestro-on-pull-request
  • GitHub Check: Red Hat Konflux / maestro-e2e-on-pull-request
  • GitHub Check: e2e-broadcast-subscription
  • GitHub Check: e2e
  • GitHub Check: e2e-grpc-broker
  • GitHub Check: upgrade
  • GitHub Check: e2e-with-istio
🔇 Additional comments (14)
test/helper.go (5)

151-172: LGTM! Broker-specific initialization is well structured.

The switch statement cleanly handles broker-specific dispatcher and event server initialization. Using NoopDispatcher for Pub/Sub is appropriate given the architectural differences from MQTT's hash-based dispatching.


202-206: LGTM! Clean accessor method.

The unexported accessor follows Go conventions and provides a clean interface for retrieving per-consumer Pub/Sub config paths.


318-342: LGTM! Broker-specific work agent configuration is correct.

The switch statement properly handles MQTT, Pub/Sub, and gRPC broker configurations. Using log.Fatalf for missing Pub/Sub config is consistent with other fatal errors during test setup (lines 323, 334, 351).


371-377: LGTM! CloudEvent client creation updated.

The removal of the type parameter aligns with the SDK signature change noted in the AI summary. The call is otherwise correct.


427-434: LGTM! Cleanup implementation is correct.

The cleanup function properly removes temporary Pub/Sub config files, handling both non-existent files and other errors appropriately. This addresses the cleanup concern raised in past review comments.

test/factories.go (3)

294-306: LGTM! Graceful Pub/Sub integration in consumer creation.

The Pub/Sub subscription creation is appropriately non-blocking—errors are logged but don't prevent consumer creation. The config path is correctly stored for later cleanup by cleanupTempFiles.


494-514: LGTM! Clean config parsing.

The pubSubConfig type and parsePubSubConfig function are straightforward with appropriate error handling.


591-622: LGTM! Agent config file creation is well implemented.

The function correctly generates agent configuration JSON, creates a temporary file with a descriptive pattern, and returns the path for storage in the cleanup map. Error handling is appropriate throughout.

hack/init-pubsub-emulator.py (3)

19-66: Script logic is correct; broad exception handling acceptable for utility script.

The function properly initializes server topics and subscriptions with appropriate AlreadyExists handling. Static analysis flags the broad Exception catches (lines 35, 62), which would be a concern in production code. For this test utility script, the current approach is acceptable given the clear error messages and proper exit codes, though catching specific google.api_core.exceptions types would be more precise.


69-117: LGTM! Agent subscription initialization is consistent.

The function correctly creates per-consumer subscriptions with appropriate filtering. The subscription naming and filtering align with the test code in test/factories.go. Same comment about broad exception handling applies (line 113).


120-147: LGTM! Main function flow is well structured.

The initialization sequence is clear: required server setup followed by optional agent setup. The top-level exception handler ensures a clean exit with proper error reporting on unexpected failures.

Makefile (3)

65-69: LGTM! Pub/Sub configuration variables are well defined.

The variable naming follows the established mqtt_* pattern, and the defaults are sensible. Storing the config file in secrets/ maintains consistency with MQTT configuration.


253-266: LGTM! Integration test targets are consistent.

All three broker types (MQTT, Pub/Sub, gRPC) follow a consistent pattern for setting MESSAGE_BROKER_TYPE and passing the appropriate config. The test execution flags are uniform across targets.


449-464: LGTM! Pub/Sub lifecycle targets are well implemented.

The three targets (pubsub/setup, pubsub/init, pubsub/teardown) provide a complete lifecycle for the emulator. The configuration JSON in pubsub/setup includes all necessary fields, and the initialization script is properly invoked with environment variables.

README.md Outdated
# OR run with Pub/Sub emulator
# First, ensure the Pub/Sub emulator is running and configured
$ ./maestro migration
$ ./maestro server --message-broker-type=pubsub --message-broker-config-file=./secrets/pubsub.config
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe put it into Makefile and then run MESSAGE_BROKER_TYPE=pubsub make run.

Copy link
Contributor Author

@morvencao morvencao Jan 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

code updated.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Fix all issues with AI agents
In @Makefile:
- Line 279: The run target assumes ./secrets/$(MESSAGE_DRIVER_TYPE).config
exists but there is no grpc/setup to create ./secrets/grpc.config; add a
grpc/setup target (mirroring mqtt/setup and pubsub/setup) that generates
./secrets/grpc.config, and change the run target to depend on
$(MESSAGE_DRIVER_TYPE)/setup (or add a conditional branch for
MESSAGE_DRIVER_TYPE == grpc to use ./secrets/grpc.config) so running with
MESSAGE_DRIVER_TYPE=grpc succeeds without a missing file error.
🧹 Nitpick comments (1)
Makefile (1)

449-453: Consider using jq for more readable config generation.

The JSON config string on Line 452 is quite long and hard to read. While functional, using jq could improve maintainability.

📝 Optional: Use jq for formatted config
 .PHONY: pubsub/setup
 pubsub/setup:
 	@mkdir -p ${PWD}/secrets
-	@echo '{"projectID":"$(pubsub_project_id)","endpoint":"localhost:$(pubsub_port)","insecure":true,"topics":{"sourceEvents":"projects/$(pubsub_project_id)/topics/sourceevents","sourceBroadcast":"projects/$(pubsub_project_id)/topics/sourcebroadcast"},"subscriptions":{"agentEvents":"projects/$(pubsub_project_id)/subscriptions/agentevents-maestro","agentBroadcast":"projects/$(pubsub_project_id)/subscriptions/agentbroadcast-maestro"}}' > $(pubsub_config_file)
+	@echo '{"projectID":"$(pubsub_project_id)","endpoint":"localhost:$(pubsub_port)","insecure":true,"topics":{"sourceEvents":"projects/$(pubsub_project_id)/topics/sourceevents","sourceBroadcast":"projects/$(pubsub_project_id)/topics/sourcebroadcast"},"subscriptions":{"agentEvents":"projects/$(pubsub_project_id)/subscriptions/agentevents-maestro","agentBroadcast":"projects/$(pubsub_project_id)/subscriptions/agentbroadcast-maestro"}}' | jq '.' > $(pubsub_config_file)
 	$(container_tool) run --name pubsub-maestro -p $(pubsub_port):8085 -e PUBSUB_PROJECT_ID=$(pubsub_project_id) -d gcr.io/google.com/cloudsdktool/google-cloud-cli:emulators gcloud beta emulators pubsub start --host-port=0.0.0.0:8085 --project=$(pubsub_project_id)

Note: This requires jq to be installed, so only apply if it's already a project dependency.

📜 Review details

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Cache: Disabled due to data retention organization setting

Knowledge base: Disabled due to Reviews -> Disable Knowledge Base setting

📥 Commits

Reviewing files that changed from the base of the PR and between 094202b and 2055849.

📒 Files selected for processing (2)
  • Makefile
  • README.md
🚧 Files skipped from review as they are similar to previous changes (1)
  • README.md
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (7)
  • GitHub Check: Red Hat Konflux / maestro-e2e-on-pull-request
  • GitHub Check: Red Hat Konflux / maestro-on-pull-request
  • GitHub Check: upgrade
  • GitHub Check: e2e-broadcast-subscription
  • GitHub Check: e2e-with-istio
  • GitHub Check: e2e
  • GitHub Check: e2e-grpc-broker
🔇 Additional comments (2)
Makefile (2)

65-70: LGTM!

The Pub/Sub emulator configuration variables follow the same pattern as the MQTT configuration and use appropriate defaults for local development.


263-266: No issue here. gRPC configuration is intentionally different from MQTT and Pub/Sub.

The test-integration-grpc target correctly omits MESSAGE_BROKER_CONFIG. Unlike MQTT and Pub/Sub (which are external brokers requiring config files), gRPC uses the application's built-in configuration: helper.Env().Config.HTTPServer.Hostname and helper.Env().Config.GRPCServer.BrokerBindPort. This design is intentional and correct per the test helper logic.

@morvencao morvencao force-pushed the br_add_pubsub_integration branch 2 times, most recently from 4bea99a to 7f81be0 Compare January 8, 2026 02:13
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (1)
Makefile (1)

272-272: Consider using config file variables for maintainability.

The test targets construct config file paths directly ($(PWD)/secrets/mqtt.config and $(PWD)/secrets/pubsub.config) instead of using the defined variables $(mqtt_config_file) and $(pubsub_config_file). Using the variables would make the code more maintainable if the paths need to change.

♻️ Suggested refactor
 test-integration-mqtt:
-	MESSAGE_BROKER_TYPE=mqtt MESSAGE_BROKER_CONFIG=$(PWD)/secrets/mqtt.config MAESTRO_ENV=testing gotestsum --jsonfile-timing-events=$(mqtt_integration_test_json_output) --format $(TEST_SUMMARY_FORMAT) -- -p 1 -ldflags -s -v -timeout 1h $(TESTFLAGS) \
+	MESSAGE_BROKER_TYPE=mqtt MESSAGE_BROKER_CONFIG=$(mqtt_config_file) MAESTRO_ENV=testing gotestsum --jsonfile-timing-events=$(mqtt_integration_test_json_output) --format $(TEST_SUMMARY_FORMAT) -- -p 1 -ldflags -s -v -timeout 1h $(TESTFLAGS) \
 			./test/integration
 test-integration-pubsub:
-	MESSAGE_BROKER_TYPE=pubsub MESSAGE_BROKER_CONFIG=$(PWD)/secrets/pubsub.config MAESTRO_ENV=testing gotestsum --jsonfile-timing-events=$(pubsub_integration_test_json_output) --format $(TEST_SUMMARY_FORMAT) -- -p 1 -ldflags -s -v -timeout 1h $(TESTFLAGS) \
+	MESSAGE_BROKER_TYPE=pubsub MESSAGE_BROKER_CONFIG=$(pubsub_config_file) MAESTRO_ENV=testing gotestsum --jsonfile-timing-events=$(pubsub_integration_test_json_output) --format $(TEST_SUMMARY_FORMAT) -- -p 1 -ldflags -s -v -timeout 1h $(TESTFLAGS) \
 			./test/integration

Also applies to: 277-277

📜 Review details

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Cache: Disabled due to data retention organization setting

Knowledge base: Disabled due to Reviews -> Disable Knowledge Base setting

📥 Commits

Reviewing files that changed from the base of the PR and between 4bea99a and 7f81be0.

📒 Files selected for processing (3)
  • Makefile
  • README.md
  • test/e2e/pkg/suite_test.go
💤 Files with no reviewable changes (1)
  • test/e2e/pkg/suite_test.go
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (7)
  • GitHub Check: e2e
  • GitHub Check: e2e-with-istio
  • GitHub Check: upgrade
  • GitHub Check: e2e-grpc-broker
  • GitHub Check: e2e-broadcast-subscription
  • GitHub Check: Red Hat Konflux / maestro-e2e-on-pull-request
  • GitHub Check: Red Hat Konflux / maestro-on-pull-request
🔇 Additional comments (5)
README.md (2)

45-79: LGTM! Clear documentation of Pub/Sub setup.

The documentation clearly separates MQTT (3a) and Pub/Sub (3b) setup paths. The note about the Python dependency (line 77) is helpful for users who may not have google-cloud-pubsub installed.


114-120: LGTM! Clear instructions for running with different brokers.

The documentation correctly shows that MQTT is the default and demonstrates how to override with MESSAGE_DRIVER_TYPE=pubsub, which aligns with the Makefile implementation.

Makefile (3)

69-74: LGTM! Pub/Sub configuration variables are well-defined.

The variables follow the same pattern as the MQTT configuration variables and use appropriate defaults for the Pub/Sub emulator (port 8085, project ID maestro-test).


297-301: LGTM! Conditional logic correctly handles different broker types.

The conditional logic properly handles the gRPC case (which doesn't require a config file) separately from MQTT and Pub/Sub. This addresses the previous review concern about missing gRPC config file handling.


471-475: LGTM! Pub/Sub setup target is well-structured.

The target creates the necessary directory, generates a config file with appropriate Pub/Sub paths, and starts the emulator container using the official Google Cloud CLI emulator image.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Fix all issues with AI agents
In @.tekton/integration-test.yaml:
- Around line 148-157: The pubsub-test sidecar is using the floating image tag
"google-cloud-cli:emulators" which hinders reproducibility; update the image
reference for the pubsub-test container to a specific versioned emulator tag
(for example replace the :emulators tag with a concrete version like
:489.0.0-emulators or another supported numeric-emulators tag) so the sidecar is
pinned consistently with the other services (e.g., postgres and
eclipse-mosquitto); ensure the change targets the pubsub-test image entry and
run a quick pipeline to verify the emulator starts with the chosen tag.
- Around line 97-99: The pip install line currently installs google-cloud-pubsub
without a pinned version, causing non-deterministic test runs; change the pip3
install invocation (the command that installs google-cloud-pubsub in
.tekton/integration-test.yaml) to pin a specific, tested release (e.g., use
google-cloud-pubsub==<stable-version>) or reference a constraints/requirements
file to lock versions so CI installs a reproducible package set.
📜 Review details

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Cache: Disabled due to data retention organization setting

Knowledge base: Disabled due to Reviews -> Disable Knowledge Base setting

📥 Commits

Reviewing files that changed from the base of the PR and between 7f81be0 and fc7790f.

📒 Files selected for processing (1)
  • .tekton/integration-test.yaml
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (7)
  • GitHub Check: Red Hat Konflux / maestro-e2e-on-pull-request
  • GitHub Check: Red Hat Konflux / maestro-on-pull-request
  • GitHub Check: upgrade
  • GitHub Check: e2e-grpc-broker
  • GitHub Check: e2e-with-istio
  • GitHub Check: e2e-broadcast-subscription
  • GitHub Check: e2e
🔇 Additional comments (3)
.tekton/integration-test.yaml (3)

87-88: LGTM: Pub/Sub config generation mirrors MQTT pattern.

The JSON configuration is well-structured and appropriate for the local emulator setup. The insecure:true flag is correct for testing against the emulator.


93-95: LGTM: Emulator readiness check follows established pattern.

The health check appropriately waits for the Pub/Sub emulator to be available before proceeding with initialization.


101-104: LGTM: Pub/Sub initialization configuration is consistent.

The environment variables correctly match the configuration file and sidecar settings, ensuring proper connectivity to the emulator.

@morvencao morvencao force-pushed the br_add_pubsub_integration branch 2 times, most recently from a8450a1 to 4b24ae6 Compare January 9, 2026 08:03
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Fix all issues with AI agents
In @hack/init-pubsub-emulator.sh:
- Line 126: The Pub/Sub filter uses an unquoted attribute key with a hyphen;
update the call to create_subscription (the invocation for
"agentevents-maestro") to quote the attribute key by changing the filter string
from attributes.ce-originalsource="maestro" to use a quoted key (e.g.,
attributes."ce-originalsource"="maestro") so the subscription filter is valid
for keys with hyphens.
🧹 Nitpick comments (4)
hack/init-pubsub-emulator.sh (1)

24-42: Consider verifying API readiness rather than just connectivity.

The current check on line 29 only verifies that the emulator responds to HTTP requests, but doesn't confirm the Pub/Sub API is functional. While the 2-second sleep on line 31 may handle this, a more robust approach would be to make an actual API call (e.g., listing topics).

♻️ Optional: More robust readiness check
 while [ $attempt -lt $max_attempts ]; do
-    if curl -s --connect-timeout 2 --max-time 5 "http://${EMULATOR_HOST}" > /dev/null 2>&1; then
+    if curl -s --connect-timeout 2 --max-time 5 "${BASE_URL}/projects/${PROJECT_ID}/topics" > /dev/null 2>&1; then
         echo "Emulator responded! Waiting for API to be fully ready..."
-        sleep 2
         echo "Emulator is ready!"
         break
     fi
.tekton/integration-test.yaml (3)

87-88: Consider improving config generation readability.

The JSON configuration on line 88 is very long (300+ characters) and difficult to read or maintain. A heredoc or multi-line approach would improve clarity and make typos easier to spot.

♻️ Improved readability with heredoc
-            # Generate pubsub config file
-            echo '{"projectID":"maestro-test","endpoint":"localhost:8085","insecure":true,"topics":{"sourceEvents":"projects/maestro-test/topics/sourceevents","sourceBroadcast":"projects/maestro-test/topics/sourcebroadcast"},"subscriptions":{"agentEvents":"projects/maestro-test/subscriptions/agentevents-maestro","agentBroadcast":"projects/maestro-test/subscriptions/agentbroadcast-maestro"}}' > secrets/pubsub.config
+            # Generate pubsub config file
+            cat > secrets/pubsub.config << 'EOF'
+{
+  "projectID": "maestro-test",
+  "endpoint": "localhost:8085",
+  "insecure": true,
+  "topics": {
+    "sourceEvents": "projects/maestro-test/topics/sourceevents",
+    "sourceBroadcast": "projects/maestro-test/topics/sourcebroadcast"
+  },
+  "subscriptions": {
+    "agentEvents": "projects/maestro-test/subscriptions/agentevents-maestro",
+    "agentBroadcast": "projects/maestro-test/subscriptions/agentbroadcast-maestro"
+  }
+}
+EOF

93-95: Basic connectivity check may not ensure API readiness.

Similar to the check in the init script, line 95 only verifies HTTP connectivity without confirming the Pub/Sub API is functional. The init script has its own retry logic which may mitigate this, but consider using a more robust check.

♻️ Alternative: More robust API check
             # Wait for the pubsub emulator to be ready
             echo "[INFO] Waiting for pubsub emulator to be ready for connection"
-            timeout 5m bash -c 'until curl -s http://localhost:8085 > /dev/null 2>&1; do sleep 2s; done'
+            timeout 5m bash -c 'until curl -s http://localhost:8085/v1/projects/maestro-test/topics > /dev/null 2>&1; do sleep 2s; done'

144-153: Consider adding resource limits to the sidecar.

The pubsub-test sidecar lacks resource limits, which could lead to resource contention in constrained environments. While sidecars in Tekton have different lifecycle management than regular containers, defining limits is still a best practice.

♻️ Add resource limits
           - image: gcr.io/google.com/cloudsdktool/google-cloud-cli:emulators
             name: pubsub-test
             command:
               - gcloud
               - beta
               - emulators
               - pubsub
               - start
               - --host-port=0.0.0.0:8085
               - --project=maestro-test
+            resources:
+              requests:
+                memory: "256Mi"
+                cpu: "100m"
+              limits:
+                memory: "512Mi"
+                cpu: "500m"
📜 Review details

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Cache: Disabled due to data retention organization setting

Knowledge base: Disabled due to Reviews -> Disable Knowledge Base setting

📥 Commits

Reviewing files that changed from the base of the PR and between a8450a1 and 4b24ae6.

📒 Files selected for processing (3)
  • .tekton/integration-test.yaml
  • Makefile
  • hack/init-pubsub-emulator.sh
🚧 Files skipped from review as they are similar to previous changes (1)
  • Makefile
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (7)
  • GitHub Check: Red Hat Konflux / maestro-e2e-on-pull-request
  • GitHub Check: Red Hat Konflux / maestro-on-pull-request
  • GitHub Check: e2e
  • GitHub Check: e2e-grpc-broker
  • GitHub Check: e2e-with-istio
  • GitHub Check: e2e-broadcast-subscription
  • GitHub Check: upgrade
🔇 Additional comments (6)
hack/init-pubsub-emulator.sh (6)

1-23: LGTM!

The script header, environment variable setup, and initialization logging are well-structured with sensible defaults.


44-64: LGTM with minor observation.

The topic creation logic properly handles success, conflict, and error cases. The response capture on lines 49-50 assumes curl succeeds; if curl fails, the http_code extraction could behave unexpectedly, though set -e and the explicit error checking should catch most issues.


66-110: Excellent use of jq for JSON payload construction.

The subscription creation logic properly handles optional filter expressions and uses jq to safely construct JSON payloads, preventing injection issues and escaping problems.


112-121: LGTM!

The topic creation loop is clean and properly handles failures with immediate exit.


138-154: LGTM with filter syntax caveat.

The conditional agent subscription creation logic is correct. The filter expression on line 143 properly interpolates CONSUMER_NAME and should work if the filter syntax is valid (see comment on line 126).


156-157: LGTM!

Clear success message.

@morvencao morvencao force-pushed the br_add_pubsub_integration branch from 4b24ae6 to 6ccf559 Compare January 9, 2026 12:06
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (3)
Makefile (2)

281-284: Minor inconsistency: --count=1 flag only present in gRPC tests.

The --count=1 flag (which disables test caching) is only used in test-integration-grpc but not in test-integration-mqtt or test-integration-pubsub. If this is intentional (e.g., gRPC tests require fresh runs), consider adding a comment. Otherwise, consider aligning the flags across all integration test targets.


482-484: The pubsub/init dependency on pubsub/setup may cause failures if the container already exists.

The pubsub/init target depends on pubsub/setup, which runs $(container_tool) run --name pubsub-maestro .... If the container already exists (e.g., from a previous setup), this will fail. Consider either:

  1. Removing the dependency so users can reinitialize without recreating the container
  2. Adding --rm or checking for existing containers in pubsub/setup

For comparison, the MQTT target mqtt/setup has a similar pattern but doesn't have a separate mqtt/init target.

♻️ Suggested approach: decouple init from setup
 .PHONY: pubsub/init
-pubsub/init: pubsub/setup
+pubsub/init:
 	@PUBSUB_EMULATOR_HOST=localhost:$(pubsub_port) PUBSUB_PROJECT_ID=$(pubsub_project_id) bash hack/init-pubsub-emulator.sh

Then document that users should run make pubsub/setup before make pubsub/init, or create a combined target if needed.

.tekton/integration-test.yaml (1)

144-153: Consider pinning the emulator image to a specific version for reproducibility.

The gcr.io/google.com/cloudsdktool/google-cloud-cli:emulators tag is a floating tag that changes when new gcloud releases are published, potentially causing inconsistent test behavior across runs. For reproducibility, use a versioned tag (e.g., 489.0.0-emulators) or an image digest.

📜 Review details

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Cache: Disabled due to data retention organization setting

Knowledge base: Disabled due to Reviews -> Disable Knowledge Base setting

📥 Commits

Reviewing files that changed from the base of the PR and between 4b24ae6 and 6ccf559.

📒 Files selected for processing (3)
  • .tekton/integration-test.yaml
  • Makefile
  • hack/init-pubsub-emulator.sh
🚧 Files skipped from review as they are similar to previous changes (1)
  • hack/init-pubsub-emulator.sh
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (7)
  • GitHub Check: Red Hat Konflux / maestro-on-pull-request
  • GitHub Check: Red Hat Konflux / maestro-e2e-on-pull-request
  • GitHub Check: e2e
  • GitHub Check: e2e-grpc-broker
  • GitHub Check: upgrade
  • GitHub Check: e2e-with-istio
  • GitHub Check: e2e-broadcast-subscription
🔇 Additional comments (9)
.tekton/integration-test.yaml (2)

87-88: LGTM!

The Pub/Sub config file generation is correctly structured with the necessary fields (projectID, endpoint, insecure mode, topics, and subscriptions). The config aligns with the local emulator setup.


93-100: LGTM!

The wait logic using curl is appropriate for the HTTP-based Pub/Sub emulator endpoint. The environment variable setup and initialization script invocation follow the established pattern.

Makefile (7)

69-73: LGTM!

The Pub/Sub configuration variables follow the established patterns for other services (database, MQTT) and use sensible defaults for the emulator.


103-106: LGTM!

The test output file variable follows the established naming convention.


144-150: LGTM!

The help text additions are well-formatted and consistent with existing entries.


268-274: LGTM!

The test orchestration correctly chains all broker integration tests and properly sets environment variables for each broker type.


276-279: LGTM!

The Pub/Sub integration test target correctly sets all required environment variables including PUBSUB_EMULATOR_HOST which is essential for the Google Cloud Pub/Sub client to connect to the emulator.


471-480: LGTM!

The pubsub/setup and pubsub/teardown targets follow the established patterns for other service lifecycle management (database, MQTT). The configuration generation and container management are correct.


297-301: LGTM!

The conditional logic correctly distinguishes gRPC (no config file) from message broker types (MQTT, Pub/Sub) that require configuration files. The dynamic config file path ./secrets/$(MESSAGE_DRIVER_TYPE).config aligns with the defined variables in the Makefile (mqtt_config_file and pubsub_config_file at lines 64 and 73), confirming the naming convention is properly maintained.

@morvencao morvencao force-pushed the br_add_pubsub_integration branch from 6ccf559 to 2cb06b9 Compare January 9, 2026 12:46
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
test/integration/resource_test.go (1)

166-188: Fix data race on conflictRequests (incremented from goroutines).

conflictRequests = conflictRequests + 1 is not safe across goroutines; this can flake and will be reported by go test -race.

Proposed fix (use atomic)
@@
-    conflictRequests := 0
+    var conflictRequests int32
@@
-            if err != nil && strings.Contains(err.Error(), fmt.Sprintf("%d", errors.ErrorConflict)) {
-                conflictRequests = conflictRequests + 1
-            }
+            if err != nil && strings.Contains(err.Error(), fmt.Sprintf("%d", errors.ErrorConflict)) {
+                atomic.AddInt32(&conflictRequests, 1)
+            }
         }()
@@
-    Expect(conflictRequests).To(Equal(threads - 1))
+    Expect(int(conflictRequests)).To(Equal(threads - 1))

(Requires adding sync/atomic import.)

🤖 Fix all issues with AI agents
In @.tekton/integration-test.yaml:
- Around line 87-104: The Tekton task passes integration_test_json_output but
the Makefile's test-integration target actually invokes test-integration-mqtt,
test-integration-pubsub, and test-integration-grpc which each honor
mqtt_integration_test_json_output, pubsub_integration_test_json_output, and
grpc_integration_test_json_output respectively, so your override is ignored; fix
by wiring a single integration_test_json_output through the Makefile: update the
test-integration target to propagate integration_test_json_output into the three
subtargets (or have test-integration concatenate their outputs) by setting
mqtt_integration_test_json_output=$(integration_test_json_output),
pubsub_integration_test_json_output=$(integration_test_json_output), and
grpc_integration_test_json_output=$(integration_test_json_output) when invoking
test-integration-mqtt, test-integration-pubsub, and test-integration-grpc (or
implement aggregation logic to merge the three broker files into the expected
path read by Tekton).

In @Makefile:
- Around line 268-285: The Makefile targets test-integration-mqtt,
test-integration-pubsub and test-integration-grpc use distinct variables
mqtt_integration_test_json_output, pubsub_integration_test_json_output and
grpc_integration_test_json_output so overriding integration_test_json_output
from Tekton has no effect; to fix, consolidate or wire the variables so Tekton’s
single integration_test_json_output is respected — e.g., set
mqtt_integration_test_json_output, pubsub_integration_test_json_output and
grpc_integration_test_json_output to reference integration_test_json_output (or
change each gotestsum invocation to use $(integration_test_json_output)), or
alternatively update Tekton to pass all three specific variables; update the
Makefile references in the test-integration-* targets accordingly so output
paths are controlled by integration_test_json_output.

In @test/integration/controller_test.go:
- Around line 420-443: The transaction is still vulnerable to a race where two
sessions both see no record and one Create returns a duplicate-key error;
replace the manual Where+First and conditional tx.Create in the Transaction
callback with an idempotent operation such as
tx.Clauses(clause.OnConflict{DoNothing: true}).Create(&api.ServerInstance{Meta:
api.Meta{ID: "maestro"}, Ready: true, LastHeartbeat: time.Now()}) or use
tx.FirstOrCreate(&instance, api.ServerInstance{Meta: api.Meta{ID: "maestro"}})
(import gorm.io/gorm/clause if using OnConflict) so the insert is safe under
concurrent runs instead of unguarded tx.Create(newInstance).
🧹 Nitpick comments (3)
test/integration/resource_test.go (1)

202-204: Avoid fixed Sleep here; rely on Eventually / polling instead.

This introduces a hard 3s tax and is still potentially flaky on slower CI. Since you already have an Eventually block for lock release, consider removing the fixed sleep and (if needed) increase the Eventually timeout or add a short initial poll delay there.

Makefile (1)

471-485: Make test-integration-pubsub more self-contained (setup/init dependency).

Right now local runs require manual pubsub/setup + pubsub/init before test-integration-pubsub. Consider making the test target depend on init (or at least document it inline).

Example: chain init into the pubsub test target
@@
-test-integration-pubsub:
+test-integration-pubsub: pubsub/setup pubsub/init
 	MESSAGE_BROKER_TYPE=pubsub MESSAGE_BROKER_CONFIG=$(PWD)/secrets/pubsub.config PUBSUB_EMULATOR_HOST=localhost:$(pubsub_port) MAESTRO_ENV=testing gotestsum --jsonfile-timing-events=$(pubsub_integration_test_json_output) --format $(TEST_SUMMARY_FORMAT) -- -count=1 -p 1 -ldflags -s -v -timeout 1h $(TESTFLAGS) \
 			./test/integration

Also applies to: 276-279

.tekton/integration-test.yaml (1)

144-153: Pin the Pub/Sub emulator image by digest for deterministic CI.

The gcr.io/google.com/cloudsdktool/google-cloud-cli:emulators tag is floating and may pull different image versions unexpectedly. Pin by digest instead: gcr.io/google.com/cloudsdktool/google-cloud-cli@sha256:<DIGEST> to guarantee reproducible CI runs. Alternatively, pin to a specific version tag like gcr.io/google.com/cloudsdktool/google-cloud-cli:507.0.0-emulators (then resolve and pin its digest for maximum safety).

📜 Review details

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Cache: Disabled due to data retention organization setting

Knowledge base: Disabled due to Reviews -> Disable Knowledge Base setting

📥 Commits

Reviewing files that changed from the base of the PR and between 6ccf559 and 2cb06b9.

📒 Files selected for processing (5)
  • .tekton/integration-test.yaml
  • Makefile
  • hack/init-pubsub-emulator.sh
  • test/integration/controller_test.go
  • test/integration/resource_test.go
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (7)
  • GitHub Check: Red Hat Konflux / maestro-on-pull-request
  • GitHub Check: Red Hat Konflux / maestro-e2e-on-pull-request
  • GitHub Check: e2e-broadcast-subscription
  • GitHub Check: e2e-grpc-broker
  • GitHub Check: upgrade
  • GitHub Check: e2e
  • GitHub Check: e2e-with-istio

@morvencao morvencao force-pushed the br_add_pubsub_integration branch from 2cb06b9 to d677497 Compare January 9, 2026 13:16
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
test/helper.go (1)

422-429: Consider returning accumulated errors from cleanup.

The function logs errors via T.Errorf but always returns nil. While this prevents teardown failure from blocking other cleanup, consider whether accumulated cleanup errors should be reported as the return value.

♻️ Optional: Return cleanup errors
 func (helper *Helper) cleanupTempFiles() error {
+	var errs []error
 	for _, pubsubConfigFile := range helper.consumerPubSubConfigMap {
 		if err := os.Remove(pubsubConfigFile); err != nil && !os.IsNotExist(err) {
-			helper.T.Errorf("unable to remove temp pubsub config file %s: %s", pubsubConfigFile, err)
+			errs = append(errs, fmt.Errorf("unable to remove temp pubsub config file %s: %w", pubsubConfigFile, err))
 		}
 	}
+	if len(errs) > 0 {
+		return errors.Join(errs...)
+	}
 	return nil
 }
📜 Review details

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Cache: Disabled due to data retention organization setting

Knowledge base: Disabled due to Reviews -> Disable Knowledge Base setting

📥 Commits

Reviewing files that changed from the base of the PR and between 2cb06b9 and d677497.

📒 Files selected for processing (6)
  • .tekton/integration-test.yaml
  • Makefile
  • hack/init-pubsub-emulator.sh
  • test/helper.go
  • test/integration/controller_test.go
  • test/integration/resource_test.go
🚧 Files skipped from review as they are similar to previous changes (1)
  • test/integration/resource_test.go
🧰 Additional context used
🧬 Code graph analysis (2)
test/integration/controller_test.go (4)
cmd/maestro/environments/types.go (2)
  • Env (14-25)
  • Database (31-33)
pkg/db/session.go (1)
  • SessionFactory (13-21)
pkg/db/migrations/202412171429_add_last_heartbeat_and_ready_column_in_server_instances_tables.go (1)
  • ServerInstance (11-14)
pkg/api/metadata_types.go (1)
  • Meta (51-56)
test/helper.go (5)
test/store.go (1)
  • MemoryStore (14-17)
pkg/config/config.go (1)
  • ApplicationConfig (15-25)
pkg/dispatcher/dispatcher.go (1)
  • Dispatcher (11-16)
pkg/config/message_broker.go (1)
  • MessageBrokerConfig (9-16)
pkg/dispatcher/noop_dispatcher.go (1)
  • NewNoopDispatcher (27-33)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (7)
  • GitHub Check: Red Hat Konflux / maestro-on-pull-request
  • GitHub Check: Red Hat Konflux / maestro-e2e-on-pull-request
  • GitHub Check: e2e-with-istio
  • GitHub Check: upgrade
  • GitHub Check: e2e-grpc-broker
  • GitHub Check: e2e
  • GitHub Check: e2e-broadcast-subscription
🔇 Additional comments (31)
test/integration/controller_test.go (2)

13-13: LGTM!

The import of gorm.io/gorm/clause is correctly added to support the OnConflict{DoNothing: true} clause used in the transactional instance creation below.


421-445: Well-structured transactional check-and-create pattern.

The transactional approach correctly avoids race conditions when multiple test instances might try to create the "maestro" ServerInstance concurrently. The use of OnConflict{DoNothing: true} ensures idempotent insertion - if another transaction wins the race, this one silently succeeds without error.

hack/init-pubsub-emulator.sh (6)

1-23: LGTM!

Good script setup with proper shebang, strict error handling via set -e, clear documentation, and sensible default values for environment variables.


24-42: LGTM!

The retry logic with 30 attempts and appropriate timeouts is well-implemented. The additional 2-second sleep after initial connectivity ensures the API is fully ready before proceeding.


44-64: LGTM!

The create_topic function correctly handles HTTP responses: 200/201 for success, 409 for idempotent "already exists" case, and proper error reporting for other status codes.


66-110: LGTM!

The create_subscription function properly uses jq for JSON payload construction, ensuring correct escaping of special characters. The optional filter expression handling is clean and the HTTP response handling is consistent with create_topic.


112-136: LGTM!

Topic and server subscription creation is well-structured with proper error handling. The filter expression attributes."ce-originalsource"="maestro" follows the correct Pub/Sub filter syntax for CloudEvents attributes.


138-157: LGTM!

The optional agent subscription creation is properly gated by CONSUMER_NAME presence. The filter attributes."ce-clustername"="${CONSUMER_NAME}" correctly routes events to the specific consumer.

test/helper.go (9)

32-32: LGTM!

The import path open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/v2/pubsub is correct for the v2 Pub/Sub options API.


71-91: LGTM!

The struct fields are now exported for external access, and the new consumerPubSubConfigMap provides proper tracking of per-consumer Pub/Sub configuration files for later cleanup.


112-120: LGTM!

The environment variable handling correctly overrides configuration only when the variables are explicitly set, preserving default behavior otherwise.


129-137: LGTM!

The helper initialization correctly creates an empty map for consumerPubSubConfigMap, avoiding potential nil map panics during subsequent operations.


146-167: Verify the dispatcher choice for Pub/Sub.

The pubsub case uses NoopDispatcher while mqtt uses HashDispatcher. This implies Pub/Sub relies on its native filtering mechanism for message distribution rather than client-side hash-based dispatching.

Please confirm this is the intended design - that Pub/Sub subscriptions with filters handle message routing natively, eliminating the need for HashDispatcher.


176-176: LGTM!

Adding cleanupTempFiles to the teardown chain ensures proper cleanup of temporary Pub/Sub configuration files after test execution.


197-201: LGTM!

The getConsumerPubSubConfig helper follows idiomatic Go patterns for map lookups with existence checks.


366-372: LGTM!

Removing the explicit generic type parameter allows Go's type inference to determine the type from the function arguments, which is cleaner when the types are unambiguous.


311-337: Config map population order is correctly sequenced.

The consumerPubSubConfigMap is properly populated in CreateConsumerWithLabels (test/factories.go) when the broker is set to "pubsub", and this happens before StartWorkAgent is called in all test cases. The code flow is correct: CreateConsumer → map population → StartWorkAgent.

.tekton/integration-test.yaml (6)

65-67: LGTM!

Separate output files for each broker type provide clear test result isolation for debugging and analysis.


89-91: LGTM!

The Pub/Sub configuration correctly targets the emulator with insecure:true and references the topics/subscriptions that will be created by init-pubsub-emulator.sh.


95-102: LGTM!

The Pub/Sub emulator readiness check follows the same pattern as the database wait, and the environment variables are correctly set before running the initialization script.


105-108: LGTM!

The make test-integration invocation correctly passes all three broker-specific output file paths as parameters.


112-120: LGTM!

The results aggregation correctly extracts pass/fail counts from each broker's JSON output and combines them into a single result summary.


156-165: LGTM!

The Pub/Sub emulator sidecar uses the official Google Cloud SDK emulators image with correct host binding (0.0.0.0:8085) for inter-container communication, and the project ID is consistent across all configurations.

Makefile (8)

69-73: LGTM!

The Pub/Sub configuration variables follow the established pattern from MQTT configuration, with sensible defaults including the standard emulator port (8085).


92-93: LGTM!

The comment now correctly documents all three supported message driver types.


105-106: LGTM!

The new Pub/Sub test output variable follows the established naming convention.


144-150: LGTM!

The help text is updated with the new Pub/Sub lifecycle targets, maintaining consistent formatting.


268-284: LGTM!

The test integration targets are well-structured with consistent patterns. The PUBSUB_EMULATOR_HOST environment variable in test-integration-pubsub is correctly set to direct the SDK to the emulator.


297-301: LGTM!

The conditional logic elegantly handles the gRPC case (no config file needed) versus MQTT/Pub/Sub (config file required), with dynamic path construction based on driver type.


471-475: LGTM!

The pubsub/setup target correctly creates the configuration file and starts the emulator container, following the same pattern as mqtt/setup.


477-484: LGTM!

The pubsub/teardown and pubsub/init targets complete the lifecycle management with proper environment variable setup and script invocation.

@morvencao morvencao force-pushed the br_add_pubsub_integration branch from d677497 to 1387b96 Compare January 13, 2026 03:40
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Fix all issues with AI agents
In @test/factories.go:
- Around line 564-572: The current loop calling
client.SubscriptionAdminClient.GetSubscription(ctx,
&pubsubpb.GetSubscriptionRequest{Subscription: sub.name}) treats any error as
"not found"; change the error handling to distinguish NotFound from other errors
by using status.Code(err) == codes.NotFound (import
"google.golang.org/grpc/status" and "google.golang.org/grpc/codes"); only
proceed to create the subscription when the code is NotFound, and for other
non-nil errors return or log the error (with context including sub.name) so
transient/permission errors are not silently ignored.
- Around line 295-307: The nil-panic note is incorrect—consumerPubSubConfigMap
is initialized in NewHelper, so remove any review/comment suggesting a nil
access; instead address the swallowed error from createPubSubSubscriptions:
decide whether PubSub subscription creation is critical for tests and either (A)
propagate the error from the consumer creation path so failures surface to tests
(return the error from the function that calls
helper.createPubSubSubscriptions), or (B) keep the current non-failing behavior
but add a clear code comment and update test setup docs stating that PubSub
subscription creation is optional and failures are only logged; reference
helper.createPubSubSubscriptions, helper.consumerPubSubConfigMap, and NewHelper
to locate where to change behavior or add documentation.
🧹 Nitpick comments (6)
test/integration/resource_test.go (1)

202-203: Consider if the fixed sleep is necessary given the Eventually block.

The added 3-second sleep is a workaround for timing issues with advisory lock release. While this works, fixed sleeps in tests can be fragile. The Eventually block on lines 212-223 already provides a 20-second timeout with 1-second polling intervals, which should handle the timing variability.

If the sleep is genuinely necessary due to a race condition between transaction commit and lock release visibility, consider adding a brief comment explaining why the Eventually block alone isn't sufficient.

hack/init-pubsub-emulator.sh (2)

1-16: Consider adding a dependency check for jq.

The script uses jq for JSON payload construction (lines 78-86) but doesn't verify it's installed. If jq is missing, the script will fail with a confusing error.

🔧 Suggested improvement
 PROJECT_ID="${PUBSUB_PROJECT_ID:-maestro-test}"
 EMULATOR_HOST="${PUBSUB_EMULATOR_HOST:-localhost:8085}"
 CONSUMER_NAME="${CONSUMER_NAME:-}"
+
+# Check for required dependencies
+if ! command -v jq &> /dev/null; then
+    echo "Error: jq is required but not installed." >&2
+    exit 1
+fi

28-42: The connectivity check may pass before the API is ready.

The curl check on line 29 tests basic connectivity but the emulator may respond before its API endpoints are ready. The fixed 2-second sleep on line 31 is a workaround. Consider probing an actual API endpoint (like listing topics) to confirm readiness.

🔧 Alternative readiness check
 while [ $attempt -lt $max_attempts ]; do
-    if curl -s --connect-timeout 2 --max-time 5 "http://${EMULATOR_HOST}" > /dev/null 2>&1; then
-        echo "Emulator responded! Waiting for API to be fully ready..."
-        sleep 2
-        echo "Emulator is ready!"
-        break
+    # Probe the API endpoint to verify it's ready
+    if curl -s --connect-timeout 2 --max-time 5 "${BASE_URL}/projects/${PROJECT_ID}/topics" > /dev/null 2>&1; then
+        echo "Emulator API is ready!"
+        break
     fi
Makefile (1)

471-484: Consider breaking the long JSON config into a multi-line format.

Line 474 contains a very long JSON string that's difficult to read and maintain. Consider using a heredoc or multiple lines for better readability.

🔧 Suggested improvement for readability
 .PHONY: pubsub/setup
 pubsub/setup:
 	@mkdir -p ${PWD}/secrets
-	@echo '{"projectID":"$(pubsub_project_id)","endpoint":"localhost:$(pubsub_port)","insecure":true,"topics":{"sourceEvents":"projects/$(pubsub_project_id)/topics/sourceevents","sourceBroadcast":"projects/$(pubsub_project_id)/topics/sourcebroadcast"},"subscriptions":{"agentEvents":"projects/$(pubsub_project_id)/subscriptions/agentevents-maestro","agentBroadcast":"projects/$(pubsub_project_id)/subscriptions/agentbroadcast-maestro"}}' > $(pubsub_config_file)
+	@echo '{\
+	"projectID":"$(pubsub_project_id)",\
+	"endpoint":"localhost:$(pubsub_port)",\
+	"insecure":true,\
+	"topics":{"sourceEvents":"projects/$(pubsub_project_id)/topics/sourceevents","sourceBroadcast":"projects/$(pubsub_project_id)/topics/sourcebroadcast"},\
+	"subscriptions":{"agentEvents":"projects/$(pubsub_project_id)/subscriptions/agentevents-maestro","agentBroadcast":"projects/$(pubsub_project_id)/subscriptions/agentbroadcast-maestro"}\
+	}' > $(pubsub_config_file)
 	$(container_tool) run --name pubsub-maestro -p $(pubsub_port):8085 -e PUBSUB_PROJECT_ID=$(pubsub_project_id) -d gcr.io/google.com/cloudsdktool/google-cloud-cli:emulators gcloud beta emulators pubsub start --host-port=0.0.0.0:8085 --project=$(pubsub_project_id)
test/factories.go (1)

583-589: Redundant "already exists" check after GetSubscription logic.

The code checks for "already exists" in the error string after CreateSubscription, but line 569-572 already skips creation if GetSubscription succeeds (subscription exists). This string-based check is fragile and suggests a TOCTOU race condition is expected. Consider using proper gRPC status codes instead of string matching.

Suggested improvement using gRPC status codes
 		_, err = client.SubscriptionAdminClient.CreateSubscription(ctx, subConfig)
 		if err != nil {
-			// Ignore already exists errors
-			if !strings.Contains(err.Error(), "already exists") {
+			// Ignore AlreadyExists errors (race condition with another process)
+			if status.Code(err) != codes.AlreadyExists {
 				return "", fmt.Errorf("failed to create subscription %s: %w", sub.name, err)
 			}
 		}
test/integration/controller_test.go (1)

446-456: Instance creation for test fixtures could also benefit from idempotent inserts.

The instances "i1", "i2", "i3" are created without conflict handling. If this test runs concurrently or after a failed run that left stale data, these creates could fail. Consider using the same OnConflict{DoNothing: true} pattern for consistency, or ensure cleanup runs before these creates.

📜 Review details

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Cache: Disabled due to data retention organization setting

Knowledge base: Disabled due to Reviews -> Disable Knowledge Base setting

📥 Commits

Reviewing files that changed from the base of the PR and between d677497 and 1387b96.

📒 Files selected for processing (11)
  • .gitignore
  • .tekton/integration-test.yaml
  • Makefile
  • README.md
  • go.mod
  • hack/init-pubsub-emulator.sh
  • test/factories.go
  • test/helper.go
  • test/integration/controller_test.go
  • test/integration/resource_test.go
  • test/integration/status_dispatcher_test.go
🚧 Files skipped from review as they are similar to previous changes (3)
  • go.mod
  • .tekton/integration-test.yaml
  • .gitignore
🧰 Additional context used
🧬 Code graph analysis (1)
test/integration/controller_test.go (2)
pkg/db/session.go (1)
  • SessionFactory (13-21)
pkg/db/migrations/202412171429_add_last_heartbeat_and_ready_column_in_server_instances_tables.go (1)
  • ServerInstance (11-14)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (8)
  • GitHub Check: Red Hat Konflux / maestro-on-pull-request
  • GitHub Check: Red Hat Konflux / maestro-e2e-on-pull-request
  • GitHub Check: upgrade
  • GitHub Check: verify
  • GitHub Check: e2e-with-istio
  • GitHub Check: e2e
  • GitHub Check: e2e-grpc-broker
  • GitHub Check: e2e-broadcast-subscription
🔇 Additional comments (21)
README.md (2)

45-79: LGTM!

The documentation clearly explains both MQTT and Pub/Sub setup paths with appropriate step numbering (3a/3b) and includes a helpful note about the Python package dependency for pubsub/init.


114-120: LGTM!

Clear instructions for running with different broker types using the MESSAGE_DRIVER_TYPE environment variable.

hack/init-pubsub-emulator.sh (3)

44-64: LGTM!

The create_topic function has proper error handling with HTTP status code checks and clear logging for success, already-exists, and error cases.


66-110: LGTM!

The create_subscription function properly handles JSON payload construction with jq and includes appropriate error handling for various HTTP response codes.


112-157: LGTM!

The main logic properly creates topics and subscriptions with appropriate filters. Error handling with early exits on failure is well implemented.

Makefile (3)

69-73: LGTM!

New Pub/Sub configuration variables follow the existing pattern for MQTT configuration.


268-284: LGTM!

Integration test targets properly wired with environment variables for each broker type. The test-integration target now runs all three broker tests (MQTT, Pub/Sub, gRPC).


296-301: LGTM!

The run target correctly handles the gRPC case (no config file needed) separately from MQTT/Pub/Sub (which require config files).

test/helper.go (8)

32-32: LGTM!

Import for Pub/Sub options from the Open Cluster Management SDK.


67-91: LGTM!

Helper struct properly exports fields and adds the consumerPubSubConfigMap for managing per-consumer Pub/Sub configuration.


112-120: LGTM!

Environment-driven broker configuration setup reads MESSAGE_DRIVER_TYPE and MESSAGE_DRIVER_CONFIG from environment variables, aligning with the Makefile integration test targets.


146-167: LGTM!

The switch on broker type correctly initializes the appropriate dispatcher and event server for each broker. The pubsub case using NoopDispatcher is appropriate since Pub/Sub handles message filtering via subscription filters.


366-372: LGTM!

Updated to use the non-generic NewCloudEventSourceClient function call pattern.


422-429: LGTM!

The cleanupTempFiles function properly cleans up temporary Pub/Sub config files and handles os.IsNotExist errors gracefully.


197-201: The consumerPubSubConfigMap is populated through the CreateConsumer factory method in test/factories.go (line 304), which stores the config file path after successful PubSub subscription creation. The getter method works correctly with this pattern—consumers are created first (which populates the map), then StartWorkAgent is called with the consumer's name. This is idiomatic Go for test helpers and requires no separate setter method.

Likely an incorrect or invalid review comment.


311-337: Config population is already handled correctly, but verify that createPubSubSubscriptions doesn't fail silently.

The pubsub case in StartWorkAgent (lines 321–331) will fatally exit if the config isn't in consumerPubSubConfigMap. The pattern in test code is correct—CreateConsumer is called before StartWorkAgent, and CreateConsumer populates the map when the broker is pubsub (test/factories.go:304). However, if createPubSubSubscriptions fails (test/factories.go:298), only a warning is logged and the map entry is never added, which will cause StartWorkAgent to fatally exit later. Ensure that if createPubSubSubscriptions fails in tests using the pubsub broker, the test explicitly fails or the subscriptions are manually created.

test/factories.go (3)

612-623: Temporary file is not cleaned up on subsequent errors.

If tmpFile.Write fails (line 619), the temporary file remains on disk. While this is test code and likely acceptable, for completeness consider cleaning up the file on error paths.

Also, the defer tmpFile.Close() on line 617 means the file is closed before the function returns, which is correct. However, the caller receives the file path and the file content has been written, so this is fine.


496-515: LGTM on config parsing.

The pubSubConfig struct and parsePubSubConfig function properly handle file reading and JSON unmarshaling with appropriate error wrapping.


526-540: Client creation and cleanup looks correct.

The Pub/Sub client is created with appropriate options for insecure connections when configured, and defer client.Close() ensures proper cleanup.

test/integration/status_dispatcher_test.go (1)

21-25: LGTM - Skip condition correctly restricts test to MQTT broker.

The updated condition properly skips this test for non-MQTT brokers (gRPC and the new PubSub), with an accurate skip message explaining the limitation.

test/integration/controller_test.go (1)

421-445: Transaction usage is correct but has a subtle issue with error handling.

The transactional pattern correctly handles the check-and-create race condition using OnConflict{DoNothing: true}. However, there's a minor issue: when DoNothing is used and a conflict occurs, GORM returns success (no error), but RowsAffected will be 0. The current code correctly doesn't check RowsAffected, which is appropriate since you just want to ensure the record exists.

One consideration: the transaction isolation level may affect behavior. By default, PostgreSQL uses "Read Committed", which should be sufficient here since you're using OnConflict.

Copy link
Contributor

@clyang82 clyang82 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Signed-off-by: Morven Cao <lcao@redhat.com>
Signed-off-by: Morven Cao <lcao@redhat.com>
@morvencao morvencao force-pushed the br_add_pubsub_integration branch from 1387b96 to cea1451 Compare January 13, 2026 10:23
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (3)
test/factories.go (2)

295-307: Consider failing fast on Pub/Sub subscription creation errors.

The current approach logs a warning but allows consumer creation to proceed even if Pub/Sub subscription creation fails. In a test context, this could lead to confusing test failures later when the agent can't receive messages.

Consider failing the test immediately if subscription creation fails, since tests relying on Pub/Sub will fail anyway without proper subscriptions.

Proposed change
 	// If using PubSub message broker, create the corresponding subscriptions and config file
 	if helper.Broker == "pubsub" {
 		configPath, err := helper.createPubSubSubscriptions(name)
 		if err != nil {
-			// Log the error but don't fail consumer creation
-			// The subscriptions can be created manually if needed
-			helper.T.Logf("Warning: failed to create PubSub subscriptions for consumer %s: %v\n", name, err)
+			helper.T.Fatalf("Failed to create PubSub subscriptions for consumer %s: %v", name, err)
 		} else {
 			// Store the config file path in the map
 			helper.consumerPubSubConfigMap[name] = configPath
 			helper.T.Logf("Created PubSub agent config for consumer %s at: %s\n", name, configPath)
 		}
 	}

564-590: Improve error handling for subscription existence check.

Two issues in the subscription creation loop:

  1. Line 569: When GetSubscription returns an error, the code assumes it means "not found" and proceeds to create. However, the error could be a network failure or permission issue, which would then cause a confusing error on CreateSubscription.

  2. Line 586: String matching "already exists" is fragile. Use gRPC status codes for reliability.

Proposed fix using gRPC status codes
+import (
+    "google.golang.org/grpc/codes"
+    "google.golang.org/grpc/status"
+)

 	for _, sub := range subscriptions {
 		// Check if subscription already exists
 		_, err := client.SubscriptionAdminClient.GetSubscription(ctx, &pubsubpb.GetSubscriptionRequest{
 			Subscription: sub.name,
 		})
 		if err == nil {
 			// Subscription already exists, skip
 			continue
 		}
+		// Only proceed to create if the error is NotFound
+		if status.Code(err) != codes.NotFound {
+			return "", fmt.Errorf("failed to check subscription %s: %w", sub.name, err)
+		}

 		// Create the subscription
 		subConfig := &pubsubpb.Subscription{
 			Name:  sub.name,
 			Topic: sub.topic,
 		}
 		if sub.filter != "" {
 			subConfig.Filter = sub.filter
 		}

 		_, err = client.SubscriptionAdminClient.CreateSubscription(ctx, subConfig)
 		if err != nil {
-			// Ignore already exists errors
-			if !strings.Contains(err.Error(), "already exists") {
+			// Ignore AlreadyExists errors (race condition with another test)
+			if status.Code(err) != codes.AlreadyExists {
 				return "", fmt.Errorf("failed to create subscription %s: %w", sub.name, err)
 			}
 		}
 	}
test/helper.go (1)

670-694: Minor: Unreachable code after log.Fatalf.

log.Fatalf calls os.Exit(1) and never returns, making lines 674 and 682 unreachable. While harmless, this could be cleaned up.

Proposed cleanup
 func getProjectRootDir() string {
 	curr, err := os.Getwd()
 	if err != nil {
 		log.Fatalf("Unable to get working directory: %v", err.Error())
-		return ""
 	}
 	root := curr
 	for {
 		anchor := filepath.Join(curr, ".git")
 		_, err = os.Stat(anchor)
 		if err != nil && !os.IsNotExist(err) {
 			log.Fatalf("Unable to check if directory '%s' exists", anchor)
-			break
 		}
📜 Review details

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Cache: Disabled due to data retention organization setting

Knowledge base: Disabled due to Reviews -> Disable Knowledge Base setting

📥 Commits

Reviewing files that changed from the base of the PR and between 1387b96 and cea1451.

📒 Files selected for processing (11)
  • .gitignore
  • .tekton/integration-test.yaml
  • Makefile
  • README.md
  • go.mod
  • hack/init-pubsub-emulator.sh
  • test/factories.go
  • test/helper.go
  • test/integration/controller_test.go
  • test/integration/resource_test.go
  • test/integration/status_dispatcher_test.go
🚧 Files skipped from review as they are similar to previous changes (4)
  • README.md
  • .gitignore
  • test/integration/resource_test.go
  • go.mod
🧰 Additional context used
🧬 Code graph analysis (2)
test/factories.go (4)
pkg/config/config.go (1)
  • ReadFile (110-135)
test/helper.go (1)
  • Helper (67-91)
cmd/maestro/environments/types.go (2)
  • Env (14-25)
  • MessageBroker (35-36)
pkg/config/message_broker.go (1)
  • MessageBrokerConfig (9-16)
test/integration/controller_test.go (4)
pkg/db/session.go (1)
  • SessionFactory (13-21)
pkg/db/migrations/202401151014_add_server_instances.go (1)
  • ServerInstance (9-11)
pkg/db/migrations/202412171429_add_last_heartbeat_and_ready_column_in_server_instances_tables.go (1)
  • ServerInstance (11-14)
pkg/api/metadata_types.go (1)
  • Meta (51-56)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (8)
  • GitHub Check: Red Hat Konflux / maestro-e2e-on-pull-request
  • GitHub Check: Red Hat Konflux / maestro-on-pull-request
  • GitHub Check: e2e-grpc-broker
  • GitHub Check: upgrade
  • GitHub Check: e2e-broadcast-subscription
  • GitHub Check: verify
  • GitHub Check: e2e-with-istio
  • GitHub Check: e2e
🔇 Additional comments (24)
test/integration/status_dispatcher_test.go (1)

22-25: LGTM!

The updated skip condition correctly restricts the StatusDispatcher test to MQTT only, which aligns with the broader broker support expansion (adding Pub/Sub alongside GRPC). The skip message is clear.

test/integration/controller_test.go (1)

421-445: LGTM! Transaction-based instance creation prevents race conditions.

The transactional check-and-create pattern with OnConflict{DoNothing: true} is a robust approach to handle concurrent test execution. This ensures only one "maestro" instance is created even when multiple tests might run in parallel.

test/factories.go (3)

14-22: LGTM!

The imports for Pub/Sub v2 client and related gRPC options are correctly added for the new subscription creation functionality.


496-515: LGTM!

The pubSubConfig struct captures the essential fields needed for Pub/Sub client creation. The error handling with %w formatting preserves the error chain correctly.


612-623: LGTM! Temporary file handling is correct.

The temp file creation and writing pattern is appropriate. The cleanup is handled by helper.cleanupTempFiles() which is registered in teardowns.

hack/init-pubsub-emulator.sh (5)

1-23: LGTM!

Good documentation of environment variables with sensible defaults. The set -e ensures the script fails fast on errors.


24-42: LGTM!

The retry logic with 30 attempts and appropriate timeouts is reasonable for waiting on the emulator to start.


44-64: LGTM!

The create_topic function has proper error handling, correctly distinguishes between success (200/201), already exists (409), and errors, with helpful output for debugging.


66-110: LGTM!

Using jq for JSON payload construction ensures proper escaping of filter expressions. The function maintains consistency with create_topic in error handling.


112-157: LGTM!

The topic and subscription names align with the Go implementation in factories.go and the Makefile configuration. The conditional agent subscription creation based on CONSUMER_NAME is a good design for flexibility.

Makefile (4)

63-68: LGTM!

The Pub/Sub configuration variables are well-defined with sensible defaults that match the standard emulator configuration.


226-242: LGTM!

The test integration targets correctly set up environment variables for each broker type. The PUBSUB_EMULATOR_HOST environment variable is properly included for the Pub/Sub tests.


253-260: LGTM!

The conditional logic correctly distinguishes gRPC (no config file) from MQTT/Pub/Sub (requires config file) when starting the server.


362-376: LGTM!

The Pub/Sub lifecycle targets follow the same pattern as the existing MQTT targets. The emulator container setup is correct with appropriate port mapping and project ID configuration.

.tekton/integration-test.yaml (4)

65-67: LGTM!

Separate test output files for each broker type will make debugging easier when a specific broker's tests fail.


95-102: LGTM!

The Pub/Sub emulator readiness check and initialization follow the same pattern as the database setup, ensuring proper sequencing.


112-120: LGTM!

The result aggregation correctly combines test outcomes from all three broker tests using arithmetic addition.


156-165: LGTM!

The Pub/Sub emulator sidecar configuration matches the Makefile setup and uses the official Google Cloud SDK emulators image.

test/helper.go (6)

32-32: LGTM!

The Pub/Sub options import from open-cluster-management.io/sdk-go is correctly added for v2 API support.


112-120: LGTM!

Environment-driven broker configuration allows flexible test execution across different broker types.


146-167: LGTM!

The broker-specific initialization correctly uses NoopDispatcher for Pub/Sub, which aligns with the StatusDispatcher test being skipped for non-MQTT brokers. The event server and filter configurations are appropriate for each broker type.


366-372: LGTM!

The call to NewCloudEventSourceClient correctly uses type inference from the arguments.


422-429: LGTM!

The cleanup function correctly iterates over all consumer Pub/Sub config files and removes them, gracefully handling already-deleted files.


311-337: LGTM! Per-consumer Pub/Sub configuration is correctly implemented.

The work agent correctly retrieves per-consumer Pub/Sub config from the map populated during consumer creation, enabling isolated subscriptions for each consumer. Both mqtt.BuildMQTTOptionsFromFlags and pubsub.BuildPubSubOptionsFromFlags are available in the pinned sdk-go dependency, and proper error handling is in place for all broker options initialization.

Signed-off-by: Morven Cao <lcao@redhat.com>
@morvencao morvencao force-pushed the br_add_pubsub_integration branch from cea1451 to 8012dfd Compare January 13, 2026 10:33
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
test/helper.go (1)

313-337: Typo: "initilize" should be "initialize".

Minor spelling issue in comments at lines 315 and 326.

Proposed fix
 	case "mqtt":
-		// initilize the mqtt options
+		// initialize the mqtt options
 		mqttOptions, err := mqtt.BuildMQTTOptionsFromFlags(helper.Env().Config.MessageBroker.MessageBrokerConfig)
 	case "pubsub":
 		configPath, exists := helper.getConsumerPubSubConfig(clusterName)
 		if !exists {
 			log.Fatalf("Unable to find pubsub config for consumer %s", clusterName)
 		}
-		// initilize the pubsub options
+		// initialize the pubsub options
 		pubsubOptions, err := pubsub.BuildPubSubOptionsFromFlags(configPath)
🧹 Nitpick comments (1)
test/helper.go (1)

422-429: Consider logging cleanup errors at a higher severity or returning them.

The function logs errors using helper.T.Errorf but always returns nil. This swallows cleanup errors during teardown. While this prevents teardown from failing on cleanup issues, consider whether these errors should propagate or at least be logged more visibly.

📜 Review details

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Cache: Disabled due to data retention organization setting

Knowledge base: Disabled due to Reviews -> Disable Knowledge Base setting

📥 Commits

Reviewing files that changed from the base of the PR and between cea1451 and 8012dfd.

📒 Files selected for processing (6)
  • .tekton/integration-test.yaml
  • Makefile
  • hack/init-pubsub-emulator.sh
  • test/helper.go
  • test/integration/controller_test.go
  • test/integration/resource_test.go
✅ Files skipped from review due to trivial changes (1)
  • Makefile
🚧 Files skipped from review as they are similar to previous changes (2)
  • test/integration/resource_test.go
  • hack/init-pubsub-emulator.sh
🧰 Additional context used
🧬 Code graph analysis (2)
test/integration/controller_test.go (4)
cmd/maestro/environments/types.go (2)
  • Env (14-25)
  • Database (31-33)
pkg/db/session.go (1)
  • SessionFactory (13-21)
pkg/db/migrations/202412171429_add_last_heartbeat_and_ready_column_in_server_instances_tables.go (1)
  • ServerInstance (11-14)
pkg/api/metadata_types.go (1)
  • Meta (51-56)
test/helper.go (5)
test/store.go (1)
  • MemoryStore (15-18)
pkg/dispatcher/dispatcher.go (1)
  • Dispatcher (11-16)
cmd/maestro/server/event_server.go (1)
  • EventServer (26-44)
pkg/config/message_broker.go (1)
  • MessageBrokerConfig (9-16)
pkg/dispatcher/noop_dispatcher.go (1)
  • NewNoopDispatcher (27-33)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (7)
  • GitHub Check: Red Hat Konflux / maestro-e2e-on-pull-request
  • GitHub Check: Red Hat Konflux / maestro-on-pull-request
  • GitHub Check: upgrade
  • GitHub Check: e2e-grpc-broker
  • GitHub Check: e2e-broadcast-subscription
  • GitHub Check: e2e
  • GitHub Check: e2e-with-istio
🔇 Additional comments (17)
test/integration/controller_test.go (2)

13-13: LGTM!

Import is correctly added for the clause.OnConflict usage below.


421-445: LGTM! Race-safe upsert pattern correctly implemented.

The OnConflict{DoNothing: true} clause is the key mechanism preventing duplicate inserts when concurrent tests race to create the "maestro" instance. The transaction wrapper ensures atomicity of the check-and-create sequence.

Note: The "maestro" instance isn't cleaned up at the end of this test (unlike "i1", "i2", "i3"). If this is a shared/persistent instance, that's fine—just wanted to confirm this is intentional.

.tekton/integration-test.yaml (6)

65-67: LGTM!

Good refactoring to separate test output files by broker type, enabling proper result aggregation.


89-91: LGTM!

The Pub/Sub config file generation aligns with the emulator setup on port 8085 with project maestro-test.


95-102: LGTM!

Proper readiness wait using HTTP check before initializing topics/subscriptions via the init script.


105-108: LGTM!

Clean invocation passing separate output files for each broker type.


112-120: LGTM!

Result aggregation correctly combines failures and successes from all three broker tests using arithmetic expansion.


156-165: Pub/Sub sidecar lacks volumeMount unlike other sidecars.

The PostgreSQL and MQTT sidecars have volume mounts for pgdata and mqttdata. The Pub/Sub emulator sidecar has no volume mount. This is likely fine since the emulator stores data in memory, but verify this is intentional.

test/helper.go (9)

32-32: LGTM!

Import for Pub/Sub options correctly added.


71-91: LGTM!

Struct fields appropriately exported for test access. The consumerPubSubConfigMap provides per-consumer config path management needed for Pub/Sub testing.


112-120: LGTM!

Environment variable configuration for message driver type and config path correctly wired into the environment config.


129-137: LGTM!

Proper initialization of the consumerPubSubConfigMap with make(map[string]string).


146-167: LGTM!

Broker switch correctly handles mqtt, pubsub, and default (grpc) cases. The pubsub case appropriately uses NoopDispatcher since Pub/Sub handles its own dispatch semantics.


176-176: LGTM!

Cleanup function properly registered in teardowns to ensure temporary config files are removed.


197-201: LGTM!

Standard Go pattern for map lookup with existence check.


366-372: LGTM!

Updated call to NewCloudEventSourceClient without explicit type parameter aligns with Go's type inference.


673-674: LGTM!

Proper use of log.Fatalf with format specifiers for error logging.

Also applies to: 681-682

Signed-off-by: Morven Cao <lcao@redhat.com>
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (2)
test/helper.go (2)

422-429: Consider returning errors instead of only logging them.

The function logs errors via helper.T.Errorf but always returns nil. This is inconsistent with the function signature and the teardown pattern used elsewhere. If an error occurs, the caller won't know about it from the return value.

♻️ Suggested improvement
 func (helper *Helper) cleanupTempFiles() error {
+	var errs []error
 	for _, pubsubConfigFile := range helper.consumerPubSubConfigMap {
 		if err := os.Remove(pubsubConfigFile); err != nil && !os.IsNotExist(err) {
-			helper.T.Errorf("unable to remove temp pubsub config file %s: %s", pubsubConfigFile, err)
+			errs = append(errs, fmt.Errorf("unable to remove temp pubsub config file %s: %w", pubsubConfigFile, err))
 		}
 	}
+	if len(errs) > 0 {
+		return fmt.Errorf("cleanup errors: %v", errs)
+	}
 	return nil
 }

673-682: Unreachable code after log.Fatalf.

log.Fatalf calls os.Exit(1) after logging, making the subsequent return "" and break statements unreachable. While this doesn't affect functionality, it's cleaner to remove the unreachable code.

♻️ Suggested fix
 func getProjectRootDir() string {
 	curr, err := os.Getwd()
 	if err != nil {
 		log.Fatalf("Unable to get working directory: %v", err.Error())
-		return ""
 	}
 	root := curr
 	for {
 		anchor := filepath.Join(curr, ".git")
 		_, err = os.Stat(anchor)
 		if err != nil && !os.IsNotExist(err) {
 			log.Fatalf("Unable to check if directory '%s' exists", anchor)
-			break
 		}
📜 Review details

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Cache: Disabled due to data retention organization setting

Knowledge base: Disabled due to Reviews -> Disable Knowledge Base setting

📥 Commits

Reviewing files that changed from the base of the PR and between 8012dfd and f840a81.

📒 Files selected for processing (1)
  • test/helper.go
🧰 Additional context used
🧬 Code graph analysis (1)
test/helper.go (6)
pkg/event/event.go (2)
  • EventBroadcaster (23-31)
  • NewEventBroadcaster (34-39)
test/store.go (1)
  • MemoryStore (15-18)
pkg/dispatcher/dispatcher.go (1)
  • Dispatcher (11-16)
cmd/maestro/server/event_server.go (2)
  • EventServer (26-44)
  • NewMessageQueueEventServer (63-75)
cmd/maestro/server/controllers.go (1)
  • ControllersServer (51-56)
pkg/config/message_broker.go (1)
  • MessageBrokerConfig (9-16)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (7)
  • GitHub Check: Red Hat Konflux / maestro-e2e-on-pull-request
  • GitHub Check: Red Hat Konflux / maestro-on-pull-request
  • GitHub Check: e2e
  • GitHub Check: e2e-grpc-broker
  • GitHub Check: upgrade
  • GitHub Check: e2e-broadcast-subscription
  • GitHub Check: e2e-with-istio
🔇 Additional comments (7)
test/helper.go (7)

112-137: LGTM!

The environment variable handling for MESSAGE_DRIVER_TYPE and MESSAGE_DRIVER_CONFIG is straightforward. The initialization of consumerPubSubConfigMap as an empty map is correct.


157-163: LGTM!

The pubsub broker case correctly follows the established pattern from the mqtt case. Using NoopDispatcher for pubsub aligns with the different dispatch semantics for Pub/Sub messaging.


171-177: LGTM!

The cleanupTempFiles is properly added to the teardown sequence and positioned appropriately at the end.


197-201: LGTM!

Clean implementation using Go's comma-ok idiom for map lookup. Thread-safety was addressed in the earlier comment about the struct.


321-331: LGTM!

The pubsub broker handling correctly retrieves the per-consumer config using the cluster name and builds options accordingly. Error handling is consistent with the existing mqtt case.


366-372: LGTM!

The updated call to NewCloudEventSourceClient without an explicit type parameter aligns with the SDK version update. Type inference from the arguments should work correctly.


67-91: The thread-safety concern for consumerPubSubConfigMap is not valid based on actual usage patterns.

The map is accessed only sequentially within each test:

  1. CreateConsumer writes to the map (factories.go:304)
  2. StartWorkAgent immediately reads from the map (helper.go:322) in the same test execution
  3. The goroutine spawned in StartWorkAgent (line 352) does not access the map
  4. cleanupTempFiles reads the map during teardown, after test execution completes

All operations occur in the test's execution context without concurrent reads and writes. The exported fields change is a separate API consideration but does not require synchronization for the consumerPubSubConfigMap based on actual code paths.

Likely an incorrect or invalid review comment.

@morvencao morvencao force-pushed the br_add_pubsub_integration branch from f840a81 to 6cac4b4 Compare January 13, 2026 10:51
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
test/helper.go (1)

670-694: Unreachable code after log.Fatalf.

The return "" statements on lines 674 and 682 are unreachable since log.Fatalf terminates the program. While Go requires them for compilation (the compiler doesn't know Fatalf doesn't return), this pattern differs from the typical test helper approach of returning errors to callers.

Consider whether failing fast is the desired behavior here, as it will terminate the entire test process rather than allowing graceful test failure.

📜 Review details

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Cache: Disabled due to data retention organization setting

Knowledge base: Disabled due to Reviews -> Disable Knowledge Base setting

📥 Commits

Reviewing files that changed from the base of the PR and between f840a81 and 6cac4b4.

📒 Files selected for processing (1)
  • test/helper.go
🧰 Additional context used
🧬 Code graph analysis (1)
test/helper.go (5)
test/store.go (1)
  • MemoryStore (15-18)
pkg/config/config.go (1)
  • ApplicationConfig (15-25)
cmd/maestro/server/healthcheck_server.go (1)
  • HealthCheckServer (21-28)
pkg/dispatcher/dispatcher.go (1)
  • Dispatcher (11-16)
pkg/config/message_broker.go (1)
  • MessageBrokerConfig (9-16)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (7)
  • GitHub Check: Red Hat Konflux / maestro-e2e-on-pull-request
  • GitHub Check: Red Hat Konflux / maestro-on-pull-request
  • GitHub Check: upgrade
  • GitHub Check: e2e-grpc-broker
  • GitHub Check: e2e-broadcast-subscription
  • GitHub Check: e2e
  • GitHub Check: e2e-with-istio
🔇 Additional comments (8)
test/helper.go (8)

67-91: LGTM!

The struct field exports and the new consumerPubSubConfigMap field are well-defined. The map is properly initialized in NewHelper at line 137.


112-120: LGTM!

The environment variable handling for MESSAGE_DRIVER_TYPE and MESSAGE_DRIVER_CONFIG is clean and allows flexible test configuration via the CI pipeline.


146-167: LGTM!

The broker-selection logic correctly differentiates between mqtt, pubsub, and gRPC (default) configurations. The pubsub case using NoopDispatcher is appropriate as Pub/Sub handles message routing differently than MQTT's hash-based dispatch.


171-177: LGTM!

Adding cleanupTempFiles to the teardown chain ensures proper cleanup of temporary Pub/Sub config files.


197-201: LGTM!

The getConsumerPubSubConfig helper correctly retrieves per-consumer Pub/Sub config paths using the standard Go map lookup idiom.


361-384: LGTM!

The updated NewCloudEventSourceClient call aligns with the sdk-go library's API. Type inference handles the generic parameter correctly based on the resourceStore and cloudevents.NewCodec arguments.


422-429: LGTM!

The cleanupTempFiles function correctly iterates over temporary Pub/Sub config files, handles the "file not found" case gracefully, and logs errors without failing the teardown chain.


311-337: LGTM!

The StartWorkAgent function correctly handles the three broker types. The pubsub case appropriately retrieves the per-consumer config path via getConsumerPubSubConfig and uses pubsub.BuildPubSubOptionsFromFlags to initialize the options. Error handling with log.Fatalf is consistent with the existing patterns in this function.

@clyang82 clyang82 merged commit 3119dc4 into openshift-online:main Jan 13, 2026
12 checks passed
@morvencao morvencao deleted the br_add_pubsub_integration branch January 13, 2026 13:57
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants