80
80
from cassandra .marshal import int64_pack
81
81
from cassandra .timestamps import MonotonicTimestampGenerator
82
82
from cassandra .compat import Mapping
83
- from cassandra .util import _resolve_contact_points_to_string_map
83
+ from cassandra .util import _resolve_contact_points_to_string_map , Version
84
84
85
85
from cassandra .datastax .insights .reporter import MonitorReporter
86
86
from cassandra .datastax .insights .util import version_supports_insights
87
87
88
88
from cassandra .datastax .graph import (graph_object_row_factory , GraphOptions , GraphSON1Serializer ,
89
- GraphProtocol , GraphSON2Serializer , GraphStatement , SimpleGraphStatement )
90
- from cassandra .datastax .graph .query import _request_timeout_key
89
+ GraphProtocol , GraphSON2Serializer , GraphStatement , SimpleGraphStatement ,
90
+ graph_graphson2_row_factory , graph_graphson3_row_factory ,
91
+ GraphSON3Serializer )
92
+ from cassandra .datastax .graph .query import _request_timeout_key , _GraphSONContextRowFactory
91
93
from cassandra .datastax import cloud as dscloud
92
94
93
95
try :
@@ -153,6 +155,7 @@ def _is_gevent_monkey_patched():
153
155
DEFAULT_MIN_CONNECTIONS_PER_REMOTE_HOST = 1
154
156
DEFAULT_MAX_CONNECTIONS_PER_REMOTE_HOST = 2
155
157
158
+ _GRAPH_PAGING_MIN_DSE_VERSION = Version ('6.8.0' )
156
159
157
160
_NOT_SET = object ()
158
161
@@ -416,20 +419,21 @@ class GraphExecutionProfile(ExecutionProfile):
416
419
417
420
def __init__ (self , load_balancing_policy = _NOT_SET , retry_policy = None ,
418
421
consistency_level = ConsistencyLevel .LOCAL_ONE , serial_consistency_level = None ,
419
- request_timeout = 30.0 , row_factory = graph_object_row_factory ,
420
- graph_options = None ):
422
+ request_timeout = 30.0 , row_factory = None ,
423
+ graph_options = None , continuous_paging_options = _NOT_SET ):
421
424
"""
422
425
Default execution profile for graph execution.
423
426
424
- See :class:`.ExecutionProfile`
425
- for base attributes .
427
+ See :class:`.ExecutionProfile` for base attributes. Note that if not explicitly set,
428
+ the row_factory and graph_options.graph_protocol are resolved during the query execution .
426
429
427
430
In addition to default parameters shown in the signature, this profile also defaults ``retry_policy`` to
428
431
:class:`cassandra.policies.NeverRetryPolicy`.
429
432
"""
430
433
retry_policy = retry_policy or NeverRetryPolicy ()
431
434
super (GraphExecutionProfile , self ).__init__ (load_balancing_policy , retry_policy , consistency_level ,
432
- serial_consistency_level , request_timeout , row_factory )
435
+ serial_consistency_level , request_timeout , row_factory ,
436
+ continuous_paging_options = continuous_paging_options )
433
437
self .graph_options = graph_options or GraphOptions (graph_source = b'g' ,
434
438
graph_language = b'gremlin-groovy' )
435
439
@@ -438,7 +442,7 @@ class GraphAnalyticsExecutionProfile(GraphExecutionProfile):
438
442
439
443
def __init__ (self , load_balancing_policy = None , retry_policy = None ,
440
444
consistency_level = ConsistencyLevel .LOCAL_ONE , serial_consistency_level = None ,
441
- request_timeout = 3600. * 24. * 7. , row_factory = graph_object_row_factory ,
445
+ request_timeout = 3600. * 24. * 7. , row_factory = None ,
442
446
graph_options = None ):
443
447
"""
444
448
Execution profile with timeout and load balancing appropriate for graph analytics queries.
@@ -2506,6 +2510,7 @@ def default_serial_consistency_level(self, cl):
2506
2510
_profile_manager = None
2507
2511
_metrics = None
2508
2512
_request_init_callbacks = None
2513
+ _graph_paging_available = False
2509
2514
2510
2515
def __init__ (self , cluster , hosts , keyspace = None ):
2511
2516
self .cluster = cluster
@@ -2539,6 +2544,7 @@ def __init__(self, cluster, hosts, keyspace=None):
2539
2544
raise NoHostAvailable (msg , [h .address for h in hosts ])
2540
2545
2541
2546
self .session_id = uuid .uuid4 ()
2547
+ self ._graph_paging_available = self ._check_graph_paging_available ()
2542
2548
2543
2549
cc_host = self .cluster .get_control_connection_host ()
2544
2550
valid_insights_version = (cc_host and version_supports_insights (cc_host .dse_version ))
@@ -2678,18 +2684,31 @@ def execute_graph_async(self, query, parameters=None, trace=False, execution_pro
2678
2684
if not isinstance (query , GraphStatement ):
2679
2685
query = SimpleGraphStatement (query )
2680
2686
2681
- execution_profile = self ._maybe_get_execution_profile (execution_profile ) # look up instance here so we can apply the extended attributes
2687
+ # Clone and look up instance here so we can resolve and apply the extended attributes
2688
+ execution_profile = self .execution_profile_clone_update (execution_profile )
2689
+
2690
+ if not hasattr (execution_profile , 'graph_options' ):
2691
+ raise ValueError (
2692
+ "Execution profile for graph queries must derive from GraphExecutionProfile, and provide graph_options" )
2682
2693
2694
+ self ._resolve_execution_profile_options (execution_profile )
2695
+
2696
+ # make sure the graphson context row factory is binded to this cluster
2683
2697
try :
2684
- options = execution_profile .graph_options .copy ()
2685
- except AttributeError :
2686
- raise ValueError ("Execution profile for graph queries must derive from GraphExecutionProfile, and provide graph_options" )
2698
+ if issubclass (execution_profile .row_factory , _GraphSONContextRowFactory ):
2699
+ execution_profile .row_factory = execution_profile .row_factory (self .cluster )
2700
+ except TypeError :
2701
+ # issubclass might fail if arg1 is an instance
2702
+ pass
2703
+
2704
+ # set graph paging if needed
2705
+ self ._maybe_set_graph_paging (execution_profile )
2687
2706
2688
2707
graph_parameters = None
2689
2708
if parameters :
2690
- graph_parameters = self ._transform_params (parameters , graph_options = options )
2709
+ graph_parameters = self ._transform_params (parameters , graph_options = execution_profile . graph_options )
2691
2710
2692
- custom_payload = options .get_options_map ()
2711
+ custom_payload = execution_profile . graph_options .get_options_map ()
2693
2712
if execute_as :
2694
2713
custom_payload [_proxy_execute_key ] = six .b (execute_as )
2695
2714
custom_payload [_request_timeout_key ] = int64_pack (long (execution_profile .request_timeout * 1000 ))
@@ -2700,25 +2719,99 @@ def execute_graph_async(self, query, parameters=None, trace=False, execution_pro
2700
2719
future .message .query_params = graph_parameters
2701
2720
future ._protocol_handler = self .client_protocol_handler
2702
2721
2703
- if options .is_analytics_source and isinstance (execution_profile .load_balancing_policy , DefaultLoadBalancingPolicy ):
2722
+ if execution_profile .graph_options .is_analytics_source and \
2723
+ isinstance (execution_profile .load_balancing_policy , DefaultLoadBalancingPolicy ):
2704
2724
self ._target_analytics_master (future )
2705
2725
else :
2706
2726
future .send_request ()
2707
2727
return future
2708
2728
2729
+ def _maybe_set_graph_paging (self , execution_profile ):
2730
+ graph_paging = execution_profile .continuous_paging_options
2731
+ if execution_profile .continuous_paging_options is _NOT_SET :
2732
+ graph_paging = ContinuousPagingOptions () if self ._graph_paging_available else None
2733
+
2734
+ execution_profile .continuous_paging_options = graph_paging
2735
+
2736
+ def _check_graph_paging_available (self ):
2737
+ """Verify if we can enable graph paging. This executed only once when the session is created."""
2738
+
2739
+ if not ProtocolVersion .has_continuous_paging_next_pages (self ._protocol_version ):
2740
+ return False
2741
+
2742
+ for host in self .cluster .metadata .all_hosts ():
2743
+ if host .dse_version is None :
2744
+ return False
2745
+
2746
+ version = Version (host .dse_version )
2747
+ if version < _GRAPH_PAGING_MIN_DSE_VERSION :
2748
+ return False
2749
+
2750
+ return True
2751
+
2752
+ def _resolve_execution_profile_options (self , execution_profile ):
2753
+ """
2754
+ Determine the GraphSON protocol and row factory for a graph query. This is useful
2755
+ to configure automatically the execution profile when executing a query on a
2756
+ core graph.
2757
+
2758
+ If `graph_protocol` is not explicitly specified, the following rules apply:
2759
+ - Default to GraphProtocol.GRAPHSON_1_0, or GRAPHSON_2_0 if the `graph_language` is not gremlin-groovy.
2760
+ - If `graph_options.graph_name` is specified and is a Core graph, set GraphSON_3_0.
2761
+ If `row_factory` is not explicitly specified, the following rules apply:
2762
+ - Default to graph_object_row_factory.
2763
+ - If `graph_options.graph_name` is specified and is a Core graph, set graph_graphson3_row_factory.
2764
+ """
2765
+ if execution_profile .graph_options .graph_protocol is not None and \
2766
+ execution_profile .row_factory is not None :
2767
+ return
2768
+
2769
+ graph_options = execution_profile .graph_options
2770
+
2771
+ is_core_graph = False
2772
+ if graph_options .graph_name :
2773
+ # graph_options.graph_name is bytes ...
2774
+ name = graph_options .graph_name .decode ('utf-8' )
2775
+ if name in self .cluster .metadata .keyspaces :
2776
+ ks_metadata = self .cluster .metadata .keyspaces [name ]
2777
+ if ks_metadata .graph_engine == 'Core' :
2778
+ is_core_graph = True
2779
+
2780
+ if is_core_graph :
2781
+ graph_protocol = GraphProtocol .GRAPHSON_3_0
2782
+ row_factory = graph_graphson3_row_factory
2783
+ else :
2784
+ if graph_options .graph_language == GraphOptions .DEFAULT_GRAPH_LANGUAGE :
2785
+ graph_protocol = GraphOptions .DEFAULT_GRAPH_PROTOCOL
2786
+ row_factory = graph_object_row_factory
2787
+ else :
2788
+ # if not gremlin-groovy, GraphSON_2_0
2789
+ graph_protocol = GraphProtocol .GRAPHSON_2_0
2790
+ row_factory = graph_graphson2_row_factory
2791
+
2792
+ # Only apply if not set explicitly
2793
+ if graph_options .graph_protocol is None :
2794
+ graph_options .graph_protocol = graph_protocol
2795
+ if execution_profile .row_factory is None :
2796
+ execution_profile .row_factory = row_factory
2797
+
2709
2798
def _transform_params (self , parameters , graph_options ):
2710
2799
if not isinstance (parameters , dict ):
2711
2800
raise ValueError ('The parameters must be a dictionary. Unnamed parameters are not allowed.' )
2712
2801
2713
2802
# Serialize python types to graphson
2714
2803
serializer = GraphSON1Serializer
2715
2804
if graph_options .graph_protocol == GraphProtocol .GRAPHSON_2_0 :
2716
- serializer = GraphSON2Serializer
2717
-
2718
- serialized_parameters = {
2719
- p : serializer .serialize (v )
2720
- for p , v in six .iteritems (parameters )
2721
- }
2805
+ serializer = GraphSON2Serializer ()
2806
+ elif graph_options .graph_protocol == GraphProtocol .GRAPHSON_3_0 :
2807
+ # only required for core graphs
2808
+ context = {
2809
+ 'cluster' : self .cluster ,
2810
+ 'graph_name' : graph_options .graph_name .decode ('utf-8' ) if graph_options .graph_name else None
2811
+ }
2812
+ serializer = GraphSON3Serializer (context )
2813
+
2814
+ serialized_parameters = serializer .serialize (parameters )
2722
2815
return [json .dumps (serialized_parameters ).encode ('utf-8' )]
2723
2816
2724
2817
def _target_analytics_master (self , future ):
0 commit comments