Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 0 additions & 88 deletions cassandra/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,15 +192,6 @@
log = logging.getLogger(__name__)


DEFAULT_MIN_REQUESTS = 5
DEFAULT_MAX_REQUESTS = 100

DEFAULT_MIN_CONNECTIONS_PER_LOCAL_HOST = 2
DEFAULT_MAX_CONNECTIONS_PER_LOCAL_HOST = 8

DEFAULT_MIN_CONNECTIONS_PER_REMOTE_HOST = 1
DEFAULT_MAX_CONNECTIONS_PER_REMOTE_HOST = 2

_GRAPH_PAGING_MIN_DSE_VERSION = Version('6.8.0')

_NOT_SET = object()
Expand Down Expand Up @@ -1449,18 +1440,6 @@

self._user_types = defaultdict(dict)

self._core_connections_per_host = {
HostDistance.LOCAL_RACK: DEFAULT_MIN_CONNECTIONS_PER_LOCAL_HOST,
HostDistance.LOCAL: DEFAULT_MIN_CONNECTIONS_PER_LOCAL_HOST,
HostDistance.REMOTE: DEFAULT_MIN_CONNECTIONS_PER_REMOTE_HOST
}

self._max_connections_per_host = {
HostDistance.LOCAL_RACK: DEFAULT_MAX_CONNECTIONS_PER_LOCAL_HOST,
HostDistance.LOCAL: DEFAULT_MAX_CONNECTIONS_PER_LOCAL_HOST,
HostDistance.REMOTE: DEFAULT_MAX_CONNECTIONS_PER_REMOTE_HOST
}

self.executor = self._create_thread_pool_executor(max_workers=executor_threads)
self.scheduler = _Scheduler(self.executor)

Expand Down Expand Up @@ -1651,73 +1630,6 @@
if not_done:
raise OperationTimedOut("Failed to create all new connection pools in the %ss timeout.")

def get_core_connections_per_host(self, host_distance):
"""
Gets the minimum number of connections per Session that will be opened
for each host with :class:`~.HostDistance` equal to `host_distance`.
The default is 2 for :attr:`~HostDistance.LOCAL` and 1 for
:attr:`~HostDistance.REMOTE`.

This property is ignored if :attr:`~.Cluster.protocol_version` is
3 or higher.
"""
return self._core_connections_per_host[host_distance]

def set_core_connections_per_host(self, host_distance, core_connections):
"""
Sets the minimum number of connections per Session that will be opened
for each host with :class:`~.HostDistance` equal to `host_distance`.
The default is 2 for :attr:`~HostDistance.LOCAL` and 1 for
:attr:`~HostDistance.REMOTE`.

Protocol version 1 and 2 are limited in the number of concurrent
requests they can send per connection. The driver implements connection
pooling to support higher levels of concurrency.

If :attr:`~.Cluster.protocol_version` is set to 3 or higher, this
is not supported (there is always one connection per host, unless
the host is remote and :attr:`connect_to_remote_hosts` is :const:`False`)
and using this will result in an :exc:`~.UnsupportedOperation`.
"""
if self.protocol_version >= 3:
raise UnsupportedOperation(
"Cluster.set_core_connections_per_host() only has an effect "
"when using protocol_version 1 or 2.")
old = self._core_connections_per_host[host_distance]
self._core_connections_per_host[host_distance] = core_connections
if old < core_connections:
self._ensure_core_connections()

def get_max_connections_per_host(self, host_distance):
"""
Gets the maximum number of connections per Session that will be opened
for each host with :class:`~.HostDistance` equal to `host_distance`.
The default is 8 for :attr:`~HostDistance.LOCAL` and 2 for
:attr:`~HostDistance.REMOTE`.

This property is ignored if :attr:`~.Cluster.protocol_version` is
3 or higher.
"""
return self._max_connections_per_host[host_distance]

def set_max_connections_per_host(self, host_distance, max_connections):
"""
Sets the maximum number of connections per Session that will be opened
for each host with :class:`~.HostDistance` equal to `host_distance`.
The default is 2 for :attr:`~HostDistance.LOCAL` and 1 for
:attr:`~HostDistance.REMOTE`.

If :attr:`~.Cluster.protocol_version` is set to 3 or higher, this
is not supported (there is always one connection per host, unless
the host is remote and :attr:`connect_to_remote_hosts` is :const:`False`)
and using this will result in an :exc:`~.UnsupportedOperation`.
"""
if self.protocol_version >= 3:
raise UnsupportedOperation(
"Cluster.set_max_connections_per_host() only has an effect "
"when using protocol_version 1 or 2.")
self._max_connections_per_host[host_distance] = max_connections

def connection_factory(self, endpoint, host_conn = None, *args, **kwargs):
"""
Called to create a new connection with proper configuration.
Expand Down Expand Up @@ -4401,7 +4313,7 @@
self._scheduled_tasks.discard(task)
fn, args, kwargs = task
kwargs = dict(kwargs)
future = self._executor.submit(fn, *args, **kwargs)

Check failure on line 4316 in cassandra/cluster.py

View workflow job for this annotation

GitHub Actions / test libev (3.12)

cannot schedule new futures after shutdown

Check failure on line 4316 in cassandra/cluster.py

View workflow job for this annotation

GitHub Actions / test libev (3.12)

cannot schedule new futures after shutdown

Check failure on line 4316 in cassandra/cluster.py

View workflow job for this annotation

GitHub Actions / test asyncore (3.9)

cannot schedule new futures after shutdown
future.add_done_callback(self._log_if_failed)
else:
self._queue.put_nowait((run_at, i, task))
Expand Down
8 changes: 1 addition & 7 deletions cassandra/concurrent.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,7 @@ def execute_concurrent(session, statements_and_parameters, concurrency=100, rais
``parameters`` item must be a sequence or :const:`None`.

The `concurrency` parameter controls how many statements will be executed
concurrently. When :attr:`.Cluster.protocol_version` is set to 1 or 2,
it is recommended that this be kept below 100 times the number of
core connections per host times the number of connected hosts (see
:meth:`.Cluster.set_core_connections_per_host`). If that amount is exceeded,
the event loop thread may attempt to block on new connection creation,
substantially impacting throughput. If :attr:`~.Cluster.protocol_version`
is 3 or higher, you can safely experiment with higher levels of concurrency.
concurrently.

If `raise_on_first_error` is left as :const:`True`, execution will stop
after the first failed statement and the corresponding exception will be
Expand Down
8 changes: 0 additions & 8 deletions docs/api/cassandra/cluster.rst
Original file line number Diff line number Diff line change
Expand Up @@ -86,14 +86,6 @@

.. automethod:: add_execution_profile

.. automethod:: get_core_connections_per_host

.. automethod:: set_core_connections_per_host

.. automethod:: get_max_connections_per_host

.. automethod:: set_max_connections_per_host

.. automethod:: get_control_connection_host

.. automethod:: refresh_schema_metadata
Expand Down
2 changes: 0 additions & 2 deletions tests/integration/standard/test_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -729,8 +729,6 @@ def test_idle_heartbeat(self):
interval = 2
cluster = TestCluster(idle_heartbeat_interval=interval,
monitor_reporting_enabled=False)
if PROTOCOL_VERSION < 3:
cluster.set_core_connections_per_host(HostDistance.LOCAL, 1)
session = cluster.connect(wait_for_all_pools=True)

# This test relies on impl details of connection req id management to see if heartbeats
Expand Down
2 changes: 0 additions & 2 deletions tests/integration/standard/test_concurrent.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,6 @@ def setUpClass(cls):
EXEC_PROFILE_DICT: ExecutionProfile(row_factory=dict_factory)
}
)
if PROTOCOL_VERSION < 3:
cls.cluster.set_core_connections_per_host(HostDistance.LOCAL, 1)
cls.session = cls.cluster.connect()

@classmethod
Expand Down
4 changes: 0 additions & 4 deletions tests/integration/standard/test_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -664,8 +664,6 @@ def setUp(self):
% (PROTOCOL_VERSION,))

self.cluster = TestCluster()
if PROTOCOL_VERSION < 3:
self.cluster.set_core_connections_per_host(HostDistance.LOCAL, 1)
self.session = self.cluster.connect(wait_for_all_pools=True)

def tearDown(self):
Expand Down Expand Up @@ -800,8 +798,6 @@ def setUp(self):
% (PROTOCOL_VERSION,))

self.cluster = TestCluster()
if PROTOCOL_VERSION < 3:
self.cluster.set_core_connections_per_host(HostDistance.LOCAL, 1)
self.session = self.cluster.connect()

def tearDown(self):
Expand Down
2 changes: 0 additions & 2 deletions tests/integration/standard/test_query_paging.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,6 @@ def setUp(self):
self.cluster = TestCluster(
execution_profiles={EXEC_PROFILE_DEFAULT: ExecutionProfile(consistency_level=ConsistencyLevel.LOCAL_QUORUM)}
)
if PROTOCOL_VERSION < 3:
self.cluster.set_core_connections_per_host(HostDistance.LOCAL, 1)
self.session = self.cluster.connect(wait_for_all_pools=True)
self.session.execute("TRUNCATE test3rf.test")

Expand Down
5 changes: 0 additions & 5 deletions tests/unit/test_host_connection_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,6 @@ class _PoolTests(unittest.TestCase):

def make_session(self):
session = NonCallableMagicMock(spec=Session, keyspace='foobarkeyspace')
session.cluster.get_core_connections_per_host.return_value = 1
session.cluster.get_max_connections_per_host.return_value = 1
return session

def test_borrow_and_return(self):
Expand Down Expand Up @@ -113,9 +111,6 @@ def test_spawn_when_at_max(self):
conn.max_request_id = 100
session.cluster.connection_factory.return_value = conn

# core conns = 1, max conns = 2
session.cluster.get_max_connections_per_host.return_value = 2

pool = self.PoolImpl(host, HostDistance.LOCAL, session)
session.cluster.connection_factory.assert_called_once_with(host.endpoint, on_orphaned_stream_released=pool.on_orphaned_stream_released)

Expand Down
Loading