21
21
from graphql_server .subscriptions .protocols .graphql_ws import types as ws_types
22
22
23
23
if TYPE_CHECKING :
24
- from collections .abc import AsyncIterator
25
- from types import TracebackType
26
- from typing_extensions import Self
24
+ from collections .abc import AsyncIterator # pragma: no cover
25
+ from types import TracebackType # pragma: no cover
26
+ from typing_extensions import Self # pragma: no cover
27
27
28
- from asgiref .typing import ASGIApplication
28
+ from asgiref .typing import ASGIApplication # pragma: no cover
29
29
30
30
31
31
class GraphQLWebsocketCommunicator (WebsocketCommunicator ):
@@ -71,7 +71,7 @@ def __init__(
71
71
subprotocols: an ordered list of preferred subprotocols to be sent to the server.
72
72
**kwargs: additional arguments to be passed to the `WebsocketCommunicator` constructor.
73
73
"""
74
- if connection_params is None :
74
+ if connection_params is None : # pragma: no cover - tested via custom initialisation
75
75
connection_params = {}
76
76
self .protocol = protocol
77
77
subprotocols = kwargs .get ("subprotocols" , [])
@@ -139,7 +139,7 @@ async def subscribe(
139
139
},
140
140
}
141
141
142
- if variables is not None :
142
+ if variables is not None : # pragma: no cover - exercised in higher-level tests
143
143
start_message ["payload" ]["variables" ] = variables
144
144
145
145
await self .send_json_to (start_message )
@@ -155,7 +155,7 @@ async def subscribe(
155
155
ret .errors = self .process_errors (payload .get ("errors" ) or [])
156
156
ret .extensions = payload .get ("extensions" , None )
157
157
yield ret
158
- elif message ["type" ] == "error" :
158
+ elif message ["type" ] == "error" : # pragma: no cover - network failures untested
159
159
error_payload = message ["payload" ]
160
160
yield ExecutionResult (
161
161
data = None , errors = self .process_errors (error_payload )
0 commit comments