Skip to content

Commit 06df8e6

Browse files
committed
implemented graphql server python over rsocket
1 parent 9c020e0 commit 06df8e6

File tree

6 files changed

+108
-53
lines changed

6 files changed

+108
-53
lines changed

examples/graphql/client_graphql.py

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import asyncio
22
import logging
33
import sys
4+
from pathlib import Path
45

56
from gql import gql, Client
67

@@ -16,16 +17,11 @@ async def main(server_port: int):
1617

1718
async with RSocketClient(single_transport_provider(TransportTCP(*connection)),
1819
metadata_encoding=WellKnownMimeTypes.MESSAGE_RSOCKET_COMPOSITE_METADATA) as client:
20+
with (Path(__file__).parent / 'rsocket.graphqls').open() as fd:
21+
schema = fd.read()
22+
1923
graphql = Client(
20-
schema="""
21-
type Query {
22-
greeting: Greeting
23-
}
24-
25-
type Greeting {
26-
message: String
27-
}
28-
""",
24+
schema=schema,
2925
transport=RSocketTransport(client),
3026
)
3127

examples/graphql/rsocket.graphqls

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
type Query {
2+
greeting: Greeting
3+
}
4+
5+
type Subscription {
6+
greetings: Greeting
7+
}
8+
9+
type Greeting {
10+
message: String
11+
}

examples/graphql/server_graphql.py

Lines changed: 67 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -1,76 +1,101 @@
1+
import asyncio
12
import json
23
import logging
34
import sys
5+
from pathlib import Path
46
from typing import AsyncGenerator, Tuple
57

6-
from graphql import parse, subscribe
7-
from graphql_server import get_graphql_params
8-
from quart import Quart
8+
from graphql import build_schema, subscribe, parse
99

10-
from examples.graphql.schema import AsyncSchema
1110
from rsocket.frame_helpers import str_to_bytes
12-
from rsocket.graphql.helpers import execute_query_in_payload
11+
from rsocket.graphql.helpers import execute_query_in_payload, get_graphql_params
1312
from rsocket.helpers import create_future
1413
from rsocket.payload import Payload
1514
from rsocket.routing.request_router import RequestRouter
1615
from rsocket.routing.routing_request_handler import RoutingRequestHandler
16+
from rsocket.rsocket_server import RSocketServer
1717
from rsocket.streams.stream_from_async_generator import StreamFromAsyncGenerator
18-
from rsocket.transports.quart_websocket import websocket_handler
18+
from rsocket.transports.tcp import TransportTCP
1919

20-
app = Quart(__name__)
2120

22-
router = RequestRouter()
21+
def greeting(*args):
22+
return {
23+
'message': "Hello world"
24+
}
2325

2426

25-
@router.response('graphql')
26-
async def graphql_query(payload: Payload):
27-
execution_result = await execute_query_in_payload(payload, AsyncSchema)
27+
def greetings(*args):
28+
async def results():
29+
for i in range(10):
30+
yield {'greetings': {'message': f"Hello world {i}"}}
31+
await asyncio.sleep(1)
2832

29-
response_data = str_to_bytes(json.dumps({
30-
'data': execution_result.data
31-
}))
33+
return results()
3234

33-
return create_future(Payload(response_data))
3435

36+
class GraphqlRequestHandler:
3537

36-
@router.stream('graphql')
37-
async def graphql_subscription(payload: Payload):
38-
async def generator() -> AsyncGenerator[Tuple[Payload, bool], None]:
39-
data = json.loads(payload.data.decode('utf-8'))
40-
params = get_graphql_params(data, {})
41-
schema = AsyncSchema
42-
document = parse(params.query)
38+
def __init__(self):
39+
with (Path(__file__).parent / 'rsocket.graphqls').open() as fd:
40+
schema = build_schema(fd.read())
41+
42+
schema.query_type.fields['greeting'].resolve = greeting
43+
schema.subscription_type.fields['greetings'].subscribe = greetings
44+
45+
router = RequestRouter()
46+
47+
@router.response('graphql')
48+
async def graphql_query(payload: Payload):
49+
execution_result = await execute_query_in_payload(payload, schema)
4350

44-
async for execution_result in await subscribe(
45-
schema,
46-
document,
47-
variable_values=params.variables,
48-
operation_name=params.operation_name
49-
):
50-
item = execution_result.data
5151
response_data = str_to_bytes(json.dumps({
52-
'data': item[0]
52+
'data': execution_result.data
5353
}))
54-
yield Payload(response_data), item[1]
5554

56-
return StreamFromAsyncGenerator(generator)
55+
return create_future(Payload(response_data))
56+
57+
@router.stream('graphql')
58+
async def graphql_subscription(payload: Payload):
59+
async def generator() -> AsyncGenerator[Tuple[Payload, bool], None]:
60+
data = json.loads(payload.data.decode('utf-8'))
61+
params = get_graphql_params(data, {})
62+
document = parse(params.query)
63+
64+
async for execution_result in await subscribe(
65+
schema,
66+
document,
67+
operation_name=params.operation_name
68+
):
69+
item = execution_result.data
70+
response_data = str_to_bytes(json.dumps({
71+
'data': item
72+
}))
73+
yield Payload(response_data), False
5774

75+
yield Payload(), True
5876

59-
@router.response('ping')
60-
async def ping():
61-
return create_future(Payload(b'pong'))
77+
return StreamFromAsyncGenerator(generator)
78+
79+
self.router = router
6280

6381

6482
def handler_factory():
65-
return RoutingRequestHandler(router)
83+
return RoutingRequestHandler(GraphqlRequestHandler().router)
84+
85+
86+
async def run_server(server_port):
87+
logging.info('Starting server at localhost:%s', server_port)
88+
89+
def session(*connection):
90+
RSocketServer(TransportTCP(*connection), handler_factory=handler_factory)
6691

92+
server = await asyncio.start_server(session, 'localhost', server_port)
6793

68-
@app.websocket("/")
69-
async def ws():
70-
await websocket_handler(handler_factory=handler_factory)
94+
async with server:
95+
await server.serve_forever()
7196

7297

73-
if __name__ == "__main__":
74-
port = sys.argv[1] if len(sys.argv) > 1 else 7000
98+
if __name__ == '__main__':
99+
port = sys.argv[1] if len(sys.argv) > 1 else 9191
75100
logging.basicConfig(level=logging.DEBUG)
76-
app.run(port=port)
101+
asyncio.run(run_server(port))

requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,3 +21,4 @@ cbitstruct==1.0.9
2121
cloudevents==1.10.0
2222
pydantic==1.10.13
2323
Werkzeug==3.0.0
24+
graphql-core==3.2.3

rsocket/graphql/helpers.py

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,42 @@
11
import json
2+
from collections import namedtuple
3+
from typing import Dict, Optional, Union
24

35
from graphql import execute, parse, ExecutionResult, GraphQLSchema
4-
from graphql_server import get_graphql_params
56

67
from rsocket.payload import Payload
78

9+
GraphQLParams = namedtuple("GraphQLParams", "query variables operation_name")
10+
811

912
async def execute_query_in_payload(payload: Payload,
1013
schema: GraphQLSchema) -> ExecutionResult:
1114
data = json.loads(payload.data.decode('utf-8'))
1215
params = get_graphql_params(data, {})
1316
document = parse(params.query)
1417

15-
execution_result = await execute(
18+
execution_result = execute(
1619
schema,
1720
document,
1821
variable_values=params.variables,
1922
operation_name=params.operation_name
2023
)
2124

2225
return execution_result
26+
27+
28+
def get_graphql_params(data: Dict, query_data: Dict) -> GraphQLParams:
29+
query = data.get("query") or query_data.get("query")
30+
variables = data.get("variables") or query_data.get("variables")
31+
operation_name = data.get("operationName") or query_data.get("operationName")
32+
33+
return GraphQLParams(query, load_json_variables(variables), operation_name)
34+
35+
36+
def load_json_variables(variables: Optional[Union[str, Dict]]) -> Optional[Dict]:
37+
if variables and isinstance(variables, str):
38+
try:
39+
return json.loads(variables)
40+
except Exception:
41+
raise Exception("Variables are invalid JSON.")
42+
return variables # type: ignore

setup.cfg

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,8 @@ optimized = cbitstruct>=1.0.9
5757
cloudevents =
5858
cloudevents>=1.9.0
5959
pydantic>=1.10.0
60+
graphql =
61+
graphql-core>=3.2.0
6062

6163
[options.entry_points]
6264
cli.console_scripts =

0 commit comments

Comments
 (0)