Skip to content

Conversation

@dkorittki
Copy link
Contributor

@dkorittki dkorittki commented Oct 10, 2025

Summary by CodeRabbit

  • New Features
    • Cosmo Streams v1: hook-driven subscription lifecycle (on-start, inbound receive, outbound publish), provider-agnostic event model, mutable/immutable event support, and multi-event publish across NATS/Kafka/Redis.
  • Configuration
    • New handlers settings to tune receive concurrency and handler timeouts; runtime option to supply streams handler configuration.
  • Bug Fixes
    • Improved streams error mapping and subscription error propagation to GraphQL responses.
  • Tests
    • Extensive unit & integration coverage for hooks, cross-provider flows, concurrency, timeouts, error and panic recovery.
  • Documentation
    • ADR/RFC and examples for Streams v1, hook usage and event transformation.

✏️ Tip: You can customize this high-level summary in your review settings.

Checklist

  • I have discussed my proposed changes in an issue and have received approval to proceed.
  • I have followed the coding standards of the project.
  • Tests or benchmarks have been added or updated.
  • Documentation has been updated on https://github.com/wundergraph/cosmo-docs.
  • I have read the Contributors Guide.

Context

This pull request implements Cosmo Streams v1. It rebrands and extends EDFS by the ability to add hooks in form of new custom modules:

  • SubscriptionOnStart: A handler running before a subscription is established
  • OnReceiveEvents: A handler running whenever events are received from a message provider, before being passed to clients
  • OnPublishEvents: A handler running before events are getting sent to message providers

These custom modules extend EDFS´s capabilities by allowing custom authorization logic, event filtering, etc.

The preview docs explains this in more detail. It's a good place to see what the hooks are capable of and how to use them.
https://wundergraphinc-topics-streams-v1.mintlify.app/overview

Related pull requests

There are two other pull requests, which contribute to the initial release of Cosmo Streams.

Cosmo Docs: wundergraph/cosmo-docs#187
Engine: wundergraph/graphql-go-tools#1309

Important

This pull request must not be merged before the engine pull request has been merged. The router´s go.mod in this pull request currently points to a version of the engine from the Cosmo Streams branch and once we have an official version of the engine with these changes included, this pull requests gets updated to use that.

alepane21 and others added 2 commits October 10, 2025 12:13
Co-authored-by: Ludwig Bedacht <[email protected]>
Co-authored-by: StarpTech <[email protected]>
Co-authored-by: Dominik Korittki <[email protected]>
Co-authored-by: Ludwig Bedacht <[email protected]>
Co-authored-by: StarpTech <[email protected]>
Co-authored-by: Dominik Korittki <[email protected]>
@coderabbitai
Copy link

coderabbitai bot commented Oct 10, 2025

Walkthrough

Adds Cosmo Streams v1: new provider-agnostic event types (ProviderType, StreamEvent/MutableStreamEvent, StreamEvents), hook interfaces/contexts (SubscriptionOnStart, OnReceiveEvents, OnPublishEvents), wiring of hooks through loader/core/pubsub/provider layers, provider/adapter/engine-datasource API refactors, concurrency/timeouts/panic-recovery, and extensive tests.

Changes

Cohort / File(s) Summary
Docs
adr/cosmo-streams-v1.md, rfc/cosmo-streams-v1.md
New ADR/RFC describing Streams v1 design, types, hook interfaces/contexts, examples and compatibility notes.
Core wiring & GraphQL
router/core/*.go, router/core/subscriptions_modules.go, router/core/router_config.go, router/core/executor.go, router/core/factoryresolver.go, router/core/plan_generator.go, router/core/graph_server.go, router/core/graphql_handler.go, router/core/errors.go, router/core/supervisor_instance.go
Introduces subscriptionHooks types and wiring; exposes hook handler/context interfaces and factory adapters; threads hooks through loader/plan/executor/graph server; adds StreamHandlerError handling and WithStreamsHandlerConfiguration option.
Pub/Sub datasource core
router/pkg/pubsub/datasource/* (e.g., datasource.go, hooks.go, pubsubprovider.go, subscription_datasource.go, subscription_event_updater.go, planner.go, factory.go, mocks*.go)
Adds Hooks API types and container; PlannerConfig gains Providers/Hooks; provider SetHooks/Publish/Subscribe API expanded; PubSub provider applies publish hooks with panic recovery; generic PubSub subscription datasource; subscription event updater with per-subscription concurrency, timeout and panic recovery; mocks updated.
Provider adapters & engine datasources
router/pkg/pubsub/{kafka,nats,redis}/* (adapters, engine_datasource*, provider_builder*, engine_datasource_factory*, tests, mocks)
Adapters refactored to use datasource.Adapter and datasource.SubscriptionEventUpdater; added Event/MutableEvent types and Clone/SetData; Publish now accepts []StreamEvent; subscription/publish config shapes changed (provider/fieldName); UniqueRequestID plumbing added; extensive test and mock revisions.
PubSub integration & build
router/pkg/pubsub/pubsub.go, router/pkg/pubsub/pubsub_test.go
BuildProvidersAndDataSources extended to accept logger and hooks; providers built into a map, SetHooks called on providers, and providers/hooks passed into planner/factory.
Router config & schema
router/pkg/config/*, router/pkg/config/config.schema.json, router/pkg/config/fixtures/full.yaml, router/pkg/config/testdata/*
Adds StreamsHandlerConfiguration and OnReceiveEvents configuration (max_concurrent_handlers, handler_timeout), wires into EventsConfiguration and schema/defaults/fixtures.
Tests & test utilities
router-tests/events/*, router-tests/modules/*, router-tests/modules/*_test.go, router-tests/events/utils.go, router-tests/go.mod
Tests reorganized to external packages, test helpers refactored, new Kafka/NATS/Redis utilities (KafkaEnsureTopicExists, timed ProduceKafkaMessage, ReadKafkaMessages, ReadRedisMessages), and many new integration/unit tests for hooks and modules.
Demo changes
demo/pkg/subgraphs/.../schema.resolvers.go
Demo resolvers updated to publish using new PublishEventConfiguration and pass []datasource.StreamEvent (MutableEvent) slices.
Mocks & mock config
router/.mockery.yml, router/pkg/pubsub/*/mocks*.go
Mock generation config updated; mocks renamed/rewired to datasource types, new SubscriptionUpdater mocks added, old Adapter mocks removed where replaced.
Misc / go.mod updates
router/go.mod, router-tests/go.mod
Dependency version updates and formatting tweaks.

Estimated code review effort

🎯 5 (Critical) | ⏱️ ~120 minutes

Areas to focus on:

  • router/pkg/pubsub/datasource/subscription_event_updater.go — semaphore/goroutine lifecycle, deadline/timeout, per-subscription error/close semantics, panic recovery.
  • router/core/subscriptions_modules.go — hook context implementations, NewEvent/EmitEvent semantics, immutable vs mutable Clone/SetData behavior.
  • router/pkg/pubsub/datasource/pubsubprovider.go — applyPublishEventHooks chaining, panic recovery, nil-event filtering, eventBuilder usage.
  • router/pkg/pubsub/{kafka,nats,redis} — runtime type assertions, Publish multi-event loops, headers/keys mapping, UniqueRequestID and JSON template marshaling.
  • Mocks/tests — ensure mock signatures align with datasource interfaces and updated test helper semantics/timeouts.

Possibly related PRs

Pre-merge checks

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 12.00% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title 'feat(router): implement Cosmo Streams v1' accurately reflects the main objective of this large changeset, which introduces the complete Cosmo Streams v1 implementation including new hook-based subscription and event handling interfaces throughout the router package.

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@dkorittki dkorittki changed the title Topic/streams v1 Cosmo Streams v1 Oct 10, 2025
Merge conflict was on go.mod files in router and router-tests,
because a new upstream engine version was used.
I go-getted the latest engine version of the topic branch,
which includes the latest engine changes + cosmo streams.
@github-actions
Copy link

github-actions bot commented Oct 17, 2025

Router-nonroot image scan passed

✅ No security vulnerabilities found in image:

ghcr.io/wundergraph/cosmo/router:sha-64eec426d60dd7eb14e9c7dd303da1f8ce904343-nonroot

@dkorittki dkorittki changed the title Cosmo Streams v1 feat(router): implement Cosmo Streams v1 Nov 14, 2025
@dkorittki dkorittki requested a review from jensneuse November 14, 2025 11:58
@dkorittki dkorittki marked this pull request as ready for review November 14, 2025 11:59
@dkorittki dkorittki requested a review from JivusAyrus November 14, 2025 11:59
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
router/core/errors.go (1)

38-38: Consider renaming for consistency with Cosmo Streams v1.

The constant uses errorTypeEDFSHookError naming while this PR implements Cosmo Streams v1 and checks for StreamHandlerError. Since the PR objectives indicate this is rebranding EDFS to Cosmo Streams, consider errorTypeStreamHookError for consistency.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between d2946a1 and cdcbf93.

📒 Files selected for processing (2)
  • router-tests/modules/stream_receive_test.go (1 hunks)
  • router/core/errors.go (2 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • router-tests/modules/stream_receive_test.go
🧰 Additional context used
🧠 Learnings (1)
📓 Common learnings
Learnt from: endigma
Repo: wundergraph/cosmo PR: 2141
File: router-tests/http_subscriptions_test.go:17-55
Timestamp: 2025-08-28T09:17:49.477Z
Learning: The Cosmo router uses a custom, intentionally rigid multipart implementation for GraphQL subscriptions. The multipart parsing in test files should remain strict and not be made more tolerant, as this rigidity is by design.
🧬 Code graph analysis (1)
router/core/errors.go (1)
router/core/subscriptions_modules.go (1)
  • StreamHandlerError (448-451)
🔇 Additional comments (1)
router/core/errors.go (1)

81-84: No action required—StreamHandlerError correctly implements the error interface.

Verification confirms that StreamHandlerError has the required Error() string method (router/core/subscriptions_modules.go:454-456), so the errors.As check will work as intended and properly classify hook errors.

This actually introduces a bug, where occasionally subscription clients will get disconnected after receiving messages when we deal with lots of connecting and disconnecting clients. It depends on two contexts being regularly checked inside the Kafka topic poller and if one of them cancels, the connections would get closed. These contexts are meant to handle the lifecycle of the Kafka adapter, not subscriptions. So we should not close the subscription updater when the poller is canceled.

After this commit it behaves like the router behaves before Cosmo Streams. All of this can be reevaluated, maybe there is room for improvement. But for now its better not to change this bevaviour.
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (6)
adr/cosmo-streams-v1.md (5)

23-26: Clarify language and naming terminology.

Several lines have minor wording issues that could be tightened:

  • Line 24: "prior to delivery" → suggest "before delivery" (more concise)
  • Line 25: "going to be sent" → suggest "will be sent" (more direct)
- `StreamReceiveEventHandler`: Triggered for each client/subscription when a batch of events is received from the provider, prior to delivery.
- `StreamPublishEventHandler`: Called each time a batch of events is going to be sent to the provider.
+ `StreamReceiveEventHandler`: Triggered for each client/subscription when a batch of events are received from the provider, before delivery.
+ `StreamPublishEventHandler`: Called each time a batch of events will be sent to the provider.

219-219: Apply hyphenation to compound adjectives.

Two instances where compound adjectives should be hyphenated for clarity:

  • Line 219: "to/from Federation compatible" → "Federation-compatible"
  • Line 225: "fully backwards compatible" → "fully backward-compatible"
- **Data mapping**: Transforming events data from the format that could be used by the external system to/from Federation compatible Router events
+ **Data mapping**: Transforming events data from the format that could be used by the external system to/from Federation-compatible Router events
- The new hooks can be integrated in the router in a fully backwards compatible way.
+ The new hooks can be integrated in the router in a fully backward-compatible way.

Also applies to: 225-225


235-235: Correct emphasis and heading formatting.

Line 235 uses underscores (__text__) for strong emphasis instead of asterisks (**text**), and treats strong text as a heading when it should be a proper Markdown heading (using ##).

- __All examples reflect the current implementation and match the actual API__
+ ## All examples reflect the current implementation and match the actual API

289-292: Replace hard tabs with spaces in example code blocks.

Additional hard tabs detected in the Go code examples (lines 289–292, 294, 428–431, 437–438, 445–446). Replace with consistent space indentation.

Also applies to: 294-294, 428-431, 437-438, 445-446


87-189: Replace hard tabs with spaces in interface definitions (lines 87–189).

Hard tabs detected in all interface method definitions and comments. Replace with spaces for consistency and portability across different editors and tools.

router/pkg/pubsub/kafka/adapter.go (1)

126-180: Consider explicitly closing the per-subscription Kafka consumer client

In Subscribe, a new kgo.Client is created per subscription and used only inside the poller goroutine. When topicPoller returns (either on shutdown or error), the goroutine just exits; the consumer client is never explicitly closed here.

To avoid potential lingering background goroutines or connections in the franz-go client, consider closing it when the poller ends:

go func() {

-   defer p.closeWg.Done()
+   defer p.closeWg.Done()
+   defer client.Close()

    err := p.topicPoller(ctx, client, updater, PollerOpts{providerId: conf.ProviderID()})
    if err != nil {
        // ...
    }
}()

If the consumer lifecycle is already managed elsewhere, this can be skipped, but as written this function appears to be the only owner of client.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between cdcbf93 and 551ef84.

📒 Files selected for processing (2)
  • adr/cosmo-streams-v1.md (1 hunks)
  • router/pkg/pubsub/kafka/adapter.go (5 hunks)
🧰 Additional context used
🧠 Learnings (4)
📓 Common learnings
Learnt from: endigma
Repo: wundergraph/cosmo PR: 2141
File: router-tests/http_subscriptions_test.go:17-55
Timestamp: 2025-08-28T09:17:49.477Z
Learning: The Cosmo router uses a custom, intentionally rigid multipart implementation for GraphQL subscriptions. The multipart parsing in test files should remain strict and not be made more tolerant, as this rigidity is by design.
📚 Learning: 2025-11-16T11:52:34.064Z
Learnt from: dkorittki
Repo: wundergraph/cosmo PR: 2273
File: router/pkg/pubsub/redis/engine_datasource.go:37-42
Timestamp: 2025-11-16T11:52:34.064Z
Learning: In router/pkg/pubsub/redis/engine_datasource.go (and similar files for kafka/nats), the MutableEvent.GetData() method intentionally returns the internal Data slice without cloning, while Event.GetData() clones the slice. This is by design: MutableEvent is designed to be mutable (so callers can modify the data), while Event is immutable (cloning prevents modification). This difference is part of the Cosmo Streams v1 hook interface design.

Applied to files:

  • router/pkg/pubsub/kafka/adapter.go
  • adr/cosmo-streams-v1.md
📚 Learning: 2025-11-13T10:10:47.680Z
Learnt from: dkorittki
Repo: wundergraph/cosmo PR: 2329
File: router/pkg/pubsub/datasource/subscription_event_updater.go:86-88
Timestamp: 2025-11-13T10:10:47.680Z
Learning: In router/pkg/pubsub/datasource/subscription_event_updater.go, the SetHooks method is intentionally designed to only replace hook handlers, not reconfigure timeout or semaphore settings. The timeout and semaphore fields are meant to be set once during construction via NewSubscriptionEventUpdater and remain immutable. If different timeout or concurrency settings are needed, a new updater instance should be created rather than modifying the existing one.

Applied to files:

  • router/pkg/pubsub/kafka/adapter.go
📚 Learning: 2025-08-28T09:17:49.477Z
Learnt from: endigma
Repo: wundergraph/cosmo PR: 2141
File: router-tests/http_subscriptions_test.go:17-55
Timestamp: 2025-08-28T09:17:49.477Z
Learning: The Cosmo router uses a custom, intentionally rigid multipart implementation for GraphQL subscriptions. The multipart parsing in test files should remain strict and not be made more tolerant, as this rigidity is by design.

Applied to files:

  • adr/cosmo-streams-v1.md
🧬 Code graph analysis (1)
router/pkg/pubsub/kafka/adapter.go (5)
router/pkg/pubsub/datasource/provider.go (6)
  • Adapter (24-28)
  • ProviderType (52-52)
  • ProviderTypeKafka (56-56)
  • StreamEvent (86-91)
  • SubscriptionEventConfiguration (101-105)
  • PublishEventConfiguration (108-112)
router/pkg/pubsub/datasource/subscription_event_updater.go (1)
  • SubscriptionEventUpdater (21-26)
router/pkg/metric/stream_metric_store.go (3)
  • StreamsEvent (25-31)
  • ProviderType (16-16)
  • ProviderTypeKafka (19-19)
router/pkg/pubsub/kafka/engine_datasource.go (4)
  • Event (21-23)
  • MutableEvent (62-66)
  • SubscriptionEventConfiguration (91-95)
  • PublishEventConfiguration (188-192)
router/pkg/pubsub/datasource/error.go (2)
  • NewError (12-17)
  • Error (3-6)
🪛 LanguageTool
adr/cosmo-streams-v1.md

[style] ~24-~24: ‘prior to’ might be wordy. Consider a shorter alternative.
Context: ...f events is received from the provider, prior to delivery. - StreamPublishEventHandler...

(EN_WORDINESS_PREMIUM_PRIOR_TO)


[style] ~25-~25: Use ‘will’ instead of ‘going to’ if the following action is certain.
Context: ...er`: Called each time a batch of events is going to be sent to the provider. ```go // STRU...

(GOING_TO_WILL)


[grammar] ~219-~219: Use a hyphen to join words.
Context: ...y the external system to/from Federation compatible Router events - **Event filte...

(QB_NEW_EN_HYPHEN)


[uncategorized] ~225-~225: If this is a compound adjective that modifies the following noun, use a hyphen.
Context: ... be integrated in the router in a fully backwards compatible way. When the new module system will b...

(EN_COMPOUND_ADJECTIVE_INTERNAL)

🪛 markdownlint-cli2 (0.18.1)
adr/cosmo-streams-v1.md

87-87: Hard tabs
Column: 1

(MD010, no-hard-tabs)


88-88: Hard tabs
Column: 1

(MD010, no-hard-tabs)


89-89: Hard tabs
Column: 1

(MD010, no-hard-tabs)


90-90: Hard tabs
Column: 1

(MD010, no-hard-tabs)


91-91: Hard tabs
Column: 1

(MD010, no-hard-tabs)


92-92: Hard tabs
Column: 1

(MD010, no-hard-tabs)


93-93: Hard tabs
Column: 1

(MD010, no-hard-tabs)


94-94: Hard tabs
Column: 1

(MD010, no-hard-tabs)


95-95: Hard tabs
Column: 1

(MD010, no-hard-tabs)


96-96: Hard tabs
Column: 1

(MD010, no-hard-tabs)


97-97: Hard tabs
Column: 1

(MD010, no-hard-tabs)


98-98: Hard tabs
Column: 1

(MD010, no-hard-tabs)


99-99: Hard tabs
Column: 1

(MD010, no-hard-tabs)


100-100: Hard tabs
Column: 1

(MD010, no-hard-tabs)


101-101: Hard tabs
Column: 1

(MD010, no-hard-tabs)


102-102: Hard tabs
Column: 1

(MD010, no-hard-tabs)


103-103: Hard tabs
Column: 1

(MD010, no-hard-tabs)


104-104: Hard tabs
Column: 1

(MD010, no-hard-tabs)


105-105: Hard tabs
Column: 1

(MD010, no-hard-tabs)


106-106: Hard tabs
Column: 1

(MD010, no-hard-tabs)


107-107: Hard tabs
Column: 1

(MD010, no-hard-tabs)


108-108: Hard tabs
Column: 1

(MD010, no-hard-tabs)


109-109: Hard tabs
Column: 1

(MD010, no-hard-tabs)


110-110: Hard tabs
Column: 1

(MD010, no-hard-tabs)


111-111: Hard tabs
Column: 1

(MD010, no-hard-tabs)


112-112: Hard tabs
Column: 1

(MD010, no-hard-tabs)


113-113: Hard tabs
Column: 1

(MD010, no-hard-tabs)


114-114: Hard tabs
Column: 1

(MD010, no-hard-tabs)


115-115: Hard tabs
Column: 1

(MD010, no-hard-tabs)


116-116: Hard tabs
Column: 1

(MD010, no-hard-tabs)


117-117: Hard tabs
Column: 1

(MD010, no-hard-tabs)


118-118: Hard tabs
Column: 1

(MD010, no-hard-tabs)


119-119: Hard tabs
Column: 1

(MD010, no-hard-tabs)


120-120: Hard tabs
Column: 1

(MD010, no-hard-tabs)


121-121: Hard tabs
Column: 1

(MD010, no-hard-tabs)


122-122: Hard tabs
Column: 1

(MD010, no-hard-tabs)


123-123: Hard tabs
Column: 1

(MD010, no-hard-tabs)


133-133: Hard tabs
Column: 1

(MD010, no-hard-tabs)


134-134: Hard tabs
Column: 1

(MD010, no-hard-tabs)


135-135: Hard tabs
Column: 1

(MD010, no-hard-tabs)


136-136: Hard tabs
Column: 1

(MD010, no-hard-tabs)


137-137: Hard tabs
Column: 1

(MD010, no-hard-tabs)


138-138: Hard tabs
Column: 1

(MD010, no-hard-tabs)


139-139: Hard tabs
Column: 1

(MD010, no-hard-tabs)


140-140: Hard tabs
Column: 1

(MD010, no-hard-tabs)


141-141: Hard tabs
Column: 1

(MD010, no-hard-tabs)


142-142: Hard tabs
Column: 1

(MD010, no-hard-tabs)


143-143: Hard tabs
Column: 1

(MD010, no-hard-tabs)


144-144: Hard tabs
Column: 1

(MD010, no-hard-tabs)


145-145: Hard tabs
Column: 1

(MD010, no-hard-tabs)


146-146: Hard tabs
Column: 1

(MD010, no-hard-tabs)


147-147: Hard tabs
Column: 1

(MD010, no-hard-tabs)


148-148: Hard tabs
Column: 1

(MD010, no-hard-tabs)


149-149: Hard tabs
Column: 1

(MD010, no-hard-tabs)


150-150: Hard tabs
Column: 1

(MD010, no-hard-tabs)


151-151: Hard tabs
Column: 1

(MD010, no-hard-tabs)


152-152: Hard tabs
Column: 1

(MD010, no-hard-tabs)


153-153: Hard tabs
Column: 1

(MD010, no-hard-tabs)


154-154: Hard tabs
Column: 1

(MD010, no-hard-tabs)


170-170: Hard tabs
Column: 1

(MD010, no-hard-tabs)


171-171: Hard tabs
Column: 1

(MD010, no-hard-tabs)


172-172: Hard tabs
Column: 1

(MD010, no-hard-tabs)


173-173: Hard tabs
Column: 1

(MD010, no-hard-tabs)


174-174: Hard tabs
Column: 1

(MD010, no-hard-tabs)


175-175: Hard tabs
Column: 1

(MD010, no-hard-tabs)


176-176: Hard tabs
Column: 1

(MD010, no-hard-tabs)


177-177: Hard tabs
Column: 1

(MD010, no-hard-tabs)


178-178: Hard tabs
Column: 1

(MD010, no-hard-tabs)


179-179: Hard tabs
Column: 1

(MD010, no-hard-tabs)


180-180: Hard tabs
Column: 1

(MD010, no-hard-tabs)


181-181: Hard tabs
Column: 1

(MD010, no-hard-tabs)


182-182: Hard tabs
Column: 1

(MD010, no-hard-tabs)


183-183: Hard tabs
Column: 1

(MD010, no-hard-tabs)


184-184: Hard tabs
Column: 1

(MD010, no-hard-tabs)


185-185: Hard tabs
Column: 1

(MD010, no-hard-tabs)


186-186: Hard tabs
Column: 1

(MD010, no-hard-tabs)


187-187: Hard tabs
Column: 1

(MD010, no-hard-tabs)


188-188: Hard tabs
Column: 1

(MD010, no-hard-tabs)


189-189: Hard tabs
Column: 1

(MD010, no-hard-tabs)


235-235: Emphasis used instead of a heading

(MD036, no-emphasis-as-heading)


235-235: Strong style
Expected: asterisk; Actual: underscore

(MD050, strong-style)


235-235: Strong style
Expected: asterisk; Actual: underscore

(MD050, strong-style)


289-289: Hard tabs
Column: 1

(MD010, no-hard-tabs)


290-290: Hard tabs
Column: 1

(MD010, no-hard-tabs)


291-291: Hard tabs
Column: 1

(MD010, no-hard-tabs)


292-292: Hard tabs
Column: 1

(MD010, no-hard-tabs)


294-294: Hard tabs
Column: 1

(MD010, no-hard-tabs)


428-428: Hard tabs
Column: 1

(MD010, no-hard-tabs)


429-429: Hard tabs
Column: 1

(MD010, no-hard-tabs)


430-430: Hard tabs
Column: 1

(MD010, no-hard-tabs)


431-431: Hard tabs
Column: 1

(MD010, no-hard-tabs)


437-437: Hard tabs
Column: 1

(MD010, no-hard-tabs)


437-437: Hard tabs
Column: 6

(MD010, no-hard-tabs)


438-438: Hard tabs
Column: 1

(MD010, no-hard-tabs)


445-445: Hard tabs
Column: 1

(MD010, no-hard-tabs)


445-445: Hard tabs
Column: 6

(MD010, no-hard-tabs)


446-446: Hard tabs
Column: 1

(MD010, no-hard-tabs)

🔇 Additional comments (3)
adr/cosmo-streams-v1.md (1)

15-232: ADR structure and content are sound.

The document clearly articulates the design rationale, interface contracts, and immutable-vs-mutable event trade-offs. The EmitLocalEvent terminology reflects the prior rename from WriteEvent, and examples demonstrate realistic module implementation patterns (authorization, filtering, data transformation). No substantive documentation gaps identified.

router/pkg/pubsub/kafka/adapter.go (2)

23-24: Adapter compile-time assertion is correct

The var _ datasource.Adapter = (*ProviderAdapter)(nil) check is a good safeguard to keep ProviderAdapter aligned with the public datasource.Adapter surface as it evolves.


51-118: topicPoller event construction and metrics wiring look consistent

The changes to topicPoller to:

  • use datasource.SubscriptionEventUpdater,
  • extract Kafka headers into a map[string][]byte,
  • wrap records as *Event{evt: &MutableEvent{...}}, and
  • emit Consume metrics with providerId from PollerOpts

are consistent with the Streams v1 StreamEvent / MutableStreamEvent model and preserve key, value, and headers as expected. No issues from a correctness or API-usage perspective.

Based on learnings

@dkorittki
Copy link
Contributor Author

dkorittki commented Nov 20, 2025

Waiting for Engine PR to get merged and new engine version to be released. Then I pull latest changes from main, update router go.mod, then its ready for merge.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (3)
router-tests/modules/stream_receive_test.go (2)

212-213: Fix typo in comment.

"garantueed" should be "guaranteed".

-		// By default events are immutable in Cosmo Streams hooks, because it is not garantueed they aren't
+		// By default events are immutable in Cosmo Streams hooks, because it is not guaranteed they aren't

844-845: Fix typo in comment.

"proceedes" should be "proceeds".

-		// Because the first hook exceeds the timeout, the subscription-updater gives up waiting for it
-		// and proceedes to process the second and third events immediately.
+		// Because the first hook exceeds the timeout, the subscription-updater gives up waiting for it
+		// and proceeds to process the second and third events immediately.
router/pkg/pubsub/kafka/adapter.go (1)

182-184: Update docstring to reflect synchronous behavior.

The doc comment claims "non-blocking" (line 182), but the implementation blocks on wg.Wait() (line 246) until all produce callbacks complete. Update the documentation to accurately describe the synchronous behavior:

-// Publish publishes the given events to the Kafka topic in a non-blocking way.
+// Publish publishes the given events to the Kafka topic and blocks until all
+// produce callbacks have completed.
 // Publish errors are logged and returned as a pubsub error.
 // The events are written with a dedicated write client.
🧹 Nitpick comments (5)
adr/cosmo-streams-v1.md (4)

1-491: Fix markdown formatting: replace hard tabs with spaces.

Static analysis detected hard tabs throughout the file (lines 87–124, 133–154, 170–189, 289–292, 428–438, 445–446). Replace all hard tabs with spaces to comply with markdown linting rules. Additionally, line 235 uses underscores (__) for strong emphasis; use asterisks (**) instead.

- <tabs>type SubscriptionOnStartHandlerContext interface {
- <tabs>	// Request is the original request received by the router.
- <tabs>	Request() *http.Request
+ type SubscriptionOnStartHandlerContext interface {
+     // Request is the original request received by the router.
+     Request() *http.Request
  ...
- __All examples reflect the current implementation and match the actual API__
+ **All examples reflect the current implementation and match the actual API**

20-26: Refine language for clarity and conciseness.

Apply these minor style improvements per static analysis:

- `StreamReceiveEventHandler`: Triggered for each client/subscription when a batch of events is received from the provider, prior to delivery.
- `StreamPublishEventHandler`: Called each time a batch of events is going to be sent to the provider.
+ `StreamReceiveEventHandler`: Triggered for each client/subscription when a batch of events is received from the provider, before delivery.
+ `StreamPublishEventHandler`: Called each time a batch of events will be sent to the provider.

219-219: Add hyphen to compound adjective.

- transforming events data from the format that could be used by the external system to/from Federation compatible Router events
+ transforming events data from the format that could be used by the external system to/from Federation-compatible Router events

225-225: Use compound adjective form.

- The new hooks can be integrated in the router in a fully backwards compatible way.
+ The new hooks can be integrated in the router in a fully backward-compatible way.
router/core/subscriptions_modules.go (1)

118-122: EmitEvent always returns true but documentation mentions dropping events.

The EmitEvent method unconditionally returns true, but the interface documentation (line 38) states: "The method returns true if the event was successfully emitted, or false if it was dropped."

If events can never be dropped in the current implementation, consider updating the documentation to reflect this. If dropping is possible in the underlying emitEventFn, ensure the return value is properly propagated:

 func (c *pubSubSubscriptionOnStartHookContext) EmitEvent(event datasource.StreamEvent) bool {
 	c.emitEventFn(event.GetData())
-	return true
+	// TODO: propagate actual success/failure from emitEventFn if supported
+	return true
 }
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between bffad18 and bb03b5f.

⛔ Files ignored due to path filters (2)
  • router-tests/go.sum is excluded by !**/*.sum
  • router/go.sum is excluded by !**/*.sum
📒 Files selected for processing (10)
  • adr/cosmo-streams-v1.md (1 hunks)
  • router-tests/modules/start_subscription_test.go (1 hunks)
  • router-tests/modules/stream_receive_test.go (1 hunks)
  • router/core/errors.go (2 hunks)
  • router/core/graphql_handler.go (1 hunks)
  • router/core/subscriptions_modules.go (1 hunks)
  • router/pkg/pubsub/datasource/subscription_event_updater.go (1 hunks)
  • router/pkg/pubsub/kafka/adapter.go (6 hunks)
  • router/pkg/pubsub/nats/adapter.go (7 hunks)
  • router/pkg/pubsub/redis/adapter.go (5 hunks)
🚧 Files skipped from review as they are similar to previous changes (3)
  • router/core/graphql_handler.go
  • router/core/errors.go
  • router-tests/modules/start_subscription_test.go
🧰 Additional context used
🧠 Learnings (10)
📓 Common learnings
Learnt from: endigma
Repo: wundergraph/cosmo PR: 2141
File: router-tests/http_subscriptions_test.go:17-55
Timestamp: 2025-08-28T09:17:49.477Z
Learning: The Cosmo router uses a custom, intentionally rigid multipart implementation for GraphQL subscriptions. The multipart parsing in test files should remain strict and not be made more tolerant, as this rigidity is by design.
📚 Learning: 2025-11-13T10:10:47.680Z
Learnt from: dkorittki
Repo: wundergraph/cosmo PR: 2329
File: router/pkg/pubsub/datasource/subscription_event_updater.go:86-88
Timestamp: 2025-11-13T10:10:47.680Z
Learning: In router/pkg/pubsub/datasource/subscription_event_updater.go, the SetHooks method is intentionally designed to only replace hook handlers, not reconfigure timeout or semaphore settings. The timeout and semaphore fields are meant to be set once during construction via NewSubscriptionEventUpdater and remain immutable. If different timeout or concurrency settings are needed, a new updater instance should be created rather than modifying the existing one.

Applied to files:

  • router/pkg/pubsub/datasource/subscription_event_updater.go
  • router/pkg/pubsub/redis/adapter.go
  • router/pkg/pubsub/kafka/adapter.go
  • adr/cosmo-streams-v1.md
  • router/core/subscriptions_modules.go
  • router-tests/modules/stream_receive_test.go
  • router/pkg/pubsub/nats/adapter.go
📚 Learning: 2025-11-16T11:52:34.064Z
Learnt from: dkorittki
Repo: wundergraph/cosmo PR: 2273
File: router/pkg/pubsub/redis/engine_datasource.go:37-42
Timestamp: 2025-11-16T11:52:34.064Z
Learning: In router/pkg/pubsub/redis/engine_datasource.go (and similar files for kafka/nats), the MutableEvent.GetData() method intentionally returns the internal Data slice without cloning, while Event.GetData() clones the slice. This is by design: MutableEvent is designed to be mutable (so callers can modify the data), while Event is immutable (cloning prevents modification). This difference is part of the Cosmo Streams v1 hook interface design.

Applied to files:

  • router/pkg/pubsub/datasource/subscription_event_updater.go
  • router/pkg/pubsub/redis/adapter.go
  • router/pkg/pubsub/kafka/adapter.go
  • adr/cosmo-streams-v1.md
  • router/core/subscriptions_modules.go
  • router-tests/modules/stream_receive_test.go
  • router/pkg/pubsub/nats/adapter.go
📚 Learning: 2025-08-28T09:18:10.121Z
Learnt from: endigma
Repo: wundergraph/cosmo PR: 2141
File: router-tests/http_subscriptions_test.go:100-108
Timestamp: 2025-08-28T09:18:10.121Z
Learning: In router-tests/http_subscriptions_test.go heartbeat tests, the message ordering should remain strict with data messages followed by heartbeat messages, as the timing is deterministic and known by design in the Cosmo router implementation.

Applied to files:

  • router/pkg/pubsub/datasource/subscription_event_updater.go
  • router-tests/modules/stream_receive_test.go
📚 Learning: 2025-08-28T09:17:49.477Z
Learnt from: endigma
Repo: wundergraph/cosmo PR: 2141
File: router-tests/http_subscriptions_test.go:17-55
Timestamp: 2025-08-28T09:17:49.477Z
Learning: The Cosmo router uses a custom, intentionally rigid multipart implementation for GraphQL subscriptions. The multipart parsing in test files should remain strict and not be made more tolerant, as this rigidity is by design.

Applied to files:

  • router/pkg/pubsub/datasource/subscription_event_updater.go
  • adr/cosmo-streams-v1.md
  • router-tests/modules/stream_receive_test.go
📚 Learning: 2025-11-19T15:13:57.821Z
Learnt from: dkorittki
Repo: wundergraph/cosmo PR: 2273
File: router/core/graphql_handler.go:0-0
Timestamp: 2025-11-19T15:13:57.821Z
Learning: In the Cosmo router (wundergraph/cosmo), error handling follows a two-phase pattern: (1) Prehandler phase handles request parsing, validation, and setup errors using `httpGraphqlError` and `writeOperationError` (in files like graphql_prehandler.go, operation_processor.go, parse_multipart.go, batch.go); (2) Execution phase handles resolver execution errors using `WriteError` in GraphQLHandler.ServeHTTP. Because all `httpGraphqlError` instances are caught in the prehandler before ServeHTTP is invoked, any error type checks for `httpGraphqlError` in the execution-phase WriteError method are unreachable code.

Applied to files:

  • router/pkg/pubsub/redis/adapter.go
  • router/core/subscriptions_modules.go
📚 Learning: 2025-08-14T17:22:41.662Z
Learnt from: SkArchon
Repo: wundergraph/cosmo PR: 2137
File: router/pkg/metric/oltp_connection_metric_store.go:46-49
Timestamp: 2025-08-14T17:22:41.662Z
Learning: In the OTLP connection metric store (router/pkg/metric/oltp_connection_metric_store.go), errors from startInitMetrics are intentionally logged but not propagated - this is existing behavior to ensure metrics failures don't block core system functionality.

Applied to files:

  • router/pkg/pubsub/redis/adapter.go
📚 Learning: 2025-10-01T20:39:16.113Z
Learnt from: SkArchon
Repo: wundergraph/cosmo PR: 2252
File: router-tests/telemetry/telemetry_test.go:9684-9693
Timestamp: 2025-10-01T20:39:16.113Z
Learning: Repo preference: In router-tests/telemetry/telemetry_test.go, keep strict > 0 assertions for request.operation.*Time (parsingTime, normalizationTime, validationTime, planningTime) in telemetry-related tests; do not relax to >= 0 unless CI flakiness is observed.

Applied to files:

  • router-tests/modules/stream_receive_test.go
📚 Learning: 2025-09-24T12:54:00.765Z
Learnt from: endigma
Repo: wundergraph/cosmo PR: 2222
File: router-tests/websocket_test.go:2238-2302
Timestamp: 2025-09-24T12:54:00.765Z
Learning: The wundergraph/cosmo project uses Go 1.25 (Go 1.23+ minimum), so fmt.Appendf and other newer Go standard library functions are available and can be used without compatibility concerns.

Applied to files:

  • router/pkg/pubsub/nats/adapter.go
📚 Learning: 2025-09-24T12:54:00.765Z
Learnt from: endigma
Repo: wundergraph/cosmo PR: 2222
File: router-tests/websocket_test.go:2238-2302
Timestamp: 2025-09-24T12:54:00.765Z
Learning: The wundergraph/cosmo project uses Go 1.25 (Go 1.25 minimum), so fmt.Appendf and other newer Go standard library functions are available and can be used without compatibility concerns.

Applied to files:

  • router/pkg/pubsub/nats/adapter.go
🧬 Code graph analysis (6)
router/pkg/pubsub/datasource/subscription_event_updater.go (6)
router/pkg/pubsub/datasource/provider.go (2)
  • StreamEvent (86-91)
  • SubscriptionEventConfiguration (101-105)
router/pkg/pubsub/datasource/hooks.go (1)
  • Hooks (17-21)
router/pkg/pubsub/kafka/engine_datasource.go (1)
  • SubscriptionEventConfiguration (91-95)
router/pkg/pubsub/nats/engine_datasource.go (1)
  • SubscriptionEventConfiguration (92-97)
router/pkg/pubsub/redis/engine_datasource.go (1)
  • SubscriptionEventConfiguration (64-68)
router/pkg/pubsub/datasource/subscription_datasource.go (1)
  • EventBuilderFn (16-16)
router/pkg/pubsub/redis/adapter.go (5)
router/pkg/pubsub/datasource/provider.go (6)
  • Adapter (24-28)
  • SubscriptionEventConfiguration (101-105)
  • ProviderType (52-52)
  • ProviderTypeRedis (57-57)
  • StreamEvent (86-91)
  • PublishEventConfiguration (108-112)
router/pkg/pubsub/redis/engine_datasource.go (4)
  • SubscriptionEventConfiguration (64-68)
  • Event (20-22)
  • MutableEvent (35-37)
  • PublishEventConfiguration (141-145)
router/pkg/pubsub/datasource/subscription_event_updater.go (1)
  • SubscriptionEventUpdater (19-24)
router/pkg/pubsub/datasource/error.go (2)
  • NewError (12-17)
  • Error (3-6)
router/pkg/metric/stream_metric_store.go (3)
  • ProviderType (16-16)
  • ProviderTypeRedis (21-21)
  • StreamsEvent (25-31)
router/pkg/pubsub/kafka/adapter.go (3)
router/pkg/pubsub/datasource/provider.go (5)
  • Adapter (24-28)
  • ProviderType (52-52)
  • StreamEvent (86-91)
  • SubscriptionEventConfiguration (101-105)
  • PublishEventConfiguration (108-112)
router/pkg/metric/stream_metric_store.go (2)
  • StreamsEvent (25-31)
  • ProviderType (16-16)
router/pkg/pubsub/kafka/engine_datasource.go (4)
  • Event (21-23)
  • MutableEvent (62-66)
  • SubscriptionEventConfiguration (91-95)
  • PublishEventConfiguration (188-192)
router/core/subscriptions_modules.go (4)
router/core/context.go (1)
  • OperationContext (476-499)
router/pkg/pubsub/datasource/provider.go (6)
  • SubscriptionEventConfiguration (101-105)
  • StreamEvent (86-91)
  • MutableStreamEvent (94-98)
  • PublishEventConfiguration (108-112)
  • ProviderType (52-52)
  • StreamEvents (61-63)
router/pkg/pubsub/datasource/subscription_datasource.go (1)
  • EventBuilderFn (16-16)
router/pkg/pubsub/datasource/hooks.go (3)
  • SubscriptionOnStartFn (10-10)
  • OnPublishEventsFn (12-12)
  • OnReceiveEventsFn (14-14)
router-tests/modules/stream_receive_test.go (10)
router-tests/testenv/testenv.go (2)
  • Run (105-122)
  • Environment (1733-1769)
router-tests/modules/stream-receive/module.go (1)
  • StreamReceiveModule (14-18)
router/pkg/config/config.go (4)
  • Config (1020-1095)
  • Graph (24-29)
  • StreamsHandlerConfiguration (651-653)
  • OnReceiveEventsConfiguration (655-658)
router/core/router.go (5)
  • Option (172-172)
  • WithModulesConfig (1678-1682)
  • WithCustomModules (1785-1789)
  • WithAccessController (1838-1842)
  • WithStreamsHandlerConfiguration (2142-2147)
router-tests/events/utils.go (2)
  • KafkaEnsureTopicExists (15-32)
  • ProduceKafkaMessage (34-53)
router-tests/testenv/utils.go (1)
  • AwaitChannelWithT (10-19)
router/core/subscriptions_modules.go (1)
  • StreamReceiveEventHandlerContext (269-292)
router/pkg/pubsub/datasource/provider.go (4)
  • StreamEvents (61-63)
  • StreamEvent (86-91)
  • NewStreamEvents (81-83)
  • MutableStreamEvent (94-98)
router/pkg/authentication/jwks_token_decoder.go (3)
  • NewJwksTokenDecoder (78-241)
  • JWKSConfig (44-56)
  • TokenDecoder (21-23)
router/pkg/authentication/http_header_authenticator.go (2)
  • HttpHeaderAuthenticatorOptions (81-89)
  • NewHttpHeaderAuthenticator (93-113)
router/pkg/pubsub/nats/adapter.go (4)
router/pkg/pubsub/datasource/provider.go (5)
  • Adapter (24-28)
  • PublishEventConfiguration (108-112)
  • StreamEvent (86-91)
  • SubscriptionEventConfiguration (101-105)
  • ProviderType (52-52)
router/pkg/pubsub/nats/engine_datasource.go (5)
  • SubscriptionEventConfiguration (92-97)
  • StreamConfiguration (86-90)
  • Event (20-22)
  • MutableEvent (42-45)
  • PublishAndRequestEventConfiguration (168-172)
router/pkg/pubsub/datasource/subscription_event_updater.go (1)
  • SubscriptionEventUpdater (19-24)
router/pkg/pubsub/datasource/error.go (2)
  • NewError (12-17)
  • Error (3-6)
🪛 LanguageTool
adr/cosmo-streams-v1.md

[style] ~24-~24: ‘prior to’ might be wordy. Consider a shorter alternative.
Context: ...f events is received from the provider, prior to delivery. - StreamPublishEventHandler...

(EN_WORDINESS_PREMIUM_PRIOR_TO)


[style] ~25-~25: Use ‘will’ instead of ‘going to’ if the following action is certain.
Context: ...er`: Called each time a batch of events is going to be sent to the provider. ```go // STRU...

(GOING_TO_WILL)


[grammar] ~219-~219: Use a hyphen to join words.
Context: ...y the external system to/from Federation compatible Router events - **Event filte...

(QB_NEW_EN_HYPHEN)


[uncategorized] ~225-~225: If this is a compound adjective that modifies the following noun, use a hyphen.
Context: ... be integrated in the router in a fully backwards compatible way. When the new module system will b...

(EN_COMPOUND_ADJECTIVE_INTERNAL)

🪛 markdownlint-cli2 (0.18.1)
adr/cosmo-streams-v1.md

87-87: Hard tabs
Column: 1

(MD010, no-hard-tabs)


88-88: Hard tabs
Column: 1

(MD010, no-hard-tabs)


89-89: Hard tabs
Column: 1

(MD010, no-hard-tabs)


90-90: Hard tabs
Column: 1

(MD010, no-hard-tabs)


91-91: Hard tabs
Column: 1

(MD010, no-hard-tabs)


92-92: Hard tabs
Column: 1

(MD010, no-hard-tabs)


93-93: Hard tabs
Column: 1

(MD010, no-hard-tabs)


94-94: Hard tabs
Column: 1

(MD010, no-hard-tabs)


95-95: Hard tabs
Column: 1

(MD010, no-hard-tabs)


96-96: Hard tabs
Column: 1

(MD010, no-hard-tabs)


97-97: Hard tabs
Column: 1

(MD010, no-hard-tabs)


98-98: Hard tabs
Column: 1

(MD010, no-hard-tabs)


99-99: Hard tabs
Column: 1

(MD010, no-hard-tabs)


100-100: Hard tabs
Column: 1

(MD010, no-hard-tabs)


101-101: Hard tabs
Column: 1

(MD010, no-hard-tabs)


102-102: Hard tabs
Column: 1

(MD010, no-hard-tabs)


103-103: Hard tabs
Column: 1

(MD010, no-hard-tabs)


104-104: Hard tabs
Column: 1

(MD010, no-hard-tabs)


105-105: Hard tabs
Column: 1

(MD010, no-hard-tabs)


106-106: Hard tabs
Column: 1

(MD010, no-hard-tabs)


107-107: Hard tabs
Column: 1

(MD010, no-hard-tabs)


108-108: Hard tabs
Column: 1

(MD010, no-hard-tabs)


109-109: Hard tabs
Column: 1

(MD010, no-hard-tabs)


110-110: Hard tabs
Column: 1

(MD010, no-hard-tabs)


111-111: Hard tabs
Column: 1

(MD010, no-hard-tabs)


112-112: Hard tabs
Column: 1

(MD010, no-hard-tabs)


113-113: Hard tabs
Column: 1

(MD010, no-hard-tabs)


114-114: Hard tabs
Column: 1

(MD010, no-hard-tabs)


115-115: Hard tabs
Column: 1

(MD010, no-hard-tabs)


116-116: Hard tabs
Column: 1

(MD010, no-hard-tabs)


117-117: Hard tabs
Column: 1

(MD010, no-hard-tabs)


118-118: Hard tabs
Column: 1

(MD010, no-hard-tabs)


119-119: Hard tabs
Column: 1

(MD010, no-hard-tabs)


120-120: Hard tabs
Column: 1

(MD010, no-hard-tabs)


121-121: Hard tabs
Column: 1

(MD010, no-hard-tabs)


122-122: Hard tabs
Column: 1

(MD010, no-hard-tabs)


123-123: Hard tabs
Column: 1

(MD010, no-hard-tabs)


133-133: Hard tabs
Column: 1

(MD010, no-hard-tabs)


134-134: Hard tabs
Column: 1

(MD010, no-hard-tabs)


135-135: Hard tabs
Column: 1

(MD010, no-hard-tabs)


136-136: Hard tabs
Column: 1

(MD010, no-hard-tabs)


137-137: Hard tabs
Column: 1

(MD010, no-hard-tabs)


138-138: Hard tabs
Column: 1

(MD010, no-hard-tabs)


139-139: Hard tabs
Column: 1

(MD010, no-hard-tabs)


140-140: Hard tabs
Column: 1

(MD010, no-hard-tabs)


141-141: Hard tabs
Column: 1

(MD010, no-hard-tabs)


142-142: Hard tabs
Column: 1

(MD010, no-hard-tabs)


143-143: Hard tabs
Column: 1

(MD010, no-hard-tabs)


144-144: Hard tabs
Column: 1

(MD010, no-hard-tabs)


145-145: Hard tabs
Column: 1

(MD010, no-hard-tabs)


146-146: Hard tabs
Column: 1

(MD010, no-hard-tabs)


147-147: Hard tabs
Column: 1

(MD010, no-hard-tabs)


148-148: Hard tabs
Column: 1

(MD010, no-hard-tabs)


149-149: Hard tabs
Column: 1

(MD010, no-hard-tabs)


150-150: Hard tabs
Column: 1

(MD010, no-hard-tabs)


151-151: Hard tabs
Column: 1

(MD010, no-hard-tabs)


152-152: Hard tabs
Column: 1

(MD010, no-hard-tabs)


153-153: Hard tabs
Column: 1

(MD010, no-hard-tabs)


154-154: Hard tabs
Column: 1

(MD010, no-hard-tabs)


170-170: Hard tabs
Column: 1

(MD010, no-hard-tabs)


171-171: Hard tabs
Column: 1

(MD010, no-hard-tabs)


172-172: Hard tabs
Column: 1

(MD010, no-hard-tabs)


173-173: Hard tabs
Column: 1

(MD010, no-hard-tabs)


174-174: Hard tabs
Column: 1

(MD010, no-hard-tabs)


175-175: Hard tabs
Column: 1

(MD010, no-hard-tabs)


176-176: Hard tabs
Column: 1

(MD010, no-hard-tabs)


177-177: Hard tabs
Column: 1

(MD010, no-hard-tabs)


178-178: Hard tabs
Column: 1

(MD010, no-hard-tabs)


179-179: Hard tabs
Column: 1

(MD010, no-hard-tabs)


180-180: Hard tabs
Column: 1

(MD010, no-hard-tabs)


181-181: Hard tabs
Column: 1

(MD010, no-hard-tabs)


182-182: Hard tabs
Column: 1

(MD010, no-hard-tabs)


183-183: Hard tabs
Column: 1

(MD010, no-hard-tabs)


184-184: Hard tabs
Column: 1

(MD010, no-hard-tabs)


185-185: Hard tabs
Column: 1

(MD010, no-hard-tabs)


186-186: Hard tabs
Column: 1

(MD010, no-hard-tabs)


187-187: Hard tabs
Column: 1

(MD010, no-hard-tabs)


188-188: Hard tabs
Column: 1

(MD010, no-hard-tabs)


189-189: Hard tabs
Column: 1

(MD010, no-hard-tabs)


235-235: Emphasis used instead of a heading

(MD036, no-emphasis-as-heading)


235-235: Strong style
Expected: asterisk; Actual: underscore

(MD050, strong-style)


235-235: Strong style
Expected: asterisk; Actual: underscore

(MD050, strong-style)


289-289: Hard tabs
Column: 1

(MD010, no-hard-tabs)


290-290: Hard tabs
Column: 1

(MD010, no-hard-tabs)


291-291: Hard tabs
Column: 1

(MD010, no-hard-tabs)


292-292: Hard tabs
Column: 1

(MD010, no-hard-tabs)


294-294: Hard tabs
Column: 1

(MD010, no-hard-tabs)


428-428: Hard tabs
Column: 1

(MD010, no-hard-tabs)


429-429: Hard tabs
Column: 1

(MD010, no-hard-tabs)


430-430: Hard tabs
Column: 1

(MD010, no-hard-tabs)


431-431: Hard tabs
Column: 1

(MD010, no-hard-tabs)


437-437: Hard tabs
Column: 1

(MD010, no-hard-tabs)


437-437: Hard tabs
Column: 6

(MD010, no-hard-tabs)


438-438: Hard tabs
Column: 1

(MD010, no-hard-tabs)


445-445: Hard tabs
Column: 1

(MD010, no-hard-tabs)


445-445: Hard tabs
Column: 6

(MD010, no-hard-tabs)


446-446: Hard tabs
Column: 1

(MD010, no-hard-tabs)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (10)
  • GitHub Check: build-router
  • GitHub Check: integration_test (./events)
  • GitHub Check: build_push_image (nonroot)
  • GitHub Check: build_push_image
  • GitHub Check: integration_test (./. ./fuzzquery ./lifecycle ./modules)
  • GitHub Check: image_scan (nonroot)
  • GitHub Check: image_scan
  • GitHub Check: build_test
  • GitHub Check: Analyze (javascript-typescript)
  • GitHub Check: Analyze (go)
🔇 Additional comments (33)
adr/cosmo-streams-v1.md (1)

97-124: Verify method naming: EmitEvent vs. EmitLocalEvent.

Past review comments indicate that WriteEvent was renamed to EmitLocalEvent with a request to update the ADR. However, this ADR documents the method as EmitEvent. Please confirm the correct method name is reflected throughout (lines 109, 123, 154, 189) and update the ADR if the method was indeed renamed to EmitLocalEvent.

router-tests/modules/stream_receive_test.go (8)

37-115: LGTM! Well-structured basic hook invocation test.

The test correctly verifies that the receive hook is called when Kafka events arrive, uses proper synchronization with channels, and cleans up resources appropriately.


117-206: LGTM! Event mutation test correctly demonstrates the Clone() → SetData() pattern.

The test properly validates that hooks can modify events by first cloning them (to get mutability) and then changing their data, which is the intended design for event transformation.


208-303: LGTM! Immutability contract test is correct.

The test properly verifies that events cannot be type-asserted to MutableStreamEvent without explicit cloning, which enforces the immutability guarantee and prevents accidental mutation of shared data.


305-469: LGTM! Per-subscription authentication-based filtering works correctly.

The test properly demonstrates that hooks can access authentication context and conditionally modify events for specific subscribers, which is a key feature for implementing authorization and filtering logic.


471-572: LGTM! Header-based event transformation is correctly implemented.

The test validates that hooks can access WebSocket connection headers and use them for conditional event transformation, enabling header-based filtering and authorization patterns.


574-648: LGTM! Error handling test correctly validates subscription closure.

The test properly verifies that when a hook returns an error, the router closes the subscription connection and cleans up Kafka clients, preventing resource leaks.


650-835: LGTM! Comprehensive concurrency control test.

The test thoroughly validates that MaxConcurrentHandlers configuration properly limits concurrent hook execution across various scenarios, including edge cases where the final batch has fewer subscribers than the configured limit.


837-965: LGTM! Timeout and out-of-order delivery test is well-designed.

The test correctly validates the tradeoff between strict ordering and responsiveness: when a hook exceeds the timeout, subsequent events are processed immediately to avoid blocking, resulting in intentional out-of-order delivery. The assertions confirm this behavior and the corresponding warning is logged.

router/core/subscriptions_modules.go (7)

15-53: LGTM! Well-documented hook context interface.

The SubscriptionOnStartHandlerContext interface provides a clean API with clear documentation for event creation and emission, properly distinguishing between event-driven subscriptions (EDFS) and normal subscriptions.


128-163: LGTM! Event wrapper design correctly implements mutability contract.

The dual-wrapper pattern is properly implemented:

  • MutableEngineEvent.GetData() returns the internal slice directly (mutable by design)
  • EngineEvent.GetData() clones the slice (immutable by design)

This aligns with the Cosmo Streams v1 hook interface design.

Based on learnings


164-200: LGTM! Engine subscription context correctly implements interface.

The engineSubscriptionOnStartHookContext properly implements the handler context for engine subscriptions, with SubscriptionEventConfiguration() correctly returning nil as documented for engine subscriptions (line 24).


209-267: LGTM! Hook factory functions are well-structured.

The factory functions (NewPubSubSubscriptionOnStartHook, NewEngineSubscriptionOnStartHook) correctly adapt user-provided handlers into the internal hook system, with proper context extraction, logger setup with structured fields, and nil-safety checks.


338-443: LGTM! Publish and receive event hook factories are correct.

Both NewPubSubOnPublishEventsHook and NewPubSubOnReceiveEventsHook properly set up hook contexts with request context, logger (including provider metadata), and event builders, then invoke the user handler and unwrap results via Unsafe().


269-336: LGTM! Stream event handler interfaces are well-designed.

The StreamReceiveEventHandler and StreamPublishEventHandler interfaces provide clear contracts with comprehensive documentation about event immutability, cloning requirements, error handling, and the per-subscription invocation pattern.


445-456: LGTM! StreamHandlerError provides clear error signaling.

The custom error type enables hooks to signal subscription closure with a specific message, properly documented as writing an error event and closing the WebSocket with normal closure code 1000.

router/pkg/pubsub/datasource/subscription_event_updater.go (5)

14-34: LGTM! Uses golang.org/x/sync/semaphore for concurrency control.

The implementation properly uses *semaphore.Weighted from golang.org/x/sync/semaphore (line 32) to limit concurrent hook execution, addressing the architectural guidance from the review.


36-81: LGTM! Update method correctly implements concurrent hook execution with timeout.

The implementation properly:

  • Creates a fresh updaterCtx with deadline for each batch (line 46), ensuring each Update call has its own timeout window
  • Uses semaphore to limit concurrency via Acquire (line 53)
  • Passes the same events slice to all subscriptions safely (read-only access in updateSubscription)
  • Handles timeouts gracefully, logging warnings about potential out-of-order delivery while allowing in-flight work to complete

Based on learnings


95-129: LGTM! Per-subscription hook processing with proper error handling.

The updateSubscription method correctly:

  • Short-circuits the hook loop on first error (lines 110-112), ensuring all hook errors are properly captured
  • Sends any non-nil events regardless of hook errors (giving hooks control over what to send)
  • Closes the subscription on hook error (lines 126-128)
  • Recovers from panics with proper logging and cleanup (lines 98-100)

Based on learnings


83-93: LGTM! Lifecycle methods are correctly implemented.

The Close method properly delegates to the underlying updater without closing the semaphore channel (avoiding send-on-closed panics), and SetHooks correctly updates only the hook handlers as designed.

Based on learnings


131-164: LGTM! Constructor and panic recovery are well-implemented.

NewSubscriptionEventUpdater properly initializes the semaphore with the configured limit (defaulting to 1 if invalid), and recoverPanic logs the panic with a full stacktrace before closing the subscription with a downstream service error.

router/pkg/pubsub/redis/adapter.go (3)

21-45: LGTM! Proper compile-time interface compliance check.

The compile-time assertion (line 22) ensures ProviderAdapter implements datasource.Adapter, and the constructor correctly returns the interface type.


88-152: LGTM! Subscribe method correctly uses datasource abstractions.

The method properly:

  • Performs runtime type assertion with clear error message (lines 89-92)
  • Uses the new configuration accessors (ProviderID(), Channels)
  • Wraps events in the correct Event{evt: &MutableEvent{...}} structure (lines 132-136)

154-227: LGTM! Publish method implements proper per-event error handling.

The implementation correctly:

  • Returns early if no events (lines 170-172)
  • Processes each event individually with proper type checking (lines 179-183)
  • Aggregates errors without stopping on first failure (allows partial success)
  • Emits metrics for both successes and failures (lines 198-215)
  • Returns a combined error with clear context (lines 218-224)
router/pkg/pubsub/kafka/adapter.go (5)

23-51: LGTM! Interface compliance and updated topicPoller signature.

The compile-time assertion ensures interface compliance, and topicPoller correctly accepts datasource.SubscriptionEventUpdater for the new abstraction.


98-118: LGTM! Event construction includes headers and key.

The topicPoller properly extracts Kafka record headers (lines 98-101) and wraps the event with Data, Headers, and Key fields in the new event structure.


126-180: LGTM! Subscribe method properly uses datasource abstractions.

The method correctly performs runtime type assertion, uses configuration accessors, and passes provider context to the topic poller.


185-278: LGTM! Publish method implements proper per-event error handling.

The implementation correctly:

  • Validates event types via castToMutableEvent (lines 214-221)
  • Extracts and maps headers to Kafka record headers (lines 223-229)
  • Aggregates errors with mutex protection (lines 217-220, 238-242)
  • Emits metrics for successes and failures separately (lines 248-266)
  • Returns combined error with clear context (lines 269-275)

341-350: LGTM! Helper function handles both event types.

castToMutableEvent correctly handles both *Event (unwrapping to *MutableEvent) and direct *MutableEvent cases, with a clear error for invalid types.

router/pkg/pubsub/nats/adapter.go (4)

27-34: LGTM! NATS Adapter correctly extends base adapter interface.

The Adapter interface properly embeds datasource.Adapter and adds the NATS-specific Request method, with compile-time assertion ensuring compliance.


80-231: LGTM! Subscribe supports both JetStream and non-JetStream modes.

The method correctly:

  • Performs runtime type assertion (lines 81-84)
  • Handles JetStream subscriptions with durable consumers and stream configuration (lines 100-172)
  • Handles non-JetStream subscriptions with channel-based delivery (lines 174-230)
  • Wraps events in the proper structure with headers extraction (lines 153-156, 201-204)

233-296: LGTM! Publish method implements proper per-event error handling.

The implementation follows the same robust pattern as the other adapters:

  • Processes each event individually with type validation (lines 254-258)
  • Aggregates errors without stopping on first failure (lines 256, 262)
  • Emits separate metrics for successes and failures (lines 267-284)
  • Returns combined error with clear context (lines 287-293)

298-355: LGTM! Request method correctly implements NATS request-reply pattern.

The method properly:

  • Validates configuration and event types (lines 299-302, 314-317)
  • Uses the new configuration accessors for subject and provider metadata (lines 307, 321, 326-328)
  • Handles errors with detailed context logging (lines 323-329)
  • Emits appropriate metrics for both success and failure cases (lines 330-345)

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (2)
router/pkg/pubsub/nats/adapter.go (1)

26-35: Update interface-compliance comment to match current adapter

The comment // Ensure ProviderAdapter implements ProviderSubscriptionHooks no longer matches the assertion var _ datasource.Adapter = (*ProviderAdapter)(nil). Consider updating the comment to avoid confusion for future readers.

router/pkg/pubsub/nats/engine_datasource.go (1)

189-235: SubscriptionSource wiring to datasource.Adapter is straightforward

SubscriptionSource.SubscriptionEventConfiguration and Start:

  • Deserialize the JSON into SubscriptionEventConfiguration.
  • Guard against bad configs (nil or wrong type) with clear errors.
  • Delegate to pubSub.Subscribe with the concrete config and updater.

Given SubscriptionSource.SubscriptionEventConfiguration always returns *SubscriptionEventConfiguration on success, the extra type assertion in Start is slightly redundant but harmless.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between bb03b5f and ef45209.

📒 Files selected for processing (4)
  • router/pkg/pubsub/nats/adapter.go (8 hunks)
  • router/pkg/pubsub/nats/engine_datasource.go (3 hunks)
  • router/pkg/pubsub/redis/adapter.go (5 hunks)
  • router/pkg/pubsub/redis/engine_datasource.go (3 hunks)
🧰 Additional context used
🧠 Learnings (9)
📓 Common learnings
Learnt from: endigma
Repo: wundergraph/cosmo PR: 2141
File: router-tests/http_subscriptions_test.go:17-55
Timestamp: 2025-08-28T09:17:49.477Z
Learning: The Cosmo router uses a custom, intentionally rigid multipart implementation for GraphQL subscriptions. The multipart parsing in test files should remain strict and not be made more tolerant, as this rigidity is by design.
📚 Learning: 2025-11-16T11:52:34.064Z
Learnt from: dkorittki
Repo: wundergraph/cosmo PR: 2273
File: router/pkg/pubsub/redis/engine_datasource.go:37-42
Timestamp: 2025-11-16T11:52:34.064Z
Learning: In router/pkg/pubsub/redis/engine_datasource.go (and similar files for kafka/nats), the MutableEvent.GetData() method intentionally returns the internal Data slice without cloning, while Event.GetData() clones the slice. This is by design: MutableEvent is designed to be mutable (so callers can modify the data), while Event is immutable (cloning prevents modification). This difference is part of the Cosmo Streams v1 hook interface design.

Applied to files:

  • router/pkg/pubsub/redis/adapter.go
  • router/pkg/pubsub/nats/adapter.go
  • router/pkg/pubsub/redis/engine_datasource.go
  • router/pkg/pubsub/nats/engine_datasource.go
📚 Learning: 2025-11-13T10:10:47.680Z
Learnt from: dkorittki
Repo: wundergraph/cosmo PR: 2329
File: router/pkg/pubsub/datasource/subscription_event_updater.go:86-88
Timestamp: 2025-11-13T10:10:47.680Z
Learning: In router/pkg/pubsub/datasource/subscription_event_updater.go, the SetHooks method is intentionally designed to only replace hook handlers, not reconfigure timeout or semaphore settings. The timeout and semaphore fields are meant to be set once during construction via NewSubscriptionEventUpdater and remain immutable. If different timeout or concurrency settings are needed, a new updater instance should be created rather than modifying the existing one.

Applied to files:

  • router/pkg/pubsub/redis/adapter.go
  • router/pkg/pubsub/nats/adapter.go
  • router/pkg/pubsub/redis/engine_datasource.go
  • router/pkg/pubsub/nats/engine_datasource.go
📚 Learning: 2025-11-19T15:13:57.821Z
Learnt from: dkorittki
Repo: wundergraph/cosmo PR: 2273
File: router/core/graphql_handler.go:0-0
Timestamp: 2025-11-19T15:13:57.821Z
Learning: In the Cosmo router (wundergraph/cosmo), error handling follows a two-phase pattern: (1) Prehandler phase handles request parsing, validation, and setup errors using `httpGraphqlError` and `writeOperationError` (in files like graphql_prehandler.go, operation_processor.go, parse_multipart.go, batch.go); (2) Execution phase handles resolver execution errors using `WriteError` in GraphQLHandler.ServeHTTP. Because all `httpGraphqlError` instances are caught in the prehandler before ServeHTTP is invoked, any error type checks for `httpGraphqlError` in the execution-phase WriteError method are unreachable code.

Applied to files:

  • router/pkg/pubsub/redis/adapter.go
📚 Learning: 2025-08-14T17:22:41.662Z
Learnt from: SkArchon
Repo: wundergraph/cosmo PR: 2137
File: router/pkg/metric/oltp_connection_metric_store.go:46-49
Timestamp: 2025-08-14T17:22:41.662Z
Learning: In the OTLP connection metric store (router/pkg/metric/oltp_connection_metric_store.go), errors from startInitMetrics are intentionally logged but not propagated - this is existing behavior to ensure metrics failures don't block core system functionality.

Applied to files:

  • router/pkg/pubsub/redis/adapter.go
📚 Learning: 2025-09-24T12:54:00.765Z
Learnt from: endigma
Repo: wundergraph/cosmo PR: 2222
File: router-tests/websocket_test.go:2238-2302
Timestamp: 2025-09-24T12:54:00.765Z
Learning: The wundergraph/cosmo project uses Go 1.25 (Go 1.23+ minimum), so fmt.Appendf and other newer Go standard library functions are available and can be used without compatibility concerns.

Applied to files:

  • router/pkg/pubsub/nats/adapter.go
📚 Learning: 2025-09-24T12:54:00.765Z
Learnt from: endigma
Repo: wundergraph/cosmo PR: 2222
File: router-tests/websocket_test.go:2238-2302
Timestamp: 2025-09-24T12:54:00.765Z
Learning: The wundergraph/cosmo project uses Go 1.25 (Go 1.25 minimum), so fmt.Appendf and other newer Go standard library functions are available and can be used without compatibility concerns.

Applied to files:

  • router/pkg/pubsub/nats/adapter.go
📚 Learning: 2025-08-28T09:17:49.477Z
Learnt from: endigma
Repo: wundergraph/cosmo PR: 2141
File: router-tests/http_subscriptions_test.go:17-55
Timestamp: 2025-08-28T09:17:49.477Z
Learning: The Cosmo router uses a custom, intentionally rigid multipart implementation for GraphQL subscriptions. The multipart parsing in test files should remain strict and not be made more tolerant, as this rigidity is by design.

Applied to files:

  • router/pkg/pubsub/redis/engine_datasource.go
  • router/pkg/pubsub/nats/engine_datasource.go
📚 Learning: 2025-08-28T09:18:10.121Z
Learnt from: endigma
Repo: wundergraph/cosmo PR: 2141
File: router-tests/http_subscriptions_test.go:100-108
Timestamp: 2025-08-28T09:18:10.121Z
Learning: In router-tests/http_subscriptions_test.go heartbeat tests, the message ordering should remain strict with data messages followed by heartbeat messages, as the timing is deterministic and known by design in the Cosmo router implementation.

Applied to files:

  • router/pkg/pubsub/redis/engine_datasource.go
🧬 Code graph analysis (4)
router/pkg/pubsub/redis/adapter.go (6)
router/pkg/pubsub/nats/adapter.go (1)
  • Adapter (27-31)
router/pkg/pubsub/datasource/provider.go (5)
  • Adapter (24-28)
  • SubscriptionEventConfiguration (101-105)
  • ProviderType (52-52)
  • StreamEvent (86-91)
  • PublishEventConfiguration (108-112)
router/pkg/pubsub/redis/engine_datasource.go (4)
  • SubscriptionEventConfiguration (64-68)
  • Event (20-22)
  • MutableEvent (35-37)
  • PublishEventConfiguration (141-145)
router/pkg/pubsub/datasource/subscription_event_updater.go (1)
  • SubscriptionEventUpdater (19-24)
router/pkg/pubsub/datasource/error.go (2)
  • NewError (12-17)
  • Error (3-6)
router/pkg/metric/stream_metric_store.go (2)
  • ProviderType (16-16)
  • StreamsEvent (25-31)
router/pkg/pubsub/nats/adapter.go (4)
router/pkg/pubsub/datasource/provider.go (5)
  • Adapter (24-28)
  • PublishEventConfiguration (108-112)
  • StreamEvent (86-91)
  • SubscriptionEventConfiguration (101-105)
  • ProviderType (52-52)
router/pkg/pubsub/nats/engine_datasource.go (5)
  • SubscriptionEventConfiguration (92-97)
  • StreamConfiguration (86-90)
  • Event (20-22)
  • MutableEvent (42-45)
  • PublishAndRequestEventConfiguration (168-172)
router/pkg/pubsub/datasource/subscription_event_updater.go (1)
  • SubscriptionEventUpdater (19-24)
router/pkg/pubsub/datasource/error.go (2)
  • NewError (12-17)
  • Error (3-6)
router/pkg/pubsub/redis/engine_datasource.go (6)
router/pkg/pubsub/nats/engine_datasource.go (3)
  • MutableEvent (42-45)
  • Event (20-22)
  • SubscriptionEventConfiguration (92-97)
router/pkg/pubsub/kafka/engine_datasource.go (5)
  • MutableEvent (62-66)
  • Event (21-23)
  • SubscriptionEventConfiguration (91-95)
  • PublishEventConfiguration (188-192)
  • SubscriptionDataSource (209-211)
router/pkg/pubsub/datasource/provider.go (8)
  • MutableStreamEvent (94-98)
  • Provider (31-39)
  • SubscriptionEventConfiguration (101-105)
  • ProviderType (52-52)
  • ProviderTypeRedis (57-57)
  • PublishEventConfiguration (108-112)
  • Adapter (24-28)
  • StreamEvent (86-91)
router/pkg/pubsub/nats/adapter.go (1)
  • Adapter (27-31)
router/pkg/pubsub/datasource/datasource.go (1)
  • SubscriptionDataSource (8-13)
router/pkg/pubsub/datasource/subscription_event_updater.go (1)
  • SubscriptionEventUpdater (19-24)
router/pkg/pubsub/nats/engine_datasource.go (3)
router/pkg/pubsub/datasource/provider.go (8)
  • MutableStreamEvent (94-98)
  • StreamEvent (86-91)
  • Provider (31-39)
  • SubscriptionEventConfiguration (101-105)
  • ProviderType (52-52)
  • ProviderTypeNats (55-55)
  • PublishEventConfiguration (108-112)
  • Adapter (24-28)
router/pkg/pubsub/datasource/subscription_event_updater.go (1)
  • SubscriptionEventUpdater (19-24)
router/pkg/pubsub/datasource/pubsubprovider.go (1)
  • PubSubProvider (12-19)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (9)
  • GitHub Check: integration_test (./. ./fuzzquery ./lifecycle ./modules)
  • GitHub Check: integration_test (./events)
  • GitHub Check: build_push_image (nonroot)
  • GitHub Check: build_push_image
  • GitHub Check: image_scan (nonroot)
  • GitHub Check: image_scan
  • GitHub Check: build_test
  • GitHub Check: Analyze (go)
  • GitHub Check: Analyze (javascript-typescript)
🔇 Additional comments (14)
router/pkg/pubsub/redis/adapter.go (1)

170-227: Well-structured error aggregation and metrics.

The per-event publishing logic correctly aggregates errors, emits metrics for both successful and failed events, and provides clear error messages. The success count calculation and error collection are accurate.

router/pkg/pubsub/redis/engine_datasource.go (2)

102-138: Safe JSON construction with proper escaping.

The MarshalJSONTemplate implementation now properly escapes user-controlled fields (Channel, Provider, FieldName) using goccyjson.Marshal while preserving template placeholders in Event.Data. This addresses previous security concerns about unsafe string concatenation.


243-247: Good practice: compile-time interface compliance checks.

These compile-time assertions ensure that the types correctly implement their respective interfaces, catching any interface mismatches at build time.

router/pkg/pubsub/nats/adapter.go (4)

80-231: Subscribe wiring and metrics look consistent with datasource abstractions

The Subscribe implementation correctly:

  • Asserts the concrete *SubscriptionEventConfiguration.
  • Derives a durable consumer name from instance + subjects.
  • Configures JetStream consumer with optional InactiveThreshold.
  • Streams messages into SubscriptionEventUpdater.Update as datasource.StreamEvent wrappers and records consume metrics.
  • Falls back to simple subject subscriptions when StreamConfiguration is nil.

This matches the expected datasource.Adapter.Subscribe contract and is in line with other providers.


298-355: Request path type assertions and metrics look reasonable

The Request implementation:

  • Asserts the concrete *PublishAndRequestEventConfiguration and returns a datasource.Error on mismatch.
  • Clones the incoming StreamEvent and asserts it to *MutableEvent, ensuring a mutable copy for the NATS request.
  • Logs and emits metrics on request errors and on success, and writes the response into the provided writer.

This matches the expectations for the NATS request path and is consistent with the new configuration types defined in engine_datasource.go.


445-454: castToMutableEvent helper centralizes NATS event type handling

The castToMutableEvent helper cleanly handles both *Event and *MutableEvent and rejects other StreamEvent implementations with a clear error. This centralization simplifies the publish path and avoids scattering type assertions.


233-285: The review comment is incorrect and should be disregarded.

Go 1.22 introduced the ability to range over integers, making for range n syntax valid for iterating n times in loops. The repository targets Go 1.25 (as confirmed in router/go.mod), which fully supports this syntax. The code at lines 265 and 277 using for range successCount and for range len(errs) is valid Go and will compile without errors. No changes are necessary.

Likely an incorrect or invalid review comment.

router/pkg/pubsub/nats/engine_datasource.go (7)

20-84: Event / MutableEvent semantics align with Streams v1 design

The Event and MutableEvent implementations look good:

  • Event.GetData and Event.GetHeaders clone underlying slices/maps, making Event effectively immutable.
  • MutableEvent.GetData returns the underlying []byte without cloning, while SetData/Clone defensively copy incoming/outgoing data and headers.
  • cloneHeaders performs a proper deep copy of the map-of-slices.

This matches the intended design where MutableEvent is the mutable view and Event is the immutable wrapper. Based on learnings.


92-113: SubscriptionEventConfiguration correctly implements datasource contract

The updated SubscriptionEventConfiguration struct and its methods:

  • Use providerId / rootFieldName JSON tags consistent with the datasource layer.
  • Expose ProviderID, ProviderType (fixed to NATS), and RootFieldName as required by datasource.SubscriptionEventConfiguration.

This keeps the engine-side config aligned with the generic pubsub/datasource abstractions.


114-166: publishData and MarshalJSONTemplate now safely build templated JSON

publishData.PublishEventConfiguration cleanly adapts engine-side data into a PublishAndRequestEventConfiguration.

MarshalJSONTemplate improves on the previous approach by:

  • Using goccyjson.Marshal to JSON-escape Subject, Provider, and FieldName.
  • Inserting Event.Data raw to preserve template placeholders, as required by the templating semantics.
  • Building the JSON via strings.Builder to avoid repeated allocations.

This addresses earlier JSON-safety concerns while still supporting templated payloads.


168-187: PublishAndRequestEventConfiguration correctly implements PublishEventConfiguration

The PublishAndRequestEventConfiguration struct and its methods (ProviderID, ProviderType, RootFieldName) correctly satisfy datasource.PublishEventConfiguration for the NATS provider, mirroring the subscription configuration design.


237-254: NatsPublishDataSource.Load behavior matches pubsub error-handling expectations

NatsPublishDataSource.Load:

  • Parses the publishData input and wraps the event as &Event{evt: &publishData.Event} so the adapter sees the expected NATS-specific event type.
  • Calls pubSub.Publish with the derived PublishEventConfiguration.
  • Swallows publish errors in favor of writing {"success": false} to the buffer, as documented in the comment, and only returns writer errors.

This is consistent with the intention to avoid surfacing provider errors as "unable to fetch from subgraph" while still signaling success/failure in the JSON payload.


260-281: NatsRequestDataSource.Load correctly drills down to NATS-specific Adapter

NatsRequestDataSource.Load:

  • Deserializes publishData and then asserts s.pubSub to *datasource.PubSubProvider to reach the underlying provider adapter.
  • Asserts that inner adapter to this package’s Adapter (with Request support) and fails fast with a clear error message if the types don't match.
  • Invokes adapter.Request with the NATS-specific publish configuration and an Event wrapper around the mutable event.

This layering looks correct, assuming the NATS provider is always wired via a datasource.PubSubProvider in the call sites.

Please double-check the construction sites for NatsRequestDataSource to ensure pubSub is always a *datasource.PubSubProvider with a NATS Adapter underneath; otherwise the type assertions here will fail at runtime.


287-291: Interface compliance checks are helpful for future refactors

The explicit interface assertions for SubscriptionEventConfiguration, PublishAndRequestEventConfiguration, Event, and MutableEvent are a good guardrail to catch accidental signature changes during future refactors.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
router-tests/go.mod (1)

212-212: Minor: comment formatting deviates from Go style conventions.

The space after // was removed (// github.com///github.com/). Go's style guide recommends a space after //. Clarify whether this is intentional formatting consistency or unintentional drift.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 8e79ea0 and da8f4a6.

⛔ Files ignored due to path filters (2)
  • router-tests/go.sum is excluded by !**/*.sum
  • router/go.sum is excluded by !**/*.sum
📒 Files selected for processing (2)
  • router-tests/go.mod (2 hunks)
  • router/go.mod (1 hunks)
🧰 Additional context used
🧠 Learnings (9)
📓 Common learnings
Learnt from: endigma
Repo: wundergraph/cosmo PR: 2141
File: router-tests/http_subscriptions_test.go:17-55
Timestamp: 2025-08-28T09:17:49.477Z
Learning: The Cosmo router uses a custom, intentionally rigid multipart implementation for GraphQL subscriptions. The multipart parsing in test files should remain strict and not be made more tolerant, as this rigidity is by design.
📚 Learning: 2025-08-20T22:13:25.222Z
Learnt from: StarpTech
Repo: wundergraph/cosmo PR: 2157
File: router-tests/go.mod:16-16
Timestamp: 2025-08-20T22:13:25.222Z
Learning: github.com/mark3labs/mcp-go v0.38.0 has regressions and should not be used in the wundergraph/cosmo project. v0.36.0 is the stable version that should be used across router-tests and other modules.

Applied to files:

  • router-tests/go.mod
  • router/go.mod
📚 Learning: 2025-09-24T12:54:00.765Z
Learnt from: endigma
Repo: wundergraph/cosmo PR: 2222
File: router-tests/websocket_test.go:2238-2302
Timestamp: 2025-09-24T12:54:00.765Z
Learning: The wundergraph/cosmo project uses Go 1.25 (Go 1.23+ minimum), so fmt.Appendf and other newer Go standard library functions are available and can be used without compatibility concerns.

Applied to files:

  • router-tests/go.mod
  • router/go.mod
📚 Learning: 2025-09-24T12:54:00.765Z
Learnt from: endigma
Repo: wundergraph/cosmo PR: 2222
File: router-tests/websocket_test.go:2238-2302
Timestamp: 2025-09-24T12:54:00.765Z
Learning: The wundergraph/cosmo project uses Go 1.25 (Go 1.25 minimum), so fmt.Appendf and other newer Go standard library functions are available and can be used without compatibility concerns.

Applied to files:

  • router-tests/go.mod
  • router/go.mod
📚 Learning: 2025-08-15T10:21:45.838Z
Learnt from: StarpTech
Repo: wundergraph/cosmo PR: 2142
File: helm/cosmo/Chart.yaml:0-0
Timestamp: 2025-08-15T10:21:45.838Z
Learning: In the WunderGraph Cosmo project, helm chart version upgrades and README badge synchronization are handled in separate helm release PRs, not in the initial version bump PRs.

Applied to files:

  • router-tests/go.mod
  • router/go.mod
📚 Learning: 2025-11-19T15:13:57.821Z
Learnt from: dkorittki
Repo: wundergraph/cosmo PR: 2273
File: router/core/graphql_handler.go:0-0
Timestamp: 2025-11-19T15:13:57.821Z
Learning: In the Cosmo router (wundergraph/cosmo), error handling follows a two-phase pattern: (1) Prehandler phase handles request parsing, validation, and setup errors using `httpGraphqlError` and `writeOperationError` (in files like graphql_prehandler.go, operation_processor.go, parse_multipart.go, batch.go); (2) Execution phase handles resolver execution errors using `WriteError` in GraphQLHandler.ServeHTTP. Because all `httpGraphqlError` instances are caught in the prehandler before ServeHTTP is invoked, any error type checks for `httpGraphqlError` in the execution-phase WriteError method are unreachable code.

Applied to files:

  • router-tests/go.mod
  • router/go.mod
📚 Learning: 2025-08-20T10:08:17.857Z
Learnt from: endigma
Repo: wundergraph/cosmo PR: 2155
File: router/core/router.go:1857-1866
Timestamp: 2025-08-20T10:08:17.857Z
Learning: router/pkg/config/config.schema.json forbids null values for traffic_shaping.subgraphs: additionalProperties references $defs.traffic_shaping_subgraph_request_rule with type "object". Therefore, in core.NewSubgraphTransportOptions, dereferencing each subgraph rule pointer is safe under schema-validated configs, and a nil-check is unnecessary.

Applied to files:

  • router-tests/go.mod
📚 Learning: 2025-10-01T20:39:16.113Z
Learnt from: SkArchon
Repo: wundergraph/cosmo PR: 2252
File: router-tests/telemetry/telemetry_test.go:9684-9693
Timestamp: 2025-10-01T20:39:16.113Z
Learning: Repo preference: In router-tests/telemetry/telemetry_test.go, keep strict > 0 assertions for request.operation.*Time (parsingTime, normalizationTime, validationTime, planningTime) in telemetry-related tests; do not relax to >= 0 unless CI flakiness is observed.

Applied to files:

  • router/go.mod
📚 Learning: 2025-08-28T09:17:49.477Z
Learnt from: endigma
Repo: wundergraph/cosmo PR: 2141
File: router-tests/http_subscriptions_test.go:17-55
Timestamp: 2025-08-28T09:17:49.477Z
Learning: The Cosmo router uses a custom, intentionally rigid multipart implementation for GraphQL subscriptions. The multipart parsing in test files should remain strict and not be made more tolerant, as this rigidity is by design.

Applied to files:

  • router/go.mod
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (12)
  • GitHub Check: build-router
  • GitHub Check: build_push_image (nonroot)
  • GitHub Check: integration_test (./. ./fuzzquery ./lifecycle ./modules)
  • GitHub Check: build_push_image
  • GitHub Check: image_scan (nonroot)
  • GitHub Check: build_test
  • GitHub Check: integration_test (./events)
  • GitHub Check: image_scan
  • GitHub Check: integration_test (./telemetry)
  • GitHub Check: build_test
  • GitHub Check: Analyze (go)
  • GitHub Check: Analyze (javascript-typescript)
🔇 Additional comments (3)
router-tests/go.mod (1)

30-30: Dependency bump for Streams v1 engine integration.

The graphql-go-tools/v2 version bump from rc.238 to rc.239 aligns with the Cosmo Streams v1 implementation. Per the PR objectives, the engine PR (graphql-go-tools #1309) must merge first before updating to a released version; this interim RC state is expected.

Once the upstream engine PR is merged and a stable version is released, verify that go.mod is updated to reference the released version (not an RC) before this PR merges.

router/go.mod (2)

197-197: Commented replace directive is appropriate for development.

The local filesystem replace directive remains commented out, which is correct. This serves as a reference for future local development workflows and does not interfere with published builds.


34-34: Version consistency verified across router and router-tests go.mod files.

Verification confirms the graphql-go-tools/v2 version bump is properly coordinated:

  • Both router/go.mod (line 34) and router-tests/go.mod reference v2.0.0-rc.239
  • Local filesystem replace directives for graphql-go-tools are correctly commented out in both files
  • No inappropriate active replace directives are present

The changes align with the PR objectives and maintain proper dependency management.

Copy link
Contributor

@StarpTech StarpTech left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@dkorittki
Copy link
Contributor Author

Query Planner Test keeps failing. This should be clarified before merging. Need to find out why it's failing, does not seem obvious to me.

@dkorittki
Copy link
Contributor Author

Waiting for confirmation that failing query planner checks are indeed not because of the changes in this pull request.

@dkorittki
Copy link
Contributor Author

Discussed internally why the check fails and if it is connected to this issue.
There is a query, which fails the planner check https://github.com/wundergraph/cosmo-celestial/pull/760#user-content-query-plan-check-failures. Usually there should be a reference to the failing check in the pull request comments of the query planner bot but at the time of testing this pull request, there was some work on the query planner automation which could have prevented this. We observed other pull requests having the same problem.

So skipping query planner checks is okay here.

@dkorittki dkorittki merged commit e4e5bfb into main Nov 23, 2025
28 checks passed
@dkorittki dkorittki deleted the topic/streams-v1 branch November 23, 2025 14:29
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants