Skip to content
7 changes: 4 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,14 @@ The complete documentation for GQL can be found at

## Features

* Execute GraphQL queries using [different protocols](https://gql.readthedocs.io/en/latest/transports/index.html):
* Execute GraphQL requests using [different protocols](https://gql.readthedocs.io/en/latest/transports/index.html):
* http
* including the multipart protocol for subscriptions
* websockets:
* apollo or graphql-ws protocol
* Phoenix channels
* AWS AppSync realtime protocol (experimental)
* Possibility to [validate the queries locally](https://gql.readthedocs.io/en/latest/usage/validation.html) using a GraphQL schema provided locally or fetched from the backend using an instrospection query
* AWS AppSync realtime protocol
* Possibility to [validate the requests locally](https://gql.readthedocs.io/en/latest/usage/validation.html) using a GraphQL schema provided locally or fetched from the backend using an instrospection query
* Supports GraphQL queries, mutations and [subscriptions](https://gql.readthedocs.io/en/latest/usage/subscriptions.html)
* Supports [sync](https://gql.readthedocs.io/en/latest/usage/sync_usage.html) or [async](https://gql.readthedocs.io/en/latest/usage/async_usage.html) usage, [allowing concurrent requests](https://gql.readthedocs.io/en/latest/advanced/async_advanced_usage.html#async-advanced-usage)
* Supports [File uploads](https://gql.readthedocs.io/en/latest/usage/file_upload.html)
Expand Down
37 changes: 37 additions & 0 deletions docs/code_examples/aiohttp_multipart_subscription.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import asyncio
import logging

from gql import Client, gql
from gql.transport.aiohttp import AIOHTTPTransport

logging.basicConfig(level=logging.INFO)


async def main():

transport = AIOHTTPTransport(url="https://gql-book-server.fly.dev/graphql")

# Using `async with` on the client will start a connection on the transport
# and provide a `session` variable to execute queries on this connection
async with Client(
transport=transport,
) as session:

# Request subscription
subscription = gql(
"""
subscription {
book {
title
author
}
}
"""
)

# Subscribe and receive streaming updates
async for result in session.subscribe(subscription):
print(f"Received: {result}")


asyncio.run(main())
76 changes: 71 additions & 5 deletions docs/transports/aiohttp.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,79 @@ This transport uses the `aiohttp`_ library and allows you to send GraphQL querie

Reference: :class:`gql.transport.aiohttp.AIOHTTPTransport`

.. note::
This transport supports both standard GraphQL operations (queries, mutations) and subscriptions.
Subscriptions are implemented using the `multipart subscription protocol`_
as implemented by Apollo GraphOS Router and other compatible servers.

GraphQL subscriptions are not supported on the HTTP transport.
For subscriptions you should use a websockets transport:
:ref:`WebsocketsTransport <websockets_transport>` or
:ref:`AIOHTTPWebsocketsTransport <aiohttp_websockets_transport>`.
This provides an HTTP-based alternative to WebSocket transports for receiving streaming
subscription updates. It's particularly useful when:

- WebSocket connections are not available or blocked by infrastructure
- You want to use standard HTTP with existing load balancers and proxies
- The backend implements the multipart subscription protocol

Queries
-------

.. literalinclude:: ../code_examples/aiohttp_async.py

Subscriptions
-------------

The transport sends a standard HTTP POST request with an ``Accept`` header indicating
support for multipart responses:

.. code-block:: text

Accept: multipart/mixed;subscriptionSpec="1.0", application/json

The server responds with a ``multipart/mixed`` content type and streams subscription
updates as separate parts in the response body. Each part contains a JSON payload
with GraphQL execution results.

.. literalinclude:: ../code_examples/aiohttp_multipart_subscription.py

How It Works
^^^^^^^^^^^^

**Message Format**

Each message part follows this structure:

.. code-block:: text

--graphql
Content-Type: application/json

{"payload": {"data": {...}, "errors": [...]}}

**Heartbeats**

Servers may send empty JSON objects (``{}``) as heartbeat messages to keep the
connection alive. These are automatically filtered out by the transport.

**Error Handling**

The protocol distinguishes between two types of errors:

- **GraphQL errors**: Returned within the ``payload`` property alongside data
- **Transport errors**: Returned with a top-level ``errors`` field and ``null`` payload

**End of Stream**

The subscription ends when the server sends the final boundary marker:

.. code-block:: text

--graphql--

Limitations
^^^^^^^^^^^

- Subscriptions require the server to implement the multipart subscription protocol
- Long-lived connections may be terminated by intermediate proxies or load balancers
- Some server configurations may not support HTTP/1.1 chunked transfer encoding required for streaming

Authentication
--------------

Expand Down Expand Up @@ -52,3 +116,5 @@ and you can save these cookies in a cookie jar to reuse them in a following conn

.. _aiohttp: https://docs.aiohttp.org
.. _issue 197: https://github.com/graphql-python/gql/issues/197
.. _multipart subscription protocol: https://www.apollographql.com/docs/graphos/routing/operations/subscriptions/multipart-protocol

183 changes: 179 additions & 4 deletions gql/transport/aiohttp.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
)

import aiohttp
from aiohttp import BodyPartReader, MultipartReader
from aiohttp.client_exceptions import ClientResponseError
from aiohttp.client_reqrep import Fingerprint
from aiohttp.helpers import BasicAuth
Expand Down Expand Up @@ -421,12 +422,186 @@ async def execute_batch(
except Exception as e:
raise TransportConnectionFailed(str(e)) from e

def subscribe(
async def subscribe(
self,
request: GraphQLRequest,
) -> AsyncGenerator[ExecutionResult, None]:
"""Subscribe is not supported on HTTP.
"""Execute a GraphQL subscription and yield results from multipart response.

:meta private:
:param request: GraphQL request to execute
:yields: ExecutionResult objects as they arrive in the multipart stream
"""
raise NotImplementedError(" The HTTP transport does not support subscriptions")
if self.session is None:
raise TransportClosed("Transport is not connected")

post_args = self._prepare_request(request)

# Add headers for multipart subscription
headers = post_args.get("headers", {})
headers.update(
{
"Content-Type": "application/json",
"Accept": (
"multipart/mixed;boundary=graphql;"
"subscriptionSpec=1.0,application/json"
),
}
)
post_args["headers"] = headers

try:
async with self.session.post(self.url, ssl=self.ssl, **post_args) as resp:
# Saving latest response headers in the transport
self.response_headers = resp.headers

# Check for errors
if resp.status >= 400:
# Raise a TransportServerError if status > 400
self._raise_transport_server_error_if_status_more_than_400(resp)

initial_content_type = resp.headers.get("Content-Type", "")
if (
"application/json" in initial_content_type
and "multipart/mixed" not in initial_content_type
):
yield await self._prepare_result(resp)
return

if (
("multipart/mixed" not in initial_content_type)
or ("boundary=graphql" not in initial_content_type)
or ("subscriptionSpec=1.0" not in initial_content_type)
):
raise TransportProtocolError(
f"Unexpected content-type: {initial_content_type}. "
"Server may not support the multipart subscription protocol."
)

# Parse multipart response
async for result in self._parse_multipart_response(resp):
yield result

except TransportError:
raise
except Exception as e:
raise TransportConnectionFailed(str(e)) from e

async def _parse_multipart_response(
self,
response: aiohttp.ClientResponse,
) -> AsyncGenerator[ExecutionResult, None]:
"""
Parse a multipart response stream and yield execution results.

Uses aiohttp's built-in MultipartReader to handle the multipart protocol.

:param response: The aiohttp response object
:yields: ExecutionResult objects
"""
# Use aiohttp's built-in multipart reader
reader = MultipartReader.from_response(response)

# Iterate through each part in the multipart response
while True:
try:
part = await reader.next()
except Exception:
# reader.next() throws on empty parts at the end of the stream.
# (some servers may send this.)
# see: https://github.com/aio-libs/aiohttp/pull/11857
# As an ugly workaround for now, we can check if we've reached
# EOF and assume this was the case.
if reader.at_eof():
break

# Otherwise, re-raise unexpected errors
raise # pragma: no cover

if part is None:
# No more parts
break

assert not isinstance(
part, MultipartReader
), "Nested multipart parts are not supported in GraphQL subscriptions"

result = await self._parse_multipart_part(part)
if result:
yield result

async def _parse_multipart_part(
self, part: BodyPartReader
) -> Optional[ExecutionResult]:
"""
Parse a single part from a multipart response.

:param part: aiohttp BodyPartReader for the part
:return: ExecutionResult or None if part is empty/heartbeat
"""
# Verify the part has the correct content type
content_type = part.headers.get(aiohttp.hdrs.CONTENT_TYPE, "")
if not content_type.startswith("application/json"):
raise TransportProtocolError(
f"Unexpected part content-type: {content_type}. "
"Expected 'application/json'."
)

try:
# Read the part content as text
body = await part.text()
body = body.strip()

if log.isEnabledFor(logging.DEBUG):
log.debug("<<< %s", ascii(body or "(empty body, skipping)"))

if not body:
return None

# Parse JSON body using custom deserializer
data = self.json_deserialize(body)

# Handle heartbeats - empty JSON objects
if not data:
log.debug("Received heartbeat, ignoring")
return None

# The multipart subscription protocol wraps data in a "payload" property
if "payload" not in data:
log.warning("Invalid response: missing 'payload' field")
return None

payload = data["payload"]

# Check for transport-level errors (payload is null)
if payload is None:
# If there are errors, this is a transport-level error
errors = data.get("errors")
if errors:
error_messages = [
error.get("message", "Unknown transport error")
for error in errors
]

for message in error_messages:
log.error(f"Transport error: {message}")

raise TransportServerError("\n\n".join(error_messages))
else:
# Null payload without errors - just skip this part
return None

# Extract GraphQL data from payload
return ExecutionResult(
data=payload.get("data"),
errors=payload.get("errors"),
extensions=payload.get("extensions"),
)
except json.JSONDecodeError as e:
log.warning(
f"Failed to parse JSON: {ascii(e)}, "
f"body: {ascii(body[:100]) if body else ''}"
)
return None
except UnicodeDecodeError as e:
log.warning(f"Failed to decode part: {ascii(e)}")
return None
Loading
Loading