Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 22 additions & 4 deletions cassandra/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from itertools import groupby, count, chain
import json
import logging
from typing import Optional
from typing import Optional, Union
from warnings import warn
from random import random
import re
Expand All @@ -51,7 +51,7 @@
from cassandra.connection import (ConnectionException, ConnectionShutdown,
ConnectionHeartbeat, ProtocolVersionUnsupported,
EndPoint, DefaultEndPoint, DefaultEndPointFactory,
SniEndPointFactory, ConnectionBusy)
SniEndPointFactory, ConnectionBusy, locally_supported_compressions)
from cassandra.cqltypes import UserType
import cassandra.cqltypes as types
from cassandra.encoder import Encoder
Expand Down Expand Up @@ -686,7 +686,7 @@
Used for testing new protocol features incrementally before the new version is complete.
"""

compression = True
compression: Union[bool, str] = True
"""
Controls compression for communications between the driver and Cassandra.
If left as the default of :const:`True`, either lz4 or snappy compression
Expand Down Expand Up @@ -1173,7 +1173,7 @@
def __init__(self,
contact_points=_NOT_SET,
port=9042,
compression=True,
compression: Union[bool, str] = True,
auth_provider=None,
load_balancing_policy=None,
reconnection_policy=None,
Expand Down Expand Up @@ -1302,6 +1302,24 @@

self._resolve_hostnames()

if isinstance(compression, bool):
if compression and not locally_supported_compressions:
log.error(
"Compression is enabled, but no compression libraries are available. "
"Disabling compression, consider installing one of the Python packages: lz4 and/or python-snappy."
)
compression = False
elif isinstance(compression, str):
if not locally_supported_compressions.get(compression):
raise ValueError(
"Compression '%s' was requested, but it is not available. "
"Consider installing the corresponding Python package." % compression
)
else:
raise TypeError(
"The 'compression' option must be either a string (e.g., 'lz4' or 'snappy') "
"or a boolean (True to enable any available compression, False to disable it)."
)
self.compression = compression

if protocol_version is not _NOT_SET:
Expand Down Expand Up @@ -4315,7 +4333,7 @@
self._scheduled_tasks.discard(task)
fn, args, kwargs = task
kwargs = dict(kwargs)
future = self._executor.submit(fn, *args, **kwargs)

Check failure on line 4336 in cassandra/cluster.py

View workflow job for this annotation

GitHub Actions / test asyncore (3.11)

cannot schedule new futures after shutdown
future.add_done_callback(self._log_if_failed)
else:
self._queue.put_nowait((run_at, i, task))
Expand Down
15 changes: 8 additions & 7 deletions cassandra/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import weakref
import random
import itertools
from typing import Optional
from typing import Optional, Union

from cassandra.application_info import ApplicationInfoBase
from cassandra.protocol_features import ProtocolFeatures
Expand Down Expand Up @@ -679,7 +679,7 @@ class Connection(object):
protocol_version = ProtocolVersion.MAX_SUPPORTED

keyspace = None
compression = True
compression: Union[bool, str] = True
_compression_type = None
compressor = None
decompressor = None
Expand Down Expand Up @@ -760,7 +760,7 @@ def _iobuf(self):
return self._io_buffer.io_buffer

def __init__(self, host='127.0.0.1', port=9042, authenticator=None,
ssl_options=None, sockopts=None, compression=True,
ssl_options=None, sockopts=None, compression: Union[bool, str] = True,
cql_version=None, protocol_version=ProtocolVersion.MAX_SUPPORTED, is_control_connection=False,
user_type_map=None, connect_timeout=None, allow_beta_protocol_version=False, no_compact=False,
ssl_context=None, owning_pool=None, shard_id=None, total_shards=None,
Expand Down Expand Up @@ -1383,10 +1383,11 @@ def _handle_options_response(self, options_response):
overlap = (set(locally_supported_compressions.keys()) &
set(remote_supported_compressions))
if len(overlap) == 0:
log.error("No available compression types supported on both ends."
" locally supported: %r. remotely supported: %r",
locally_supported_compressions.keys(),
remote_supported_compressions)
if locally_supported_compressions:
log.error("No available compression types supported on both ends."
" locally supported: %r. remotely supported: %r",
locally_supported_compressions.keys(),
remote_supported_compressions)
else:
compression_type = None
if isinstance(self.compression, str):
Expand Down
Loading