Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 35 additions & 7 deletions cassandra/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from copy import copy
from functools import partial, reduce, wraps
from itertools import groupby, count, chain
import datetime
import json
import logging
from warnings import warn
Expand Down Expand Up @@ -3630,6 +3631,7 @@ class PeersQueryType(object):
_schema_meta_enabled = True
_token_meta_enabled = True
_schema_meta_page_size = 1000
_metadata_refresh_window = 0 # seconds

_uses_peers_v2 = True
_tablets_routing_v1 = False
Expand All @@ -3643,7 +3645,8 @@ def __init__(self, cluster, timeout,
status_event_refresh_window,
schema_meta_enabled=True,
token_meta_enabled=True,
schema_meta_page_size=1000):
schema_meta_page_size=1000,
metadata_refresh_window=0):
# use a weak reference to allow the Cluster instance to be GC'ed (and
# shutdown) since implementing __del__ disables the cycle detector
self._cluster = weakref.proxy(cluster)
Expand All @@ -3656,6 +3659,7 @@ def __init__(self, cluster, timeout,
self._schema_meta_enabled = schema_meta_enabled
self._token_meta_enabled = token_meta_enabled
self._schema_meta_page_size = schema_meta_page_size
self._metadata_refresh_window = metadata_refresh_window

self._lock = RLock()
self._schema_agreement_lock = Lock()
Expand Down Expand Up @@ -3909,12 +3913,26 @@ def _refresh_schema(self, connection, preloaded_results=None, schema_agreement_w
log.debug("Skipping schema refresh due to lack of schema agreement")
return False

self._cluster.metadata.refresh(
connection,
self._timeout,
fetch_size=self._schema_meta_page_size,
metadata_request_timeout=self._metadata_request_timeout,
**kwargs)
# if we never updated cluster metadata or we wish to update it every time,
# (refresh window == 0), do so
metadata_last_update_time = self._cluster.metadata.last_update_time
if metadata_last_update_time is None or self._metadata_refresh_window == 0:
self._cluster.metadata.refresh(
connection,
self._timeout,
fetch_size=self._schema_meta_page_size,
metadata_request_timeout=self._metadata_request_timeout,
**kwargs)
# if the last update time is older than the refresh window, refresh
# the metadata
elif (metadata_last_update_time + datetime.timedelta(seconds=self._metadata_refresh_window) <
datetime.datetime.now()):
self._cluster.metadata.refresh(
connection,
self._timeout,
fetch_size=self._schema_meta_page_size,
metadata_request_timeout=self._metadata_request_timeout,
**kwargs)

return True

Expand Down Expand Up @@ -4394,6 +4412,16 @@ def return_connection(self, connection):
if connection is self._connection and (connection.is_defunct or connection.is_closed):
self.reconnect()

@property
def metadata_refresh_window(self):
return self._metadata_refresh_window

@metadata_refresh_window.setter
def metadata_refresh_window(self, window):
if not isinstance(window, int) or window < 0:
raise ValueError("metadata_refresh_window must be an int and >= 0")
self._metadata_refresh_window = window


def _stop_scheduler(scheduler, thread):
try:
Expand Down
14 changes: 14 additions & 0 deletions cassandra/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from collections.abc import Mapping
from functools import total_ordering
from hashlib import md5
import datetime
import json
import logging
import re
Expand Down Expand Up @@ -121,9 +122,13 @@ class Metadata(object):
dbaas = False
""" A boolean indicating if connected to a DBaaS cluster """

_last_update_time = None
""" The time of the last metadata update. """

def __init__(self):
self.keyspaces = {}
self.dbaas = False
self._last_update_time = None
self._hosts = {}
self._host_id_by_endpoint = {}
self._hosts_lock = RLock()
Expand All @@ -145,6 +150,7 @@ def refresh(self, connection, timeout, target_type=None, change_type=None, fetch

if not target_type:
self._rebuild_all(parser)
self._last_update_time = datetime.datetime.now()
return

tt_lower = target_type.lower()
Expand All @@ -162,6 +168,7 @@ def refresh(self, connection, timeout, target_type=None, change_type=None, fetch
else:
drop_method = getattr(self, '_drop_' + tt_lower)
drop_method(**kwargs)
self._last_update_time = datetime.datetime.now()
except AttributeError:
raise ValueError("Unknown schema target_type: '%s'" % target_type)

Expand Down Expand Up @@ -406,6 +413,13 @@ def all_hosts_items(self):
with self._hosts_lock:
return list(self._hosts.items())

@property
def last_update_time(self):
"""
The time of the last metadata update.
"""
return self._last_update_time


REPLICATION_STRATEGY_CLASS_PREFIX = "org.apache.cassandra.locator."

Expand Down
Loading