1616import orjson
1717import logging
1818from pygqlc .helper_modules .Singleton import Singleton
19+ from pygqlc .logging import log , LogLevel
1920from tenacity import (
2021 retry ,
2122 retry_if_result ,
@@ -417,7 +418,7 @@ def subscribe(
417418 # ! initialize websocket only once
418419 if not self ._conn :
419420 if not self ._new_conn ():
420- print ( 'Error creating WSS connection for subscription' )
421+ log ( LogLevel . ERROR , 'Error creating WSS connection for subscription' )
421422 return None
422423
423424 _cb = callback if callback is not None else self ._on_message
@@ -446,21 +447,20 @@ def unsubscribe():
446447 def _unsubscribe (self , _id ):
447448 sub = self .subs .get (_id )
448449 if not sub :
449- print ( 'Subscription already cleared' )
450+ log ( LogLevel . INFO , 'Subscription already cleared' )
450451 return
451452 self .unsubscribing = True
452453 sub ['kill' ] = True
453454 try :
454455 self ._stop (_id )
455456 except BrokenPipeError as e :
456- print ('WSS Pipe broken, nothing to stop' )
457- print (f'original message: { e } ' )
457+ log (LogLevel .WARNING , 'WSS Pipe broken, nothing to stop' )
458458 sub ['thread' ].join ()
459459 sub ['running' ] = False
460460 self .unsubscribing = False
461461
462462 def _sub_routing_loop (self ):
463- print ( 'first subscription, starting routing loop' )
463+ log ( LogLevel . INFO , 'first subscription, starting routing loop' )
464464 last_reconnect_attempt = 0
465465 reconnect_delay = 1.0
466466
@@ -469,13 +469,13 @@ def _sub_routing_loop(self):
469469 # Rate limit reconnection attempts
470470 current_time = time .time ()
471471 if current_time - last_reconnect_attempt >= reconnect_delay :
472- print ( 'Connection halted, attempting reconnection...' )
472+ log ( LogLevel . INFO , 'Connection halted, attempting reconnection...' )
473473 if self ._new_conn ():
474474 self .wss_conn_halted = False
475- print (
475+ log ( LogLevel . INFO ,
476476 'WSS Reconnection succeeded, attempting resubscription to lost subs' )
477477 self ._resubscribe_all ()
478- print ( 'finished resubscriptions' )
478+ log ( LogLevel . INFO , 'finished resubscriptions' )
479479 reconnect_delay = 1.0 # Reset delay on success
480480 else :
481481 # Use exponential backoff for reconnection attempts (up to 5 seconds)
@@ -513,8 +513,7 @@ def _sub_routing_loop(self):
513513 continue
514514 except Exception as e :
515515 if not self .closing :
516- print (f'Some error trying to receive WSS' )
517- print (f'original message: { e } ' )
516+ log (LogLevel .ERROR , 'Some error trying to receive WSS' )
518517 self .wss_conn_halted = True
519518 continue
520519
@@ -533,7 +532,7 @@ def _sub_routing_loop(self):
533532 elif message_type == PONG_TYPE :
534533 pass
535534 else :
536- print ( f'unknown msg type: { message } ' )
535+ log ( LogLevel . WARNING , f'unknown msg type: { message } ' )
537536
538537 # Use non-blocking sleep
539538 time .sleep (self .poll_interval )
@@ -575,7 +574,8 @@ def _subscription_loop(self, _cb, _id, _ecb):
575574 self .subs [_id ].update ({'running' : True , 'starting' : False })
576575 while self .subs [_id ]['running' ]:
577576 if self .subs [_id ]['kill' ]:
578- print (f'stopping subscription id={ _id } on Unsubscribe' )
577+ log (LogLevel .INFO ,
578+ f'stopping subscription id={ _id } on Unsubscribe' )
579579 break
580580
581581 # Get message without copying the queue
@@ -591,22 +591,24 @@ def _subscription_loop(self, _cb, _id, _ecb):
591591 elif message_type == ERROR_TYPE :
592592 if _ecb :
593593 _ecb (message )
594- print (f'stopping subscription id={ _id } on { message_type } ' )
594+ log (LogLevel .WARNING ,
595+ f'stopping subscription id={ _id } on { message_type } ' )
595596 break
596597 elif message_type == COMPLETE_TYPE :
597- print (f'stopping subscription id={ _id } on { message_type } ' )
598+ log (LogLevel .INFO ,
599+ f'stopping subscription id={ _id } on { message_type } ' )
598600 break
599601 else :
600- print ( f'unknown msg type: { message } ' )
602+ log ( LogLevel . WARNING , f'unknown msg type: { message } ' )
601603 continue
602604
603605 # Payload handling
604606 if is_ws_payloadErrors_msg (message ):
605607 if _ecb :
606608 _ecb (message )
607609 continue
608- print ( 'Subscription message has payload Errors' )
609- print ( message )
610+ log ( LogLevel . ERROR , 'Subscription message has payload Errors' )
611+ log ( LogLevel . ERROR , f' { message } ' )
610612 elif is_ws_connection_init_msg (message ):
611613 # Subscription successfully initialized
612614 pass
@@ -617,19 +619,21 @@ def _subscription_loop(self, _cb, _id, _ecb):
617619 _cb (gql_msg ) # execute callback function
618620 # Increment counter without locking
619621 self .subs [_id ]['runs' ] += 1
620- except Exception as e :
621- print ( f'Error on subscription callback: { e } ' )
622+ except Exception as _e :
623+ log ( LogLevel . ERROR , f'Error on subscription callback' )
622624 sub_query = self .subs [_id ].get ('query' )
623625 sub_variables = self .subs [_id ].get ('variables' )
624626 if sub_query :
625- print (f'subscription document: \n \t { sub_query } ' )
627+ log (LogLevel .ERROR ,
628+ f'subscription document: \n \t { sub_query } ' )
626629 if sub_variables :
627- print (f'subscription variables: \n \t { sub_variables } ' )
628- print (traceback .format_exc ())
630+ log (LogLevel .ERROR ,
631+ f'subscription variables: \n \t { sub_variables } ' )
632+ log (LogLevel .ERROR , traceback .format_exc ())
629633
630634 # Subscription stopped, update state atomically
631635 self .subs [_id ].update ({'running' : False , 'kill' : True })
632- print ( f'Subscription id={ _id } stopped' )
636+ log ( LogLevel . INFO , f'Subscription id={ _id } stopped' )
633637
634638 def _clean_sub_message (self , _id , message ):
635639 data = py_ .get (message , 'payload' , {})
@@ -644,8 +648,7 @@ def _new_conn(self):
644648 self ._conn_init ()
645649 return True
646650 except Exception as e :
647- print (f'Failed connecting to { self .ws_url } ' )
648- print (f'original message: { e } ' )
651+ log (LogLevel .ERROR , f'Failed connecting to { self .ws_url } ' )
649652 return False
650653
651654 def close (self ):
@@ -655,7 +658,7 @@ def close(self):
655658 # ! ask subscription message router to stop
656659 self .closing = True
657660 if not self .sub_router_thread :
658- print ( 'connection not stablished, nothing to close' )
661+ log ( LogLevel . INFO , 'connection not stablished, nothing to close' )
659662 self .closing = False
660663 return
661664 for sub in self .subs .values ():
@@ -721,8 +724,8 @@ def _ping_pong(self):
721724 # No need to log normal ping operations
722725 except Exception as e :
723726 if not self .closing :
724- print ( 'error trying to send ping, WSS Pipe is broken' )
725- print ( f'original message: { e } ' )
727+ log ( LogLevel . ERROR ,
728+ 'error trying to send ping, WSS Pipe is broken ' )
726729 self .wss_conn_halted = True
727730
728731 def _registerSub (self , _id = None ):
@@ -747,19 +750,20 @@ def resetSubsConnection(self):
747750 (boolean): Returns if the reconnection has been possible.
748751 """
749752 if not self .sub_router_thread :
750- print ( 'connection not stablished, nothing to reset' )
753+ log ( LogLevel . INFO , 'connection not stablished, nothing to reset' )
751754 return False
752755 if self .sub_router_thread .is_alive (): # check that _sub_routing_loop() is running
753756 self ._conn .close () # forces connection halted (wss_conn_halted)
754757 return True
755758 # in case for some reason _sub_routing_loop() is not running
756759 if self ._new_conn ():
757- print ('WSS Reconnection succeeded, attempting resubscription to lost subs' )
760+ log (LogLevel .INFO ,
761+ 'WSS Reconnection succeeded, attempting resubscription to lost subs' )
758762 self ._resubscribe_all ()
759- print ( 'finished resubscriptions' )
763+ log ( LogLevel . INFO , 'finished resubscriptions' )
760764 return True
761765 else :
762- print ( 'Reconnection has not been possible' )
766+ log ( LogLevel . ERROR , 'Reconnection has not been possible' )
763767 return False
764768
765769 # * END SUBSCRIPTION functions ******************************
@@ -1210,7 +1214,8 @@ async def async_cleanup(self):
12101214 await self ._async_client .aclose ()
12111215 except Exception as e : # pylint: disable=broad-except
12121216 # If closing fails, log but continue
1213- print (f"Warning: Error closing async client: { str (e )} " )
1217+ log (LogLevel .WARNING ,
1218+ f"Warning: Error closing async client: { str (e )} " )
12141219 finally :
12151220 # Always set to None to allow garbage collection and recreation
12161221 self ._async_client = None
0 commit comments