diff --git a/benchmarks/callback_full_pipeline.py b/benchmarks/callback_full_pipeline.py index a4a4c33315..87eb999cfe 100644 --- a/benchmarks/callback_full_pipeline.py +++ b/benchmarks/callback_full_pipeline.py @@ -49,10 +49,7 @@ def insert_next(self, previous_result=sentinel): def run(self): self.start_profile() - if self.protocol_version >= 3: - concurrency = 1000 - else: - concurrency = 100 + concurrency = 1000 for _ in range(min(concurrency, self.num_queries)): self.insert_next() diff --git a/cassandra/__init__.py b/cassandra/__init__.py index dfded7d1a6..c1ee195aa2 100644 --- a/cassandra/__init__.py +++ b/cassandra/__init__.py @@ -135,16 +135,6 @@ class ProtocolVersion(object): """ Defines native protocol versions supported by this driver. """ - V1 = 1 - """ - v1, supported in Cassandra 1.2-->2.2 - """ - - V2 = 2 - """ - v2, supported in Cassandra 2.0-->2.2; - added support for lightweight transactions, batch operations, and automatic query paging. - """ V3 = 3 """ @@ -180,7 +170,7 @@ class ProtocolVersion(object): DSE private protocol v2, supported in DSE 6.0+ """ - SUPPORTED_VERSIONS = (DSE_V2, DSE_V1, V6, V5, V4, V3, V2, V1) + SUPPORTED_VERSIONS = (DSE_V2, DSE_V1, V6, V5, V4, V3) """ A tuple of all supported protocol versions """ diff --git a/cassandra/cluster.py b/cassandra/cluster.py index 679293a52d..68ab370f93 100644 --- a/cassandra/cluster.py +++ b/cassandra/cluster.py @@ -75,7 +75,7 @@ NoSpeculativeExecutionPolicy, DefaultLoadBalancingPolicy, NeverRetryPolicy) from cassandra.pool import (Host, _ReconnectionHandler, _HostReconnectionHandler, - HostConnectionPool, HostConnection, + HostConnection, NoConnectionsAvailable) from cassandra.query import (SimpleStatement, PreparedStatement, BoundStatement, BatchStatement, bind_params, QueryTrace, TraceUnavailable, @@ -731,9 +731,6 @@ def auth_provider(self): be an instance of a subclass of :class:`~cassandra.auth.AuthProvider`, such as :class:`~.PlainTextAuthProvider`. - When :attr:`~.Cluster.protocol_version` is 1, this should be - a function that accepts one argument, the IP address of a node, - and returns a dict of credentials for that node. When not using authentication, this should be left as :const:`None`. """ @@ -1452,18 +1449,6 @@ def __init__(self, self._user_types = defaultdict(dict) - self._min_requests_per_connection = { - HostDistance.LOCAL_RACK: DEFAULT_MIN_REQUESTS, - HostDistance.LOCAL: DEFAULT_MIN_REQUESTS, - HostDistance.REMOTE: DEFAULT_MIN_REQUESTS - } - - self._max_requests_per_connection = { - HostDistance.LOCAL_RACK: DEFAULT_MAX_REQUESTS, - HostDistance.LOCAL: DEFAULT_MAX_REQUESTS, - HostDistance.REMOTE: DEFAULT_MAX_REQUESTS - } - self._core_connections_per_host = { HostDistance.LOCAL_RACK: DEFAULT_MIN_CONNECTIONS_PER_LOCAL_HOST, HostDistance.LOCAL: DEFAULT_MIN_CONNECTIONS_PER_LOCAL_HOST, @@ -1666,48 +1651,6 @@ def add_execution_profile(self, name, profile, pool_wait_timeout=5): if not_done: raise OperationTimedOut("Failed to create all new connection pools in the %ss timeout.") - def get_min_requests_per_connection(self, host_distance): - return self._min_requests_per_connection[host_distance] - - def set_min_requests_per_connection(self, host_distance, min_requests): - """ - Sets a threshold for concurrent requests per connection, below which - connections will be considered for disposal (down to core connections; - see :meth:`~Cluster.set_core_connections_per_host`). - - Pertains to connection pool management in protocol versions {1,2}. - """ - if self.protocol_version >= 3: - raise UnsupportedOperation( - "Cluster.set_min_requests_per_connection() only has an effect " - "when using protocol_version 1 or 2.") - if min_requests < 0 or min_requests > 126 or \ - min_requests >= self._max_requests_per_connection[host_distance]: - raise ValueError("min_requests must be 0-126 and less than the max_requests for this host_distance (%d)" % - (self._min_requests_per_connection[host_distance],)) - self._min_requests_per_connection[host_distance] = min_requests - - def get_max_requests_per_connection(self, host_distance): - return self._max_requests_per_connection[host_distance] - - def set_max_requests_per_connection(self, host_distance, max_requests): - """ - Sets a threshold for concurrent requests per connection, above which new - connections will be created to a host (up to max connections; - see :meth:`~Cluster.set_max_connections_per_host`). - - Pertains to connection pool management in protocol versions {1,2}. - """ - if self.protocol_version >= 3: - raise UnsupportedOperation( - "Cluster.set_max_requests_per_connection() only has an effect " - "when using protocol_version 1 or 2.") - if max_requests < 1 or max_requests > 127 or \ - max_requests <= self._min_requests_per_connection[host_distance]: - raise ValueError("max_requests must be 1-127 and greater than the min_requests for this host_distance (%d)" % - (self._min_requests_per_connection[host_distance],)) - self._max_requests_per_connection[host_distance] = max_requests - def get_core_connections_per_host(self, host_distance): """ Gets the minimum number of connections per Session that will be opened @@ -3101,13 +3044,11 @@ def _create_response_future(self, query, parameters, trace, custom_payload, spec_exec_policy = execution_profile.speculative_execution_policy fetch_size = query.fetch_size - if fetch_size is FETCH_SIZE_UNSET and self._protocol_version >= 2: + if fetch_size is FETCH_SIZE_UNSET: fetch_size = self.default_fetch_size - elif self._protocol_version == 1: - fetch_size = None start_time = time.time() - if self._protocol_version >= 3 and self.use_client_timestamp: + if self.use_client_timestamp: timestamp = self.cluster.timestamp_generator() else: timestamp = None @@ -3378,11 +3319,7 @@ def add_or_renew_pool(self, host, is_host_addition): def run_add_or_renew_pool(): try: - if self._protocol_version >= 3: - new_pool = HostConnection(host, distance, self) - else: - # TODO remove host pool again ??? - new_pool = HostConnectionPool(host, distance, self) + new_pool = HostConnection(host, distance, self) except AuthenticationFailed as auth_exc: conn_exc = ConnectionException(str(auth_exc), endpoint=host) self.cluster.signal_connection_failure(host, conn_exc, is_host_addition) diff --git a/cassandra/connection.py b/cassandra/connection.py index e1646cafc1..2d998fb805 100644 --- a/cassandra/connection.py +++ b/cassandra/connection.py @@ -123,7 +123,6 @@ def decompress(byts): DEFAULT_LOCAL_PORT_LOW = 49152 DEFAULT_LOCAL_PORT_HIGH = 65535 -frame_header_v1_v2 = struct.Struct('>BbBi') frame_header_v3 = struct.Struct('>BhBi') @@ -817,17 +816,12 @@ def __init__(self, host='127.0.0.1', port=9042, authenticator=None, if not self.ssl_context and self.ssl_options: self.ssl_context = self._build_ssl_context_from_options() - if protocol_version >= 3: - self.max_request_id = min(self.max_in_flight - 1, (2 ** 15) - 1) - # Don't fill the deque with 2**15 items right away. Start with some and add - # more if needed. - initial_size = min(300, self.max_in_flight) - self.request_ids = deque(range(initial_size)) - self.highest_request_id = initial_size - 1 - else: - self.max_request_id = min(self.max_in_flight, (2 ** 7) - 1) - self.request_ids = deque(range(self.max_request_id + 1)) - self.highest_request_id = self.max_request_id + self.max_request_id = min(self.max_in_flight - 1, (2 ** 15) - 1) + # Don't fill the deque with 2**15 items right away. Start with some and add + # more if needed. + initial_size = min(300, self.max_in_flight) + self.request_ids = deque(range(initial_size)) + self.highest_request_id = initial_size - 1 self.lock = RLock() self.connected_event = Event() @@ -1205,11 +1199,10 @@ def _read_frame_header(self): version = buf[0] & PROTOCOL_VERSION_MASK if version not in ProtocolVersion.SUPPORTED_VERSIONS: raise ProtocolError("This version of the driver does not support protocol version %d" % version) - frame_header = frame_header_v3 if version >= 3 else frame_header_v1_v2 # this frame header struct is everything after the version byte - header_size = frame_header.size + 1 + header_size = frame_header_v3.size + 1 if pos >= header_size: - flags, stream, op, body_len = frame_header.unpack_from(buf, 1) + flags, stream, op, body_len = frame_header_v3.unpack_from(buf, 1) if body_len < 0: raise ProtocolError("Received negative body length: %r" % body_len) self._current_frame = _Frame(version, flags, stream, op, header_size, body_len + header_size) diff --git a/cassandra/cqltypes.py b/cassandra/cqltypes.py index d1580f00ff..e36c48563c 100644 --- a/cassandra/cqltypes.py +++ b/cassandra/cqltypes.py @@ -812,18 +812,13 @@ class _SimpleParameterizedType(_ParameterizedType): @classmethod def deserialize_safe(cls, byts, protocol_version): subtype, = cls.subtypes - if protocol_version >= 3: - unpack = int32_unpack - length = 4 - else: - unpack = uint16_unpack - length = 2 - numelements = unpack(byts[:length]) + length = 4 + numelements = int32_unpack(byts[:length]) p = length result = [] inner_proto = max(3, protocol_version) for _ in range(numelements): - itemlen = unpack(byts[p:p + length]) + itemlen = int32_unpack(byts[p:p + length]) p += length if itemlen < 0: result.append(None) @@ -839,16 +834,15 @@ def serialize_safe(cls, items, protocol_version): raise TypeError("Received a string for a type that expects a sequence") subtype, = cls.subtypes - pack = int32_pack if protocol_version >= 3 else uint16_pack buf = io.BytesIO() - buf.write(pack(len(items))) + buf.write(int32_pack(len(items))) inner_proto = max(3, protocol_version) for item in items: if item is None: - buf.write(pack(-1)) + buf.write(int32_pack(-1)) else: itembytes = subtype.to_binary(item, inner_proto) - buf.write(pack(len(itembytes))) + buf.write(int32_pack(len(itembytes))) buf.write(itembytes) return buf.getvalue() @@ -872,18 +866,13 @@ class MapType(_ParameterizedType): @classmethod def deserialize_safe(cls, byts, protocol_version): key_type, value_type = cls.subtypes - if protocol_version >= 3: - unpack = int32_unpack - length = 4 - else: - unpack = uint16_unpack - length = 2 - numelements = unpack(byts[:length]) + length = 4 + numelements = int32_unpack(byts[:length]) p = length themap = util.OrderedMapSerializedKey(key_type, protocol_version) inner_proto = max(3, protocol_version) for _ in range(numelements): - key_len = unpack(byts[p:p + length]) + key_len = int32_unpack(byts[p:p + length]) p += length if key_len < 0: keybytes = None @@ -893,7 +882,7 @@ def deserialize_safe(cls, byts, protocol_version): p += key_len key = key_type.from_binary(keybytes, inner_proto) - val_len = unpack(byts[p:p + length]) + val_len = int32_unpack(byts[p:p + length]) p += length if val_len < 0: val = None @@ -908,9 +897,8 @@ def deserialize_safe(cls, byts, protocol_version): @classmethod def serialize_safe(cls, themap, protocol_version): key_type, value_type = cls.subtypes - pack = int32_pack if protocol_version >= 3 else uint16_pack buf = io.BytesIO() - buf.write(pack(len(themap))) + buf.write(int32_pack(len(themap))) try: items = themap.items() except AttributeError: @@ -919,16 +907,16 @@ def serialize_safe(cls, themap, protocol_version): for key, val in items: if key is not None: keybytes = key_type.to_binary(key, inner_proto) - buf.write(pack(len(keybytes))) + buf.write(int32_pack(len(keybytes))) buf.write(keybytes) else: - buf.write(pack(-1)) + buf.write(int32_pack(-1)) if val is not None: valbytes = value_type.to_binary(val, inner_proto) - buf.write(pack(len(valbytes))) + buf.write(int32_pack(len(valbytes))) buf.write(valbytes) else: - buf.write(pack(-1)) + buf.write(int32_pack(-1)) return buf.getvalue() diff --git a/cassandra/marshal.py b/cassandra/marshal.py index a527a9e1d7..413e1831d4 100644 --- a/cassandra/marshal.py +++ b/cassandra/marshal.py @@ -33,11 +33,6 @@ def _make_packer(format_string): float_pack, float_unpack = _make_packer('>f') double_pack, double_unpack = _make_packer('>d') -# Special case for cassandra header -header_struct = struct.Struct('>BBbB') -header_pack = header_struct.pack -header_unpack = header_struct.unpack - # in protocol version 3 and higher, the stream ID is two bytes v3_header_struct = struct.Struct('>BBhB') v3_header_pack = v3_header_struct.pack diff --git a/cassandra/metadata.py b/cassandra/metadata.py index 30bcf81654..25d7561989 100644 --- a/cassandra/metadata.py +++ b/cassandra/metadata.py @@ -153,12 +153,7 @@ def refresh(self, connection, timeout, target_type=None, change_type=None, fetch meta = parse_method(self.keyspaces, **kwargs) if meta: update_method = getattr(self, '_update_' + tt_lower) - if tt_lower == 'keyspace' and connection.protocol_version < 3: - # we didn't have 'type' target in legacy protocol versions, so we need to query those too - user_types = parser.get_types_map(self.keyspaces, **kwargs) - self._update_keyspace(meta, user_types) - else: - update_method(meta) + update_method(meta) else: drop_method = getattr(self, '_drop_' + tt_lower) drop_method(**kwargs) diff --git a/cassandra/pool.py b/cassandra/pool.py index 8bb318f582..70abb6eccc 100644 --- a/cassandra/pool.py +++ b/cassandra/pool.py @@ -374,8 +374,7 @@ def on_exception(self, exc, next_delay): class HostConnection(object): """ - When using v3 of the native protocol, this is used instead of a connection - pool per host (HostConnectionPool) due to the increased in-flight capacity + When using v3 of the native protocol, this is useddue to the increased in-flight capacity of individual connections. """ @@ -929,365 +928,3 @@ def _excess_connection_limit(self): return self.host.sharding_info.shards_count * self.max_excess_connections_per_shard_multiplier -_MAX_SIMULTANEOUS_CREATION = 1 -_MIN_TRASH_INTERVAL = 10 - - -class HostConnectionPool(object): - """ - Used to pool connections to a host for v1 and v2 native protocol. - """ - - host = None - host_distance = None - - is_shutdown = False - open_count = 0 - _scheduled_for_creation = 0 - _next_trash_allowed_at = 0 - _keyspace = None - - def __init__(self, host, host_distance, session): - self.host = host - self.host_distance = host_distance - - self._session = weakref.proxy(session) - self._lock = RLock() - self._conn_available_condition = Condition() - - log.debug("Initializing new connection pool for host %s", self.host) - core_conns = session.cluster.get_core_connections_per_host(host_distance) - self._connections = [session.cluster.connection_factory(host.endpoint, on_orphaned_stream_released=self.on_orphaned_stream_released) - for i in range(core_conns)] - - self._keyspace = session.keyspace - if self._keyspace: - for conn in self._connections: - conn.set_keyspace_blocking(self._keyspace) - - self._trash = set() - self._next_trash_allowed_at = time.time() - self.open_count = core_conns - log.debug("Finished initializing new connection pool for host %s", self.host) - - def borrow_connection(self, timeout, routing_key=None): - if self.is_shutdown: - raise ConnectionException( - "Pool for %s is shutdown" % (self.host,), self.host) - - conns = self._connections - if not conns: - # handled specially just for simpler code - log.debug("Detected empty pool, opening core conns to %s", self.host) - core_conns = self._session.cluster.get_core_connections_per_host(self.host_distance) - with self._lock: - # we check the length of self._connections again - # along with self._scheduled_for_creation while holding the lock - # in case multiple threads hit this condition at the same time - to_create = core_conns - (len(self._connections) + self._scheduled_for_creation) - for i in range(to_create): - self._scheduled_for_creation += 1 - self._session.submit(self._create_new_connection) - - # in_flight is incremented by wait_for_conn - conn = self._wait_for_conn(timeout) - return conn - else: - # note: it would be nice to push changes to these config settings - # to pools instead of doing a new lookup on every - # borrow_connection() call - max_reqs = self._session.cluster.get_max_requests_per_connection(self.host_distance) - max_conns = self._session.cluster.get_max_connections_per_host(self.host_distance) - - least_busy = min(conns, key=lambda c: c.in_flight) - request_id = None - # to avoid another thread closing this connection while - # trashing it (through the return_connection process), hold - # the connection lock from this point until we've incremented - # its in_flight count - need_to_wait = False - with least_busy.lock: - if least_busy.in_flight < least_busy.max_request_id: - least_busy.in_flight += 1 - request_id = least_busy.get_request_id() - else: - # once we release the lock, wait for another connection - need_to_wait = True - - if need_to_wait: - # wait_for_conn will increment in_flight on the conn - least_busy, request_id = self._wait_for_conn(timeout) - - # if we have too many requests on this connection but we still - # have space to open a new connection against this host, go ahead - # and schedule the creation of a new connection - if least_busy.in_flight >= max_reqs and len(self._connections) < max_conns: - self._maybe_spawn_new_connection() - - return least_busy, request_id - - def _maybe_spawn_new_connection(self): - with self._lock: - if self._scheduled_for_creation >= _MAX_SIMULTANEOUS_CREATION: - return - if self.open_count >= self._session.cluster.get_max_connections_per_host(self.host_distance): - return - self._scheduled_for_creation += 1 - - log.debug("Submitting task for creation of new Connection to %s", self.host) - self._session.submit(self._create_new_connection) - - def _create_new_connection(self): - try: - self._add_conn_if_under_max() - except (ConnectionException, socket.error) as exc: - log.warning("Failed to create new connection to %s: %s", self.host, exc) - except Exception: - log.exception("Unexpectedly failed to create new connection") - finally: - with self._lock: - self._scheduled_for_creation -= 1 - - def _add_conn_if_under_max(self): - max_conns = self._session.cluster.get_max_connections_per_host(self.host_distance) - with self._lock: - if self.is_shutdown: - return True - - if self.open_count >= max_conns: - return True - - self.open_count += 1 - - log.debug("Going to open new connection to host %s", self.host) - try: - conn = self._session.cluster.connection_factory(self.host.endpoint, on_orphaned_stream_released=self.on_orphaned_stream_released) - if self._keyspace: - conn.set_keyspace_blocking(self._session.keyspace) - self._next_trash_allowed_at = time.time() + _MIN_TRASH_INTERVAL - with self._lock: - new_connections = self._connections[:] + [conn] - self._connections = new_connections - log.debug("Added new connection (%s) to pool for host %s, signaling availablility", - id(conn), self.host) - self._signal_available_conn() - return True - except (ConnectionException, socket.error) as exc: - log.warning("Failed to add new connection to pool for host %s: %s", self.host, exc) - with self._lock: - self.open_count -= 1 - if self._session.cluster.signal_connection_failure(self.host, exc, is_host_addition=False): - self.shutdown() - return False - except AuthenticationFailed: - with self._lock: - self.open_count -= 1 - return False - - def _await_available_conn(self, timeout): - with self._conn_available_condition: - self._conn_available_condition.wait(timeout) - - def _signal_available_conn(self): - with self._conn_available_condition: - self._conn_available_condition.notify() - - def _signal_all_available_conn(self): - with self._conn_available_condition: - self._conn_available_condition.notify_all() - - def _wait_for_conn(self, timeout): - start = time.time() - remaining = timeout - - while remaining > 0: - # wait on our condition for the possibility that a connection - # is useable - self._await_available_conn(remaining) - - # self.shutdown() may trigger the above Condition - if self.is_shutdown: - raise ConnectionException("Pool is shutdown") - - conns = self._connections - if conns: - least_busy = min(conns, key=lambda c: c.in_flight) - with least_busy.lock: - if least_busy.in_flight < least_busy.max_request_id: - least_busy.in_flight += 1 - return least_busy, least_busy.get_request_id() - - remaining = timeout - (time.time() - start) - - raise NoConnectionsAvailable() - - def return_connection(self, connection, stream_was_orphaned=False): - with connection.lock: - if not stream_was_orphaned: - connection.in_flight -= 1 - in_flight = connection.in_flight - - if connection.is_defunct or connection.is_closed: - if not connection.signaled_error: - log.debug("Defunct or closed connection (%s) returned to pool, potentially " - "marking host %s as down", id(connection), self.host) - is_down = self._session.cluster.signal_connection_failure( - self.host, connection.last_error, is_host_addition=False) - connection.signaled_error = True - if is_down: - self.shutdown() - else: - self._replace(connection) - elif connection in self._trash: - with connection.lock: - no_pending_requests = connection.in_flight <= len(connection.orphaned_request_ids) - if no_pending_requests: - with self._lock: - close_connection = False - if connection in self._trash: - self._trash.remove(connection) - close_connection = True - if close_connection: - log.debug("Closing trashed connection (%s) to %s", id(connection), self.host) - connection.close() - return - - core_conns = self._session.cluster.get_core_connections_per_host(self.host_distance) - min_reqs = self._session.cluster.get_min_requests_per_connection(self.host_distance) - # we can use in_flight here without holding the connection lock - # because the fact that in_flight dipped below the min at some - # point is enough to start the trashing procedure - if len(self._connections) > core_conns and in_flight <= min_reqs and \ - time.time() >= self._next_trash_allowed_at: - self._maybe_trash_connection(connection) - else: - self._signal_available_conn() - - def on_orphaned_stream_released(self): - """ - Called when a response for an orphaned stream (timed out on the client - side) was received. - """ - self._signal_available_conn() - - def _maybe_trash_connection(self, connection): - core_conns = self._session.cluster.get_core_connections_per_host(self.host_distance) - did_trash = False - with self._lock: - if connection not in self._connections: - return - - if self.open_count > core_conns: - did_trash = True - self.open_count -= 1 - new_connections = self._connections[:] - new_connections.remove(connection) - self._connections = new_connections - - if did_trash: - with connection.lock: - no_pending_requests = connection.in_flight <= len(connection.orphaned_request_ids) - if no_pending_requests: - log.debug("Skipping trash and closing unused connection (%s) to %s", id(connection), self.host) - connection.close() - return - with self._lock: - self._trash.add(connection) - - if did_trash: - self._next_trash_allowed_at = time.time() + _MIN_TRASH_INTERVAL - log.debug("Trashed connection (%s) to %s", id(connection), self.host) - - def _replace(self, connection): - should_replace = False - with self._lock: - if connection in self._connections: - new_connections = self._connections[:] - new_connections.remove(connection) - self._connections = new_connections - self.open_count -= 1 - should_replace = True - - if should_replace: - log.debug("Replacing connection (%s) to %s", id(connection), self.host) - connection.close() - self._session.submit(self._retrying_replace) - else: - log.debug("Closing connection (%s) to %s", id(connection), self.host) - connection.close() - - def _retrying_replace(self): - replaced = False - try: - replaced = self._add_conn_if_under_max() - except Exception: - log.exception("Failed replacing connection to %s", self.host) - if not replaced: - log.debug("Failed replacing connection to %s. Retrying.", self.host) - self._session.submit(self._retrying_replace) - - def shutdown(self): - with self._lock: - if self.is_shutdown: - return - else: - self.is_shutdown = True - - self._signal_all_available_conn() - - connections_to_close = [] - with self._lock: - connections_to_close.extend(self._connections) - self.open_count -= len(self._connections) - self._connections.clear() - connections_to_close.extend(self._trash) - self._trash.clear() - - for conn in connections_to_close: - conn.close() - - def ensure_core_connections(self): - if self.is_shutdown: - return - - core_conns = self._session.cluster.get_core_connections_per_host(self.host_distance) - with self._lock: - to_create = core_conns - (len(self._connections) + self._scheduled_for_creation) - for i in range(to_create): - self._scheduled_for_creation += 1 - self._session.submit(self._create_new_connection) - - def _set_keyspace_for_all_conns(self, keyspace, callback): - """ - Asynchronously sets the keyspace for all connections. When all - connections have been set, `callback` will be called with two - arguments: this pool, and a list of any errors that occurred. - """ - remaining_callbacks = set(self._connections) - errors = [] - - if not remaining_callbacks: - callback(self, errors) - return - - def connection_finished_setting_keyspace(conn, error): - self.return_connection(conn) - remaining_callbacks.remove(conn) - if error: - errors.append(error) - - if not remaining_callbacks: - callback(self, errors) - - self._keyspace = keyspace - for conn in self._connections: - conn.set_keyspace_async(keyspace, connection_finished_setting_keyspace) - - def get_connections(self): - return self._connections - - def get_state(self): - in_flights = [c.in_flight for c in self._connections] - orphan_requests = [c.orphaned_request_ids for c in self._connections] - return {'shutdown': self.is_shutdown, 'open_count': self.open_count, \ - 'in_flights': in_flights, 'orphan_requests': orphan_requests} diff --git a/cassandra/protocol.py b/cassandra/protocol.py index 29ae404048..5fe4ed2be4 100644 --- a/cassandra/protocol.py +++ b/cassandra/protocol.py @@ -36,7 +36,7 @@ TupleType, lookup_casstype, SimpleDateType, TimeType, ByteType, ShortType, DurationType) from cassandra.marshal import (int32_pack, int32_unpack, uint16_pack, uint16_unpack, - uint8_pack, int8_unpack, uint64_pack, header_pack, + uint8_pack, int8_unpack, uint64_pack, v3_header_pack, uint32_pack, uint32_le_unpack, uint32_le_pack) from cassandra.policies import ColDesc from cassandra import WriteType @@ -563,29 +563,13 @@ def _write_query_params(self, f, protocol_version): flags |= _VALUES_FLAG # also v2+, but we're only setting params internally right now if self.serial_consistency_level: - if protocol_version >= 2: - flags |= _WITH_SERIAL_CONSISTENCY_FLAG - else: - raise UnsupportedOperation( - "Serial consistency levels require the use of protocol version " - "2 or higher. Consider setting Cluster.protocol_version to 2 " - "to support serial consistency levels.") + flags |= _WITH_SERIAL_CONSISTENCY_FLAG if self.fetch_size: - if protocol_version >= 2: - flags |= _PAGE_SIZE_FLAG - else: - raise UnsupportedOperation( - "Automatic query paging may only be used with protocol version " - "2 or higher. Consider setting Cluster.protocol_version to 2.") + flags |= _PAGE_SIZE_FLAG if self.paging_state: - if protocol_version >= 2: - flags |= _WITH_PAGING_STATE_FLAG - else: - raise UnsupportedOperation( - "Automatic query paging may only be used with protocol version " - "2 or higher. Consider setting Cluster.protocol_version to 2.") + flags |= _WITH_PAGING_STATE_FLAG if self.timestamp is not None: flags |= _PROTOCOL_TIMESTAMP_FLAG @@ -664,22 +648,7 @@ def __init__(self, query_id, query_params, consistency_level, paging_state, timestamp, skip_meta, continuous_paging_options) def _write_query_params(self, f, protocol_version): - if protocol_version == 1: - if self.serial_consistency_level: - raise UnsupportedOperation( - "Serial consistency levels require the use of protocol version " - "2 or higher. Consider setting Cluster.protocol_version to 2 " - "to support serial consistency levels.") - if self.fetch_size or self.paging_state: - raise UnsupportedOperation( - "Automatic query paging may only be used with protocol version " - "2 or higher. Consider setting Cluster.protocol_version to 2.") - write_short(f, len(self.query_params)) - for param in self.query_params: - write_value(f, param) - write_consistency_level(f, self.consistency_level) - else: - super(ExecuteMessage, self)._write_query_params(f, protocol_version) + super(ExecuteMessage, self)._write_query_params(f, protocol_version) def send_body(self, f, protocol_version): write_string(f, self.query_id) @@ -853,8 +822,7 @@ def recv_prepared_metadata(self, f, protocol_version, user_type_map): coltype = self.read_type(f, user_type_map) bind_metadata.append(ColumnMetadata(colksname, colcfname, colname, coltype)) - if protocol_version >= 2: - self.recv_results_metadata(f, user_type_map) + self.recv_results_metadata(f, user_type_map) self.bind_metadata = bind_metadata self.pk_indexes = pk_indexes @@ -969,33 +937,31 @@ def send_body(self, f, protocol_version): write_value(f, param) write_consistency_level(f, self.consistency_level) - if protocol_version >= 3: - flags = 0 - if self.serial_consistency_level: - flags |= _WITH_SERIAL_CONSISTENCY_FLAG - if self.timestamp is not None: - flags |= _PROTOCOL_TIMESTAMP_FLAG - if self.keyspace: - if ProtocolVersion.uses_keyspace_flag(protocol_version): - flags |= _WITH_KEYSPACE_FLAG - else: - raise UnsupportedOperation( - "Keyspaces may only be set on queries with protocol version " - "5 or higher. Consider setting Cluster.protocol_version to 5.") - - if ProtocolVersion.uses_int_query_flags(protocol_version): - write_int(f, flags) + flags = 0 + if self.serial_consistency_level: + flags |= _WITH_SERIAL_CONSISTENCY_FLAG + if self.timestamp is not None: + flags |= _PROTOCOL_TIMESTAMP_FLAG + if self.keyspace: + if ProtocolVersion.uses_keyspace_flag(protocol_version): + flags |= _WITH_KEYSPACE_FLAG else: - write_byte(f, flags) + raise UnsupportedOperation( + "Keyspaces may only be set on queries with protocol version " + "5 or higher. Consider setting Cluster.protocol_version to 5.") + if ProtocolVersion.uses_int_query_flags(protocol_version): + write_int(f, flags) + else: + write_byte(f, flags) - if self.serial_consistency_level: - write_consistency_level(f, self.serial_consistency_level) - if self.timestamp is not None: - write_long(f, self.timestamp) + if self.serial_consistency_level: + write_consistency_level(f, self.serial_consistency_level) + if self.timestamp is not None: + write_long(f, self.timestamp) - if ProtocolVersion.uses_keyspace_flag(protocol_version): - if self.keyspace is not None: - write_string(f, self.keyspace) + if ProtocolVersion.uses_keyspace_flag(protocol_version): + if self.keyspace is not None: + write_string(f, self.keyspace) known_event_types = frozenset(( @@ -1050,25 +1016,17 @@ def recv_status_change(cls, f, protocol_version): def recv_schema_change(cls, f, protocol_version): # "CREATED", "DROPPED", or "UPDATED" change_type = read_string(f) - if protocol_version >= 3: - target = read_string(f) - keyspace = read_string(f) - event = {'target_type': target, 'change_type': change_type, 'keyspace': keyspace} - if target != SchemaTargetType.KEYSPACE: - target_name = read_string(f) - if target == SchemaTargetType.FUNCTION: - event['function'] = UserFunctionDescriptor(target_name, [read_string(f) for _ in range(read_short(f))]) - elif target == SchemaTargetType.AGGREGATE: - event['aggregate'] = UserAggregateDescriptor(target_name, [read_string(f) for _ in range(read_short(f))]) - else: - event[target.lower()] = target_name - else: - keyspace = read_string(f) - table = read_string(f) - if table: - event = {'target_type': SchemaTargetType.TABLE, 'change_type': change_type, 'keyspace': keyspace, 'table': table} + target = read_string(f) + keyspace = read_string(f) + event = {'target_type': target, 'change_type': change_type, 'keyspace': keyspace} + if target != SchemaTargetType.KEYSPACE: + target_name = read_string(f) + if target == SchemaTargetType.FUNCTION: + event['function'] = UserFunctionDescriptor(target_name, [read_string(f) for _ in range(read_short(f))]) + elif target == SchemaTargetType.AGGREGATE: + event['aggregate'] = UserAggregateDescriptor(target_name, [read_string(f) for _ in range(read_short(f))]) else: - event = {'target_type': SchemaTargetType.KEYSPACE, 'change_type': change_type, 'keyspace': keyspace} + event[target.lower()] = target_name return event @@ -1164,8 +1122,7 @@ def _write_header(f, version, flags, stream_id, opcode, length): """ Write a CQL protocol frame header. """ - pack = v3_header_pack if version >= 3 else header_pack - f.write(pack(version, flags, stream_id, opcode)) + f.write(v3_header_pack(version, flags, stream_id, opcode)) write_int(f, length) @classmethod diff --git a/docs/api/cassandra/cluster.rst b/docs/api/cassandra/cluster.rst index a9a9d378a4..81da28c052 100644 --- a/docs/api/cassandra/cluster.rst +++ b/docs/api/cassandra/cluster.rst @@ -86,14 +86,6 @@ .. automethod:: add_execution_profile - .. automethod:: set_max_requests_per_connection - - .. automethod:: get_max_requests_per_connection - - .. automethod:: set_min_requests_per_connection - - .. automethod:: get_min_requests_per_connection - .. automethod:: get_core_connections_per_host .. automethod:: set_core_connections_per_host diff --git a/docs/index.rst b/docs/index.rst index f4abf44320..8c38dc315c 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -41,9 +41,6 @@ Contents :doc:`security` An overview of the security features of the driver -:doc:`upgrading` - A guide to upgrading versions of the driver - :doc:`user-defined-types` Working with Scylla's user-defined types (UDT) @@ -66,7 +63,6 @@ Contents installation getting-started scylla-specific - upgrading execution-profiles performance query-paging diff --git a/docs/security.rst b/docs/security.rst index c30189562f..57e2be71da 100644 --- a/docs/security.rst +++ b/docs/security.rst @@ -37,23 +37,6 @@ If these do not suit your needs, you may need to create your own subclasses of :class:`~.AuthProvider` and :class:`~.Authenticator`. You can use the Sasl classes as example implementations. -Protocol v1 Authentication -^^^^^^^^^^^^^^^^^^^^^^^^^^ -When working with Cassandra 1.2 (or a higher version with -:attr:`~.Cluster.protocol_version` set to ``1``), you will not pass in -an :class:`~.AuthProvider` instance. Instead, you should pass in a -function that takes one argument, the IP address of a host, and returns -a dict of credentials with a ``username`` and ``password`` key: - -.. code-block:: python - - from cassandra.cluster import Cluster - - def get_credentials(host_address): - return {'username': 'joe', 'password': '1234'} - - cluster = Cluster(auth_provider=get_credentials, protocol_version=1) - SSL --- SSL should be used when client encryption is enabled in Cassandra. diff --git a/docs/upgrading.rst b/docs/upgrading.rst deleted file mode 100644 index d0d40e46b5..0000000000 --- a/docs/upgrading.rst +++ /dev/null @@ -1,353 +0,0 @@ -Upgrading -========= - -.. toctree:: - :maxdepth: 1 - -Installation -^^^^^^^^^^^^ - -Only the `scylla-driver` package should be installed. `dse-driver` and `dse-graph` -are not required anymore:: - - pip install scylla-driver - -See :doc:`installation` for more details. - -Import from the cassandra module -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ - -There is no `dse` module, so you should import from the `cassandra` module. You -need to change only the first module of your import statements, not the submodules. - -.. code-block:: python - - from dse.cluster import Cluster, EXEC_PROFILE_GRAPH_DEFAULT - from dse.auth import PlainTextAuthProvider - from dse.policies import WhiteListRoundRobinPolicy - - # becomes - - from cassandra.cluster import Cluster, EXEC_PROFILE_GRAPH_DEFAULT - from cassandra.auth import PlainTextAuthProvider - from cassandra.policies import WhiteListRoundRobinPolicy - -Also note that the cassandra.hosts module doesn't exist in scylla-driver. This -module is named cassandra.pool. - -Session.execute and Session.execute_async API -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ - -Although it is not common to use this API with positional arguments, it is -important to be aware that the `host` and `execute_as` parameters have had -their positional order swapped. This is only because `execute_as` was added -in dse-driver before `host`. - -See :meth:`.Session.execute`. - -Deprecations -^^^^^^^^^^^^ - -These changes are optional, but recommended: - -* Use :class:`~.policies.DefaultLoadBalancingPolicy` instead of DSELoadBalancingPolicy. - -Upgrading to 3.0 ----------------- -Version 3.0 of the DataStax Python driver for Apache Cassandra -adds support for Cassandra 3.0 while maintaining support for -previously supported versions. In addition to substantial internal rework, -there are several updates to the API that integrators will need -to consider: - -Default consistency is now ``LOCAL_ONE`` -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ -Previous value was ``ONE``. The new value is introduced to mesh with the default -DC-aware load balancing policy and to match other drivers. - -Execution API Updates -^^^^^^^^^^^^^^^^^^^^^ -Result return normalization ---------------------------- -`PYTHON-368 `_ - -Previously results would be returned as a ``list`` of rows for result rows -up to ``fetch_size``, and ``PagedResult`` afterward. This could break -application code that assumed one type and got another. - -Now, all results are returned as an iterable :class:`~.ResultSet`. - -The preferred way to consume results of unknown size is to iterate through -them, letting automatic paging occur as they are consumed. - -.. code-block:: python - - results = session.execute("SELECT * FROM system.local") - for row in results: - process(row) - -If the expected size of the results is known, it is still possible to -materialize a list using the iterator: - -.. code-block:: python - - results = session.execute("SELECT * FROM system.local") - row_list = list(results) - -For backward compatibility, :class:`~.ResultSet` supports indexing. When -accessed at an index, a `~.ResultSet` object will materialize all its pages: - -.. code-block:: python - - results = session.execute("SELECT * FROM system.local") - first_result = results[0] # materializes results, fetching all pages - -This can send requests and load (possibly large) results into memory, so -`~.ResultSet` will log a warning on implicit materialization. - -Trace information is not attached to executed Statements --------------------------------------------------------- -`PYTHON-318 `_ - -Previously trace data was attached to Statements if tracing was enabled. This -could lead to confusion if the same statement was used for multiple executions. - -Now, trace data is associated with the ``ResponseFuture`` and ``ResultSet`` -returned for each query: - -:meth:`.ResponseFuture.get_query_trace()` - -:meth:`.ResponseFuture.get_all_query_traces()` - -:meth:`.ResultSet.get_query_trace()` - -:meth:`.ResultSet.get_all_query_traces()` - -Binding named parameters now ignores extra names ------------------------------------------------- -`PYTHON-178 `_ - -Previously, :meth:`.BoundStatement.bind()` would raise if a mapping -was passed with extra names not found in the prepared statement. - -Behavior in 3.0+ is to ignore extra names. - -blist removed as soft dependency -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ -`PYTHON-385 `_ - -Previously the driver had a soft dependency on ``blist sortedset``, using -that where available and using an internal fallback where possible. - -Now, the driver never chooses the ``blist`` variant, instead returning the -internal :class:`.util.SortedSet` for all ``set`` results. The class implements -all standard set operations, so no integration code should need to change unless -it explicitly checks for ``sortedset`` type. - -Metadata API Updates -^^^^^^^^^^^^^^^^^^^^ -`PYTHON-276 `_, `PYTHON-408 `_, `PYTHON-400 `_, `PYTHON-422 `_ - -Cassandra 3.0 brought a substantial overhaul to the internal schema metadata representation. -This version of the driver supports that metadata in addition to the legacy version. Doing so -also brought some changes to the metadata model. - -The present API is documented: :any:`cassandra.metadata`. Changes highlighted below: - -* All types are now exposed as CQL types instead of types derived from the internal server implementation -* Some metadata attributes have changed names to match current nomenclature (for example, :attr:`.Index.kind` in place of ``Index.type``). -* Some metadata attributes removed - - * ``TableMetadata.keyspace`` reference replaced with :attr:`.TableMetadata.keyspace_name` - * ``ColumnMetadata.index`` is removed table- and keyspace-level mappings are still maintained - -Several deprecated features are removed -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ -`PYTHON-292 `_ - -* ``ResponseFuture.result`` timeout parameter is removed, use ``Session.execute`` timeout instead (`031ebb0 `_) -* ``Cluster.refresh_schema`` removed, use ``Cluster.refresh_*_metadata`` instead (`419fcdf `_) -* ``Cluster.submit_schema_refresh`` removed (`574266d `_) -* ``cqltypes`` time/date functions removed, use ``util`` entry points instead (`bb984ee `_) -* ``decoder`` module removed (`e16a073 `_) -* ``TableMetadata.keyspace`` attribute replaced with ``keyspace_name`` (`cc94073 `_) -* ``cqlengine.columns.TimeUUID.from_datetime`` removed, use ``util`` variant instead (`96489cc `_) -* ``cqlengine.columns.Float(double_precision)`` parameter removed, use ``columns.Double`` instead (`a2d3a98 `_) -* ``cqlengine`` keyspace management functions are removed in favor of the strategy-specific entry points (`4bd5909 `_) -* ``cqlengine.Model.__polymorphic_*__`` attributes removed, use ``__discriminator*`` attributes instead (`9d98c8e `_) -* ``cqlengine.statements`` will no longer warn about list list prepend behavior (`79efe97 `_) - - -Upgrading to 2.1 from 2.0 -------------------------- -Version 2.1 of the DataStax Python driver for Apache Cassandra -adds support for Cassandra 2.1 and version 3 of the native protocol. - -Cassandra 1.2, 2.0, and 2.1 are all supported. However, 1.2 only -supports protocol version 1, and 2.0 only supports versions 1 and -2, so some features may not be available. - -Using the v3 Native Protocol -^^^^^^^^^^^^^^^^^^^^^^^^^^^^ -By default, the driver will attempt to use version 2 of the -native protocol. To use version 3, you must explicitly -set the :attr:`~.Cluster.protocol_version`: - -.. code-block:: python - - from cassandra.cluster import Cluster - - cluster = Cluster(protocol_version=3) - -Note that protocol version 3 is only supported by Cassandra 2.1+. - -In future releases, the driver may default to using protocol version -3. - -Working with User-Defined Types -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ -Cassandra 2.1 introduced the ability to define new types:: - - USE KEYSPACE mykeyspace; - - CREATE TYPE address (street text, city text, zip int); - -The driver generally expects you to use instances of a specific -class to represent column values of this type. You can let the -driver know what class to use with :meth:`.Cluster.register_user_type`: - -.. code-block:: python - - cluster = Cluster() - - class Address(object): - - def __init__(self, street, city, zipcode): - self.street = street - self.city = text - self.zipcode = zipcode - - cluster.register_user_type('mykeyspace', 'address', Address) - -When inserting data for ``address`` columns, you should pass in -instances of ``Address``. When querying data, ``address`` column -values will be instances of ``Address``. - -If no class is registered for a user-defined type, query results -will use a ``namedtuple`` class and data may only be inserted -though prepared statements. - -See :ref:`udts` for more details. - -Customizing Encoders for Non-prepared Statements -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ -Starting with version 2.1 of the driver, it is possible to customize -how Python types are converted to CQL literals when working with -non-prepared statements. This is done on a per-:class:`~.Session` -basis through :attr:`.Session.encoder`: - -.. code-block:: python - - cluster = Cluster() - session = cluster.connect() - session.encoder.mapping[tuple] = session.encoder.cql_encode_tuple - -See :ref:`type-conversions` for the table of default CQL literal conversions. - -Using Client-Side Protocol-Level Timestamps -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ -With version 3 of the native protocol, timestamps may be supplied by the -client at the protocol level. (Normally, if they are not specified within -the CQL query itself, a timestamp is generated server-side.) - -When :attr:`~.Cluster.protocol_version` is set to 3 or higher, the driver -will automatically use client-side timestamps with microsecond precision -unless :attr:`.Session.use_client_timestamp` is changed to :const:`False`. -If a timestamp is specified within the CQL query, it will override the -timestamp generated by the driver. - -Upgrading to 2.0 from 1.x -------------------------- -Version 2.0 of the DataStax Python driver for Apache Cassandra -includes some notable improvements over version 1.x. This version -of the driver supports Cassandra 1.2, 2.0, and 2.1. However, not -all features may be used with Cassandra 1.2, and some new features -in 2.1 are not yet supported. - -Using the v2 Native Protocol -^^^^^^^^^^^^^^^^^^^^^^^^^^^^ -By default, the driver will attempt to use version 2 of Cassandra's -native protocol. You can explicitly set the protocol version to -2, though: - -.. code-block:: python - - from cassandra.cluster import Cluster - - cluster = Cluster(protocol_version=2) - -When working with Cassandra 1.2, you will need to -explicitly set the :attr:`~.Cluster.protocol_version` to 1: - -.. code-block:: python - - from cassandra.cluster import Cluster - - cluster = Cluster(protocol_version=1) - -Automatic Query Paging -^^^^^^^^^^^^^^^^^^^^^^ -Version 2 of the native protocol adds support for automatic query -paging, which can make dealing with large result sets much simpler. - -See :ref:`query-paging` for full details. - -Protocol-Level Batch Statements -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ -With version 1 of the native protocol, batching of statements required -using a `BATCH cql query `_. -With version 2 of the native protocol, you can now batch statements at -the protocol level. This allows you to use many different prepared -statements within a single batch. - -See :class:`~.query.BatchStatement` for details and usage examples. - -SASL-based Authentication -^^^^^^^^^^^^^^^^^^^^^^^^^ -Also new in version 2 of the native protocol is SASL-based authentication. -See the section on :ref:`security` for details and examples. - -Lightweight Transactions -^^^^^^^^^^^^^^^^^^^^^^^^ -`Lightweight transactions `_ are another new feature. To use lightweight transactions, add ``IF`` clauses -to your CQL queries and set the :attr:`~.Statement.serial_consistency_level` -on your statements. - -Calling Cluster.shutdown() -^^^^^^^^^^^^^^^^^^^^^^^^^^ -In order to fix some issues around garbage collection and unclean interpreter -shutdowns, version 2.0 of the driver requires you to call :meth:`.Cluster.shutdown()` -on your :class:`~.Cluster` objects when you are through with them. -This helps to guarantee a clean shutdown. - -Deprecations -^^^^^^^^^^^^ -The following functions have moved from ``cassandra.decoder`` to ``cassandra.query``. -The original functions have been left in place with a :exc:`DeprecationWarning` for -now: - -* :attr:`cassandra.decoder.tuple_factory` has moved to - :attr:`cassandra.query.tuple_factory` -* :attr:`cassandra.decoder.named_tuple_factory` has moved to - :attr:`cassandra.query.named_tuple_factory` -* :attr:`cassandra.decoder.dict_factory` has moved to - :attr:`cassandra.query.dict_factory` -* :attr:`cassandra.decoder.ordered_dict_factory` has moved to - :attr:`cassandra.query.ordered_dict_factory` - -Dependency Changes -^^^^^^^^^^^^^^^^^^ -The following dependencies have officially been made optional: - -* ``scales`` -* ``blist`` diff --git a/tests/integration/__init__.py b/tests/integration/__init__.py index f00d4c7126..75e1df4261 100644 --- a/tests/integration/__init__.py +++ b/tests/integration/__init__.py @@ -241,8 +241,6 @@ def get_default_protocol(): return 4 elif CASSANDRA_VERSION >= Version('2.1'): return 3 - elif CASSANDRA_VERSION >= Version('2.0'): - return 2 else: raise Exception("Running tests with an unsupported Cassandra version: {0}".format(CASSANDRA_VERSION)) @@ -260,10 +258,8 @@ def get_scylla_default_protocol(): def get_supported_protocol_versions(): """ - 1.2 -> 1 - 2.0 -> 2, 1 - 2.1 -> 3, 2, 1 - 2.2 -> 4, 3, 2, 1 + 2.1 -> 3 + 2.2 -> 4, 3 3.X -> 4, 3 3.10(C*) -> 5(beta),4,3 3.10(DSE) -> DSE_V1,4,3 @@ -286,13 +282,11 @@ def get_supported_protocol_versions(): elif CASSANDRA_VERSION >= Version('3.0'): return (3, 4) elif CASSANDRA_VERSION >= Version('2.2'): - return (1,2, 3, 4) + return (3, 4) elif CASSANDRA_VERSION >= Version('2.1'): - return (1, 2, 3) - elif CASSANDRA_VERSION >= Version('2.0'): - return (1, 2) + return (3) else: - return (1,) + return (3,) def get_unsupported_lower_protocol(): @@ -354,8 +348,6 @@ def _id_and_mark(f): local = local_decorator_creator() notprotocolv1 = unittest.skipUnless(PROTOCOL_VERSION > 1, 'Protocol v1 not supported') -lessthenprotocolv4 = unittest.skipUnless(PROTOCOL_VERSION < 4, 'Protocol versions 4 or greater not supported') -lessthanprotocolv3 = unittest.skipUnless(PROTOCOL_VERSION < 3, 'Protocol versions 3 or greater not supported') greaterthanprotocolv3 = unittest.skipUnless(PROTOCOL_VERSION >= 4, 'Protocol versions less than 4 are not supported') protocolv6 = unittest.skipUnless(6 in get_supported_protocol_versions(), 'Protocol versions less than 6 are not supported') diff --git a/tests/integration/standard/test_cluster.py b/tests/integration/standard/test_cluster.py index cdfc7c1b82..9c01fc00a9 100644 --- a/tests/integration/standard/test_cluster.py +++ b/tests/integration/standard/test_cluster.py @@ -41,7 +41,7 @@ from tests import notwindows, notasyncio from tests.integration import use_cluster, get_server_versions, CASSANDRA_VERSION, \ execute_until_pass, execute_with_long_wait_retry, get_node, MockLoggingHandler, get_unsupported_lower_protocol, \ - get_unsupported_upper_protocol, lessthanprotocolv3, protocolv6, local, CASSANDRA_IP, greaterthanorequalcass30, \ + get_unsupported_upper_protocol, protocolv6, local, CASSANDRA_IP, greaterthanorequalcass30, \ lessthanorequalcass40, DSE_VERSION, TestCluster, PROTOCOL_VERSION, xfail_scylla, incorrect_test from tests.integration.util import assert_quiescent_pool_state import sys @@ -408,34 +408,6 @@ def test_connect_to_bad_hosts(self): protocol_version=PROTOCOL_VERSION) self.assertRaises(NoHostAvailable, cluster.connect) - @lessthanprotocolv3 - def test_cluster_settings(self): - """ - Test connection setting getters and setters - """ - - cluster = TestCluster() - - min_requests_per_connection = cluster.get_min_requests_per_connection(HostDistance.LOCAL) - self.assertEqual(cassandra.cluster.DEFAULT_MIN_REQUESTS, min_requests_per_connection) - cluster.set_min_requests_per_connection(HostDistance.LOCAL, min_requests_per_connection + 1) - self.assertEqual(cluster.get_min_requests_per_connection(HostDistance.LOCAL), min_requests_per_connection + 1) - - max_requests_per_connection = cluster.get_max_requests_per_connection(HostDistance.LOCAL) - self.assertEqual(cassandra.cluster.DEFAULT_MAX_REQUESTS, max_requests_per_connection) - cluster.set_max_requests_per_connection(HostDistance.LOCAL, max_requests_per_connection + 1) - self.assertEqual(cluster.get_max_requests_per_connection(HostDistance.LOCAL), max_requests_per_connection + 1) - - core_connections_per_host = cluster.get_core_connections_per_host(HostDistance.LOCAL) - self.assertEqual(cassandra.cluster.DEFAULT_MIN_CONNECTIONS_PER_LOCAL_HOST, core_connections_per_host) - cluster.set_core_connections_per_host(HostDistance.LOCAL, core_connections_per_host + 1) - self.assertEqual(cluster.get_core_connections_per_host(HostDistance.LOCAL), core_connections_per_host + 1) - - max_connections_per_host = cluster.get_max_connections_per_host(HostDistance.LOCAL) - self.assertEqual(cassandra.cluster.DEFAULT_MAX_CONNECTIONS_PER_LOCAL_HOST, max_connections_per_host) - cluster.set_max_connections_per_host(HostDistance.LOCAL, max_connections_per_host + 1) - self.assertEqual(cluster.get_max_connections_per_host(HostDistance.LOCAL), max_connections_per_host + 1) - def test_refresh_schema(self): cluster = TestCluster() session = cluster.connect() diff --git a/tests/integration/standard/test_connection.py b/tests/integration/standard/test_connection.py index b86a4445af..4a5c23d6bf 100644 --- a/tests/integration/standard/test_connection.py +++ b/tests/integration/standard/test_connection.py @@ -27,7 +27,6 @@ from cassandra.cluster import NoHostAvailable, ConnectionShutdown, ExecutionProfile, EXEC_PROFILE_DEFAULT from cassandra.protocol import QueryMessage from cassandra.policies import HostFilterPolicy, RoundRobinPolicy, HostStateListener -from cassandra.pool import HostConnectionPool from tests import is_monkey_patched from tests.integration import use_singledc, get_node, CASSANDRA_IP, local, \ @@ -168,12 +167,8 @@ def fetch_connections(self, host, cluster): holders = cluster.get_connection_holders() for conn in holders: if host == str(getattr(conn, 'host', '')): - if isinstance(conn, HostConnectionPool): - if conn._connections is not None and (conn._connections): - connections.extend(conn._connections) - else: - if conn._connections and conn._connections: - connections.extend(conn._connections.values()) + if conn._connections and conn._connections: + connections.extend(conn._connections.values()) return connections def wait_for_connections(self, host, cluster): diff --git a/tests/unit/io/utils.py b/tests/unit/io/utils.py index a26d898c5f..fa9017ffa2 100644 --- a/tests/unit/io/utils.py +++ b/tests/unit/io/utils.py @@ -197,10 +197,11 @@ def get_socket(self, connection): def set_socket(self, connection, obj): return setattr(connection, self.socket_attr_name, obj) - def make_header_prefix(self, message_class, version=2, stream_id=0): + def make_header_prefix(self, message_class, version=3, stream_id=0): return bytes().join(map(uint8_pack, [ 0xff & (HEADER_DIRECTION_TO_CLIENT | version), 0, # flags (compression) + 0, # MSB for v3+ stream stream_id, message_class.opcode # opcode ])) diff --git a/tests/unit/test_cluster.py b/tests/unit/test_cluster.py index ec7d51bc2d..e656dad005 100644 --- a/tests/unit/test_cluster.py +++ b/tests/unit/test_cluster.py @@ -105,20 +105,6 @@ def test_invalid_contact_point_types(self): with self.assertRaises(TypeError): Cluster(contact_points="not a sequence", protocol_version=4, connect_timeout=1) - def test_requests_in_flight_threshold(self): - d = HostDistance.LOCAL - mn = 3 - mx = 5 - c = Cluster(protocol_version=2) - c.set_min_requests_per_connection(d, mn) - c.set_max_requests_per_connection(d, mx) - # min underflow, max, overflow - for n in (-1, mx, 127): - self.assertRaises(ValueError, c.set_min_requests_per_connection, d, n) - # max underflow, under min, overflow - for n in (0, mn, 128): - self.assertRaises(ValueError, c.set_max_requests_per_connection, d, n) - def test_port_str(self): """Check port passed as tring is converted and checked properly""" cluster = Cluster(contact_points=['127.0.0.1'], port='1111') @@ -230,10 +216,6 @@ def test_protocol_downgrade_test(self): lower = ProtocolVersion.get_lower_supported(ProtocolVersion.V4) self.assertEqual(ProtocolVersion.V3,lower) lower = ProtocolVersion.get_lower_supported(ProtocolVersion.V3) - self.assertEqual(ProtocolVersion.V2,lower) - lower = ProtocolVersion.get_lower_supported(ProtocolVersion.V2) - self.assertEqual(ProtocolVersion.V1, lower) - lower = ProtocolVersion.get_lower_supported(ProtocolVersion.V1) self.assertEqual(0, lower) self.assertTrue(ProtocolVersion.uses_error_code_map(ProtocolVersion.DSE_V1)) diff --git a/tests/unit/test_connection.py b/tests/unit/test_connection.py index 51e6247313..460179e9ff 100644 --- a/tests/unit/test_connection.py +++ b/tests/unit/test_connection.py @@ -38,21 +38,13 @@ def make_connection(self): return c def make_header_prefix(self, message_class, version=Connection.protocol_version, stream_id=0): - if Connection.protocol_version < 3: - return bytes().join(map(uint8_pack, [ - 0xff & (HEADER_DIRECTION_TO_CLIENT | version), - 0, # flags (compression) - stream_id, - message_class.opcode # opcode - ])) - else: - return bytes().join(map(uint8_pack, [ - 0xff & (HEADER_DIRECTION_TO_CLIENT | version), - 0, # flags (compression) - 0, # MSB for v3+ stream - stream_id, - message_class.opcode # opcode - ])) + return bytes().join(map(uint8_pack, [ + 0xff & (HEADER_DIRECTION_TO_CLIENT | version), + 0, # flags (compression) + 0, # MSB for v3+ stream + stream_id, + message_class.opcode # opcode + ])) def make_options_body(self): options_buf = BytesIO() diff --git a/tests/unit/test_host_connection_pool.py b/tests/unit/test_host_connection_pool.py index b4cb067d2f..252ccb49ca 100644 --- a/tests/unit/test_host_connection_pool.py +++ b/tests/unit/test_host_connection_pool.py @@ -24,7 +24,7 @@ from cassandra.cluster import Session, ShardAwareOptions from cassandra.connection import Connection -from cassandra.pool import HostConnection, HostConnectionPool +from cassandra.pool import HostConnection from cassandra.pool import Host, NoConnectionsAvailable from cassandra.policies import HostDistance, SimpleConvictionPolicy @@ -39,7 +39,6 @@ class _PoolTests(unittest.TestCase): def make_session(self): session = NonCallableMagicMock(spec=Session, keyspace='foobarkeyspace') session.cluster.get_core_connections_per_host.return_value = 1 - session.cluster.get_max_requests_per_connection.return_value = 1 session.cluster.get_max_connections_per_host.return_value = 1 return session @@ -223,55 +222,6 @@ def test_host_equality(self): self.assertNotEqual(b, c, 'Two Host instances should NOT be equal when using two different addresses.') -class HostConnectionPoolTests(_PoolTests): - __test__ = True - PoolImpl = HostConnectionPool - uses_single_connection = False - - def test_all_connections_trashed(self): - host = Mock(spec=Host, address='ip1') - session = self.make_session() - conn = NonCallableMagicMock(spec=Connection, in_flight=0, is_defunct=False, is_closed=False, max_request_id=100, - lock=Lock()) - session.cluster.connection_factory.return_value = conn - session.cluster.get_core_connections_per_host.return_value = 1 - - # manipulate the core connection setting so that we can - # trash the only connection - pool = self.PoolImpl(host, HostDistance.LOCAL, session) - session.cluster.get_core_connections_per_host.return_value = 0 - pool._maybe_trash_connection(conn) - session.cluster.get_core_connections_per_host.return_value = 1 - - submit_called = Event() - - def fire_event(*args, **kwargs): - submit_called.set() - - session.submit.side_effect = fire_event - - def get_conn(): - conn.reset_mock() - c, request_id = pool.borrow_connection(1.0) - self.assertIs(conn, c) - self.assertEqual(1, conn.in_flight) - conn.set_keyspace_blocking.assert_called_once_with('foobarkeyspace') - pool.return_connection(c) - - t = Thread(target=get_conn) - t.start() - - submit_called.wait() - self.assertEqual(1, pool._scheduled_for_creation) - session.submit.assert_called_once_with(pool._create_new_connection) - - # now run the create_new_connection call - pool._create_new_connection() - - t.join() - self.assertEqual(0, conn.in_flight) - - class HostConnectionTests(_PoolTests): __test__ = True PoolImpl = HostConnection diff --git a/tests/unit/test_marshalling.py b/tests/unit/test_marshalling.py index 1fdbfa6a4b..9c368860f3 100644 --- a/tests/unit/test_marshalling.py +++ b/tests/unit/test_marshalling.py @@ -75,10 +75,10 @@ (b'', 'MapType(AsciiType, BooleanType)', None), (b'', 'ListType(FloatType)', None), (b'', 'SetType(LongType)', None), - (b'\x00\x00', 'MapType(DecimalType, BooleanType)', OrderedMapSerializedKey(DecimalType, 0)), - (b'\x00\x00', 'ListType(FloatType)', []), - (b'\x00\x00', 'SetType(IntegerType)', sortedset()), - (b'\x00\x01\x00\x10\xafYC\xa3\xea<\x11\xe1\xabc\xc4,\x03"y\xf0', 'ListType(TimeUUIDType)', [UUID(bytes=b'\xafYC\xa3\xea<\x11\xe1\xabc\xc4,\x03"y\xf0')]), + (b'\x00\x00\x00\x00', 'MapType(DecimalType, BooleanType)', OrderedMapSerializedKey(DecimalType, 3)), + (b'\x00\x00\x00\x00', 'ListType(FloatType)', []), + (b'\x00\x00\x00\x00', 'SetType(IntegerType)', sortedset()), + (b'\x00\x00\x00\x01\x00\x00\x00\x10\xafYC\xa3\xea<\x11\xe1\xabc\xc4,\x03"y\xf0', 'ListType(TimeUUIDType)', [UUID(bytes=b'\xafYC\xa3\xea<\x11\xe1\xabc\xc4,\x03"y\xf0')]), (b'\x80\x00\x00\x01', 'SimpleDateType', Date(1)), (b'\x7f\xff\xff\xff', 'SimpleDateType', Date('1969-12-31')), (b'\x00\x00\x00\x00\x00\x00\x00\x01', 'TimeType', Time(1)), @@ -88,7 +88,7 @@ (b'\x80\x00', 'ShortType', -32768) ) -ordered_map_value = OrderedMapSerializedKey(UTF8Type, 2) +ordered_map_value = OrderedMapSerializedKey(UTF8Type, 3) ordered_map_value._insert(u'\u307fbob', 199) ordered_map_value._insert(u'', -1) ordered_map_value._insert(u'\\', 0) @@ -96,8 +96,8 @@ # these following entries work for me right now, but they're dependent on # vagaries of internal python ordering for unordered types marshalled_value_pairs_unsafe = ( - (b'\x00\x03\x00\x06\xe3\x81\xbfbob\x00\x04\x00\x00\x00\xc7\x00\x00\x00\x04\xff\xff\xff\xff\x00\x01\\\x00\x04\x00\x00\x00\x00', 'MapType(UTF8Type, Int32Type)', ordered_map_value), - (b'\x00\x02\x00\x08@\x01\x99\x99\x99\x99\x99\x9a\x00\x08@\x14\x00\x00\x00\x00\x00\x00', 'SetType(DoubleType)', sortedset([2.2, 5.0])), + (b'\x00\x00\x00\x03\x00\x00\x00\x06\xe3\x81\xbfbob\x00\x00\x00\x04\x00\x00\x00\xc7\x00\x00\x00\x00\x00\x00\x00\x04\xff\xff\xff\xff\x00\x00\x00\x01\\\x00\x00\x00\x04\x00\x00\x00\x00', 'MapType(UTF8Type, Int32Type)', ordered_map_value), + (b'\x00\x00\x00\x02\x00\x00\x00\x08@\x01\x99\x99\x99\x99\x99\x9a\x00\x00\x00\x08@\x14\x00\x00\x00\x00\x00\x00', 'SetType(DoubleType)', sortedset([2.2, 5.0])), (b'\x00', 'IntegerType', 0), ) @@ -111,7 +111,7 @@ class UnmarshalTest(unittest.TestCase): def test_unmarshalling(self): for serializedval, valtype, nativeval in marshalled_value_pairs: unmarshaller = lookup_casstype(valtype) - whatwegot = unmarshaller.from_binary(serializedval, 1) + whatwegot = unmarshaller.from_binary(serializedval, 3) self.assertEqual(whatwegot, nativeval, msg='Unmarshaller for %s (%s) failed: unmarshal(%r) got %r instead of %r' % (valtype, unmarshaller, serializedval, whatwegot, nativeval)) @@ -122,7 +122,7 @@ def test_unmarshalling(self): def test_marshalling(self): for serializedval, valtype, nativeval in marshalled_value_pairs: marshaller = lookup_casstype(valtype) - whatwegot = marshaller.to_binary(nativeval, 1) + whatwegot = marshaller.to_binary(nativeval, 3) self.assertEqual(whatwegot, serializedval, msg='Marshaller for %s (%s) failed: marshal(%r) got %r instead of %r' % (valtype, marshaller, nativeval, whatwegot, serializedval)) @@ -132,14 +132,14 @@ def test_marshalling(self): def test_date(self): # separate test because it will deserialize as datetime - self.assertEqual(DateType.from_binary(DateType.to_binary(date(2015, 11, 2), 1), 1), datetime(2015, 11, 2)) + self.assertEqual(DateType.from_binary(DateType.to_binary(date(2015, 11, 2), 3), 3), datetime(2015, 11, 2)) def test_decimal(self): # testing implicit numeric conversion # int, tuple(sign, digits, exp), float converted_types = (10001, (0, (1, 0, 0, 0, 0, 1), -3), 100.1, -87.629798) - for proto_ver in range(1, ProtocolVersion.MAX_SUPPORTED + 1): + for proto_ver in range(3, ProtocolVersion.MAX_SUPPORTED + 1): for n in converted_types: expected = Decimal(n) self.assertEqual(DecimalType.from_binary(DecimalType.to_binary(n, proto_ver), proto_ver), expected) diff --git a/tests/unit/test_orderedmap.py b/tests/unit/test_orderedmap.py index 5d99fc74a8..318b98a52d 100644 --- a/tests/unit/test_orderedmap.py +++ b/tests/unit/test_orderedmap.py @@ -163,7 +163,7 @@ def test_delitem(self): class OrderedMapSerializedKeyTest(unittest.TestCase): def test_init(self): - om = OrderedMapSerializedKey(UTF8Type, 2) + om = OrderedMapSerializedKey(UTF8Type, 3) self.assertEqual(om, {}) def test_normalized_lookup(self): diff --git a/tests/unit/test_parameter_binding.py b/tests/unit/test_parameter_binding.py index 78f3898e01..9c557c0208 100644 --- a/tests/unit/test_parameter_binding.py +++ b/tests/unit/test_parameter_binding.py @@ -68,10 +68,9 @@ def test_float_precision(self): f = 3.4028234663852886e+38 self.assertEqual(float(bind_params("%s", (f,), Encoder())), f) +class BoundStatementTestV3(unittest.TestCase): -class BoundStatementTestV1(unittest.TestCase): - - protocol_version = 1 + protocol_version = 3 @classmethod def setUpClass(cls): @@ -179,15 +178,7 @@ def test_unset_value(self): self.assertRaises(ValueError, self.bound.bind, (0, 0, 0, UNSET_VALUE)) -class BoundStatementTestV2(BoundStatementTestV1): - protocol_version = 2 - - -class BoundStatementTestV3(BoundStatementTestV1): - protocol_version = 3 - - -class BoundStatementTestV4(BoundStatementTestV1): +class BoundStatementTestV4(BoundStatementTestV3): protocol_version = 4 def test_dict_missing_routing_key(self):