Skip to content

Commit e8e40fc

Browse files
committed
(feat)metadata_refresh_window - refresh cluster metdata only once every window
Today, after every schema change, we refresh the cluster metadata unconditionally. This is 8-10 or so queries, which are mostly unneeded. Added a window parameter, which allows refreshing either when it was never fetched or when the window has passed. Signed-off-by: Yaniv Kaul <[email protected]>
1 parent a401409 commit e8e40fc

File tree

2 files changed

+37
-7
lines changed

2 files changed

+37
-7
lines changed

cassandra/cluster.py

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
from copy import copy
2828
from functools import partial, reduce, wraps
2929
from itertools import groupby, count, chain
30+
import datetime
3031
import json
3132
import logging
3233
from warnings import warn
@@ -3630,6 +3631,7 @@ class PeersQueryType(object):
36303631
_schema_meta_enabled = True
36313632
_token_meta_enabled = True
36323633
_schema_meta_page_size = 1000
3634+
_metadata_refresh_window = 60 # seconds
36333635

36343636
_uses_peers_v2 = True
36353637
_tablets_routing_v1 = False
@@ -3643,7 +3645,8 @@ def __init__(self, cluster, timeout,
36433645
status_event_refresh_window,
36443646
schema_meta_enabled=True,
36453647
token_meta_enabled=True,
3646-
schema_meta_page_size=1000):
3648+
schema_meta_page_size=1000,
3649+
metadata_refresh_window=60):
36473650
# use a weak reference to allow the Cluster instance to be GC'ed (and
36483651
# shutdown) since implementing __del__ disables the cycle detector
36493652
self._cluster = weakref.proxy(cluster)
@@ -3656,6 +3659,7 @@ def __init__(self, cluster, timeout,
36563659
self._schema_meta_enabled = schema_meta_enabled
36573660
self._token_meta_enabled = token_meta_enabled
36583661
self._schema_meta_page_size = schema_meta_page_size
3662+
self._metadata_refresh_window = metadata_refresh_window
36593663

36603664
self._lock = RLock()
36613665
self._schema_agreement_lock = Lock()
@@ -3909,12 +3913,24 @@ def _refresh_schema(self, connection, preloaded_results=None, schema_agreement_w
39093913
log.debug("Skipping schema refresh due to lack of schema agreement")
39103914
return False
39113915

3912-
self._cluster.metadata.refresh(
3913-
connection,
3914-
self._timeout,
3915-
fetch_size=self._schema_meta_page_size,
3916-
metadata_request_timeout=self._metadata_request_timeout,
3917-
**kwargs)
3916+
metadata_last_update_time = self._cluster.metadata.last_update_time
3917+
if metadata_last_update_time is None:
3918+
self._cluster.metadata.refresh(
3919+
connection,
3920+
self._timeout,
3921+
fetch_size=self._schema_meta_page_size,
3922+
metadata_request_timeout=self._metadata_request_timeout,
3923+
**kwargs)
3924+
# if the last update time is older than the refresh window, refresh
3925+
# the metadata
3926+
elif (metadata_last_update_time + datetime.timedelta(seconds=self._metadata_refresh_window) <
3927+
datetime.datetime.now()):
3928+
self._cluster.metadata.refresh(
3929+
connection,
3930+
self._timeout,
3931+
fetch_size=self._schema_meta_page_size,
3932+
metadata_request_timeout=self._metadata_request_timeout,
3933+
**kwargs)
39183934

39193935
return True
39203936

cassandra/metadata.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
from collections.abc import Mapping
1919
from functools import total_ordering
2020
from hashlib import md5
21+
import datetime
2122
import json
2223
import logging
2324
import re
@@ -121,9 +122,13 @@ class Metadata(object):
121122
dbaas = False
122123
""" A boolean indicating if connected to a DBaaS cluster """
123124

125+
_last_update_time = None
126+
""" The time of the last metadata update. """
127+
124128
def __init__(self):
125129
self.keyspaces = {}
126130
self.dbaas = False
131+
self._last_update_time = None
127132
self._hosts = {}
128133
self._host_id_by_endpoint = {}
129134
self._hosts_lock = RLock()
@@ -145,6 +150,7 @@ def refresh(self, connection, timeout, target_type=None, change_type=None, fetch
145150

146151
if not target_type:
147152
self._rebuild_all(parser)
153+
self._last_update_time = datetime.datetime.now()
148154
return
149155

150156
tt_lower = target_type.lower()
@@ -162,6 +168,7 @@ def refresh(self, connection, timeout, target_type=None, change_type=None, fetch
162168
else:
163169
drop_method = getattr(self, '_drop_' + tt_lower)
164170
drop_method(**kwargs)
171+
self._last_update_time = datetime.datetime.now()
165172
except AttributeError:
166173
raise ValueError("Unknown schema target_type: '%s'" % target_type)
167174

@@ -406,6 +413,13 @@ def all_hosts_items(self):
406413
with self._hosts_lock:
407414
return list(self._hosts.items())
408415

416+
@property
417+
def last_update_time(self):
418+
"""
419+
The time of the last metadata update.
420+
"""
421+
return self._last_update_time
422+
409423

410424
REPLICATION_STRATEGY_CLASS_PREFIX = "org.apache.cassandra.locator."
411425

0 commit comments

Comments
 (0)