Skip to content

Commit 0b15212

Browse files
committed
Execute with promises
1 parent 6d7b68c commit 0b15212

23 files changed

+411
-544
lines changed

graphql/execution/__init__.py

Lines changed: 0 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -20,42 +20,6 @@
2020
"""
2121
from .execute import execute as _execute
2222
from .base import ExecutionResult
23-
# from .executor import Executor
24-
# from .middlewares.sync import SynchronousExecutionMiddleware
2523

2624
def execute(schema, root, ast, operation_name='', args=None):
2725
return _execute(schema, ast, root, variable_values=args, operation_name=operation_name)
28-
29-
# def execute(schema, root, ast, operation_name='', args=None):
30-
# """
31-
# Executes an AST synchronously. Assumes that the AST is already validated.
32-
# """
33-
# return get_default_executor().execute(schema, ast, root, args, operation_name, validate_ast=False)
34-
35-
36-
# _default_executor = None
37-
38-
39-
# def get_default_executor():
40-
# """
41-
# Gets the default executor to be used in the `execute` function above.
42-
# """
43-
# global _default_executor
44-
# if _default_executor is None:
45-
# _default_executor = Executor([SynchronousExecutionMiddleware()])
46-
47-
# return _default_executor
48-
49-
50-
# def set_default_executor(executor):
51-
# """
52-
# Sets the default executor to be used in the `execute` function above.
53-
54-
# If passed `None` will reset to the original default synchronous executor.
55-
# """
56-
# assert isinstance(executor, Executor) or executor is None
57-
# global _default_executor
58-
# _default_executor = executor
59-
60-
61-
# __all__ = ['ExecutionResult', 'Executor', 'execute', 'get_default_executor', 'set_default_executor']

graphql/execution/base.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,9 @@ class ExecutionContext(object):
1919
and the fragments defined in the query document"""
2020

2121
__slots__ = 'schema', 'fragments', 'root_value', 'operation', 'variable_values', 'errors', 'context_value', \
22-
'argument_values_cache'
22+
'argument_values_cache', 'executor'
2323

24-
def __init__(self, schema, document_ast, root_value, context_value, variable_values, operation_name):
24+
def __init__(self, schema, document_ast, root_value, context_value, variable_values, operation_name, executor):
2525
"""Constructs a ExecutionContext object from the arguments passed
2626
to execute, which we will pass throughout the other execution
2727
methods."""
@@ -63,6 +63,7 @@ def __init__(self, schema, document_ast, root_value, context_value, variable_val
6363
self.errors = errors
6464
self.context_value = context_value
6565
self.argument_values_cache = {}
66+
self.executor = executor
6667

6768
def get_argument_values(self, field_def, field_ast):
6869
k = field_def, field_ast

graphql/execution/execute.py

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,20 +16,24 @@
1616
from .base import (ExecutionContext, ExecutionResult, ResolveInfo, Undefined,
1717
collect_fields, default_resolve_fn, get_field_def,
1818
get_operation_root_type)
19+
from .executors.sync import SyncExecutor
1920

2021

21-
22-
def execute(schema, document_ast, root_value=None, context_value=None, variable_values=None, operation_name=None):
22+
def execute(schema, document_ast, root_value=None, context_value=None, variable_values=None, operation_name=None, executor=None):
2323
assert schema, 'Must provide schema'
2424
assert isinstance(schema, GraphQLSchema), 'Schema must be an instance of GraphQLSchema. Also ensure that there are not multiple versions of GraphQL installed in your node_modules directory.'
2525

26+
if executor is None:
27+
executor = SyncExecutor()
28+
2629
context = ExecutionContext(
2730
schema,
2831
document_ast,
2932
root_value,
3033
context_value,
3134
variable_values,
32-
operation_name
35+
operation_name,
36+
executor
3337
)
3438

3539
def executor(resolve, reject):
@@ -42,7 +46,10 @@ def on_rejected(error):
4246
def on_resolve(data):
4347
return ExecutionResult(data=data, errors=context.errors)
4448

45-
return Promise(executor).catch(on_rejected).then(on_resolve).value
49+
p = Promise(executor).catch(on_rejected).then(on_resolve)
50+
context.executor.wait_until_finished()
51+
return p.value
52+
4653

4754

4855
def execute_operation(exe_context, operation, root_value):
@@ -177,7 +184,7 @@ def resolve_field(exe_context, parent_type, source, field_asts):
177184
def resolve_or_error(resolve_fn, source, args, exe_context, info):
178185
try:
179186
# return resolve_fn(source, args, exe_context, info)
180-
return resolve_fn(source, args, info)
187+
return exe_context.executor.execute(resolve_fn, source, args, info)
181188
except Exception as e:
182189
return e
183190

graphql/execution/executors/__init__.py

Whitespace-only changes.
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
from __future__ import absolute_import
2+
3+
from asyncio import Future, ensure_future, iscoroutine, get_event_loop, wait
4+
from graphql.pyutils.aplus import Promise
5+
6+
7+
def process_future_result(promise):
8+
def handle_future_result(future):
9+
exception = future.exception()
10+
if exception:
11+
promise.reject(exception)
12+
else:
13+
promise.fulfill(future.result())
14+
15+
return handle_future_result
16+
17+
18+
class AsyncioExecutor(object):
19+
def __init__(self):
20+
self.loop = get_event_loop()
21+
self.futures = []
22+
23+
def wait_until_finished(self):
24+
self.loop.run_until_complete(wait(self.futures))
25+
26+
def execute(self, fn, *args, **kwargs):
27+
result = fn(*args, **kwargs)
28+
if isinstance(result, Future) or iscoroutine(result):
29+
promise = Promise()
30+
future = ensure_future(result)
31+
self.futures.append(future)
32+
future.add_done_callback(process_future_result(promise))
33+
return promise
34+
return result

graphql/execution/executors/gevent.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
from __future__ import absolute_import
2+
3+
import gevent
4+
from .utils import process
5+
from ...pyutils.aplus import Promise
6+
7+
8+
class GeventExecutor(object):
9+
def __init__(self):
10+
self.jobs = []
11+
12+
def wait_until_finished(self):
13+
[j.join() for j in self.jobs]
14+
# gevent.joinall(self.jobs)
15+
16+
def execute(self, fn, *args, **kwargs):
17+
promise = Promise()
18+
job = gevent.spawn(process, promise, fn, args, kwargs)
19+
self.jobs.append(job)
20+
return promise
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
from multiprocessing import Pool, Process, Queue
2+
from ...pyutils.aplus import Promise
3+
from .utils import process
4+
5+
def queue_process(q):
6+
promise, fn, args, kwargs = q.get()
7+
process(promise, fn, args, kwargs)
8+
9+
10+
class ProcessExecutor(object):
11+
def __init__(self):
12+
self.processes = []
13+
self.q = Queue()
14+
15+
def wait_until_finished(self):
16+
for _process in self.processes:
17+
_process.join()
18+
self.q.close()
19+
self.q.join_thread()
20+
21+
def execute(self, fn, *args, **kwargs):
22+
promise = Promise()
23+
24+
q.put([promise, fn, args, kwargs], False)
25+
_process = Process(target=queue_process, args=(self.q))
26+
_process.start()
27+
self.processes.append(_process)
28+
return promise

graphql/execution/executors/sync.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
class SyncExecutor(object):
2+
def wait_until_finished(self):
3+
pass
4+
5+
def execute(self, fn, *args, **kwargs):
6+
return fn(*args, **kwargs)

graphql/execution/executors/thread.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
from threading import Thread
2+
from ...pyutils.aplus import Promise
3+
from .utils import process
4+
5+
6+
class ThreadExecutor(object):
7+
def __init__(self):
8+
self.threads = []
9+
10+
def wait_until_finished(self):
11+
for thread in self.threads:
12+
thread.join()
13+
14+
def execute(self, fn, *args, **kwargs):
15+
promise = Promise()
16+
thread = Thread(target=process, args=(promise, fn, args, kwargs))
17+
thread.start()
18+
self.threads.append(thread)
19+
return promise

graphql/execution/executors/utils.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
def process(p, f, args, kwargs):
2+
try:
3+
val = f(*args, **kwargs)
4+
p.fulfill(val)
5+
except Exception as e:
6+
p.reject(e)

0 commit comments

Comments
 (0)