Skip to content

Commit 7f1bfe7

Browse files
jwaldripclaude
andcommitted
feat: unify streaming architecture for subscriptions and incremental delivery
- Add Absinthe.Streaming module with shared abstractions - Add Absinthe.Streaming.Executor behaviour for pluggable task execution - Add Absinthe.Streaming.TaskExecutor as default executor (Task.async_stream) - Add Absinthe.Streaming.Delivery for pubsub incremental delivery - Enable @defer/@stream in subscriptions (automatic multi-payload delivery) - Refactor Transport to use shared TaskExecutor - Update Subscription.Local to detect and handle incremental directives - Add comprehensive backwards compatibility tests - Update guides and documentation Subscriptions with @defer/@stream now automatically deliver multiple payloads using the standard GraphQL incremental format. Existing PubSub implementations work unchanged - publish_subscription/2 is called multiple times. Custom executors (Oban, RabbitMQ, etc.) can be configured via: - Schema attribute: @streaming_executor MyApp.ObanExecutor - Context: context: %{streaming_executor: MyApp.ObanExecutor} - Application config: config :absinthe, :streaming_executor, MyApp.ObanExecutor Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1 parent 0d48992 commit 7f1bfe7

File tree

17 files changed

+1748
-257
lines changed

17 files changed

+1748
-257
lines changed

CHANGELOG.md

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,26 @@
66

77
* **draft-spec:** Add `@defer` and `@stream` directives for incremental delivery ([#1377](https://github.com/absinthe-graphql/absinthe/pull/1377))
88
- **Note:** These directives are still in draft/RFC stage and not yet part of the finalized GraphQL specification
9-
- **Opt-in required:** `import_types Absinthe.Type.BuiltIns.IncrementalDirectives` in your schema
9+
- **Opt-in required:** `import_directives Absinthe.Type.BuiltIns.IncrementalDirectives` in your schema
1010
- Split GraphQL responses into initial + incremental payloads
1111
- Configure via `Absinthe.Pipeline.Incremental.enable/2`
1212
- Resource limits (max concurrent streams, memory, duration)
1313
- Dataloader integration for batched loading
1414
- SSE and WebSocket transport support
15+
* **subscriptions:** Support `@defer` and `@stream` in subscriptions
16+
- Subscriptions with deferred content deliver multiple payloads automatically
17+
- Existing PubSub implementations work unchanged (calls `publish_subscription/2` multiple times)
18+
- Uses standard GraphQL incremental delivery format that clients already understand
19+
* **streaming:** Unified streaming architecture for queries and subscriptions
20+
- New `Absinthe.Streaming` module consolidates shared abstractions
21+
- `Absinthe.Streaming.Executor` behaviour for pluggable task execution backends
22+
- `Absinthe.Streaming.TaskExecutor` default executor using `Task.async_stream`
23+
- `Absinthe.Streaming.Delivery` handles pubsub delivery for subscriptions
24+
- Both query and subscription incremental delivery share the same execution path
25+
* **executors:** Pluggable task execution backends
26+
- Implement `Absinthe.Streaming.Executor` to use custom backends (Oban, RabbitMQ, etc.)
27+
- Configure via `@streaming_executor` schema attribute, context, or application config
28+
- Default executor uses `Task.async_stream` with configurable concurrency and timeouts
1529
* **telemetry:** Add telemetry events for incremental delivery
1630
- `[:absinthe, :incremental, :delivery, :initial]` - initial response
1731
- `[:absinthe, :incremental, :delivery, :payload]` - each deferred/streamed payload

guides/incremental-delivery.md

Lines changed: 171 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ defmodule MyApp.Schema do
3838
use Absinthe.Schema
3939

4040
# Import the draft-spec @defer and @stream directives
41-
import_types Absinthe.Type.BuiltIns.IncrementalDirectives
41+
import_directives Absinthe.Type.BuiltIns.IncrementalDirectives
4242

4343
query do
4444
# ...
@@ -496,6 +496,176 @@ Existing queries work without changes. To add incremental delivery:
496496
4. **Configure transport** to handle streaming responses
497497
5. **Add monitoring** to track performance improvements
498498

499+
## Subscriptions with @defer/@stream
500+
501+
Subscriptions support the same `@defer` and `@stream` directives as queries. When a subscription contains deferred content, clients receive multiple payloads:
502+
503+
1. **Initial payload**: Immediately available subscription data
504+
2. **Incremental payloads**: Deferred/streamed content as it resolves
505+
506+
```graphql
507+
subscription OnOrderUpdated($orderId: ID!) {
508+
orderUpdated(orderId: $orderId) {
509+
id
510+
status
511+
512+
# Defer expensive customer lookup
513+
... @defer(label: "customer") {
514+
customer {
515+
name
516+
email
517+
loyaltyTier
518+
}
519+
}
520+
}
521+
}
522+
```
523+
524+
This is handled automatically by the subscription system. Existing PubSub implementations work unchanged - the same `publish_subscription/2` callback is called multiple times with the standard GraphQL incremental format.
525+
526+
### How It Works
527+
528+
When a mutation triggers a subscription with `@defer`/`@stream`:
529+
530+
1. `Subscription.Local` detects the directives in the subscription document
531+
2. The `StreamingResolution` phase executes, collecting deferred tasks
532+
3. `Streaming.Delivery` publishes the initial payload via `pubsub.publish_subscription/2`
533+
4. Deferred tasks are executed via the configured executor
534+
5. Each result is published as an incremental payload
535+
536+
```elixir
537+
# What happens internally (you don't need to do this manually)
538+
pubsub.publish_subscription(topic, %{
539+
data: %{orderUpdated: %{id: "123", status: "SHIPPED"}},
540+
pending: [%{id: "0", label: "customer", path: ["orderUpdated"]}],
541+
hasNext: true
542+
})
543+
544+
# Later...
545+
pubsub.publish_subscription(topic, %{
546+
incremental: [%{
547+
id: "0",
548+
data: %{customer: %{name: "John", email: "john@example.com", loyaltyTier: "GOLD"}}
549+
}],
550+
hasNext: false
551+
})
552+
```
553+
554+
## Custom Executors
555+
556+
By default, deferred and streamed tasks are executed using `Task.async_stream` for in-process concurrent execution. You can implement a custom executor for alternative backends:
557+
558+
- **Oban** - Persistent, retryable job processing
559+
- **RabbitMQ** - Distributed task queuing
560+
- **GenStage** - Backpressure-aware pipelines
561+
- **Custom** - Any execution strategy you need
562+
563+
### Implementing a Custom Executor
564+
565+
Implement the `Absinthe.Streaming.Executor` behaviour:
566+
567+
```elixir
568+
defmodule MyApp.ObanExecutor do
569+
@behaviour Absinthe.Streaming.Executor
570+
571+
@impl true
572+
def execute(tasks, opts) do
573+
timeout = Keyword.get(opts, :timeout, 30_000)
574+
575+
# Queue tasks to Oban and stream results
576+
tasks
577+
|> Enum.map(&queue_to_oban/1)
578+
|> stream_results(timeout)
579+
end
580+
581+
defp queue_to_oban(task) do
582+
%{task_id: task.id, execute_fn: task.execute}
583+
|> MyApp.DeferredWorker.new()
584+
|> Oban.insert!()
585+
end
586+
587+
defp stream_results(jobs, timeout) do
588+
# Return an enumerable of results matching this shape:
589+
# %{
590+
# task: original_task,
591+
# result: {:ok, data} | {:error, reason},
592+
# has_next: boolean,
593+
# success: boolean,
594+
# duration_ms: integer
595+
# }
596+
Stream.resource(
597+
fn -> {jobs, timeout} end,
598+
&poll_next_result/1,
599+
fn _ -> :ok end
600+
)
601+
end
602+
end
603+
```
604+
605+
### Configuring a Custom Executor
606+
607+
**Schema-level** (recommended):
608+
609+
```elixir
610+
defmodule MyApp.Schema do
611+
use Absinthe.Schema
612+
613+
# Use custom executor for all @defer/@stream operations
614+
@streaming_executor MyApp.ObanExecutor
615+
616+
import_directives Absinthe.Type.BuiltIns.IncrementalDirectives
617+
618+
query do
619+
# ...
620+
end
621+
end
622+
```
623+
624+
**Per-request** (via context):
625+
626+
```elixir
627+
Absinthe.run(query, MyApp.Schema,
628+
context: %{streaming_executor: MyApp.ObanExecutor}
629+
)
630+
```
631+
632+
**Application config** (global default):
633+
634+
```elixir
635+
# config/config.exs
636+
config :absinthe, :streaming_executor, MyApp.ObanExecutor
637+
```
638+
639+
### When to Use Custom Executors
640+
641+
| Use Case | Recommended Executor |
642+
|----------|---------------------|
643+
| Simple deployments | Default `TaskExecutor` |
644+
| Long-running deferred operations | Oban (with persistence) |
645+
| Distributed systems | RabbitMQ or similar |
646+
| High-throughput with backpressure | GenStage |
647+
| Retry on failure | Oban |
648+
649+
## Architecture
650+
651+
The streaming system is unified across queries, mutations, and subscriptions:
652+
653+
```
654+
Absinthe.Streaming
655+
├── Executor - Behaviour for pluggable execution backends
656+
├── TaskExecutor - Default executor (Task.async_stream)
657+
└── Delivery - Handles pubsub delivery for subscriptions
658+
659+
Query/Mutation Path:
660+
Request → Pipeline → StreamingResolution → Transport → Client
661+
662+
Subscription Path:
663+
Mutation → Subscription.Local → StreamingResolution → Streaming.Delivery
664+
→ pubsub.publish_subscription/2 (multiple times) → Client
665+
```
666+
667+
Both paths share the same `Executor` for task execution, ensuring consistent behavior and allowing a single configuration point for custom backends.
668+
499669
## See Also
500670

501671
- [Subscriptions](subscriptions.md) for real-time data

guides/subscriptions.md

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -266,3 +266,101 @@ Since we provided a `context_id`, Absinthe will only run two documents per publi
266266

267267
1. Once for _user 1_ and _user 3_ because they have the same context ID (`"global"`) and sent the same document.
268268
2. Once for _user 2_. While _user 2_ has the same context ID (`"global"`), they provided a different document, so it cannot be de-duplicated with the other two.
269+
270+
### Incremental Delivery with Subscriptions
271+
272+
Subscriptions support `@defer` and `@stream` directives for incremental delivery. This allows you to receive subscription data progressively - immediately available data first, followed by deferred content.
273+
274+
First, import the incremental directives in your schema:
275+
276+
```elixir
277+
defmodule MyAppWeb.Schema do
278+
use Absinthe.Schema
279+
280+
# Enable @defer and @stream directives
281+
import_directives Absinthe.Type.BuiltIns.IncrementalDirectives
282+
283+
# ... rest of schema
284+
end
285+
```
286+
287+
Then use `@defer` in your subscription queries:
288+
289+
```graphql
290+
subscription {
291+
commentAdded(repoName: "absinthe-graphql/absinthe") {
292+
id
293+
content
294+
author {
295+
name
296+
}
297+
298+
# Defer expensive operations
299+
... @defer(label: "authorDetails") {
300+
author {
301+
email
302+
avatarUrl
303+
recentActivity {
304+
type
305+
timestamp
306+
}
307+
}
308+
}
309+
}
310+
}
311+
```
312+
313+
When a mutation triggers this subscription, clients receive multiple payloads:
314+
315+
**Initial payload** (sent immediately):
316+
```json
317+
{
318+
"data": {
319+
"commentAdded": {
320+
"id": "123",
321+
"content": "Great library!",
322+
"author": { "name": "John" }
323+
}
324+
},
325+
"pending": [{"id": "0", "label": "authorDetails", "path": ["commentAdded"]}],
326+
"hasNext": true
327+
}
328+
```
329+
330+
**Incremental payload** (sent when deferred data resolves):
331+
```json
332+
{
333+
"incremental": [{
334+
"id": "0",
335+
"data": {
336+
"author": {
337+
"email": "john@example.com",
338+
"avatarUrl": "https://...",
339+
"recentActivity": [...]
340+
}
341+
}
342+
}],
343+
"hasNext": false
344+
}
345+
```
346+
347+
This is handled automatically by the subscription system. Your existing PubSub implementation works unchanged - it receives multiple `publish_subscription/2` calls with the standard GraphQL incremental format.
348+
349+
#### Custom Executors for Subscriptions
350+
351+
For long-running deferred operations in subscriptions, you can configure a custom executor (e.g., Oban for persistence):
352+
353+
```elixir
354+
defmodule MyAppWeb.Schema do
355+
use Absinthe.Schema
356+
357+
# Use Oban for deferred task execution
358+
@streaming_executor MyApp.ObanExecutor
359+
360+
import_directives Absinthe.Type.BuiltIns.IncrementalDirectives
361+
362+
# ...
363+
end
364+
```
365+
366+
See the [Incremental Delivery guide](incremental-delivery.md) for details on implementing custom executors.

0 commit comments

Comments
 (0)