2727from copy import copy
2828from functools import partial , reduce , wraps
2929from itertools import groupby , count , chain
30+ import datetime
3031import json
3132import logging
3233from warnings import warn
@@ -3630,6 +3631,7 @@ class PeersQueryType(object):
36303631 _schema_meta_enabled = True
36313632 _token_meta_enabled = True
36323633 _schema_meta_page_size = 1000
3634+ _metadata_refresh_window = 0 # seconds
36333635
36343636 _uses_peers_v2 = True
36353637 _tablets_routing_v1 = False
@@ -3643,7 +3645,8 @@ def __init__(self, cluster, timeout,
36433645 status_event_refresh_window ,
36443646 schema_meta_enabled = True ,
36453647 token_meta_enabled = True ,
3646- schema_meta_page_size = 1000 ):
3648+ schema_meta_page_size = 1000 ,
3649+ metadata_refresh_window = 0 ):
36473650 # use a weak reference to allow the Cluster instance to be GC'ed (and
36483651 # shutdown) since implementing __del__ disables the cycle detector
36493652 self ._cluster = weakref .proxy (cluster )
@@ -3656,6 +3659,7 @@ def __init__(self, cluster, timeout,
36563659 self ._schema_meta_enabled = schema_meta_enabled
36573660 self ._token_meta_enabled = token_meta_enabled
36583661 self ._schema_meta_page_size = schema_meta_page_size
3662+ self ._metadata_refresh_window = metadata_refresh_window
36593663
36603664 self ._lock = RLock ()
36613665 self ._schema_agreement_lock = Lock ()
@@ -3909,12 +3913,26 @@ def _refresh_schema(self, connection, preloaded_results=None, schema_agreement_w
39093913 log .debug ("Skipping schema refresh due to lack of schema agreement" )
39103914 return False
39113915
3912- self ._cluster .metadata .refresh (
3913- connection ,
3914- self ._timeout ,
3915- fetch_size = self ._schema_meta_page_size ,
3916- metadata_request_timeout = self ._metadata_request_timeout ,
3917- ** kwargs )
3916+ # if we never updated cluster metadata or we wish to update it every time,
3917+ # (refresh window == 0), do so
3918+ metadata_last_update_time = self ._cluster .metadata .last_update_time
3919+ if metadata_last_update_time is None or self ._metadata_refresh_window == 0 :
3920+ self ._cluster .metadata .refresh (
3921+ connection ,
3922+ self ._timeout ,
3923+ fetch_size = self ._schema_meta_page_size ,
3924+ metadata_request_timeout = self ._metadata_request_timeout ,
3925+ ** kwargs )
3926+ # if the last update time is older than the refresh window, refresh
3927+ # the metadata
3928+ elif (metadata_last_update_time + datetime .timedelta (seconds = self ._metadata_refresh_window ) <
3929+ datetime .datetime .now ()):
3930+ self ._cluster .metadata .refresh (
3931+ connection ,
3932+ self ._timeout ,
3933+ fetch_size = self ._schema_meta_page_size ,
3934+ metadata_request_timeout = self ._metadata_request_timeout ,
3935+ ** kwargs )
39183936
39193937 return True
39203938
@@ -4394,6 +4412,16 @@ def return_connection(self, connection):
43944412 if connection is self ._connection and (connection .is_defunct or connection .is_closed ):
43954413 self .reconnect ()
43964414
4415+ @property
4416+ def metadata_refresh_window (self ):
4417+ return self ._metadata_refresh_window
4418+
4419+ @metadata_refresh_window .setter
4420+ def metadata_refresh_window (self , window ):
4421+ if not isinstance (window , int ) or window < 0 :
4422+ raise ValueError ("metadata_refresh_window must be an int and >= 0" )
4423+ self ._metadata_refresh_window = window
4424+
43974425
43984426def _stop_scheduler (scheduler , thread ):
43994427 try :
0 commit comments