diff --git a/agentops/session.py b/agentops/session.py index 95d1fba15..ba4faa2e8 100644 --- a/agentops/session.py +++ b/agentops/session.py @@ -226,7 +226,9 @@ def __init__( self.host_env = host_env self.config = config self.jwt = None + self._event_queue = [] self._lock = threading.Lock() + self._jwt_lock = threading.Lock() self._end_session_lock = threading.Lock() self.token_cost: Decimal = Decimal(0) self._session_url: str = "" @@ -239,6 +241,12 @@ def __init__( } # self.session_url: Optional[str] = None + self.is_running = False # Start as not running + + # Start JWT fetch in the background + self._jwt_thread = threading.Thread(target=self._start_session_async) + self._jwt_thread.start() + # Start session first to get JWT self.is_running = self._start_session() if not self.is_running: @@ -351,6 +359,17 @@ def end_session( ) logger.info(analytics) + def cleanup(): + try: + if hasattr(self, "_span_processor"): + self._span_processor.force_flush(timeout_millis=5000) + self._span_processor.shutdown() + except Exception as e: + logger.warning(f"Error during cleanup: {e}") + + cleanup_thread = threading.Thread(target=cleanup) + cleanup_thread.start() + except Exception as e: logger.exception(f"Error during session end: {e}") finally: @@ -406,6 +425,12 @@ def record(self, event: Union[Event, ErrorEvent], flush_now=False): """Record an event using OpenTelemetry spans""" if not self.is_running: return + # Check if JWT is available + if self.jwt is None: + with self._jwt_lock: + if self.jwt is None: + self._event_queue.append(event) + return # Ensure event has all required base attributes if not hasattr(event, "id"): @@ -508,6 +533,18 @@ def _reauthorize_jwt(self) -> Union[str, None]: self.jwt = jwt return jwt + def _start_session_async(self): + success = self._start_session() + if success: + self.is_running = True + # Process queued events + with self._jwt_lock: + for event in self._event_queue: + self.record(event) + self._event_queue.clear() + else: + self.is_running = False + def _start_session(self): with self._lock: payload = {"session": self.__dict__}