Skip to content

Conversation

@qiujian16
Copy link
Member

@qiujian16 qiujian16 commented Nov 13, 2025

This commit addresses two related issues:

  1. Adds newly created CSRs to the watcher store to ensure the gRPC driver can retrieve them when the lister is not yet synced
  2. Passes context through RegisterService and handler chain to enable contextual logging throughout the gRPC broker

🤖 Generated with Claude Code

Summary

Related issue(s)

Fixes #

Summary by CodeRabbit

  • Bug Fixes

    • CSR creation events are now persisted to the watcher store after publishing.
  • Improvements

    • Event handling now propagates context for richer logging, error reporting, and cancellation support.
    • Service and handler registration now accept a context, improving lifecycle and subscription management.
    • Subscription and delivery logs include contextual fields for clearer observability.
  • Tests

    • Tests updated to use context-aware registration and handler signatures.

@coderabbitai
Copy link

coderabbitai bot commented Nov 13, 2025

Walkthrough

Propagates context.Context through CloudEvents broker APIs and handlers, updates tests and mocks to pass contexts, adds watcherStore.Add after publishing a CSR CloudEvent with error handling, and removes two intermediate Get+assert steps in an integration watch test.

Changes

Cohort / File(s) Summary
Broker core (context propagation)
pkg/cloudevents/server/grpc/broker.go
Added context.Context to RegisterService, register, unregister, and resourceHandler; propagate ctx into handler invocations and protobuf write calls; switch to context-derived logging; use ctx when unregistering on errors/cancellation.
Public interfaces / service store
pkg/cloudevents/server/interface.go, pkg/cloudevents/server/store.go
Updated exported method signatures to accept context.Context as first parameter: AgentEventServer.RegisterService and Service.RegisterHandler.
Tests and test helpers
pkg/cloudevents/server/grpc/broker_heartbeat_test.go, pkg/cloudevents/server/grpc/broker_test.go, pkg/server/grpc/metrics/metrics_test.go, test/integration/cloudevents/suite_test.go
Updated RegisterService call sites to pass context.Background(); adjusted mock/test service RegisterHandler signatures to accept context.Context.
Integration service implementations
test/integration/cloudevents/broker/services/resource.go
RegisterHandler signature updated to accept a context parameter (unused _).
CSR client store update
pkg/cloudevents/clients/csr/client.go
After publishing a CSR CloudEvent, adds the CSR to watcherStore and returns an internal CloudEvents error if Add fails.
Integration watch test adjustments
test/integration/cloudevents/manifestworkclients_watch_test.go
Removed two intermediate Get calls and their associated error checks/logging; test flow proceeds to add finalizer and update work status without re-fetching object/resourceVersion logs.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

  • Review context propagation correctness in pkg/cloudevents/server/grpc/broker.go (cancellation handling, goroutine lifecycle, and unregister on error).
  • Confirm all interface signature changes are consistently applied across callers, mocks, and tests.
  • Verify pkg/cloudevents/clients/csr/client.go watcherStore.Add semantics and CloudEvents error mapping.
  • Check integration test reduction of Get calls for potential race/window issues.

Possibly related PRs

Suggested labels

lgtm

Suggested reviewers

  • deads2k
  • skeeey

Pre-merge checks and finishing touches

❌ Failed checks (1 warning, 1 inconclusive)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 14.29% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
Description check ❓ Inconclusive The description covers the two main issues addressed but lacks a Summary section and Related issue(s) section as specified in the template. Add a clear Summary section and fill in the Related issue(s) field (even if no issue number exists, acknowledge this in the description).
✅ Passed checks (1 passed)
Check name Status Explanation
Title check ✅ Passed The title accurately describes both main changes: adding CSR to local store and passing context through the broker, aligning with the comprehensive changeset.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

📜 Recent review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 4524ee0 and 3f3fada.

📒 Files selected for processing (10)
  • pkg/cloudevents/clients/csr/client.go (1 hunks)
  • pkg/cloudevents/server/grpc/broker.go (12 hunks)
  • pkg/cloudevents/server/grpc/broker_heartbeat_test.go (4 hunks)
  • pkg/cloudevents/server/grpc/broker_test.go (2 hunks)
  • pkg/cloudevents/server/interface.go (1 hunks)
  • pkg/cloudevents/server/store.go (1 hunks)
  • pkg/server/grpc/metrics/metrics_test.go (2 hunks)
  • test/integration/cloudevents/broker/services/resource.go (1 hunks)
  • test/integration/cloudevents/manifestworkclients_watch_test.go (0 hunks)
  • test/integration/cloudevents/suite_test.go (1 hunks)
💤 Files with no reviewable changes (1)
  • test/integration/cloudevents/manifestworkclients_watch_test.go
🚧 Files skipped from review as they are similar to previous changes (4)
  • test/integration/cloudevents/suite_test.go
  • pkg/cloudevents/server/grpc/broker_test.go
  • pkg/cloudevents/clients/csr/client.go
  • test/integration/cloudevents/broker/services/resource.go
🧰 Additional context used
🧠 Learnings (3)
📚 Learning: 2025-09-17T13:29:00.675Z
Learnt from: qiujian16
Repo: open-cluster-management-io/sdk-go PR: 147
File: pkg/cloudevents/generic/options/grpc/protocol/heartbeat_integration_test.go:66-68
Timestamp: 2025-09-17T13:29:00.675Z
Learning: In the pkg/cloudevents/generic/options/grpc/protocol package, bufSize is already defined as a constant in protocal_test.go and is accessible to other test files in the same package, including heartbeat_integration_test.go.

Applied to files:

  • pkg/cloudevents/server/grpc/broker_heartbeat_test.go
📚 Learning: 2025-11-11T13:27:36.331Z
Learnt from: morvencao
Repo: open-cluster-management-io/sdk-go PR: 162
File: pkg/cloudevents/generic/options/pubsub/options.go:157-174
Timestamp: 2025-11-11T13:27:36.331Z
Learning: For open-cluster-management PubSub transport in pkg/cloudevents/generic/options/pubsub: broadcast topics (SourceBroadcast, AgentBroadcast) and their corresponding subscriptions are always required, not optional. The omitempty tags on types.Topics broadcast fields exist because the struct is shared with MQTT (where broadcasts are optional), but PubSub requires all broadcast channels for resync functionality.

Applied to files:

  • pkg/cloudevents/server/grpc/broker.go
📚 Learning: 2025-09-16T02:22:20.929Z
Learnt from: skeeey
Repo: open-cluster-management-io/sdk-go PR: 144
File: pkg/cloudevents/generic/options/grpc/protocol/protocol.go:200-213
Timestamp: 2025-09-16T02:22:20.929Z
Learning: In the GRPC CloudEvents protocol implementation, when startEventsReceiver encounters a stream error, it sends the error to reconnectErrorChan. The consumer of this channel handles the error by calling Close() on the protocol, which triggers close(p.closeChan), causing OpenInbound to unblock and call cancel() to properly terminate both the events receiver and heartbeat watcher goroutines.

Applied to files:

  • pkg/cloudevents/server/grpc/broker.go
🧬 Code graph analysis (3)
pkg/cloudevents/server/store.go (1)
pkg/cloudevents/server/interface.go (1)
  • EventHandler (24-33)
pkg/cloudevents/server/grpc/broker.go (2)
pkg/cloudevents/server/store.go (1)
  • Service (13-25)
pkg/cloudevents/generic/options/grpc/protocol/write_message.go (1)
  • WritePBMessage (20-32)
pkg/cloudevents/server/interface.go (2)
pkg/cloudevents/generic/types/types.go (1)
  • CloudEventsDataType (165-169)
pkg/cloudevents/server/store.go (1)
  • Service (13-25)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
  • GitHub Check: unit
  • GitHub Check: integration
  • GitHub Check: verify
🔇 Additional comments (2)
pkg/server/grpc/metrics/metrics_test.go (2)

47-47: LGTM! Context parameter added correctly.

The addition of context.Background() as the first parameter to RegisterService aligns with the PR's context propagation objective and is appropriate for test setup.


222-222: LGTM! Mock interface correctly updated.

The RegisterHandler method signature correctly adds the context.Context parameter to match the updated server.Service interface.


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@openshift-ci openshift-ci bot requested a review from deads2k November 13, 2025 05:02
@openshift-ci
Copy link

openshift-ci bot commented Nov 13, 2025

[APPROVALNOTIFIER] This PR is APPROVED

This pull-request has been approved by: qiujian16

The full list of commands accepted by this bot can be found here.

The pull request process is described here

Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (1)
pkg/cloudevents/server/grpc/broker_test.go (1)

56-59: Consider passing a proper context instead of context.TODO().

The test helper method create uses context.TODO() when calling the handler. While acceptable for test code, consider accepting a context parameter and passing it through for better test fidelity.

Apply this diff if you'd like to improve context handling:

-func (s *testService) create(evt *cloudevents.Event) error {
+func (s *testService) create(ctx context.Context, evt *cloudevents.Event) error {
 	s.evts[evt.ID()] = evt
-	return s.handler.OnCreate(context.TODO(), dataType, evt.ID())
+	return s.handler.OnCreate(ctx, dataType, evt.ID())
 }

Then update the call site at line 122:

-	if err := svc.create(&evt2); err != nil {
+	if err := svc.create(ctx, &evt2); err != nil {
 		t.Fatal(err)
 	}
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 8266e17 and d39d4a0.

📒 Files selected for processing (9)
  • pkg/cloudevents/clients/csr/client.go (1 hunks)
  • pkg/cloudevents/server/grpc/broker.go (12 hunks)
  • pkg/cloudevents/server/grpc/broker_heartbeat_test.go (4 hunks)
  • pkg/cloudevents/server/grpc/broker_test.go (2 hunks)
  • pkg/cloudevents/server/interface.go (1 hunks)
  • pkg/cloudevents/server/store.go (1 hunks)
  • pkg/server/grpc/metrics/metrics_test.go (2 hunks)
  • test/integration/cloudevents/broker/services/resource.go (1 hunks)
  • test/integration/cloudevents/suite_test.go (1 hunks)
🧰 Additional context used
🧠 Learnings (3)
📚 Learning: 2025-09-17T13:29:00.675Z
Learnt from: qiujian16
Repo: open-cluster-management-io/sdk-go PR: 147
File: pkg/cloudevents/generic/options/grpc/protocol/heartbeat_integration_test.go:66-68
Timestamp: 2025-09-17T13:29:00.675Z
Learning: In the pkg/cloudevents/generic/options/grpc/protocol package, bufSize is already defined as a constant in protocal_test.go and is accessible to other test files in the same package, including heartbeat_integration_test.go.

Applied to files:

  • pkg/cloudevents/server/grpc/broker_heartbeat_test.go
📚 Learning: 2025-11-11T13:27:36.331Z
Learnt from: morvencao
Repo: open-cluster-management-io/sdk-go PR: 162
File: pkg/cloudevents/generic/options/pubsub/options.go:157-174
Timestamp: 2025-11-11T13:27:36.331Z
Learning: For open-cluster-management PubSub transport in pkg/cloudevents/generic/options/pubsub: broadcast topics (SourceBroadcast, AgentBroadcast) and their corresponding subscriptions are always required, not optional. The omitempty tags on types.Topics broadcast fields exist because the struct is shared with MQTT (where broadcasts are optional), but PubSub requires all broadcast channels for resync functionality.

Applied to files:

  • pkg/cloudevents/server/grpc/broker.go
📚 Learning: 2025-09-16T02:22:20.929Z
Learnt from: skeeey
Repo: open-cluster-management-io/sdk-go PR: 144
File: pkg/cloudevents/generic/options/grpc/protocol/protocol.go:200-213
Timestamp: 2025-09-16T02:22:20.929Z
Learning: In the GRPC CloudEvents protocol implementation, when startEventsReceiver encounters a stream error, it sends the error to reconnectErrorChan. The consumer of this channel handles the error by calling Close() on the protocol, which triggers close(p.closeChan), causing OpenInbound to unblock and call cancel() to properly terminate both the events receiver and heartbeat watcher goroutines.

Applied to files:

  • pkg/cloudevents/server/grpc/broker.go
🧬 Code graph analysis (8)
test/integration/cloudevents/broker/services/resource.go (1)
pkg/cloudevents/server/interface.go (1)
  • EventHandler (24-33)
pkg/cloudevents/server/store.go (1)
pkg/cloudevents/server/interface.go (1)
  • EventHandler (24-33)
pkg/cloudevents/server/interface.go (2)
pkg/cloudevents/generic/types/types.go (1)
  • CloudEventsDataType (165-169)
pkg/cloudevents/server/store.go (1)
  • Service (13-25)
pkg/cloudevents/server/grpc/broker_test.go (1)
pkg/cloudevents/server/interface.go (1)
  • EventHandler (24-33)
pkg/server/grpc/metrics/metrics_test.go (2)
pkg/cloudevents/clients/work/payload/manifestbundle.go (1)
  • ManifestBundleEventDataType (11-15)
pkg/cloudevents/server/interface.go (1)
  • EventHandler (24-33)
pkg/cloudevents/clients/csr/client.go (2)
pkg/cloudevents/clients/errors/errors.go (1)
  • ToStatusError (18-30)
pkg/cloudevents/clients/common/common.go (1)
  • CSRGR (40-40)
test/integration/cloudevents/suite_test.go (1)
pkg/cloudevents/clients/work/payload/manifestbundle.go (1)
  • ManifestBundleEventDataType (11-15)
pkg/cloudevents/server/grpc/broker.go (4)
pkg/cloudevents/generic/types/types.go (1)
  • CloudEventsDataType (165-169)
pkg/cloudevents/server/store.go (1)
  • Service (13-25)
pkg/cloudevents/server/grpc/metrics/metrics.go (1)
  • IncGRPCCESubscribersMetric (305-307)
pkg/cloudevents/generic/options/grpc/protocol/write_message.go (1)
  • WritePBMessage (20-32)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
  • GitHub Check: unit
  • GitHub Check: integration
  • GitHub Check: verify
🔇 Additional comments (15)
test/integration/cloudevents/broker/services/resource.go (1)

65-67: Context-aware handler registration looks good.

Signature matches the updated server.Service contract; the handler assignment stays untouched and remains correct.

pkg/cloudevents/server/grpc/broker_heartbeat_test.go (1)

107-107: Context propagation through test setup is consistent.

Each registration now supplies a context, keeping the heartbeat tests in step with the broker’s updated API.

Also applies to: 165-165, 199-199, 320-320

pkg/cloudevents/server/interface.go (1)

18-18: RegisterService contract update looks right.

The interface now exposes context, enabling downstream callers to propagate cancellation and logging metadata.

pkg/cloudevents/server/store.go (1)

24-24: Service interface context alignment confirmed.

Adding ctx keeps handler registration compatible with the broader context-aware flow.

test/integration/cloudevents/suite_test.go (1)

139-139: Integration suite registration updated correctly.

Passing context.Background() here satisfies the broker’s new signature without altering test behavior.

pkg/cloudevents/server/grpc/broker_test.go (2)

52-52: LGTM: Test service updated to match new interface.

The RegisterHandler signature correctly matches the updated server.Service interface with the context parameter.


70-70: LGTM: Service registration updated with context.

The RegisterService call correctly passes context.Background() as the first parameter, matching the updated API.

pkg/server/grpc/metrics/metrics_test.go (2)

47-47: LGTM: Broker registration updated with context.

The RegisterService call correctly passes context.Background() to match the updated API.


222-222: LGTM: Mock service updated to match interface.

The RegisterHandler signature correctly matches the updated server.Service interface. It's acceptable for the mock to not use the context parameter.

pkg/cloudevents/server/grpc/broker.go (6)

31-31: LGTM: Handler type updated for context propagation.

The resourceHandler function type now accepts a context parameter, enabling proper context propagation through the event handling chain.


63-65: LGTM: Service registration now context-aware.

The RegisterService method correctly accepts and propagates context to the service's RegisterHandler, enabling contextual logging throughout the service lifecycle.


125-143: LGTM: Context-aware subscriber registration with structured logging.

The register method properly:

  • Accepts context as the first parameter
  • Uses klog.FromContext(ctx) for context-aware logging
  • Enables structured logging with subscriber details

177-178: LGTM: Context enrichment improves observability.

Enriching the context with cluster name and storing it back enables all downstream logging to automatically include this contextual information, improving traceability.


222-239: LGTM: Handler registration properly propagates context.

The handler function correctly:

  • Receives context as the first parameter
  • Passes context to grpcprotocol.WritePBMessage (line 225)
  • Uses context-derived logger for structured logging (line 231)
  • Properly handles context cancellation (line 234-235)

392-392: LGTM: Handler invocation propagates context.

The subscriber handler is correctly called with the context parameter, enabling context propagation through the entire event handling pipeline.

Comment on lines 70 to 74
// we need to add to the store here since grpc driver may call this when it cannot
// get from lister.
if err := c.watcherStore.Add(csr); err != nil {
return nil, cloudeventserrors.ToStatusError(common.CSRGR, csr.Name, err)
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

Use NewInternalError for local store errors and verify intended behavior on partial failure.

Line 73 uses cloudeventserrors.ToStatusError for a local store error, but this function is designed to handle CloudEvents/gRPC publish errors (as shown in the relevant snippet from errors/errors.go). All other store operations in this file (lines 52, 83, 96, 110) consistently use errors.NewInternalError for store errors.

Additionally, if Publish succeeds (line 66) but watcherStore.Add fails (line 72), the CSR is already published remotely but the caller receives an error. This might confuse callers into thinking the CSR wasn't created. Since the PR description indicates the store is eventually consistent via watch, consider whether store add failures should be logged rather than returned as errors.

Apply this diff to use the consistent error type:

 	if err := c.watcherStore.Add(csr); err != nil {
-		return nil, cloudeventserrors.ToStatusError(common.CSRGR, csr.Name, err)
+		return nil, errors.NewInternalError(err)
 	}

Please verify: what is the intended behavior if publishing succeeds but adding to the store fails? Should the operation be considered successful (since the CSR is created remotely and will eventually appear in the store via watch)?


🏁 Script executed:

#!/bin/bash
# Check the error handling functions and their usage patterns
echo "=== Checking error.go/errors.go for function definitions ==="
fd -e 'go' -path '*errors*' -type f | head -20

echo ""
echo "=== Searching for ToStatusError and NewInternalError definitions ==="
rg -n 'func.*ToStatusError|func.*NewInternalError' --type go

echo ""
echo "=== Checking lines 52, 83, 96, 110 in the CSR client file ==="
rg -n -B2 -A2 'watcherStore' pkg/cloudevents/clients/csr/client.go | head -100

echo ""
echo "=== Getting full context around lines 50-115 ==="
sed -n '50,115p' pkg/cloudevents/clients/csr/client.go

Length of output: 4311


🏁 Script executed:

#!/bin/bash
# Get ToStatusError function definition and comments
echo "=== ToStatusError function definition and context ==="
rg -n -B5 -A15 'func ToStatusError' pkg/cloudevents/clients/errors/errors.go

echo ""
echo "=== Imports in the CSR client to verify error packages ==="
head -30 pkg/cloudevents/clients/csr/client.go | grep -E 'import|errors'

Length of output: 1085


Use NewInternalError for local store errors and verify intended behavior on partial failure.

Line 73 uses cloudeventserrors.ToStatusError for a local store error, but this function is designed to handle CloudEvents/gRPC publish errors (as shown in the relevant snippet from errors/errors.go). All other store operations in this file (lines 52, 83, 96, 110) consistently use errors.NewInternalError for store errors.

Additionally, if Publish succeeds (line 69) but watcherStore.Add fails (line 73), the CSR is already published remotely but the caller receives an error. This might confuse callers into thinking the CSR wasn't created. Since the PR description indicates the store is eventually consistent via watch, consider whether store add failures should be logged rather than returned as errors.

Apply this diff to use the consistent error type:

 	if err := c.watcherStore.Add(csr); err != nil {
-		return nil, cloudeventserrors.ToStatusError(common.CSRGR, csr.Name, err)
+		return nil, errors.NewInternalError(err)
 	}

Please verify: what is the intended behavior if publishing succeeds but adding to the store fails? Should the operation be considered successful (since the CSR is created remotely and will eventually appear in the store via watch)?

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// we need to add to the store here since grpc driver may call this when it cannot
// get from lister.
if err := c.watcherStore.Add(csr); err != nil {
return nil, cloudeventserrors.ToStatusError(common.CSRGR, csr.Name, err)
}
// we need to add to the store here since grpc driver may call this when it cannot
// get from lister.
if err := c.watcherStore.Add(csr); err != nil {
return nil, errors.NewInternalError(err)
}
🤖 Prompt for AI Agents
In pkg/cloudevents/clients/csr/client.go around lines 70 to 74, replace the call
to cloudeventserrors.ToStatusError for the local watcherStore.Add failure with
errors.NewInternalError to match other local store error handling, and decide
how to handle the partial-failure case where Publish succeeded but
watcherStore.Add failed: if the system is intended to be eventually consistent
(store will be populated via watch), log the Add error at warning/error level
and return nil so callers treat the operation as successful; otherwise, return
the new internal error to signal failure. Ensure the chosen behavior is
consistent with the rest of the file and add a concise comment explaining why
Add failures are logged (if you choose the log-and-succeed approach).

@qiujian16 qiujian16 force-pushed the fix-csr branch 3 times, most recently from 775d6fb to 35e55e4 Compare November 13, 2025 05:14

klog.V(10).Infof("unregister subscriber %s", id)
logger := klog.FromContext(ctx)
logger.V(10).Info("unregister subscriber", "id", id)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
logger.V(10).Info("unregister subscriber", "id", id)
logger.V(4).Info("unregister subscriber", "id", id)

This commit addresses two related issues:
1. Adds newly created CSRs to the watcher store to ensure the gRPC driver
   can retrieve them when the lister is not yet synced
2. Passes context through RegisterService and handler chain to enable
   contextual logging throughout the gRPC broker

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <[email protected]>
Signed-off-by: Jian Qiu <[email protected]>
@skeeey
Copy link
Member

skeeey commented Nov 13, 2025

/lgtm

@openshift-ci openshift-ci bot added the lgtm label Nov 13, 2025
@openshift-merge-bot openshift-merge-bot bot merged commit e18ff00 into open-cluster-management-io:main Nov 13, 2025
12 checks passed
@qiujian16 qiujian16 deleted the fix-csr branch November 13, 2025 06:40
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants