@@ -3783,138 +3783,6 @@ def _refresh_schema(self, connection, preloaded_results=None, schema_agreement_w
3783
3783
self ._cluster .metadata .refresh (connection , self ._timeout , fetch_size = self ._schema_meta_page_size , ** kwargs )
3784
3784
3785
3785
return True
3786
-
3787
- # Three functions below (_refresh_schema_async, _refresh_schema_async_inner, _wait_for_schema_agreement_async) are async
3788
- # versions of the functions without _async in name - instead of blocking and returning result, their first argument
3789
- # is a callback that will receive either a result or an exception.
3790
- # Purpose of those functions is to avoid filling whole thread pool and deadlocking.
3791
- def _refresh_schema_async (self , callback , force = False , ** kwargs ):
3792
- def new_callback (e ):
3793
- if isinstance (e , ReferenceError ):
3794
- # our weak reference to the Cluster is no good
3795
- callback (False )
3796
- return
3797
- elif isinstance (e , Exception ):
3798
- log .debug ("[control connection] Error refreshing schema" , exc_info = True )
3799
- self ._signal_error ()
3800
- callback (False )
3801
- return
3802
- else :
3803
- callback (e )
3804
- if self ._connection :
3805
- self ._refresh_schema_async_inner (new_callback , self ._connection , force = force , ** kwargs )
3806
- else :
3807
- callback (False )
3808
-
3809
- def _refresh_schema_async_inner (self , callback , connection , preloaded_results = None , schema_agreement_wait = None , force = False , ** kwargs ):
3810
- if self ._cluster .is_shutdown :
3811
- callback (False )
3812
- return
3813
-
3814
- def new_callback (e ):
3815
- if not self ._schema_meta_enabled and not force :
3816
- log .debug ("[control connection] Skipping schema refresh because schema metadata is disabled" )
3817
- callback (False )
3818
- return
3819
-
3820
- if not e :
3821
- log .debug ("Skipping schema refresh due to lack of schema agreement" )
3822
- callback (False )
3823
- return
3824
- self ._cluster .metadata .refresh (connection , self ._timeout , fetch_size = self ._schema_meta_page_size , ** kwargs )
3825
-
3826
- self ._wait_for_schema_agreement_async (new_callback ,
3827
- connection = self ._connection ,
3828
- preloaded_results = preloaded_results ,
3829
- wait_time = schema_agreement_wait )
3830
-
3831
- # INTENDED ONLY FOR INTERNAL USE
3832
- def _wait_for_schema_agreement_async (self , callback , connection = None , preloaded_results = None , wait_time = None ):
3833
- total_timeout = wait_time if wait_time is not None else self ._cluster .max_schema_agreement_wait
3834
- if total_timeout <= 0 :
3835
- callback (True )
3836
- return
3837
-
3838
- # Each schema change typically generates two schema refreshes, one
3839
- # from the response type and one from the pushed notification. Holding
3840
- # a lock is just a simple way to cut down on the number of schema queries
3841
- # we'll make.
3842
- if not self ._schema_agreement_lock .acquire (blocking = False ):
3843
- self ._cluster .scheduler .schedule_unique (0.2 , self ._wait_for_schema_agreement_async , callback , connection , preloaded_results , wait_time )
3844
- return
3845
-
3846
- try :
3847
- if self ._is_shutdown :
3848
- self ._schema_agreement_lock .release ()
3849
- callback (None )
3850
- return
3851
-
3852
- if not connection :
3853
- connection = self ._connection
3854
-
3855
- if preloaded_results :
3856
- log .debug ("[control connection] Attempting to use preloaded results for schema agreement" )
3857
-
3858
- peers_result = preloaded_results [0 ]
3859
- local_result = preloaded_results [1 ]
3860
- schema_mismatches = self ._get_schema_mismatches (peers_result , local_result , connection .endpoint )
3861
- if schema_mismatches is None :
3862
- self ._schema_agreement_lock .release ()
3863
- callback (True )
3864
- return
3865
-
3866
- log .debug ("[control connection] Waiting for schema agreement" )
3867
- start = self ._time .time ()
3868
- elapsed = 0
3869
- cl = ConsistencyLevel .ONE
3870
- schema_mismatches = None
3871
- select_peers_query = self ._get_peers_query (self .PeersQueryType .PEERS_SCHEMA , connection )
3872
- except Exception as e :
3873
- self ._schema_agreement_lock .release ()
3874
- callback (e )
3875
- return
3876
-
3877
- def inner (first_iter ):
3878
- try :
3879
- elapsed = self ._time .time () - start
3880
- if elapsed < total_timeout or first_iter :
3881
- peers_query = QueryMessage (query = select_peers_query , consistency_level = cl )
3882
- local_query = QueryMessage (query = self ._SELECT_SCHEMA_LOCAL , consistency_level = cl )
3883
- try :
3884
- timeout = min (self ._timeout , total_timeout - elapsed )
3885
- peers_result , local_result = connection .wait_for_responses (
3886
- peers_query , local_query , timeout = timeout )
3887
- except OperationTimedOut as timeout :
3888
- log .debug ("[control connection] Timed out waiting for "
3889
- "response during schema agreement check: %s" , timeout )
3890
- self ._cluster .scheduler .schedule_unique (0.2 , inner , False )
3891
- return
3892
- except ConnectionShutdown as e :
3893
- if self ._is_shutdown :
3894
- log .debug ("[control connection] Aborting wait for schema match due to shutdown" )
3895
- self ._schema_agreement_lock .release ()
3896
- callback (None )
3897
- return
3898
- else :
3899
- raise
3900
-
3901
- schema_mismatches = self ._get_schema_mismatches (peers_result , local_result , connection .endpoint )
3902
- if schema_mismatches is None :
3903
- self ._schema_agreement_lock .release ()
3904
- callback (True )
3905
- return
3906
-
3907
- log .debug ("[control connection] Schemas mismatched, trying again" )
3908
- self ._cluster .scheduler .schedule_unique (0.2 , inner , False )
3909
- else :
3910
- log .warning ("Node %s is reporting a schema disagreement: %s" ,
3911
- connection .endpoint , schema_mismatches )
3912
- self ._schema_agreement_lock .release ()
3913
- callback (False )
3914
- except Exception as e :
3915
- self ._schema_agreement_lock .release ()
3916
- callback (e )
3917
- inner (True )
3918
3786
3919
3787
def refresh_node_list_and_token_map (self , force_token_rebuild = False ):
3920
3788
try :
@@ -4171,7 +4039,7 @@ def _handle_schema_change(self, event):
4171
4039
if self ._schema_event_refresh_window < 0 :
4172
4040
return
4173
4041
delay = self ._delay_for_event_type ('schema_change' , self ._schema_event_refresh_window )
4174
- self ._cluster .scheduler .schedule_unique (delay , self ._refresh_schema_async , lambda * a , ** k : None , ** event )
4042
+ self ._cluster .scheduler .schedule_unique (delay , self .refresh_schema , ** event )
4175
4043
4176
4044
def wait_for_schema_agreement (self , connection = None , preloaded_results = None , wait_time = None ):
4177
4045
0 commit comments