|
51 | 51 | from cassandra.connection import (ConnectionException, ConnectionShutdown,
|
52 | 52 | ConnectionHeartbeat, ProtocolVersionUnsupported,
|
53 | 53 | EndPoint, DefaultEndPoint, DefaultEndPointFactory,
|
54 |
| - ContinuousPagingState) |
| 54 | + ContinuousPagingState, SniEndPointFactory) |
55 | 55 | from cassandra.cqltypes import UserType
|
56 | 56 | from cassandra.encoder import Encoder
|
57 | 57 | from cassandra.protocol import (QueryMessage, ResultMessage,
|
|
93 | 93 | graph_graphson2_row_factory, graph_graphson3_row_factory,
|
94 | 94 | GraphSON3Serializer)
|
95 | 95 | from cassandra.datastax.graph.query import _request_timeout_key, _GraphSONContextRowFactory
|
| 96 | +from cassandra.datastax import cloud as dscloud |
96 | 97 |
|
97 | 98 | if six.PY3:
|
98 | 99 | long = int
|
99 | 100 |
|
100 |
| - |
101 | 101 | def _is_eventlet_monkey_patched():
|
102 | 102 | if 'eventlet.patcher' not in sys.modules:
|
103 | 103 | return False
|
@@ -357,19 +357,28 @@ class ExecutionProfile(object):
|
357 | 357 |
|
358 | 358 | # indicates if lbp was set explicitly or uses default values
|
359 | 359 | _load_balancing_policy_explicit = False
|
| 360 | + _consistency_level_explicit = False |
360 | 361 |
|
361 | 362 | def __init__(self, load_balancing_policy=_NOT_SET, retry_policy=None,
|
362 |
| - consistency_level=ConsistencyLevel.LOCAL_ONE, serial_consistency_level=None, |
| 363 | + consistency_level=_NOT_SET, serial_consistency_level=None, |
363 | 364 | request_timeout=10.0, row_factory=named_tuple_factory, speculative_execution_policy=None,
|
364 | 365 | continuous_paging_options=None):
|
| 366 | + |
365 | 367 | if load_balancing_policy is _NOT_SET:
|
366 | 368 | self._load_balancing_policy_explicit = False
|
367 | 369 | self.load_balancing_policy = default_lbp_factory()
|
368 | 370 | else:
|
369 | 371 | self._load_balancing_policy_explicit = True
|
370 | 372 | self.load_balancing_policy = load_balancing_policy
|
| 373 | + |
| 374 | + if consistency_level is _NOT_SET: |
| 375 | + self._consistency_level_explicit = False |
| 376 | + self.consistency_level = ConsistencyLevel.LOCAL_ONE |
| 377 | + else: |
| 378 | + self._consistency_level_explicit = True |
| 379 | + self.consistency_level = consistency_level |
| 380 | + |
371 | 381 | self.retry_policy = retry_policy or RetryPolicy()
|
372 |
| - self.consistency_level = consistency_level |
373 | 382 |
|
374 | 383 | if (serial_consistency_level is not None and
|
375 | 384 | not ConsistencyLevel.is_serial(serial_consistency_level)):
|
@@ -964,6 +973,19 @@ def default_retry_policy(self, policy):
|
964 | 973 | A string identifiying this application's version to Insights
|
965 | 974 | """
|
966 | 975 |
|
| 976 | + cloud = None |
| 977 | + """ |
| 978 | + A dict of the cloud configuration. Example:: |
| 979 | + |
| 980 | + { |
| 981 | + # path to the secure connect bundle |
| 982 | + 'secure_connect_bundle': '/path/to/secure-connect-dbname.zip' |
| 983 | + } |
| 984 | +
|
| 985 | + The zip file will be temporarily extracted in the same directory to |
| 986 | + load the configuration and certificates. |
| 987 | + """ |
| 988 | + |
967 | 989 | @property
|
968 | 990 | def schema_metadata_enabled(self):
|
969 | 991 | """
|
@@ -1064,13 +1086,34 @@ def __init__(self,
|
1064 | 1086 | application_version=None,
|
1065 | 1087 | monitor_reporting_enabled=True,
|
1066 | 1088 | monitor_reporting_interval=30,
|
1067 |
| - client_id=None): |
| 1089 | + client_id=None, |
| 1090 | + cloud=None): |
1068 | 1091 | """
|
1069 | 1092 | ``executor_threads`` defines the number of threads in a pool for handling asynchronous tasks such as
|
1070 | 1093 | extablishing connection pools or refreshing metadata.
|
1071 | 1094 |
|
1072 | 1095 | Any of the mutable Cluster attributes may be set as keyword arguments to the constructor.
|
1073 | 1096 | """
|
| 1097 | + |
| 1098 | + if cloud is not None: |
| 1099 | + if contact_points is not _NOT_SET or endpoint_factory or ssl_context or ssl_options: |
| 1100 | + raise ValueError("contact_points, endpoint_factory, ssl_context, and ssl_options " |
| 1101 | + "cannot be specified with a cloud configuration") |
| 1102 | + |
| 1103 | + cloud_config = dscloud.get_cloud_config(cloud) |
| 1104 | + |
| 1105 | + ssl_context = cloud_config.ssl_context |
| 1106 | + ssl_options = {'check_hostname': True} |
| 1107 | + if (auth_provider is None and cloud_config.username |
| 1108 | + and cloud_config.password): |
| 1109 | + auth_provider = PlainTextAuthProvider(cloud_config.username, cloud_config.password) |
| 1110 | + |
| 1111 | + endpoint_factory = SniEndPointFactory(cloud_config.sni_host, cloud_config.sni_port) |
| 1112 | + contact_points = [ |
| 1113 | + endpoint_factory.create_from_sni(host_id) |
| 1114 | + for host_id in cloud_config.host_ids |
| 1115 | + ] |
| 1116 | + |
1074 | 1117 | if contact_points is not None:
|
1075 | 1118 | if contact_points is _NOT_SET:
|
1076 | 1119 | self._contact_points_explicit = False
|
@@ -1160,12 +1203,12 @@ def __init__(self,
|
1160 | 1203 | self.timestamp_generator = MonotonicTimestampGenerator()
|
1161 | 1204 |
|
1162 | 1205 | self.profile_manager = ProfileManager()
|
1163 |
| - self.profile_manager.profiles[EXEC_PROFILE_DEFAULT] = ExecutionProfile(self.load_balancing_policy, |
1164 |
| - self.default_retry_policy, |
1165 |
| - Session._default_consistency_level, |
1166 |
| - Session._default_serial_consistency_level, |
1167 |
| - Session._default_timeout, |
1168 |
| - Session._row_factory) |
| 1206 | + self.profile_manager.profiles[EXEC_PROFILE_DEFAULT] = ExecutionProfile( |
| 1207 | + self.load_balancing_policy, |
| 1208 | + self.default_retry_policy, |
| 1209 | + request_timeout=Session._default_timeout, |
| 1210 | + row_factory=Session._row_factory |
| 1211 | + ) |
1169 | 1212 |
|
1170 | 1213 | # legacy mode if either of these is not default
|
1171 | 1214 | if load_balancing_policy or default_retry_policy:
|
@@ -1430,6 +1473,7 @@ def add_execution_profile(self, name, profile, pool_wait_timeout=5):
|
1430 | 1473 | profile.load_balancing_policy.on_up(host)
|
1431 | 1474 | futures = set()
|
1432 | 1475 | for session in tuple(self.sessions):
|
| 1476 | + self._set_default_dbaas_consistency(session) |
1433 | 1477 | futures.update(session.update_created_pools())
|
1434 | 1478 | _, not_done = wait_futures(futures, pool_wait_timeout)
|
1435 | 1479 | if not_done:
|
@@ -1648,8 +1692,18 @@ def connect(self, keyspace=None, wait_for_all_pools=False):
|
1648 | 1692 | session = self._new_session(keyspace)
|
1649 | 1693 | if wait_for_all_pools:
|
1650 | 1694 | wait_futures(session._initial_connect_futures)
|
| 1695 | + |
| 1696 | + self._set_default_dbaas_consistency(session) |
| 1697 | + |
1651 | 1698 | return session
|
1652 | 1699 |
|
| 1700 | + def _set_default_dbaas_consistency(self, session): |
| 1701 | + if session.cluster.metadata.dbaas: |
| 1702 | + for profile in self.profile_manager.profiles.values(): |
| 1703 | + if not profile._consistency_level_explicit: |
| 1704 | + profile.consistency_level = ConsistencyLevel.LOCAL_QUORUM |
| 1705 | + session._default_consistency_level = ConsistencyLevel.LOCAL_QUORUM |
| 1706 | + |
1653 | 1707 | def get_connection_holders(self):
|
1654 | 1708 | holders = []
|
1655 | 1709 | for s in tuple(self.sessions):
|
@@ -3341,7 +3395,7 @@ class ControlConnection(object):
|
3341 | 3395 | # Used only when token_metadata_enabled is set to False
|
3342 | 3396 | _SELECT_LOCAL_NO_TOKENS_RPC_ADDRESS = "SELECT rpc_address FROM system.local WHERE key='local'"
|
3343 | 3397 |
|
3344 |
| - _SELECT_SCHEMA_PEERS_TEMPLATE = "SELECT peer, {nt_col_name}, schema_version FROM system.peers" |
| 3398 | + _SELECT_SCHEMA_PEERS_TEMPLATE = "SELECT peer, host_id, {nt_col_name}, schema_version FROM system.peers" |
3345 | 3399 | _SELECT_SCHEMA_LOCAL = "SELECT schema_version FROM system.local WHERE key='local'"
|
3346 | 3400 |
|
3347 | 3401 | _MINIMUM_NATIVE_ADDRESS_VERSION = "4.0"
|
@@ -3393,6 +3447,8 @@ def connect(self):
|
3393 | 3447 | self._protocol_version = self._cluster.protocol_version
|
3394 | 3448 | self._set_new_connection(self._reconnect_internal())
|
3395 | 3449 |
|
| 3450 | + self._cluster.metadata.dbaas = self._connection._product_type == dscloud.PRODUCT_APOLLO |
| 3451 | + |
3396 | 3452 | def _set_new_connection(self, conn):
|
3397 | 3453 | """
|
3398 | 3454 | Replace existing connection (if there is one) and close it.
|
@@ -4206,10 +4262,14 @@ def _on_timeout(self, _attempts=0):
|
4206 | 4262 | if self._connection is not None:
|
4207 | 4263 | try:
|
4208 | 4264 | self._connection._requests.pop(self._req_id)
|
4209 |
| - # This prevents the race condition of the |
4210 |
| - # event loop thread just receiving the waited message |
4211 |
| - # If it arrives after this, it will be ignored |
| 4265 | + # PYTHON-1044 |
| 4266 | + # This request might have been removed from the connection after the latter was defunct by heartbeat. |
| 4267 | + # We should still raise OperationTimedOut to reject the future so that the main event thread will not |
| 4268 | + # wait for it endlessly |
4212 | 4269 | except KeyError:
|
| 4270 | + key = "Connection defunct by heartbeat" |
| 4271 | + errors = {key: "Client request timeout. See Session.execute[_async](timeout)"} |
| 4272 | + self._set_final_exception(OperationTimedOut(errors, self._current_host)) |
4213 | 4273 | return
|
4214 | 4274 |
|
4215 | 4275 | pool = self.session._pools.get(self._current_host)
|
|
0 commit comments