1
- from typing import Awaitable , Union
1
+ from inspect import isawaitable
2
+ from typing import Any , AsyncGenerator , AsyncIterator , Awaitable , Coroutine , cast
2
3
3
- from graphql import DocumentNode , ExecutionResult , GraphQLSchema , execute
4
+ from graphql import DocumentNode , ExecutionResult , GraphQLSchema , execute , subscribe
4
5
5
- from gql .transport import Transport
6
+ from gql .transport import AsyncTransport
6
7
7
8
8
- class LocalSchemaTransport (Transport ):
9
+ class LocalSchemaTransport (AsyncTransport ):
9
10
"""A transport for executing GraphQL queries against a local schema."""
10
11
11
12
def __init__ (
@@ -17,14 +18,59 @@ def __init__(
17
18
"""
18
19
self .schema = schema
19
20
20
- def execute (
21
- self , document : DocumentNode , * args , ** kwargs
22
- ) -> Union [ExecutionResult , Awaitable [ExecutionResult ]]:
23
- """Execute the given document against the configured local schema.
21
+ async def connect (self ):
22
+ """No connection needed on local transport
23
+ """
24
+ pass
25
+
26
+ async def close (self ):
27
+ """No close needed on local transport
28
+ """
29
+ pass
24
30
25
- :param document: GraphQL query as AST Node object.
26
- :param args: Positional options for execute method from graphql-core.
27
- :param kwargs: Keyword options passed to execute method from graphql-core.
28
- :return: ExecutionResult (either as value or awaitable)
31
+ async def execute (
32
+ self , document : DocumentNode , * args , ** kwargs ,
33
+ ) -> ExecutionResult :
34
+ """Execute the provided document AST for on a local GraphQL Schema.
29
35
"""
30
- return execute (self .schema , document , * args , ** kwargs )
36
+
37
+ result_or_awaitable = execute (self .schema , document , * args , ** kwargs )
38
+
39
+ execution_result : ExecutionResult
40
+
41
+ if isawaitable (result_or_awaitable ):
42
+ result_or_awaitable = cast (Awaitable [ExecutionResult ], result_or_awaitable )
43
+ execution_result = await result_or_awaitable
44
+ else :
45
+ result_or_awaitable = cast (ExecutionResult , result_or_awaitable )
46
+ execution_result = result_or_awaitable
47
+
48
+ return execution_result
49
+
50
+ async def subscribe (
51
+ self , document : DocumentNode , * args , ** kwargs ,
52
+ ) -> AsyncGenerator [ExecutionResult , None ]:
53
+ """Send a query and receive the results using an async generator
54
+
55
+ The query can be a graphql query, mutation or subscription
56
+
57
+ The results are sent as an ExecutionResult object
58
+ """
59
+
60
+ subscribe_result = subscribe (self .schema , document , * args , ** kwargs )
61
+
62
+ if isinstance (subscribe_result , ExecutionResult ):
63
+ yield ExecutionResult
64
+
65
+ else :
66
+ # if we don't get an ExecutionResult, then we should receive
67
+ # a Coroutine returning an AsyncIterator[ExecutionResult]
68
+
69
+ subscribe_coro = cast (
70
+ Coroutine [Any , Any , AsyncIterator [ExecutionResult ]], subscribe_result
71
+ )
72
+
73
+ subscribe_generator = await subscribe_coro
74
+
75
+ async for result in subscribe_generator :
76
+ yield result
0 commit comments