Skip to content

Conversation

@jwaldrip
Copy link
Contributor

@jwaldrip jwaldrip commented Sep 5, 2025

Summary

This PR implements the GraphQL @defer and @stream directives for incremental delivery as specified in the GraphQL Incremental Delivery RFC.

Note: These directives are still in draft/RFC stage and not yet part of the finalized GraphQL specification. The implementation may change as the specification evolves.

Features

Core Directives

  • @defer directive: Defer execution of fragments to reduce initial response time
  • @stream directive: Stream list fields incrementally with configurable batch sizes
  • Opt-in: Requires import_directives Absinthe.Type.BuiltIns.IncrementalDirectives in your schema

Unified Streaming Architecture

  • Subscriptions with @defer/@stream: Subscriptions now support incremental delivery automatically
  • Shared execution path: Both queries and subscriptions use the same TaskExecutor for deferred tasks
  • Backwards compatible: Existing PubSub implementations work unchanged

Pluggable Executors

  • Custom backends: Implement Absinthe.Streaming.Executor for Oban, RabbitMQ, GenStage, etc.
  • Flexible configuration: Schema-level, per-request, or application-wide
  • Default executor: Task.async_stream with configurable concurrency and timeouts

Transport & Integration

  • Transport support: GraphQL-WS, Server-Sent Events, and extensible transport layer
  • Dataloader integration: Maintains efficient batching with incremental delivery
  • Relay compatibility: Full support for streaming Relay connections

Observability

  • Telemetry events: [:absinthe, :incremental, :delivery, :*] for monitoring
  • on_event callback: Custom monitoring integrations (Sentry, DataDog, etc.)
  • Complexity analysis: Proper cost calculation for deferred/streamed operations

Architecture

Absinthe.Streaming
├── Executor        - Behaviour for pluggable execution backends
├── TaskExecutor    - Default executor (Task.async_stream)
└── Delivery        - Handles pubsub delivery for subscriptions

Query/Mutation Path:
  Request → StreamingResolution → Transport → TaskExecutor → Client

Subscription Path:
  Mutation → Subscription.Local → StreamingResolution → Streaming.Delivery
           → TaskExecutor → pubsub.publish_subscription/2 → Client

Usage

Enable in Schema

defmodule MyApp.Schema do
  use Absinthe.Schema

  # Required: opt-in to @defer/@stream directives
  import_directives Absinthe.Type.BuiltIns.IncrementalDirectives

  # Optional: custom executor for deferred tasks
  @streaming_executor MyApp.ObanExecutor

  query do
    # ...
  end

  subscription do
    # Subscriptions with @defer work automatically
  end
end

Query with @defer

query GetUser($id: ID!) {
  user(id: $id) {
    id
    name
    ... @defer(label: "profile") {
      email
      profile { bio avatar }
    }
  }
}

Subscription with @defer

subscription OnOrderUpdated($orderId: ID!) {
  orderUpdated(orderId: $orderId) {
    id
    status
    ... @defer(label: "customer") {
      customer { name email }
    }
  }
}

Custom Executor Example

defmodule MyApp.ObanExecutor do
  @behaviour Absinthe.Streaming.Executor

  @impl true
  def execute(tasks, opts) do
    tasks
    |> Enum.map(&queue_to_oban/1)
    |> stream_results(opts)
  end
end

Testing

Comprehensive test coverage including:

  • Unit tests for directives and executor
  • Backwards compatibility tests for subscriptions
  • Integration tests with dataloader
  • Transport protocol tests
  • Error handling scenarios

1516 tests, 0 failures

Breaking Changes

None - incremental delivery is opt-in and backward compatible:

  • Existing schemas work unchanged
  • Existing PubSub implementations work unchanged
  • @defer/@stream only available when explicitly imported

Documentation

@jwaldrip jwaldrip force-pushed the gigmart/defer-stream-incremental branch from ff57b93 to 59606f2 Compare September 5, 2025 23:15
@cschiewek
Copy link
Member

@jwaldrip This is fantastic. I'm going to try and get folks from my team to review this in the next couple weeks. If you could update your branch to kick off a new CI again, that'd be amazing. Thanks!

Copy link
Contributor

@bryanjos bryanjos left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great work! I had a few comments and suggestions on my initial look

@@ -0,0 +1,230 @@
defmodule Absinthe.IncrementalSchema do
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this file needed for something or could it be removed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed in a recent commit. This was a leftover from early development.

@@ -0,0 +1,483 @@
# Incremental Delivery
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see this, INCREMENTAL_DELIVERY.md and README_INCREMENTAL.md and they look similar. Could one or more be removed and consolidated?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! These have been consolidated. The duplicate files (INCREMENTAL_DELIVERY.md and README_INCREMENTAL.md) have been removed - only guides/incremental-delivery.md remains now.

.tool-versions Outdated
@@ -0,0 +1,2 @@
erlang 26.2.5
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if this should be kept in or not. If so it could be set to the lowest version or elixir and erlang supported

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed. The mainline repo doesn't have a version file, so this has been deleted to stay aligned.

based on field complexity and performance characteristics.

This middleware can:
- Analyze field complexity and suggest defer/stream
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be very useful!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! It's designed to be an optional optimization layer - users can enable it to have @defer/@stream automatically suggested for expensive fields based on configurable thresholds.

@@ -0,0 +1,240 @@
defmodule Absinthe.Incremental.Supervisor do
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there something that starts this supervisor? I couldn't find anything but I might have missed it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The supervisor needs to be added to your application's supervision tree manually. I've updated the moduledoc with clear instructions showing exactly how to do this in application.ex. The supervisor is only required for actual incremental delivery over transports (SSE, WebSocket) - standard execution works without it.

Comment on lines 105 to 112
case Absinthe.Incremental.Supervisor.start_link(
enabled: true,
enable_defer: true,
enable_stream: true
) do
{:ok, _pid} -> :ok
{:error, {:already_started, _pid}} -> :ok
end
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can use start_supervised here that'll start and stop the supervisor for each test

Suggested change
case Absinthe.Incremental.Supervisor.start_link(
enabled: true,
enable_defer: true,
enable_stream: true
) do
{:ok, _pid} -> :ok
{:error, {:already_started, _pid}} -> :ok
end
{:ok, _pid} = start_supervised({Absinthe.Incremental.Supervisor, enabled: true, enable_defer: true, enable_stream: true})

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good suggestion! The test file has been refactored - it now focuses on directive definitions and parsing which don't require the supervisor. Integration tests that need the supervisor have been moved elsewhere.

@cschiewek cschiewek marked this pull request as draft December 9, 2025 14:41
@yordis
Copy link
Contributor

yordis commented Dec 11, 2025

Hey folks, any updates on this one? Is there anything I can do to help to push the feature forward?

@jwaldrip jwaldrip marked this pull request as ready for review January 12, 2026 16:50
@jwaldrip
Copy link
Contributor Author

jwaldrip commented Jan 12, 2026

Hey @yordis! The PR has been updated and all review feedback has been addressed:

  • Removed duplicate documentation files (consolidated to guides/incremental-delivery.md)
  • Removed .tool-versions to align with mainline
  • Removed leftover test files
  • Added clear documentation for Supervisor startup and Dataloader integration
  • Fixed several bugs in the incremental delivery implementation
  • All 1483 tests pass

The branch is up to date with mainline and ready for review. The companion PRs for transport layers are also ready:

jwaldrip and others added 23 commits January 13, 2026 08:35
- Fix mix absinthe.schema.json to use schema's adapter for introspection
- Fix mix absinthe.schema.sdl to use schema's adapter for directive names
- Update SDL renderer to accept adapter parameter and use it for directive definitions
- Ensure directive names follow naming conventions (camelCase, etc.) in generated SDL

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <[email protected]>
When a field has no description, it now inherits the description from its
referenced type during introspection. This provides better documentation
for GraphQL APIs by automatically propagating type descriptions to fields.

- Modified __field introspection resolver to fall back to type descriptions
- Handles wrapped types (non_null, list_of) correctly by unwrapping first
- Added comprehensive test coverage for various inheritance scenarios
- Updated field documentation to explain the new behavior

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <[email protected]>
- Add @defer directive for deferred fragment execution
- Add @stream directive for incremental list delivery
- Implement streaming resolution phase
- Add incremental response builder
- Add transport abstraction layer
- Implement Dataloader integration for streaming
- Add error handling and resource management
- Add complexity analysis for streaming operations
- Add auto-optimization middleware
- Add comprehensive test suite
- Add performance benchmarks
- Add pipeline integration hooks
- Add configuration system
- Complete usage guide with examples
- API reference for @defer and @stream directives
- Performance optimization guidelines
- Transport configuration details
- Troubleshooting and monitoring guidance

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <[email protected]>
- Fix Ruby-style return statements in auto_defer_stream middleware
- Correct Elixir typespec syntax in response module
- Mark unused variables with underscore prefix
- Remove invalid optional() syntax from typespecs

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <[email protected]>
- Fix supervisor startup handling in tests
- Simplify test helpers to use standard Absinthe.run
- Enable basic test execution for incremental delivery features
- Address compilation issues and warnings

Tests now run successfully and provide baseline for further development.

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <[email protected]>
This commit finalizes the implementation of GraphQL @defer and @stream directives
for incremental delivery in Absinthe:

- Fix streaming resolution phase to properly handle defer/stream flags
- Update projector to gracefully handle defer/stream flags without crashing
- Improve telemetry phases to handle missing blueprint context gracefully
- Add comprehensive test infrastructure for incremental delivery
- Create debug script for testing directive processing
- Add BuiltIns module for proper directive loading

The @defer and @stream directives now work correctly according to the GraphQL
specification, allowing for incremental query result delivery.

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <[email protected]>
Add detailed guide for @defer and @stream directives following
the same structure as other Absinthe feature guides.

Includes:
- Basic usage examples
- Configuration options
- Transport integration (WebSocket, SSE)
- Advanced patterns (conditional, nested)
- Error handling
- Performance considerations
- Relay integration
- Testing approaches
- Migration guidance

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <[email protected]>
Include guides/incremental-delivery.md in the mix.exs extras list so it
appears in the generated documentation alongside other guides.

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <[email protected]>
Based on community feedback from PR absinthe-graphql#1373, automatic field description
inheritance was not well received. The community preferred explicit
field descriptions that are specific to each field's context rather
than automatically inheriting from the referenced type.

This commit:
- Reverts the automatic inheritance behavior in introspection
- Removes the associated test file
- Returns to the standard field description handling

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <[email protected]>
Run mix format to fix formatting issues detected by CI.

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <[email protected]>
- Fix Absinthe.Type.list?/1 undefined function by using pattern matching
- Fix directive expand callbacks to return node directly (not {:ok, node})
- Add missing analyze_node clauses for Operation and Fragment.Named nodes
- Fix defer depth tracking for nested defers
- Fix projector to only skip __skip_initial__ flagged nodes, not all defer/stream
- Update introspection tests for new @defer/@stream directives
- Remove duplicate documentation files per PR review
- Add comprehensive complexity analysis tests

Co-Authored-By: Claude Opus 4.5 <[email protected]>
Address review comments:
- Add detailed documentation on how to start the Incremental Supervisor
- Include configuration options and examples in supervisor docs
- Add usage documentation for Dataloader integration
- Explain how streaming-aware resolvers work with batching

Co-Authored-By: Claude Opus 4.5 <[email protected]>
Add an `on_event` callback option to the incremental delivery system
that allows sending defer/stream events to external monitoring services
like Sentry, DataDog, or custom telemetry systems.

The callback is invoked at each stage of incremental delivery:
- `:initial` - When the initial response is sent
- `:incremental` - When each deferred/streamed payload is delivered
- `:complete` - When the stream completes successfully
- `:error` - When an error occurs during streaming

Each event includes payload data and metadata such as:
- `operation_id` - Unique identifier for tracking
- `path` - GraphQL path to the deferred field
- `label` - Label from @defer/@stream directive
- `duration_ms` - Time taken for the operation
- `task_type` - `:defer` or `:stream`

Example usage:

    Absinthe.run(query, schema,
      on_event: fn
        :error, payload, metadata ->
          Sentry.capture_message("GraphQL streaming error",
            extra: %{payload: payload, metadata: metadata}
          )
        _, _, _ -> :ok
      end
    )

Co-Authored-By: Claude Opus 4.5 <[email protected]>
Add telemetry events for the incremental delivery transport layer to
enable integration with instrumentation libraries like opentelemetry_absinthe.

New telemetry events:

- `[:absinthe, :incremental, :delivery, :initial]`
  Emitted when initial response is sent with has_next, pending_count

- `[:absinthe, :incremental, :delivery, :payload]`
  Emitted for each @defer/@stream payload with path, label, task_type,
  duration, and success status

- `[:absinthe, :incremental, :delivery, :complete]`
  Emitted when streaming completes successfully with total duration

- `[:absinthe, :incremental, :delivery, :error]`
  Emitted on errors with reason and message

All events include operation_id for correlation across spans.
Events follow the same pattern as existing Absinthe telemetry events
with measurements (system_time, duration) and metadata.

This enables opentelemetry_absinthe and other instrumentation libraries
to create proper spans for @defer/@stream operations.

Co-Authored-By: Claude Opus 4.5 <[email protected]>
jwaldrip and others added 6 commits January 13, 2026 08:36
Update the telemetry guide to document the new @defer/@stream events:

- [:absinthe, :incremental, :delivery, :initial]
- [:absinthe, :incremental, :delivery, :payload]
- [:absinthe, :incremental, :delivery, :complete]
- [:absinthe, :incremental, :delivery, :error]

Includes detailed documentation of measurements and metadata for each
event, plus examples for attaching handlers and using the on_event
callback for custom monitoring integrations.

Co-Authored-By: Claude Opus 4.5 <[email protected]>
The incremental delivery directives are still in the RFC stage and not
yet part of the finalized GraphQL specification. Updated documentation
to make this clear and link to the actual RFC.

Co-Authored-By: Claude Opus 4.5 <[email protected]>
Move @defer and @stream directives from core built-ins to a new
opt-in module Absinthe.Type.BuiltIns.IncrementalDirectives.

Since @defer/@stream are draft-spec features (not yet finalized),
users must now explicitly opt-in by adding:

    import_types Absinthe.Type.BuiltIns.IncrementalDirectives

to their schema definition.

Co-Authored-By: Claude Opus 4.5 <[email protected]>
Run mix format to fix whitespace and formatting issues that were
causing CI to fail.

Co-Authored-By: Claude Opus 4.5 <[email protected]>
Restore Elixir 1.19 to the CI matrix to match upstream main.

Co-Authored-By: Claude Opus 4.5 <[email protected]>
@jwaldrip jwaldrip force-pushed the gigmart/defer-stream-incremental branch from 43e71ba to 0d48992 Compare January 13, 2026 15:37
…delivery

- Add Absinthe.Streaming module with shared abstractions
- Add Absinthe.Streaming.Executor behaviour for pluggable task execution
- Add Absinthe.Streaming.TaskExecutor as default executor (Task.async_stream)
- Add Absinthe.Streaming.Delivery for pubsub incremental delivery
- Enable @defer/@stream in subscriptions (automatic multi-payload delivery)
- Refactor Transport to use shared TaskExecutor
- Update Subscription.Local to detect and handle incremental directives
- Add comprehensive backwards compatibility tests
- Update guides and documentation

Subscriptions with @defer/@stream now automatically deliver multiple payloads
using the standard GraphQL incremental format. Existing PubSub implementations
work unchanged - publish_subscription/2 is called multiple times.

Custom executors (Oban, RabbitMQ, etc.) can be configured via:
- Schema attribute: @streaming_executor MyApp.ObanExecutor
- Context: context: %{streaming_executor: MyApp.ObanExecutor}
- Application config: config :absinthe, :streaming_executor, MyApp.ObanExecutor

Co-Authored-By: Claude Opus 4.5 <[email protected]>
@jwaldrip
Copy link
Contributor Author

Unified Streaming Architecture Update

This commit unifies the subscription and incremental delivery systems, enabling @defer/@stream support in subscriptions with pluggable task execution.

New Modules

Module Purpose
Absinthe.Streaming Umbrella module with shared helpers
Absinthe.Streaming.Executor Behaviour for pluggable task execution backends
Absinthe.Streaming.TaskExecutor Default executor using Task.async_stream
Absinthe.Streaming.Delivery Handles pubsub delivery for subscriptions

Key Features

1. Subscriptions with @defer/@stream

Subscriptions now support incremental delivery automatically:

subscription {
  orderUpdated(orderId: "123") {
    id
    status
    ... @defer(label: "customer") {
      customer { name email }
    }
  }
}

Clients receive multiple payloads using the standard GraphQL incremental format. Existing PubSub implementations work unchanged.

2. Pluggable Executors

Custom execution backends (Oban, RabbitMQ, etc.) can be configured:

defmodule MyApp.Schema do
  use Absinthe.Schema
  
  @streaming_executor MyApp.ObanExecutor
  
  # ...
end

3. Unified Code Path

Both queries and subscriptions now share the same TaskExecutor for deferred task execution, ensuring consistent behavior and a single configuration point.

Architecture

Query/Mutation Path:
  Request → StreamingResolution → Transport → TaskExecutor → Client

Subscription Path:
  Mutation → Subscription.Local → StreamingResolution → Streaming.Delivery
           → TaskExecutor → pubsub.publish_subscription/2 → Client

Documentation

  • Updated guides/incremental-delivery.md with custom executor guide
  • Updated guides/subscriptions.md with @defer/@stream section
  • Updated CHANGELOG.md with new features

Tests

All 1516 tests pass, including new tests for:

  • TaskExecutor unit tests
  • Backwards compatibility tests for subscriptions

Copy link
Contributor

@bryanjos bryanjos left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome work! Approving as I only had a few nits and questions

end
end

defmodule Absinthe.Middleware.IncrementalComplexity do
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this be moved to its own file in the middleware folder so it is easier to find?

defp telemetry_reporter(_), do: nil
end

defmodule Absinthe.Incremental.TelemetryReporter do
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this also be put in its own file so it's easier to find?

deferred = Map.get(context, :deferred_tasks, [])
streamed = Map.get(context, :stream_tasks, [])

deferred ++ streamed
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any worries getting stream tasks for a defer or the other way around?

@@ -1,5 +1,38 @@
# Changelog

## Unreleased
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The changelog should be updated automatically

@jwaldrip
Copy link
Contributor Author

Good question! This is actually safe because both task types can coexist in the same query and share the same execution path. Each task has a :type field (:defer or :stream) that determines how it's processed downstream:

  • In Streaming.Delivery.build_success_payload/3 (lines 200-218), we pattern match on task.type to call either Response.build_incremental/4 for defer or Response.build_stream_incremental/4 for stream
  • In Transport.build_task_response/3 (lines 323-356), same pattern matching ensures correct response formatting
  • The executor itself is type-agnostic - it just runs the task.execute/0 function and returns results

So mixing them is intentional and safe. A query like:

{
  user {
    ... @defer { profile }  # defer task
    posts @stream { title }  # stream task  
  }
}

Results in both task types being combined into a single stream and executed together, which is exactly what we want.

@jwaldrip
Copy link
Contributor Author

Thanks for the heads up! I've manually updated the changelog to document the new features (unified streaming architecture, pluggable executors, subscriptions with @defer/@stream).

If you have an automated changelog generation workflow (e.g., via conventional commits or release-please), I'm happy to adjust. Just let me know the process and I'll follow it.

…rability

- Move Absinthe.Middleware.IncrementalComplexity to its own file in lib/absinthe/middleware/
- Move Absinthe.Incremental.TelemetryReporter to its own file in lib/absinthe/incremental/
- Improves code organization and makes these modules easier to find

Addresses PR review feedback from @bryanjos

Co-Authored-By: Claude Sonnet 4.5 <[email protected]>
@jwaldrip
Copy link
Contributor Author

PR Review Feedback Addressed

Thanks @bryanjos for the detailed review! I've addressed all the feedback:

1. ✅ Move IncrementalComplexity middleware to its own file

Commit: 96fa747

Moved Absinthe.Middleware.IncrementalComplexity from lib/absinthe/incremental/complexity.ex to lib/absinthe/middleware/incremental_complexity.ex for better discoverability.

2. ✅ Move TelemetryReporter to its own file

Commit: 96fa747

Moved Absinthe.Incremental.TelemetryReporter from lib/absinthe/incremental/supervisor.ex to lib/absinthe/incremental/telemetry_reporter.ex for better organization.

3. ✅ Defer/Stream task mixing concern

Responded in thread - this is intentional and safe. Both task types use the same executor and are distinguished by their :type field during response formatting.

4. ✅ CHANGELOG automatic update

Responded in thread - happy to adjust if you have an automated process.

Tests

All 1516 tests passing ✅

The refactoring improves code organization without changing any functionality.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants