Skip to content

Commit 9a1c04d

Browse files
authored
PYTHON-1129: Ability to execute Fluent Graph queries asynchronously (scylladb#24)
Ability to execute Fluent Graph queries asynchronously
1 parent 591b1da commit 9a1c04d

File tree

4 files changed

+139
-9
lines changed

4 files changed

+139
-9
lines changed

CHANGELOG.rst

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,8 @@ Features
77
* Allow passing ssl context for Twisted (PYTHON-1161)
88
* Cloud Twisted support (PYTHON-1163)
99
* Add additional_write_policy and read_repair to system schema parsing (PYTHON-1048)
10-
* Remove *read_repair_chance table options (PYTHON-1140)
10+
* Remove *read_repair_chance* table options (PYTHON-1140)
11+
* [GRAPH] Ability to execute Fluent Graph queries asynchronously (PYTHON-1129)
1112

1213
Bug Fixes
1314
---------

cassandra/datastax/graph/fluent/__init__.py

Lines changed: 35 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
import logging
1616
import copy
1717

18+
from concurrent.futures import Future
19+
1820
HAVE_GREMLIN = False
1921
try:
2022
import gremlin_python
@@ -117,22 +119,52 @@ def __init__(self, session, graph_name=None, execution_profile=EXEC_PROFILE_GRAP
117119
self.graph_name = graph_name
118120
self.execution_profile = execution_profile
119121

120-
def submit(self, bytecode):
121-
122+
def _prepare_query(self, bytecode):
122123
query = DseGraph.query_from_traversal(bytecode)
123124
ep = self.session.execution_profile_clone_update(self.execution_profile,
124125
row_factory=graph_traversal_row_factory)
125126
graph_options = ep.graph_options.copy()
126127
graph_options.graph_language = DseGraph.DSE_GRAPH_QUERY_LANGUAGE
127128
if self.graph_name:
128129
graph_options.graph_name = self.graph_name
129-
130130
ep.graph_options = graph_options
131131

132+
return query, ep
133+
134+
@staticmethod
135+
def _handle_query_results(result_set, gremlin_future):
136+
try:
137+
traversers = [Traverser(t) for t in result_set]
138+
gremlin_future.set_result(
139+
RemoteTraversal(iter(traversers), TraversalSideEffects())
140+
)
141+
except Exception as e:
142+
gremlin_future.set_exception(e)
143+
144+
@staticmethod
145+
def _handle_query_error(response, gremlin_future):
146+
gremlin_future.set_exception(response)
147+
148+
def submit(self, bytecode):
149+
# the only reason I don't use submitAsync here
150+
# is to avoid an unuseful future wrap
151+
query, ep = self._prepare_query(bytecode)
152+
132153
traversers = self.session.execute_graph(query, execution_profile=ep)
133154
traversers = [Traverser(t) for t in traversers]
134155
return RemoteTraversal(iter(traversers), TraversalSideEffects())
135156

157+
def submitAsync(self, bytecode):
158+
query, ep = self._prepare_query(bytecode)
159+
160+
# to be compatible with gremlinpython, we need to return a concurrent.futures.Future
161+
gremlin_future = Future()
162+
response_future = self.session.execute_graph_async(query, execution_profile=ep)
163+
response_future.add_callback(self._handle_query_results, gremlin_future)
164+
response_future.add_errback(self._handle_query_error, gremlin_future)
165+
166+
return gremlin_future
167+
136168
def __str__(self):
137169
return "<DSESessionRemoteGraphConnection: graph_name='{0}'>".format(self.graph_name)
138170

docs/graph_fluent.rst

Lines changed: 32 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -109,16 +109,45 @@ that session. We call this *implicit execution* because the `Session` is not
109109
explicitly involved. Everything is managed internally by TinkerPop while
110110
traversing the graph and the results are TinkerPop types as well.
111111

112-
For example:
112+
Synchronous Example
113+
-------------------
113114

114115
.. code-block:: python
115116
116117
# Build the GraphTraversalSource
117118
g = DseGraph.traversal_source(session)
118119
# implicitly execute the query by traversing the TraversalSource
119120
g.addV('genre').property('genreId', 1).property('name', 'Action').next()
120-
# view the results of the execution
121-
pprint(g.V().toList())
121+
122+
# blocks until the query is completed and return the results
123+
results = g.V().toList()
124+
pprint(results)
125+
126+
Asynchronous Exemple
127+
--------------------
128+
129+
You can execute a graph traversal query asynchronously by using `.promise()`. It returns a
130+
python `Future <https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Future>`_.
131+
132+
.. code-block:: python
133+
134+
# Build the GraphTraversalSource
135+
g = DseGraph.traversal_source(session)
136+
# implicitly execute the query by traversing the TraversalSource
137+
g.addV('genre').property('genreId', 1).property('name', 'Action').next() # not async
138+
139+
# get a future and wait
140+
future = g.V().promise()
141+
results = list(future.result())
142+
pprint(results)
143+
144+
# or set a callback
145+
def cb(f):
146+
results = list(f.result())
147+
pprint(results)
148+
future = g.V().promise()
149+
future.add_done_callback(cb)
150+
# do other stuff...
122151
123152
Specify the Execution Profile explicitly
124153
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

tests/integration/advanced/graph/fluent/test_graph.py

Lines changed: 70 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,9 @@
1414

1515

1616
import sys
17+
18+
from concurrent.futures import Future
19+
1720
from cassandra.datastax.graph.fluent import DseGraph
1821
from gremlin_python.process.graph_traversal import GraphTraversal, GraphTraversalSource
1922
from gremlin_python.process.traversal import P
@@ -35,6 +38,7 @@
3538
if six.PY3:
3639
import ipaddress
3740

41+
3842
def setup_module():
3943
if DSE_VERSION:
4044
dse_options = {'graph': {'realtime_evaluation_timeout_in_seconds': 60}}
@@ -398,13 +402,13 @@ def _check_equality(self, g, original, read_value):
398402

399403

400404
@requiredse
401-
class ImplicitExecutionTest(AbstractTraversalTest, BasicGraphUnitTestCase):
405+
class BaseImplicitExecutionTest(BasicGraphUnitTestCase):
402406
"""
403407
This test class will execute all tests of the AbstractTraversalTestClass using implicit execution
404408
This all traversal will be run directly using toList()
405409
"""
406410
def setUp(self):
407-
super(ImplicitExecutionTest, self).setUp()
411+
super(BaseImplicitExecutionTest, self).setUp()
408412
if DSE_VERSION:
409413
self.ep = DseGraph().create_execution_profile(self.graph_name)
410414
self.cluster.add_execution_profile(self.graph_name, self.ep)
@@ -487,6 +491,70 @@ def _validate_type(self, g, vertex):
487491
_validate_prop(key, value, self)
488492

489493

494+
@requiredse
495+
class ImplicitExecutionTest(BaseImplicitExecutionTest, AbstractTraversalTest):
496+
pass
497+
498+
499+
@requiredse
500+
class ImplicitAsyncExecutionTest(BaseImplicitExecutionTest):
501+
"""
502+
Test to validate that the traversal async execution works properly.
503+
504+
@since 3.21.0
505+
@jira_ticket PYTHON-1129
506+
507+
@test_category dse graph
508+
"""
509+
510+
def _validate_results(self, results):
511+
results = list(results)
512+
self.assertEqual(len(results), 2)
513+
self.assertIn('vadas', results)
514+
self.assertIn('josh', results)
515+
516+
def test_promise(self):
517+
generate_classic(self.session)
518+
g = self.fetch_traversal_source()
519+
traversal_future = g.V().has('name', 'marko').out('knows').values('name').promise()
520+
self._validate_results(traversal_future.result())
521+
522+
def test_promise_error_is_propagated(self):
523+
generate_classic(self.session)
524+
g = DseGraph().traversal_source(self.session, 'wrong_graph', execution_profile=self.ep)
525+
traversal_future = g.V().has('name', 'marko').out('knows').values('name').promise()
526+
with self.assertRaises(Exception):
527+
traversal_future.result()
528+
529+
def test_promise_callback(self):
530+
generate_classic(self.session)
531+
g = self.fetch_traversal_source()
532+
future = Future()
533+
534+
def cb(f):
535+
future.set_result(f.result())
536+
537+
traversal_future = g.V().has('name', 'marko').out('knows').values('name').promise()
538+
traversal_future.add_done_callback(cb)
539+
self._validate_results(future.result())
540+
541+
def test_promise_callback_on_error(self):
542+
generate_classic(self.session)
543+
g = DseGraph().traversal_source(self.session, 'wrong_graph', execution_profile=self.ep)
544+
future = Future()
545+
546+
def cb(f):
547+
try:
548+
f.result()
549+
except Exception as e:
550+
future.set_exception(e)
551+
552+
traversal_future = g.V().has('name', 'marko').out('knows').values('name').promise()
553+
traversal_future.add_done_callback(cb)
554+
with self.assertRaises(Exception):
555+
future.result()
556+
557+
490558
@requiredse
491559
class ExplicitExecutionBase(BasicGraphUnitTestCase):
492560
def setUp(self):

0 commit comments

Comments
 (0)