Skip to content

Commit c89ad2a

Browse files
committed
Introduce ShardConnectionBackoffPolicy
Commit introduces two abstract classes: 1. `ShardConnectionBackoffPolicy` - a base class for policy that controls pase of shard connections creation 2. Auxiliary `ShardConnectionScheduler` - a scheduler that is instatiated by `ShardConnectionBackoffPolicy` at session initialization
1 parent 3f7bcbb commit c89ad2a

File tree

1 file changed

+74
-4
lines changed

1 file changed

+74
-4
lines changed

cassandra/policies.py

Lines changed: 74 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,21 +11,25 @@
1111
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
14+
from __future__ import annotations
15+
1416
import random
1517

1618
from collections import namedtuple
17-
from functools import lru_cache
1819
from itertools import islice, cycle, groupby, repeat
1920
import logging
2021
from random import randint, shuffle
2122
from threading import Lock
2223
import socket
2324
import warnings
24-
25-
log = logging.getLogger(__name__)
26-
25+
from typing import Callable, TYPE_CHECKING
26+
from abc import ABC, abstractmethod
2727
from cassandra import WriteType as WT
2828

29+
if TYPE_CHECKING:
30+
from cluster import _Scheduler
31+
32+
log = logging.getLogger(__name__)
2933

3034
# This is done this way because WriteType was originally
3135
# defined here and in order not to break the API.
@@ -864,6 +868,72 @@ def _add_jitter(self, value):
864868
return min(max(self.base_delay, delay), self.max_delay)
865869

866870

871+
class ShardConnectionScheduler(ABC):
872+
"""
873+
A base class for a scheduler for a shard connection backoff policy.
874+
``ShardConnectionScheduler`` is a per Session instance that schedules per shard connections according to
875+
``ShardConnectionBackoffPolicy`` that instantiates it.
876+
"""
877+
878+
@abstractmethod
879+
def schedule(
880+
self,
881+
host_id: str,
882+
shard_id: int,
883+
method: Callable[[], None],
884+
) -> bool:
885+
"""
886+
Schedules a request to create a connection to the given host and shard according to the scheduling policy.
887+
888+
The `schedule` method is called whenever `HostConnection` needs to establish a connection to the specified
889+
(host_id, shard_id) pair.
890+
891+
The responsibilities of `schedule` are as follows:
892+
1. Deduplicate requests for the same (host_id, shard_id), considering both queued and currently running requests.
893+
2. Ensure that requests are executed at a pace consistent with the expected behavior of the
894+
`ShardConnectionScheduler` implementation.
895+
896+
The `schedule` method must never execute `method` immediately; `method` should always be run in a separate thread.
897+
Handling of failed `method` executions is managed by upper logic (`HostConnection`) and should not be performed by the
898+
implementation of `schedule`.
899+
900+
Parameters:
901+
``host_id`` - an id of the host of the shard.
902+
``shard_id`` - an id of the shard.
903+
``method`` - a callable that creates connection and stores it in the connection pool, it handles closed session,
904+
HostConnection and existence of the connection to the shard.
905+
Currently, it is ``HostConnection._open_connection_to_missing_shard``.
906+
"""
907+
raise NotImplementedError()
908+
909+
@abstractmethod
910+
def shutdown(self):
911+
"""
912+
Shutdown the scheduler.
913+
914+
It should stop the scheduler from execution any creation requests.
915+
Ones that already scheduled should be canceled.
916+
It is acceptable for currently running requests to complete.
917+
"""
918+
raise NotImplementedError()
919+
920+
921+
class ShardConnectionBackoffPolicy(ABC):
922+
"""
923+
Base class for shard connection backoff policies.
924+
These policies allow user to control pace of establishing new connections.
925+
"""
926+
927+
@abstractmethod
928+
def new_connection_scheduler(self, scheduler: _Scheduler) -> ShardConnectionScheduler:
929+
"""
930+
Instantiate a connection scheduler that behaves according to the policy.
931+
932+
It is called on session initialization.
933+
"""
934+
raise NotImplementedError()
935+
936+
867937
class RetryPolicy(object):
868938
"""
869939
A policy that describes whether to retry, rethrow, or ignore coordinator

0 commit comments

Comments
 (0)