Skip to content

Commit d7754a9

Browse files
authored
Support HTTP multipart subscriptions in AIOHTTPTransport (#574)
This commit adds support for HTTP multipart protocol, which allows GraphQL subscriptions to work over HTTP using the multipart response format.
1 parent 05a8e98 commit d7754a9

File tree

8 files changed

+996
-45
lines changed

8 files changed

+996
-45
lines changed

README.md

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,13 +30,14 @@ The complete documentation for GQL can be found at
3030

3131
## Features
3232

33-
* Execute GraphQL queries using [different protocols](https://gql.readthedocs.io/en/latest/transports/index.html):
33+
* Execute GraphQL requests using [different protocols](https://gql.readthedocs.io/en/latest/transports/index.html):
3434
* http
35+
* including the multipart protocol for subscriptions
3536
* websockets:
3637
* apollo or graphql-ws protocol
3738
* Phoenix channels
38-
* AWS AppSync realtime protocol (experimental)
39-
* Possibility to [validate the queries locally](https://gql.readthedocs.io/en/latest/usage/validation.html) using a GraphQL schema provided locally or fetched from the backend using an instrospection query
39+
* AWS AppSync realtime protocol
40+
* Possibility to [validate the requests locally](https://gql.readthedocs.io/en/latest/usage/validation.html) using a GraphQL schema provided locally or fetched from the backend using an instrospection query
4041
* Supports GraphQL queries, mutations and [subscriptions](https://gql.readthedocs.io/en/latest/usage/subscriptions.html)
4142
* Supports [sync](https://gql.readthedocs.io/en/latest/usage/sync_usage.html) or [async](https://gql.readthedocs.io/en/latest/usage/async_usage.html) usage, [allowing concurrent requests](https://gql.readthedocs.io/en/latest/advanced/async_advanced_usage.html#async-advanced-usage)
4243
* Supports [File uploads](https://gql.readthedocs.io/en/latest/usage/file_upload.html)
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
import asyncio
2+
import logging
3+
4+
from gql import Client, gql
5+
from gql.transport.aiohttp import AIOHTTPTransport
6+
7+
logging.basicConfig(level=logging.INFO)
8+
9+
10+
async def main():
11+
12+
transport = AIOHTTPTransport(url="https://gql-book-server.fly.dev/graphql")
13+
14+
# Using `async with` on the client will start a connection on the transport
15+
# and provide a `session` variable to execute queries on this connection
16+
async with Client(
17+
transport=transport,
18+
) as session:
19+
20+
# Request subscription
21+
subscription = gql(
22+
"""
23+
subscription {
24+
book {
25+
title
26+
author
27+
}
28+
}
29+
"""
30+
)
31+
32+
# Subscribe and receive streaming updates
33+
async for result in session.subscribe(subscription):
34+
print(f"Received: {result}")
35+
36+
37+
asyncio.run(main())

docs/transports/aiohttp.rst

Lines changed: 71 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,15 +7,79 @@ This transport uses the `aiohttp`_ library and allows you to send GraphQL querie
77

88
Reference: :class:`gql.transport.aiohttp.AIOHTTPTransport`
99

10-
.. note::
10+
This transport supports both standard GraphQL operations (queries, mutations) and subscriptions.
11+
Subscriptions are implemented using the `multipart subscription protocol`_
12+
as implemented by Apollo GraphOS Router and other compatible servers.
1113

12-
GraphQL subscriptions are not supported on the HTTP transport.
13-
For subscriptions you should use a websockets transport:
14-
:ref:`WebsocketsTransport <websockets_transport>` or
15-
:ref:`AIOHTTPWebsocketsTransport <aiohttp_websockets_transport>`.
14+
This provides an HTTP-based alternative to WebSocket transports for receiving streaming
15+
subscription updates. It's particularly useful when:
16+
17+
- WebSocket connections are not available or blocked by infrastructure
18+
- You want to use standard HTTP with existing load balancers and proxies
19+
- The backend implements the multipart subscription protocol
20+
21+
Queries
22+
-------
1623

1724
.. literalinclude:: ../code_examples/aiohttp_async.py
1825

26+
Subscriptions
27+
-------------
28+
29+
The transport sends a standard HTTP POST request with an ``Accept`` header indicating
30+
support for multipart responses:
31+
32+
.. code-block:: text
33+
34+
Accept: multipart/mixed;subscriptionSpec="1.0", application/json
35+
36+
The server responds with a ``multipart/mixed`` content type and streams subscription
37+
updates as separate parts in the response body. Each part contains a JSON payload
38+
with GraphQL execution results.
39+
40+
.. literalinclude:: ../code_examples/aiohttp_multipart_subscription.py
41+
42+
How It Works
43+
^^^^^^^^^^^^
44+
45+
**Message Format**
46+
47+
Each message part follows this structure:
48+
49+
.. code-block:: text
50+
51+
--graphql
52+
Content-Type: application/json
53+
54+
{"payload": {"data": {...}, "errors": [...]}}
55+
56+
**Heartbeats**
57+
58+
Servers may send empty JSON objects (``{}``) as heartbeat messages to keep the
59+
connection alive. These are automatically filtered out by the transport.
60+
61+
**Error Handling**
62+
63+
The protocol distinguishes between two types of errors:
64+
65+
- **GraphQL errors**: Returned within the ``payload`` property alongside data
66+
- **Transport errors**: Returned with a top-level ``errors`` field and ``null`` payload
67+
68+
**End of Stream**
69+
70+
The subscription ends when the server sends the final boundary marker:
71+
72+
.. code-block:: text
73+
74+
--graphql--
75+
76+
Limitations
77+
^^^^^^^^^^^
78+
79+
- Subscriptions require the server to implement the multipart subscription protocol
80+
- Long-lived connections may be terminated by intermediate proxies or load balancers
81+
- Some server configurations may not support HTTP/1.1 chunked transfer encoding required for streaming
82+
1983
Authentication
2084
--------------
2185

@@ -52,3 +116,5 @@ and you can save these cookies in a cookie jar to reuse them in a following conn
52116
53117
.. _aiohttp: https://docs.aiohttp.org
54118
.. _issue 197: https://github.com/graphql-python/gql/issues/197
119+
.. _multipart subscription protocol: https://www.apollographql.com/docs/graphos/routing/operations/subscriptions/multipart-protocol
120+

gql/transport/aiohttp.py

Lines changed: 179 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
)
1717

1818
import aiohttp
19+
from aiohttp import BodyPartReader, MultipartReader
1920
from aiohttp.client_exceptions import ClientResponseError
2021
from aiohttp.client_reqrep import Fingerprint
2122
from aiohttp.helpers import BasicAuth
@@ -421,12 +422,186 @@ async def execute_batch(
421422
except Exception as e:
422423
raise TransportConnectionFailed(str(e)) from e
423424

424-
def subscribe(
425+
async def subscribe(
425426
self,
426427
request: GraphQLRequest,
427428
) -> AsyncGenerator[ExecutionResult, None]:
428-
"""Subscribe is not supported on HTTP.
429+
"""Execute a GraphQL subscription and yield results from multipart response.
429430
430-
:meta private:
431+
:param request: GraphQL request to execute
432+
:yields: ExecutionResult objects as they arrive in the multipart stream
431433
"""
432-
raise NotImplementedError(" The HTTP transport does not support subscriptions")
434+
if self.session is None:
435+
raise TransportClosed("Transport is not connected")
436+
437+
post_args = self._prepare_request(request)
438+
439+
# Add headers for multipart subscription
440+
headers = post_args.get("headers", {})
441+
headers.update(
442+
{
443+
"Content-Type": "application/json",
444+
"Accept": (
445+
"multipart/mixed;boundary=graphql;"
446+
"subscriptionSpec=1.0,application/json"
447+
),
448+
}
449+
)
450+
post_args["headers"] = headers
451+
452+
try:
453+
async with self.session.post(self.url, ssl=self.ssl, **post_args) as resp:
454+
# Saving latest response headers in the transport
455+
self.response_headers = resp.headers
456+
457+
# Check for errors
458+
if resp.status >= 400:
459+
# Raise a TransportServerError if status > 400
460+
self._raise_transport_server_error_if_status_more_than_400(resp)
461+
462+
initial_content_type = resp.headers.get("Content-Type", "")
463+
if (
464+
"application/json" in initial_content_type
465+
and "multipart/mixed" not in initial_content_type
466+
):
467+
yield await self._prepare_result(resp)
468+
return
469+
470+
if (
471+
("multipart/mixed" not in initial_content_type)
472+
or ("boundary=graphql" not in initial_content_type)
473+
or ("subscriptionSpec=1.0" not in initial_content_type)
474+
):
475+
raise TransportProtocolError(
476+
f"Unexpected content-type: {initial_content_type}. "
477+
"Server may not support the multipart subscription protocol."
478+
)
479+
480+
# Parse multipart response
481+
async for result in self._parse_multipart_response(resp):
482+
yield result
483+
484+
except TransportError:
485+
raise
486+
except Exception as e:
487+
raise TransportConnectionFailed(str(e)) from e
488+
489+
async def _parse_multipart_response(
490+
self,
491+
response: aiohttp.ClientResponse,
492+
) -> AsyncGenerator[ExecutionResult, None]:
493+
"""
494+
Parse a multipart response stream and yield execution results.
495+
496+
Uses aiohttp's built-in MultipartReader to handle the multipart protocol.
497+
498+
:param response: The aiohttp response object
499+
:yields: ExecutionResult objects
500+
"""
501+
# Use aiohttp's built-in multipart reader
502+
reader = MultipartReader.from_response(response)
503+
504+
# Iterate through each part in the multipart response
505+
while True:
506+
try:
507+
part = await reader.next()
508+
except Exception:
509+
# reader.next() throws on empty parts at the end of the stream.
510+
# (some servers may send this.)
511+
# see: https://github.com/aio-libs/aiohttp/pull/11857
512+
# As an ugly workaround for now, we can check if we've reached
513+
# EOF and assume this was the case.
514+
if reader.at_eof():
515+
break
516+
517+
# Otherwise, re-raise unexpected errors
518+
raise # pragma: no cover
519+
520+
if part is None:
521+
# No more parts
522+
break
523+
524+
assert not isinstance(
525+
part, MultipartReader
526+
), "Nested multipart parts are not supported in GraphQL subscriptions"
527+
528+
result = await self._parse_multipart_part(part)
529+
if result:
530+
yield result
531+
532+
async def _parse_multipart_part(
533+
self, part: BodyPartReader
534+
) -> Optional[ExecutionResult]:
535+
"""
536+
Parse a single part from a multipart response.
537+
538+
:param part: aiohttp BodyPartReader for the part
539+
:return: ExecutionResult or None if part is empty/heartbeat
540+
"""
541+
# Verify the part has the correct content type
542+
content_type = part.headers.get(aiohttp.hdrs.CONTENT_TYPE, "")
543+
if not content_type.startswith("application/json"):
544+
raise TransportProtocolError(
545+
f"Unexpected part content-type: {content_type}. "
546+
"Expected 'application/json'."
547+
)
548+
549+
try:
550+
# Read the part content as text
551+
body = await part.text()
552+
body = body.strip()
553+
554+
if log.isEnabledFor(logging.DEBUG):
555+
log.debug("<<< %s", ascii(body or "(empty body, skipping)"))
556+
557+
if not body:
558+
return None
559+
560+
# Parse JSON body using custom deserializer
561+
data = self.json_deserialize(body)
562+
563+
# Handle heartbeats - empty JSON objects
564+
if not data:
565+
log.debug("Received heartbeat, ignoring")
566+
return None
567+
568+
# The multipart subscription protocol wraps data in a "payload" property
569+
if "payload" not in data:
570+
log.warning("Invalid response: missing 'payload' field")
571+
return None
572+
573+
payload = data["payload"]
574+
575+
# Check for transport-level errors (payload is null)
576+
if payload is None:
577+
# If there are errors, this is a transport-level error
578+
errors = data.get("errors")
579+
if errors:
580+
error_messages = [
581+
error.get("message", "Unknown transport error")
582+
for error in errors
583+
]
584+
585+
for message in error_messages:
586+
log.error(f"Transport error: {message}")
587+
588+
raise TransportServerError("\n\n".join(error_messages))
589+
else:
590+
# Null payload without errors - just skip this part
591+
return None
592+
593+
# Extract GraphQL data from payload
594+
return ExecutionResult(
595+
data=payload.get("data"),
596+
errors=payload.get("errors"),
597+
extensions=payload.get("extensions"),
598+
)
599+
except json.JSONDecodeError as e:
600+
log.warning(
601+
f"Failed to parse JSON: {ascii(e)}, "
602+
f"body: {ascii(body[:100]) if body else ''}"
603+
)
604+
return None
605+
except UnicodeDecodeError as e:
606+
log.warning(f"Failed to decode part: {ascii(e)}")
607+
return None

0 commit comments

Comments
 (0)