83
83
from cassandra .marshal import int64_pack
84
84
from cassandra .timestamps import MonotonicTimestampGenerator
85
85
from cassandra .compat import Mapping
86
- from cassandra .util import _resolve_contact_points_to_string_map
86
+ from cassandra .util import _resolve_contact_points_to_string_map , Version
87
87
88
88
from cassandra .datastax .insights .reporter import MonitorReporter
89
89
from cassandra .datastax .insights .util import version_supports_insights
90
90
91
91
from cassandra .datastax .graph import (graph_object_row_factory , GraphOptions , GraphSON1Serializer ,
92
- GraphProtocol , GraphSON2Serializer , GraphStatement , SimpleGraphStatement )
93
- from cassandra .datastax .graph .query import _request_timeout_key
92
+ GraphProtocol , GraphSON2Serializer , GraphStatement , SimpleGraphStatement ,
93
+ graph_graphson2_row_factory , graph_graphson3_row_factory ,
94
+ GraphSON3Serializer )
95
+ from cassandra .datastax .graph .query import _request_timeout_key , _GraphSONContextRowFactory
94
96
95
97
if six .PY3 :
96
98
long = int
@@ -141,6 +143,7 @@ def _is_gevent_monkey_patched():
141
143
DEFAULT_MIN_CONNECTIONS_PER_REMOTE_HOST = 1
142
144
DEFAULT_MAX_CONNECTIONS_PER_REMOTE_HOST = 2
143
145
146
+ _GRAPH_PAGING_MIN_DSE_VERSION = Version ('6.8.0' )
144
147
145
148
_NOT_SET = object ()
146
149
@@ -395,20 +398,21 @@ class GraphExecutionProfile(ExecutionProfile):
395
398
396
399
def __init__ (self , load_balancing_policy = _NOT_SET , retry_policy = None ,
397
400
consistency_level = ConsistencyLevel .LOCAL_ONE , serial_consistency_level = None ,
398
- request_timeout = 30.0 , row_factory = graph_object_row_factory ,
399
- graph_options = None ):
401
+ request_timeout = 30.0 , row_factory = None ,
402
+ graph_options = None , continuous_paging_options = _NOT_SET ):
400
403
"""
401
404
Default execution profile for graph execution.
402
405
403
- See :class:`.ExecutionProfile`
404
- for base attributes .
406
+ See :class:`.ExecutionProfile` for base attributes. Note that if not explicitly set,
407
+ the row_factory and graph_options.graph_protocol are resolved during the query execution .
405
408
406
409
In addition to default parameters shown in the signature, this profile also defaults ``retry_policy`` to
407
410
:class:`cassandra.policies.NeverRetryPolicy`.
408
411
"""
409
412
retry_policy = retry_policy or NeverRetryPolicy ()
410
413
super (GraphExecutionProfile , self ).__init__ (load_balancing_policy , retry_policy , consistency_level ,
411
- serial_consistency_level , request_timeout , row_factory )
414
+ serial_consistency_level , request_timeout , row_factory ,
415
+ continuous_paging_options = continuous_paging_options )
412
416
self .graph_options = graph_options or GraphOptions (graph_source = b'g' ,
413
417
graph_language = b'gremlin-groovy' )
414
418
@@ -417,7 +421,7 @@ class GraphAnalyticsExecutionProfile(GraphExecutionProfile):
417
421
418
422
def __init__ (self , load_balancing_policy = None , retry_policy = None ,
419
423
consistency_level = ConsistencyLevel .LOCAL_ONE , serial_consistency_level = None ,
420
- request_timeout = 3600. * 24. * 7. , row_factory = graph_object_row_factory ,
424
+ request_timeout = 3600. * 24. * 7. , row_factory = None ,
421
425
graph_options = None ):
422
426
"""
423
427
Execution profile with timeout and load balancing appropriate for graph analytics queries.
@@ -2434,6 +2438,7 @@ def default_serial_consistency_level(self, cl):
2434
2438
_profile_manager = None
2435
2439
_metrics = None
2436
2440
_request_init_callbacks = None
2441
+ _graph_paging_available = False
2437
2442
2438
2443
def __init__ (self , cluster , hosts , keyspace = None ):
2439
2444
self .cluster = cluster
@@ -2466,6 +2471,8 @@ def __init__(self, cluster, hosts, keyspace=None):
2466
2471
msg += " using keyspace '%s'" % self .keyspace
2467
2472
raise NoHostAvailable (msg , [h .address for h in hosts ])
2468
2473
2474
+ self ._graph_paging_available = self ._check_graph_paging_available ()
2475
+
2469
2476
cc_host = self .cluster .get_control_connection_host ()
2470
2477
valid_insights_version = (cc_host and version_supports_insights (cc_host .dse_version ))
2471
2478
if self .cluster .monitor_reporting_enabled and valid_insights_version :
@@ -2605,18 +2612,31 @@ def execute_graph_async(self, query, parameters=None, trace=False, execution_pro
2605
2612
if not isinstance (query , GraphStatement ):
2606
2613
query = SimpleGraphStatement (query )
2607
2614
2608
- execution_profile = self ._maybe_get_execution_profile (execution_profile ) # look up instance here so we can apply the extended attributes
2615
+ # Clone and look up instance here so we can resolve and apply the extended attributes
2616
+ execution_profile = self .execution_profile_clone_update (execution_profile )
2617
+
2618
+ if not hasattr (execution_profile , 'graph_options' ):
2619
+ raise ValueError (
2620
+ "Execution profile for graph queries must derive from GraphExecutionProfile, and provide graph_options" )
2621
+
2622
+ self ._resolve_execution_profile_options (execution_profile )
2609
2623
2624
+ # make sure the graphson context row factory is binded to this cluster
2610
2625
try :
2611
- options = execution_profile .graph_options .copy ()
2612
- except AttributeError :
2613
- raise ValueError ("Execution profile for graph queries must derive from GraphExecutionProfile, and provide graph_options" )
2626
+ if issubclass (execution_profile .row_factory , _GraphSONContextRowFactory ):
2627
+ execution_profile .row_factory = execution_profile .row_factory (self .cluster )
2628
+ except TypeError :
2629
+ # issubclass might fail if arg1 is an instance
2630
+ pass
2631
+
2632
+ # set graph paging if needed
2633
+ self ._maybe_set_graph_paging (execution_profile )
2614
2634
2615
2635
graph_parameters = None
2616
2636
if parameters :
2617
- graph_parameters = self ._transform_params (parameters , graph_options = options )
2637
+ graph_parameters = self ._transform_params (parameters , graph_options = execution_profile . graph_options )
2618
2638
2619
- custom_payload = options .get_options_map ()
2639
+ custom_payload = execution_profile . graph_options .get_options_map ()
2620
2640
if execute_as :
2621
2641
custom_payload [_proxy_execute_key ] = six .b (execute_as )
2622
2642
custom_payload [_request_timeout_key ] = int64_pack (long (execution_profile .request_timeout * 1000 ))
@@ -2627,25 +2647,98 @@ def execute_graph_async(self, query, parameters=None, trace=False, execution_pro
2627
2647
future .message .query_params = graph_parameters
2628
2648
future ._protocol_handler = self .client_protocol_handler
2629
2649
2630
- if options .is_analytics_source and isinstance (execution_profile .load_balancing_policy , DefaultLoadBalancingPolicy ):
2650
+ if execution_profile .graph_options .is_analytics_source and \
2651
+ isinstance (execution_profile .load_balancing_policy , DefaultLoadBalancingPolicy ):
2631
2652
self ._target_analytics_master (future )
2632
2653
else :
2633
2654
future .send_request ()
2634
2655
return future
2635
2656
2657
+ def _maybe_set_graph_paging (self , execution_profile ):
2658
+ graph_paging = execution_profile .continuous_paging_options
2659
+ if execution_profile .continuous_paging_options is _NOT_SET :
2660
+ graph_paging = ContinuousPagingOptions () if self ._graph_paging_available else None
2661
+
2662
+ execution_profile .continuous_paging_options = graph_paging
2663
+
2664
+ def _check_graph_paging_available (self ):
2665
+ """Verify if we can enable graph paging. This executed only once when the session is created."""
2666
+
2667
+ if not ProtocolVersion .has_continuous_paging_next_pages (self ._protocol_version ):
2668
+ return False
2669
+
2670
+ for host in self .cluster .metadata .all_hosts ():
2671
+ if host .dse_version is None :
2672
+ return False
2673
+
2674
+ version = Version (host .dse_version )
2675
+ if version < _GRAPH_PAGING_MIN_DSE_VERSION :
2676
+ return False
2677
+
2678
+ return True
2679
+
2680
+ def _resolve_execution_profile_options (self , execution_profile ):
2681
+ """
2682
+ Determine the GraphSON protocol and row factory for a graph query. This is useful
2683
+ to configure automatically the execution profile when executing a query on a
2684
+ core graph.
2685
+ If `graph_protocol` is not explicitly specified, the following rules apply:
2686
+ - Default to GraphProtocol.GRAPHSON_1_0, or GRAPHSON_2_0 if the `graph_language` is not gremlin-groovy.
2687
+ - If `graph_options.graph_name` is specified and is a Core graph, set GraphSON_3_0.
2688
+ If `row_factory` is not explicitly specified, the following rules apply:
2689
+ - Default to graph_object_row_factory.
2690
+ - If `graph_options.graph_name` is specified and is a Core graph, set graph_graphson3_row_factory.
2691
+ """
2692
+ if execution_profile .graph_options .graph_protocol is not None and \
2693
+ execution_profile .row_factory is not None :
2694
+ return
2695
+
2696
+ graph_options = execution_profile .graph_options
2697
+
2698
+ is_core_graph = False
2699
+ if graph_options .graph_name :
2700
+ # graph_options.graph_name is bytes ...
2701
+ name = graph_options .graph_name .decode ('utf-8' )
2702
+ if name in self .cluster .metadata .keyspaces :
2703
+ ks_metadata = self .cluster .metadata .keyspaces [name ]
2704
+ if ks_metadata .graph_engine == 'Core' :
2705
+ is_core_graph = True
2706
+
2707
+ if is_core_graph :
2708
+ graph_protocol = GraphProtocol .GRAPHSON_3_0
2709
+ row_factory = graph_graphson3_row_factory
2710
+ else :
2711
+ if graph_options .graph_language == GraphOptions .DEFAULT_GRAPH_LANGUAGE :
2712
+ graph_protocol = GraphOptions .DEFAULT_GRAPH_PROTOCOL
2713
+ row_factory = graph_object_row_factory
2714
+ else :
2715
+ # if not gremlin-groovy, GraphSON_2_0
2716
+ graph_protocol = GraphProtocol .GRAPHSON_2_0
2717
+ row_factory = graph_graphson2_row_factory
2718
+
2719
+ # Only apply if not set explicitly
2720
+ if graph_options .graph_protocol is None :
2721
+ graph_options .graph_protocol = graph_protocol
2722
+ if execution_profile .row_factory is None :
2723
+ execution_profile .row_factory = row_factory
2724
+
2636
2725
def _transform_params (self , parameters , graph_options ):
2637
2726
if not isinstance (parameters , dict ):
2638
2727
raise ValueError ('The parameters must be a dictionary. Unnamed parameters are not allowed.' )
2639
2728
2640
2729
# Serialize python types to graphson
2641
2730
serializer = GraphSON1Serializer
2642
2731
if graph_options .graph_protocol == GraphProtocol .GRAPHSON_2_0 :
2643
- serializer = GraphSON2Serializer
2644
-
2645
- serialized_parameters = {
2646
- p : serializer .serialize (v )
2647
- for p , v in six .iteritems (parameters )
2648
- }
2732
+ serializer = GraphSON2Serializer ()
2733
+ elif graph_options .graph_protocol == GraphProtocol .GRAPHSON_3_0 :
2734
+ # only required for core graphs
2735
+ context = {
2736
+ 'cluster' : self .cluster ,
2737
+ 'graph_name' : graph_options .graph_name .decode ('utf-8' ) if graph_options .graph_name else None
2738
+ }
2739
+ serializer = GraphSON3Serializer (context )
2740
+
2741
+ serialized_parameters = serializer .serialize (parameters )
2649
2742
return [json .dumps (serialized_parameters ).encode ('utf-8' )]
2650
2743
2651
2744
def _target_analytics_master (self , future ):
0 commit comments