|
51 | 51 | from cassandra.connection import (ConnectionException, ConnectionShutdown,
|
52 | 52 | ConnectionHeartbeat, ProtocolVersionUnsupported,
|
53 | 53 | EndPoint, DefaultEndPoint, DefaultEndPointFactory,
|
54 |
| - ContinuousPagingState) |
| 54 | + ContinuousPagingState, SniEndPointFactory) |
55 | 55 | from cassandra.cqltypes import UserType
|
56 | 56 | from cassandra.encoder import Encoder
|
57 | 57 | from cassandra.protocol import (QueryMessage, ResultMessage,
|
|
91 | 91 | from cassandra.datastax.graph import (graph_object_row_factory, GraphOptions, GraphSON1Serializer,
|
92 | 92 | GraphProtocol, GraphSON2Serializer, GraphStatement, SimpleGraphStatement)
|
93 | 93 | from cassandra.datastax.graph.query import _request_timeout_key
|
| 94 | +from cassandra.datastax import cloud as dscloud |
94 | 95 |
|
95 | 96 | if six.PY3:
|
96 | 97 | long = int
|
97 | 98 |
|
98 |
| - |
99 | 99 | def _is_eventlet_monkey_patched():
|
100 | 100 | if 'eventlet.patcher' not in sys.modules:
|
101 | 101 | return False
|
@@ -354,19 +354,28 @@ class ExecutionProfile(object):
|
354 | 354 |
|
355 | 355 | # indicates if lbp was set explicitly or uses default values
|
356 | 356 | _load_balancing_policy_explicit = False
|
| 357 | + _consistency_level_explicit = False |
357 | 358 |
|
358 | 359 | def __init__(self, load_balancing_policy=_NOT_SET, retry_policy=None,
|
359 |
| - consistency_level=ConsistencyLevel.LOCAL_ONE, serial_consistency_level=None, |
| 360 | + consistency_level=ConsistencyLevel._NOT_SET, serial_consistency_level=None, |
360 | 361 | request_timeout=10.0, row_factory=named_tuple_factory, speculative_execution_policy=None,
|
361 | 362 | continuous_paging_options=None):
|
| 363 | + |
362 | 364 | if load_balancing_policy is _NOT_SET:
|
363 | 365 | self._load_balancing_policy_explicit = False
|
364 | 366 | self.load_balancing_policy = default_lbp_factory()
|
365 | 367 | else:
|
366 | 368 | self._load_balancing_policy_explicit = True
|
367 | 369 | self.load_balancing_policy = load_balancing_policy
|
| 370 | + |
| 371 | + if consistency_level is _NOT_SET: |
| 372 | + self._consistency_level_explicit = False |
| 373 | + self.consistency_level = ConsistencyLevel.LOCAL_ONE |
| 374 | + else: |
| 375 | + self._consistency_level_explicit = True |
| 376 | + self.consistency_level = consistency_level |
| 377 | + |
368 | 378 | self.retry_policy = retry_policy or RetryPolicy()
|
369 |
| - self.consistency_level = consistency_level |
370 | 379 |
|
371 | 380 | if (serial_consistency_level is not None and
|
372 | 381 | not ConsistencyLevel.is_serial(serial_consistency_level)):
|
@@ -960,6 +969,19 @@ def default_retry_policy(self, policy):
|
960 | 969 | A string identifiying this application's version to Insights
|
961 | 970 | """
|
962 | 971 |
|
| 972 | + cloud = None |
| 973 | + """ |
| 974 | + A dict of the cloud configuration. Example:: |
| 975 | + |
| 976 | + { |
| 977 | + # path to the secure connect bundle |
| 978 | + 'secure_connect_bundle': '/path/to/secure-connect-dbname.zip' |
| 979 | + } |
| 980 | +
|
| 981 | + The zip file will be temporarily extracted in the same directory to |
| 982 | + load the configuration and certificates. |
| 983 | + """ |
| 984 | + |
963 | 985 | @property
|
964 | 986 | def schema_metadata_enabled(self):
|
965 | 987 | """
|
@@ -1060,13 +1082,34 @@ def __init__(self,
|
1060 | 1082 | application_version=None,
|
1061 | 1083 | monitor_reporting_enabled=True,
|
1062 | 1084 | monitor_reporting_interval=30,
|
1063 |
| - client_id=None): |
| 1085 | + client_id=None, |
| 1086 | + cloud=None): |
1064 | 1087 | """
|
1065 | 1088 | ``executor_threads`` defines the number of threads in a pool for handling asynchronous tasks such as
|
1066 | 1089 | extablishing connection pools or refreshing metadata.
|
1067 | 1090 |
|
1068 | 1091 | Any of the mutable Cluster attributes may be set as keyword arguments to the constructor.
|
1069 | 1092 | """
|
| 1093 | + |
| 1094 | + if cloud is not None: |
| 1095 | + if contact_points is not _NOT_SET or endpoint_factory or ssl_context or ssl_options: |
| 1096 | + raise ValueError("contact_points, endpoint_factory, ssl_context, and ssl_options " |
| 1097 | + "cannot be specified with a cloud configuration") |
| 1098 | + |
| 1099 | + cloud_config = dscloud.get_cloud_config(cloud) |
| 1100 | + |
| 1101 | + ssl_context = cloud_config.ssl_context |
| 1102 | + ssl_options = {'check_hostname': True} |
| 1103 | + if (auth_provider is None and cloud_config.username |
| 1104 | + and cloud_config.password): |
| 1105 | + auth_provider = PlainTextAuthProvider(cloud_config.username, cloud_config.password) |
| 1106 | + |
| 1107 | + endpoint_factory = SniEndPointFactory(cloud_config.sni_host, cloud_config.sni_port) |
| 1108 | + contact_points = [ |
| 1109 | + endpoint_factory.create_from_sni(host_id) |
| 1110 | + for host_id in cloud_config.host_ids |
| 1111 | + ] |
| 1112 | + |
1070 | 1113 | if contact_points is not None:
|
1071 | 1114 | if contact_points is _NOT_SET:
|
1072 | 1115 | self._contact_points_explicit = False
|
@@ -1156,12 +1199,12 @@ def __init__(self,
|
1156 | 1199 | self.timestamp_generator = MonotonicTimestampGenerator()
|
1157 | 1200 |
|
1158 | 1201 | self.profile_manager = ProfileManager()
|
1159 |
| - self.profile_manager.profiles[EXEC_PROFILE_DEFAULT] = ExecutionProfile(self.load_balancing_policy, |
1160 |
| - self.default_retry_policy, |
1161 |
| - Session._default_consistency_level, |
1162 |
| - Session._default_serial_consistency_level, |
1163 |
| - Session._default_timeout, |
1164 |
| - Session._row_factory) |
| 1202 | + self.profile_manager.profiles[EXEC_PROFILE_DEFAULT] = ExecutionProfile( |
| 1203 | + self.load_balancing_policy, |
| 1204 | + self.default_retry_policy, |
| 1205 | + request_timeout=Session._default_timeout, |
| 1206 | + row_factory=Session._row_factory |
| 1207 | + ) |
1165 | 1208 |
|
1166 | 1209 | # legacy mode if either of these is not default
|
1167 | 1210 | if load_balancing_policy or default_retry_policy:
|
@@ -1426,6 +1469,7 @@ def add_execution_profile(self, name, profile, pool_wait_timeout=5):
|
1426 | 1469 | profile.load_balancing_policy.on_up(host)
|
1427 | 1470 | futures = set()
|
1428 | 1471 | for session in tuple(self.sessions):
|
| 1472 | + self._set_default_dbaas_consistency(session) |
1429 | 1473 | futures.update(session.update_created_pools())
|
1430 | 1474 | _, not_done = wait_futures(futures, pool_wait_timeout)
|
1431 | 1475 | if not_done:
|
@@ -1644,8 +1688,18 @@ def connect(self, keyspace=None, wait_for_all_pools=False):
|
1644 | 1688 | session = self._new_session(keyspace)
|
1645 | 1689 | if wait_for_all_pools:
|
1646 | 1690 | wait_futures(session._initial_connect_futures)
|
| 1691 | + |
| 1692 | + self._set_default_dbaas_consistency(session) |
| 1693 | + |
1647 | 1694 | return session
|
1648 | 1695 |
|
| 1696 | + def _set_default_dbaas_consistency(self, session): |
| 1697 | + if session.cluster.metadata.dbaas: |
| 1698 | + for profile in self.profile_manager.profiles.values(): |
| 1699 | + if not profile._consistency_level_explicit: |
| 1700 | + profile.consistency_level = ConsistencyLevel.LOCAL_QUORUM |
| 1701 | + session._default_consistency_level = ConsistencyLevel.LOCAL_QUORUM |
| 1702 | + |
1649 | 1703 | def get_connection_holders(self):
|
1650 | 1704 | holders = []
|
1651 | 1705 | for s in tuple(self.sessions):
|
@@ -3248,7 +3302,7 @@ class ControlConnection(object):
|
3248 | 3302 | # Used only when token_metadata_enabled is set to False
|
3249 | 3303 | _SELECT_LOCAL_NO_TOKENS_RPC_ADDRESS = "SELECT rpc_address FROM system.local WHERE key='local'"
|
3250 | 3304 |
|
3251 |
| - _SELECT_SCHEMA_PEERS_TEMPLATE = "SELECT peer, {nt_col_name}, schema_version FROM system.peers" |
| 3305 | + _SELECT_SCHEMA_PEERS_TEMPLATE = "SELECT peer, host_id, {nt_col_name}, schema_version FROM system.peers" |
3252 | 3306 | _SELECT_SCHEMA_LOCAL = "SELECT schema_version FROM system.local WHERE key='local'"
|
3253 | 3307 |
|
3254 | 3308 | _MINIMUM_NATIVE_ADDRESS_VERSION = "4.0"
|
@@ -3300,6 +3354,8 @@ def connect(self):
|
3300 | 3354 | self._protocol_version = self._cluster.protocol_version
|
3301 | 3355 | self._set_new_connection(self._reconnect_internal())
|
3302 | 3356 |
|
| 3357 | + self._cluster.metadata.dbaas = self._connection._product_type == dscloud.PRODUCT_APOLLO |
| 3358 | + |
3303 | 3359 | def _set_new_connection(self, conn):
|
3304 | 3360 | """
|
3305 | 3361 | Replace existing connection (if there is one) and close it.
|
@@ -4113,10 +4169,14 @@ def _on_timeout(self, _attempts=0):
|
4113 | 4169 | if self._connection is not None:
|
4114 | 4170 | try:
|
4115 | 4171 | self._connection._requests.pop(self._req_id)
|
4116 |
| - # This prevents the race condition of the |
4117 |
| - # event loop thread just receiving the waited message |
4118 |
| - # If it arrives after this, it will be ignored |
| 4172 | + # PYTHON-1044 |
| 4173 | + # This request might have been removed from the connection after the latter was defunct by heartbeat. |
| 4174 | + # We should still raise OperationTimedOut to reject the future so that the main event thread will not |
| 4175 | + # wait for it endlessly |
4119 | 4176 | except KeyError:
|
| 4177 | + key = "Connection defunct by heartbeat" |
| 4178 | + errors = {key: "Client request timeout. See Session.execute[_async](timeout)"} |
| 4179 | + self._set_final_exception(OperationTimedOut(errors, self._current_host)) |
4120 | 4180 | return
|
4121 | 4181 |
|
4122 | 4182 | pool = self.session._pools.get(self._current_host)
|
|
0 commit comments