diff --git a/cassandra/cluster.py b/cassandra/cluster.py index fd73803eb8..b4510b2356 100644 --- a/cassandra/cluster.py +++ b/cassandra/cluster.py @@ -27,6 +27,7 @@ from copy import copy from functools import partial, reduce, wraps from itertools import groupby, count, chain +import datetime import json import logging from warnings import warn @@ -3630,6 +3631,7 @@ class PeersQueryType(object): _schema_meta_enabled = True _token_meta_enabled = True _schema_meta_page_size = 1000 + _metadata_refresh_window = 0 # seconds _uses_peers_v2 = True _tablets_routing_v1 = False @@ -3643,7 +3645,8 @@ def __init__(self, cluster, timeout, status_event_refresh_window, schema_meta_enabled=True, token_meta_enabled=True, - schema_meta_page_size=1000): + schema_meta_page_size=1000, + metadata_refresh_window=0): # use a weak reference to allow the Cluster instance to be GC'ed (and # shutdown) since implementing __del__ disables the cycle detector self._cluster = weakref.proxy(cluster) @@ -3656,6 +3659,7 @@ def __init__(self, cluster, timeout, self._schema_meta_enabled = schema_meta_enabled self._token_meta_enabled = token_meta_enabled self._schema_meta_page_size = schema_meta_page_size + self._metadata_refresh_window = metadata_refresh_window self._lock = RLock() self._schema_agreement_lock = Lock() @@ -3909,12 +3913,26 @@ def _refresh_schema(self, connection, preloaded_results=None, schema_agreement_w log.debug("Skipping schema refresh due to lack of schema agreement") return False - self._cluster.metadata.refresh( - connection, - self._timeout, - fetch_size=self._schema_meta_page_size, - metadata_request_timeout=self._metadata_request_timeout, - **kwargs) + # if we never updated cluster metadata or we wish to update it every time, + # (refresh window == 0), do so + metadata_last_update_time = self._cluster.metadata.last_update_time + if metadata_last_update_time is None or self._metadata_refresh_window == 0: + self._cluster.metadata.refresh( + connection, + self._timeout, + fetch_size=self._schema_meta_page_size, + metadata_request_timeout=self._metadata_request_timeout, + **kwargs) + # if the last update time is older than the refresh window, refresh + # the metadata + elif (metadata_last_update_time + datetime.timedelta(seconds=self._metadata_refresh_window) < + datetime.datetime.now()): + self._cluster.metadata.refresh( + connection, + self._timeout, + fetch_size=self._schema_meta_page_size, + metadata_request_timeout=self._metadata_request_timeout, + **kwargs) return True @@ -4394,6 +4412,16 @@ def return_connection(self, connection): if connection is self._connection and (connection.is_defunct or connection.is_closed): self.reconnect() + @property + def metadata_refresh_window(self): + return self._metadata_refresh_window + + @metadata_refresh_window.setter + def metadata_refresh_window(self, window): + if not isinstance(window, int) or window < 0: + raise ValueError("metadata_refresh_window must be an int and >= 0") + self._metadata_refresh_window = window + def _stop_scheduler(scheduler, thread): try: diff --git a/cassandra/metadata.py b/cassandra/metadata.py index 30bcf81654..53a35f8607 100644 --- a/cassandra/metadata.py +++ b/cassandra/metadata.py @@ -18,6 +18,7 @@ from collections.abc import Mapping from functools import total_ordering from hashlib import md5 +import datetime import json import logging import re @@ -121,9 +122,13 @@ class Metadata(object): dbaas = False """ A boolean indicating if connected to a DBaaS cluster """ + _last_update_time = None + """ The time of the last metadata update. """ + def __init__(self): self.keyspaces = {} self.dbaas = False + self._last_update_time = None self._hosts = {} self._host_id_by_endpoint = {} self._hosts_lock = RLock() @@ -145,6 +150,7 @@ def refresh(self, connection, timeout, target_type=None, change_type=None, fetch if not target_type: self._rebuild_all(parser) + self._last_update_time = datetime.datetime.now() return tt_lower = target_type.lower() @@ -162,6 +168,7 @@ def refresh(self, connection, timeout, target_type=None, change_type=None, fetch else: drop_method = getattr(self, '_drop_' + tt_lower) drop_method(**kwargs) + self._last_update_time = datetime.datetime.now() except AttributeError: raise ValueError("Unknown schema target_type: '%s'" % target_type) @@ -406,6 +413,13 @@ def all_hosts_items(self): with self._hosts_lock: return list(self._hosts.items()) + @property + def last_update_time(self): + """ + The time of the last metadata update. + """ + return self._last_update_time + REPLICATION_STRATEGY_CLASS_PREFIX = "org.apache.cassandra.locator."