Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions agentops/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,9 @@ def __init__(
self.host_env = host_env
self.config = config
self.jwt = None
self._event_queue = []
self._lock = threading.Lock()
self._jwt_lock = threading.Lock()
self._end_session_lock = threading.Lock()
self.token_cost: Decimal = Decimal(0)
self._session_url: str = ""
Expand All @@ -239,6 +241,12 @@ def __init__(
}
# self.session_url: Optional[str] = None

self.is_running = False # Start as not running

# Start JWT fetch in the background
self._jwt_thread = threading.Thread(target=self._start_session_async)
self._jwt_thread.start()

# Start session first to get JWT
self.is_running = self._start_session()
if not self.is_running:
Expand Down Expand Up @@ -351,6 +359,17 @@ def end_session(
)
logger.info(analytics)

def cleanup():
try:
if hasattr(self, "_span_processor"):
self._span_processor.force_flush(timeout_millis=5000)
self._span_processor.shutdown()
except Exception as e:
logger.warning(f"Error during cleanup: {e}")

cleanup_thread = threading.Thread(target=cleanup)
cleanup_thread.start()

except Exception as e:
logger.exception(f"Error during session end: {e}")
finally:
Expand Down Expand Up @@ -406,6 +425,12 @@ def record(self, event: Union[Event, ErrorEvent], flush_now=False):
"""Record an event using OpenTelemetry spans"""
if not self.is_running:
return
# Check if JWT is available
if self.jwt is None:
with self._jwt_lock:
if self.jwt is None:
self._event_queue.append(event)
return

# Ensure event has all required base attributes
if not hasattr(event, "id"):
Expand Down Expand Up @@ -508,6 +533,18 @@ def _reauthorize_jwt(self) -> Union[str, None]:
self.jwt = jwt
return jwt

def _start_session_async(self):
success = self._start_session()
if success:
self.is_running = True
# Process queued events
with self._jwt_lock:
for event in self._event_queue:
self.record(event)
self._event_queue.clear()
else:
self.is_running = False

def _start_session(self):
with self._lock:
payload = {"session": self.__dict__}
Expand Down
Loading