Skip to content

Conversation

@alepane21
Copy link
Contributor

@alepane21 alepane21 commented Mar 18, 2025

Motivation and Context

Today changing or adding an EDFS provider requires changes to cosmo and the engine, making it harder than it should.
The main point of this PR is to make it easier to add future EDFS providers.

So I moved all the pubsub code from the engine to the pubsub package in the router and refactored it to make more sense with the new structure.

There is a README that will help implementing new EDFS providers (manually or using AI).

Important points

  • I added a test to verify that kafka publish is working, there was nothing testing it before;
  • connections operations (connection, closing connection) should happen as before;
  • switching from a previous version to the new one should just works, without any change to the configuration or schema.

Design

Goals

  1. Plug‑n‑play providers – adding Kafka, NATS, etc. must not touch core engine code.

  2. Provider freedom – each provider may implement protocol‑specific quirks without affecting others.


Current pain point

Engine and Router share provider logic; every new provider requires coordinated changes in both repositories.


Architecture

The refactor introduces four interfaces:

Interface Responsibility
ProviderFactory Creates a concrete PubSubProvider given a DataSourceConfiguration + router event config.
PubSubProvider Owns provider‑wide resources (e.g. NATS connection pool).
PubSubDataSource Implements ResolveDataSource + SubscribeDataSource for one GraphQL field.
Adapter Thin, protocol‑specific driver that actually publishes / subscribes.

Execution flow

When the router’s Loader (router/core/factoryresolver.go) encounters a PubSub node, it:

  1. Iterates over every registered ProviderFactory, passing each factory

    • the node’s DataSourceConfiguration, and

    • the router’s event‑configuration block.

      Each factory creates (or retrieves) its own PubSubProvider instance as needed.

  2. Builds a new datasource.Factory, injecting the full slice of initialised providers.

    This factory implements Planner(), returning a datasource.Planner that keeps the list of PubSubProviders.

  3. At execution time the engine calls the planner for each GraphQL field.

    The planner scans its providers and picks the matching PubSubDataSource—e.g. a NatsDataSource for a field annotated with @edfs_natsSubscribe.

    The data source already contains a pre‑configured Adapter.

  4. The chosen PubSubDataSource supplies the engine with both resolve.DataSource and resolve.SubscribeDataSource, always propagating the Adapter so downstream code can read any value from the event configuration before invoking it.

  5. Finally, the Adapter hands off the actual publish / subscribe work to the appropriate external driver (Kafka, NATS, etc.).


Lifecycle notes

  • A single PubSubProvider instance lives as long as the graphServer, opening connections when is started, and shutting them down when the router is stopped;

  • The Adapter hides third‑party SDKs; swap it to migrate from one client library to another.


Outcome

So adding Redis now means:

  1. Implement redis.ProviderFactory, redis.Provider, redis.DataSource, redis.Adapter.

  2. Register the factory once.

  3. Implements the changes to the composition side of the thing

No engine or router code changes → Goal 1 satisfied.

Provider can decide batching, ACK strategy, etc. internally → Goal 2 satisfied.

Checklist

  • I have discussed my proposed changes in an issue and have received approval to proceed.
  • I have followed the coding standards of the project.
  • Tests or benchmarks have been added or updated.
  • Documentation has been updated on https://github.com/wundergraph/cosmo-docs.
  • I have read the Contributors Guide.

@github-actions
Copy link

github-actions bot commented Mar 24, 2025

Router image scan passed

✅ No security vulnerabilities found in image:

ghcr.io/wundergraph/cosmo/router:sha-8138e124a9aefb7e5e2eedf3287b2b7917117b4a

…and planner files, enhance NATS subject validation
…actor-kafka-and-nats-as-datasource-in-router
@alepane21 alepane21 changed the title edfs refactor kafka and nats as datasource in router feat: edfs refactor kafka and nats as datasource in router Apr 4, 2025
@alepane21 alepane21 requested review from Copilot and jensneuse May 8, 2025 10:18
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR refactors the pubsub integration for EDFS providers by replacing the previous Kafka/NATS data source types with a unified NATS adapter interface, supporting the plug‑n‑play provider design.

  • Updated pubsub adapter interfaces across multiple subgraph modules to use the new nats.AdapterInterface.
  • Modified the mood schema and resolvers to accept a getPubSubName function for dynamic topic naming.
  • Adjusted error handling in adapter initialization in the New() function.

Reviewed Changes

Copilot reviewed 64 out of 64 changed files in this pull request and generated 1 comment.

Show a summary per file
File Description
demo/pkg/subgraphs/subgraphs.go Refactored NATS adapter initialization and error handling, switching the type of map values to nats.AdapterInterface.
demo/pkg/subgraphs/products_fg/subgraph/resolver.go Updated adapter interface type in dependency injection for product subgraph resolvers.
demo/pkg/subgraphs/products_fg/products.go Adjusted NewSchema signature to utilize nats.AdapterInterface.
demo/pkg/subgraphs/products/subgraph/resolver.go Updated adapter interface in product subgraph resolver.
demo/pkg/subgraphs/products/products.go Modified schema creation to reflect new adapter interface type.
demo/pkg/subgraphs/mood/subgraph/schema.resolvers.go Added usage of GetPubSubName in topic naming and updated adapter interface usage.
demo/pkg/subgraphs/mood/subgraph/resolver.go Updated resolver to inject the new adapter interface and topic naming function.
demo/pkg/subgraphs/mood/mood.go Changed NewSchema signature to accept getPubSubName for dynamic topic naming.
demo/pkg/subgraphs/hobbies/subgraph/resolver.go Refactored dependency injector to use the new adapter interface.
demo/pkg/subgraphs/hobbies/hobbies.go Updated schema creation to use nats.AdapterInterface.
demo/pkg/subgraphs/family/subgraph/resolver.go Updated dependency injection to use the new adapter interface.
demo/pkg/subgraphs/family/family.go Modified schema creation signature to reflect the adapter type change.
demo/pkg/subgraphs/employees/subgraph/resolver.go Changed adapter interface injection to nats.AdapterInterface.
demo/pkg/subgraphs/employees/employees.go Updated schema creation to use the new adapter interface.
demo/pkg/subgraphs/countries/subgraph/resolver.go Adjusted dependency injection to use nats.AdapterInterface.
demo/pkg/subgraphs/countries/countries.go Modified NewSchema to utilize nats.AdapterInterface.
demo/pkg/subgraphs/availability/subgraph/schema.resolvers.go Updated error handling and adapter interface usage with new types.
demo/pkg/subgraphs/availability/subgraph/resolver.go Refactored dependency injection to use the new adapter interface.
demo/pkg/subgraphs/availability/availability.go Adjusted schema creation to accept a getPubSubName function and the new adapter interface.
demo/cmd/mood/main.go Updated instantiation of mood schema to supply a trivial getPubSubName function.

Copy link
Member

@devsergiy devsergiy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The most questionable thing is that we are still looking for all providers/datasources during planning time

I would prefer to have a single datasource per event or per single provider id

@alepane21
Copy link
Contributor Author

The most questionable thing is that we are still looking for all providers/datasources during planning time

I would prefer to have a single datasource per event or per single provider id

Fixed in #1848

@alepane21 alepane21 closed this Jun 6, 2025
@devsergiy devsergiy deleted the ale/eng-6482-edfs-refactor-kafka-and-nats-as-datasource-in-router branch June 6, 2025 12:54
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants