@@ -52,6 +52,10 @@ class UnauthorizedException(Exception):
5252 pass
5353
5454
55+ class UnauthenticatedException (Exception ):
56+ pass
57+
58+
5559@dataclass
5660class Session ():
5761
@@ -94,14 +98,20 @@ class Connector(object):
9498
9599 def __init__ (self , host = "localhost" , port = 55555 ,
96100 user = "" , password = "" , token = "" ,
97- use_ssl = True , shared_data = None , authenticate = True ,
101+ use_ssl = True ,
102+ shared_data = None ,
103+ authenticate = True ,
98104 use_keepalive = True ,
99- retry_connect_interval_seconds = 1 ,
100- retry_connect_max_attempts = 3 ,
105+ retry_interval_seconds = 1 ,
106+ retry_max_attempts = 3 ,
101107 config : Configuration = None ):
108+ """
109+ Constructor for the Connector class.
110+ """
102111 self .connected = False
103- self .last_response = ''
112+ self .last_response = ''
104113 self .last_query_time = 0
114+ self .authenticated = False
105115
106116 if config is None :
107117 self .host = host
@@ -117,8 +127,8 @@ def __init__(self, host="localhost", port=55555,
117127 password = password ,
118128 name = "runtime" ,
119129 use_keepalive = use_keepalive ,
120- retry_connect_interval_seconds = retry_connect_interval_seconds ,
121- retry_connect_max_attempts = retry_connect_max_attempts
130+ retry_interval_seconds = retry_interval_seconds ,
131+ retry_max_attempts = retry_max_attempts
122132 )
123133 else :
124134 self .config = config
@@ -128,34 +138,36 @@ def __init__(self, host="localhost", port=55555,
128138 self .use_keepalive = config .use_keepalive
129139
130140 self .conn = None
131- connect_attempts = 1
132- while True :
133- self .connect (
134- details = f"Will retry in { self .config .retry_connect_interval_seconds } seconds" )
135- if self .connected or connect_attempts == self .config .retry_connect_max_attempts :
136- break
137- time .sleep (self .config .retry_connect_interval_seconds )
138- connect_attempts += 1
139141
140- if not self .connected :
141- raise Exception (
142- f"Could not connect to apertureDB server: { self .config } " )
143-
144- if authenticate :
145- self .authenticate (shared_data , user , password , token )
142+ self .token = token
146143
147- def authenticate (self , shared_data , user , password , token ):
148144 if shared_data is None :
149145 self .shared_data = SimpleNamespace ()
150146 self .shared_data .session = None
151147 self .shared_data .lock = Lock ()
152- try :
153- self ._authenticate (user , password , token )
154- except Exception as e :
155- raise Exception ("Authentication failed:" , str (e ))
156148 else :
157149 self .shared_data = shared_data
158150
151+ self .should_authenticate = authenticate
152+
153+ def authenticate (self , shared_data , user , password , token ):
154+ """
155+ Authenticate with the database. This will be called automatically from query.
156+ This is separate from session refresh mechanism, and is called only once.
157+ Failure leads to exception.
158+ """
159+ if not self .authenticated :
160+ if shared_data .session is None :
161+ self .shared_data .lock = Lock ()
162+ try :
163+ self ._authenticate (user , password , token )
164+ except Exception as e :
165+ raise UnauthenticatedException (
166+ "Authentication failed:" , str (e ))
167+ else :
168+ self .shared_data = shared_data
169+ self .authenticated = True
170+
159171 def __del__ (self ):
160172 if self .connected :
161173 self .conn .close ()
@@ -300,7 +312,8 @@ def _connect(self):
300312 self .conn = self .context .wrap_socket (self .conn )
301313
302314 except BaseException as e :
303- logger .error (f"Error connecting to server: { str (e )} { self .config } " )
315+ logger .error (
316+ f"Error connecting to server: { str (e )} { self .config } " , exc_info = True , stack_info = True )
304317 self .conn .close ()
305318 self .connected = False
306319 raise
@@ -343,7 +356,7 @@ def _query(self, query, blob_array = [], try_resume=True):
343356
344357 # this is for session refresh attempts
345358 tries = 0
346- while tries < 3 :
359+ while tries < self . config . retry_max_attempts :
347360 try :
348361 if self ._send_msg (data ):
349362 response = self ._recv_msg ()
@@ -362,23 +375,32 @@ def _query(self, query, blob_array = [], try_resume=True):
362375 except OSError as ose :
363376 logger .exception (ose )
364377 logger .warning (f"OS error on process { os .getpid ()} " )
378+ except AttributeError as ae :
379+ if self .connected :
380+ # Only log if we got this while connected.
381+ # else it is expected after unification of query/connect
382+ logger .exception (ae )
383+ logger .warning (f"Attribute error on process { os .getpid ()} " )
384+
365385 tries += 1
366386 logger .warning (
367- f"Connection broken. Reconnectng attempt [{ tries } /{ self .config .retry_connect_max_attempts } ] .. PID = { os .getpid ()} " )
368- self .conn .close ()
369- self .connected = False
387+ f"Connection broken. Reconnecting attempt [{ tries } /{ self .config .retry_max_attempts } ] .. PID = { os .getpid ()} " )
388+
389+ if self .connected :
390+ self .conn .close ()
391+ self .connected = False
370392
371393 self .connect (
372- details = f"Will retry in { self .config .retry_connect_interval_seconds } seconds" )
373- time .sleep (self .config .retry_connect_interval_seconds )
394+ details = f"Will retry in { self .config .retry_interval_seconds } seconds" )
395+ time .sleep (self .config .retry_interval_seconds )
374396
375397 # Try to resume the session, in cases where the connection is severed.
376398 # For example aperturedb server is restarted, or network is lost.
377399 # While this is useful bit of code, when executed in a refresh token
378400 # path, this can cause a deadlock. Hence the try_resume flag.
379401 if try_resume :
380402 self ._renew_session ()
381- if tries == self .config .retry_connect_max_attempts :
403+ if tries == self .config .retry_max_attempts :
382404 raise Exception (
383405 f"Could not query apertureDB using TCP." )
384406 return (self .last_response , response_blob_array )
@@ -399,7 +421,13 @@ def query(self, q, blobs=[]):
399421 Returns:
400422 _type_: _description_
401423 """
402- self ._renew_session ()
424+ if self .should_authenticate :
425+ self .authenticate (
426+ shared_data = self .shared_data ,
427+ user = self .config .username ,
428+ password = self .config .password ,
429+ token = self .token )
430+
403431 try :
404432 start = time .time ()
405433 self .response , self .blobs = self ._query (q , blobs )
@@ -410,14 +438,15 @@ def query(self, q, blobs=[]):
410438 # Hope is that the query send won't be longer than the session
411439 # ttl.
412440 logger .warning (
413- f"Session expired while query was sent. Retrying... { self .config } " , stack_info = True )
441+ f"Session expired while query was sent. Retrying... { self .config } " )
414442 self ._renew_session ()
415443 start = time .time ()
416444 self .response , self .blobs = self ._query (q , blobs )
417445 self .last_query_time = time .time () - start
418446 return self .response , self .blobs
419447 except BaseException as e :
420- logger .critical ("Failed to query" , exc_info = True , stack_info = True )
448+ logger .critical ("Failed to query" ,
449+ exc_info = True , stack_info = True )
421450 raise
422451
423452 def _renew_session (self ):
@@ -436,6 +465,9 @@ def create_new_connection(self) -> Connector:
436465 return type (self )(
437466 self .host ,
438467 self .port ,
468+ self .config .username ,
469+ self .config .password ,
470+ self .token ,
439471 use_ssl = self .use_ssl ,
440472 shared_data = self .shared_data )
441473
0 commit comments