Skip to content

Commit b563c7b

Browse files
committed
Initial integration of dse-graph
1 parent 0a6025b commit b563c7b

File tree

8 files changed

+844
-0
lines changed

8 files changed

+844
-0
lines changed

cassandra/datastax/graph/__init__.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,3 +20,15 @@
2020
graph_result_row_factory, graph_graphson2_row_factory
2121
)
2222
from cassandra.datastax.graph.graphson import *
23+
24+
25+
HAVE_GREMLIN = False
26+
try:
27+
import gremlin_python
28+
HAVE_GREMLIN = True
29+
except ImportError:
30+
# gremlinpython is not installed.
31+
pass
32+
33+
if HAVE_GREMLIN:
34+
from cassandra.datastax.graph._dse_graph import *
Lines changed: 214 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,214 @@
1+
# Copyright DataStax, Inc.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
16+
import logging
17+
import copy
18+
19+
from gremlin_python.structure.graph import Graph
20+
from gremlin_python.driver.remote_connection import RemoteConnection, RemoteTraversal
21+
from gremlin_python.process.traversal import Traverser, TraversalSideEffects
22+
from gremlin_python.process.graph_traversal import GraphTraversal
23+
24+
from cassandra.cluster import Session, GraphExecutionProfile, EXEC_PROFILE_GRAPH_DEFAULT
25+
from cassandra.datastax.graph import GraphOptions, GraphProtocol
26+
27+
from cassandra.datastax.graph.serializers import (
28+
GremlinGraphSONReader,
29+
deserializers,
30+
gremlin_deserializers
31+
)
32+
from cassandra.datastax.graph.query import _DefaultTraversalBatch, _query_from_traversal
33+
34+
log = logging.getLogger(__name__)
35+
36+
__all__ = ['BaseGraphRowFactory', 'dse_graphson_reader', 'graphson_reader', 'graph_traversal_row_factory',
37+
'graph_traversal_dse_object_row_factory', 'DSESessionRemoteGraphConnection', 'DseGraph']
38+
39+
40+
# Create our custom GraphSONReader/Writer
41+
dse_graphson_reader = GremlinGraphSONReader(deserializer_map=deserializers)
42+
graphson_reader = GremlinGraphSONReader(deserializer_map=gremlin_deserializers)
43+
44+
# Traversal result keys
45+
_bulk_key = 'bulk'
46+
_result_key = 'result'
47+
48+
49+
class BaseGraphRowFactory(object):
50+
"""
51+
Base row factory for graph traversal. This class basically wraps a
52+
graphson reader function to handle additional features of Gremlin/DSE
53+
and is callable as a normal row factory.
54+
55+
Currently supported:
56+
- bulk results
57+
58+
:param graphson_reader: The function used to read the graphson.
59+
60+
Use example::
61+
62+
my_custom_row_factory = BaseGraphRowFactory(custom_graphson_reader.readObject)
63+
"""
64+
65+
def __init__(self, graphson_reader):
66+
self._graphson_reader = graphson_reader
67+
68+
def __call__(self, column_names, rows):
69+
results = []
70+
71+
for row in rows:
72+
parsed_row = self._graphson_reader(row[0])
73+
bulk = parsed_row.get(_bulk_key, 1)
74+
if bulk > 1: # Avoid deepcopy call if bulk <= 1
75+
results.extend([copy.deepcopy(parsed_row[_result_key])
76+
for _ in range(bulk-1)])
77+
78+
results.append(parsed_row[_result_key])
79+
80+
return results
81+
82+
83+
graph_traversal_row_factory = BaseGraphRowFactory(graphson_reader.readObject)
84+
graph_traversal_row_factory.__doc__ = "Row Factory that returns the decoded graphson."
85+
86+
graph_traversal_dse_object_row_factory = BaseGraphRowFactory(dse_graphson_reader.readObject)
87+
graph_traversal_dse_object_row_factory.__doc__ = "Row Factory that returns the decoded graphson as DSE types."
88+
89+
90+
class DSESessionRemoteGraphConnection(RemoteConnection):
91+
"""
92+
A Tinkerpop RemoteConnection to execute traversal queries on DSE.
93+
94+
:param session: A DSE session
95+
:param graph_name: (Optional) DSE Graph name.
96+
:param execution_profile: (Optional) Execution profile for traversal queries. Default is set to `EXEC_PROFILE_GRAPH_DEFAULT`.
97+
"""
98+
99+
session = None
100+
graph_name = None
101+
execution_profile = None
102+
103+
def __init__(self, session, graph_name=None, execution_profile=EXEC_PROFILE_GRAPH_DEFAULT):
104+
super(DSESessionRemoteGraphConnection, self).__init__(None, None)
105+
106+
if not isinstance(session, Session):
107+
raise ValueError('A DSE Session must be provided to execute graph traversal queries.')
108+
109+
self.session = session
110+
self.graph_name = graph_name
111+
self.execution_profile = execution_profile
112+
113+
def submit(self, bytecode):
114+
115+
query = DseGraph.query_from_traversal(bytecode)
116+
ep = self.session.execution_profile_clone_update(self.execution_profile, row_factory=graph_traversal_row_factory)
117+
graph_options = ep.graph_options.copy()
118+
graph_options.graph_language = DseGraph.DSE_GRAPH_QUERY_LANGUAGE
119+
if self.graph_name:
120+
graph_options.graph_name = self.graph_name
121+
122+
ep.graph_options = graph_options
123+
124+
traversers = self.session.execute_graph(query, execution_profile=ep)
125+
traversers = [Traverser(t) for t in traversers]
126+
return RemoteTraversal(iter(traversers), TraversalSideEffects())
127+
128+
def __str__(self):
129+
return "<DSESessionRemoteGraphConnection: graph_name='{0}'>".format(self.graph_name)
130+
__repr__ = __str__
131+
132+
133+
class DseGraph(object):
134+
"""
135+
Dse Graph utility class for GraphTraversal construction and execution.
136+
"""
137+
138+
DSE_GRAPH_QUERY_LANGUAGE = 'bytecode-json'
139+
"""
140+
Graph query language, Default is 'bytecode-json' (GraphSON).
141+
"""
142+
143+
@staticmethod
144+
def query_from_traversal(traversal):
145+
"""
146+
From a GraphTraversal, return a query string based on the language specified in `DseGraph.DSE_GRAPH_QUERY_LANGUAGE`.
147+
148+
:param traversal: The GraphTraversal object
149+
"""
150+
151+
if isinstance(traversal, GraphTraversal):
152+
for strategy in traversal.traversal_strategies.traversal_strategies:
153+
rc = strategy.remote_connection
154+
if (isinstance(rc, DSESessionRemoteGraphConnection) and
155+
rc.session or rc.graph_name or rc.execution_profile):
156+
log.warning("GraphTraversal session, graph_name and execution_profile are "
157+
"only taken into account when executed with TinkerPop.")
158+
159+
return _query_from_traversal(traversal)
160+
161+
@staticmethod
162+
def traversal_source(session=None, graph_name=None, execution_profile=EXEC_PROFILE_GRAPH_DEFAULT, traversal_class=None):
163+
"""
164+
Returns a TinkerPop GraphTraversalSource binded to the session and graph_name if provided.
165+
166+
:param session: (Optional) A DSE session
167+
:param graph_name: (Optional) DSE Graph name
168+
:param execution_profile: (Optional) Execution profile for traversal queries. Default is set to `EXEC_PROFILE_GRAPH_DEFAULT`.
169+
:param traversal_class: (Optional) The GraphTraversalSource class to use (DSL).
170+
171+
.. code-block:: python
172+
173+
from dse.cluster import Cluster
174+
from dse_graph import DseGraph
175+
176+
c = Cluster()
177+
session = c.connect()
178+
179+
g = DseGraph.traversal_source(session, 'my_graph')
180+
print g.V().valueMap().toList()
181+
182+
"""
183+
184+
graph = Graph()
185+
traversal_source = graph.traversal(traversal_class)
186+
187+
if session:
188+
traversal_source = traversal_source.withRemote(
189+
DSESessionRemoteGraphConnection(session, graph_name, execution_profile))
190+
191+
return traversal_source
192+
193+
@staticmethod
194+
def create_execution_profile(graph_name):
195+
"""
196+
Creates an ExecutionProfile for GraphTraversal execution. You need to register that execution profile to the
197+
cluster by using `cluster.add_execution_profile`.
198+
199+
:param graph_name: The graph name
200+
"""
201+
202+
ep = GraphExecutionProfile(row_factory=graph_traversal_dse_object_row_factory,
203+
graph_options=GraphOptions(graph_name=graph_name,
204+
graph_language=DseGraph.DSE_GRAPH_QUERY_LANGUAGE,
205+
graph_protocol=GraphProtocol.GRAPHSON_2_0))
206+
return ep
207+
208+
@staticmethod
209+
def batch(*args, **kwargs):
210+
"""
211+
Returns the :class:`dse_graph.query.TraversalBatch` object allowing to
212+
execute multiple traversals in the same transaction.
213+
"""
214+
return _DefaultTraversalBatch(*args, **kwargs)
Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
# Copyright DataStax, Inc.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import math
16+
17+
from gremlin_python.process.traversal import P
18+
19+
from cassandra.util import Distance
20+
21+
__all__ = ['GeoP', 'TextDistanceP', 'Search', 'GeoUnit', 'Geo']
22+
23+
24+
class GeoP(object):
25+
26+
def __init__(self, operator, value, other=None):
27+
self.operator = operator
28+
self.value = value
29+
self.other = other
30+
31+
@staticmethod
32+
def inside(*args, **kwargs):
33+
return GeoP("inside", *args, **kwargs)
34+
35+
def __eq__(self, other):
36+
return isinstance(other,
37+
self.__class__) and self.operator == other.operator and self.value == other.value and self.other == other.other
38+
39+
def __repr__(self):
40+
return self.operator + "(" + str(self.value) + ")" if self.other is None else self.operator + "(" + str(
41+
self.value) + "," + str(self.other) + ")"
42+
43+
44+
class TextDistanceP(object):
45+
46+
def __init__(self, operator, value, distance):
47+
self.operator = operator
48+
self.value = value
49+
self.distance = distance
50+
51+
@staticmethod
52+
def fuzzy(*args):
53+
return TextDistanceP("fuzzy", *args)
54+
55+
@staticmethod
56+
def token_fuzzy(*args):
57+
return TextDistanceP("tokenFuzzy", *args)
58+
59+
@staticmethod
60+
def phrase(*args):
61+
return TextDistanceP("phrase", *args)
62+
63+
def __eq__(self, other):
64+
return isinstance(other,
65+
self.__class__) and self.operator == other.operator and self.value == other.value and self.distance == other.distance
66+
67+
def __repr__(self):
68+
return self.operator + "(" + str(self.value) + "," + str(self.distance) + ")"
69+
70+
71+
class Search(object):
72+
73+
@staticmethod
74+
def token(value):
75+
"""
76+
Search any instance of a certain token within the text property targeted.
77+
:param value: the value to look for.
78+
"""
79+
return P('token', value)
80+
81+
@staticmethod
82+
def token_prefix(value):
83+
"""
84+
Search any instance of a certain token prefix withing the text property targeted.
85+
:param value: the value to look for.
86+
"""
87+
return P('tokenPrefix', value)
88+
89+
@staticmethod
90+
def token_regex(value):
91+
"""
92+
Search any instance of the provided regular expression for the targeted property.
93+
:param value: the value to look for.
94+
"""
95+
return P('tokenRegex', value)
96+
97+
@staticmethod
98+
def prefix(value):
99+
"""
100+
Search for a specific prefix at the beginning of the text property targeted.
101+
:param value: the value to look for.
102+
"""
103+
return P('prefix', value)
104+
105+
@staticmethod
106+
def regex(value):
107+
"""
108+
Search for this regular expression inside the text property targeted.
109+
:param value: the value to look for.
110+
"""
111+
return P('regex', value)
112+
113+
@staticmethod
114+
def fuzzy(value, distance):
115+
"""
116+
Search for a fuzzy string inside the text property targeted.
117+
:param value: the value to look for.
118+
:param distance: The distance for the fuzzy search. ie. 1, to allow a one-letter misspellings.
119+
"""
120+
return TextDistanceP.fuzzy(value, distance)
121+
122+
@staticmethod
123+
def token_fuzzy(value, distance):
124+
"""
125+
Search for a token fuzzy inside the text property targeted.
126+
:param value: the value to look for.
127+
:param distance: The distance for the token fuzzy search. ie. 1, to allow a one-letter misspellings.
128+
"""
129+
return TextDistanceP.token_fuzzy(value, distance)
130+
131+
@staticmethod
132+
def phrase(value, proximity):
133+
"""
134+
Search for a phrase inside the text property targeted.
135+
:param value: the value to look for.
136+
:param proximity: The proximity for the phrase search. ie. phrase('David Felcey', 2).. to find 'David Felcey' with up to two middle names.
137+
"""
138+
return TextDistanceP.phrase(value, proximity)
139+
140+
141+
class GeoUnit(object):
142+
_EARTH_MEAN_RADIUS_KM = 6371.0087714
143+
_DEGREES_TO_RADIANS = math.pi / 180
144+
_DEG_TO_KM = _DEGREES_TO_RADIANS * _EARTH_MEAN_RADIUS_KM
145+
_KM_TO_DEG = 1 / _DEG_TO_KM
146+
_MILES_TO_KM = 1.609344001
147+
148+
MILES = _MILES_TO_KM * _KM_TO_DEG
149+
KILOMETERS = _KM_TO_DEG
150+
METERS = _KM_TO_DEG / 1000.0
151+
DEGREES = 1
152+
153+
154+
class Geo(object):
155+
156+
@staticmethod
157+
def inside(value, units=GeoUnit.DEGREES):
158+
"""
159+
Search any instance of geometry inside the Distance targeted.
160+
:param value: A Distance to look for.
161+
:param units: The units for ``value``. See GeoUnit enum. (Can also
162+
provide an integer to use as a multiplier to convert ``value`` to
163+
degrees.)
164+
"""
165+
return GeoP.inside(
166+
value=Distance(x=value.x, y=value.y, radius=value.radius * units)
167+
)

0 commit comments

Comments
 (0)