Skip to content

Commit e1c8562

Browse files
authored
Add session management during shutdown and improve session handling (#915)
* Add session management during shutdown and improve session handling * Enhance span finalization documentation and error handling
1 parent e01e162 commit e1c8562

File tree

4 files changed

+141
-40
lines changed

4 files changed

+141
-40
lines changed

agentops/client/client.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
from typing import List, Optional, Union
2+
import atexit
23

34
from agentops.client.api import ApiClient
45
from agentops.config import Config
@@ -8,6 +9,28 @@
89
from agentops.logging.config import configure_logging, intercept_opentelemetry_logging
910
from agentops.sdk.core import TracingCore
1011

12+
# Global registry for active session
13+
_active_session = None
14+
15+
# Single atexit handler registered flag
16+
_atexit_registered = False
17+
18+
def _end_active_session():
19+
"""Global handler to end the active session during shutdown"""
20+
global _active_session
21+
if _active_session is not None:
22+
logger.debug("Auto-ending active session during shutdown")
23+
try:
24+
from agentops.legacy import end_session
25+
end_session(_active_session)
26+
except Exception as e:
27+
logger.warning(f"Error ending active session during shutdown: {e}")
28+
# Final fallback: try to end the span directly
29+
try:
30+
if hasattr(_active_session, 'span') and hasattr(_active_session.span, 'end'):
31+
_active_session.span.end()
32+
except:
33+
pass
1134

1235
class Client:
1336
"""Singleton client for AgentOps service"""
@@ -61,6 +84,12 @@ def init(self, **kwargs):
6184

6285
self.initialized = True
6386

87+
# Register a single global atexit handler for session management
88+
global _atexit_registered
89+
if not _atexit_registered:
90+
atexit.register(_end_active_session)
91+
_atexit_registered = True
92+
6493
# Start a session if auto_start_session is True
6594
session = None
6695
if self.config.auto_start_session:
@@ -71,6 +100,10 @@ def init(self, **kwargs):
71100
session = start_session(tags=list(self.config.default_tags))
72101
else:
73102
session = start_session()
103+
104+
# Register this session globally
105+
global _active_session
106+
_active_session = session
74107

75108
return session
76109

agentops/legacy/__init__.py

Lines changed: 53 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -132,15 +132,21 @@ def start_session(
132132
# Pass auto_start_session=False to prevent circular dependency
133133
Client().init(auto_start_session=False)
134134

135-
span, context, token = _create_session_span(tags)
135+
span, ctx, token = _create_session_span(tags)
136136
session = Session(span, token)
137137

138138
# Set the global session reference
139139
_current_session = session
140140

141+
# Also register with the client's session registry for consistent behavior
142+
try:
143+
import agentops.client.client
144+
agentops.client.client._active_session = session
145+
except Exception:
146+
pass
147+
141148
return session
142149

143-
144150
def _set_span_attributes(span: Any, attributes: Dict[str, Any]) -> None:
145151
"""
146152
Helper to set attributes on a span.
@@ -202,6 +208,18 @@ def end_session(session_or_status: Any = None, **kwargs) -> None:
202208
logger.debug("Ignoring end_session call - TracingCore not initialized")
203209
return
204210

211+
# Clear client active session reference
212+
try:
213+
import agentops.client.client
214+
if session_or_status is None and kwargs:
215+
if _current_session is agentops.client.client._active_session:
216+
agentops.client.client._active_session = None
217+
elif hasattr(session_or_status, 'span'):
218+
if session_or_status is agentops.client.client._active_session:
219+
agentops.client.client._active_session = None
220+
except Exception:
221+
pass
222+
205223
# In some old implementations, and in crew < 0.10.5 `end_session` will be
206224
# called with a single string as a positional argument like: "Success"
207225

@@ -216,24 +234,45 @@ def end_session(session_or_status: Any = None, **kwargs) -> None:
216234
# )
217235
if session_or_status is None and kwargs:
218236
if _current_session is not None:
219-
_set_span_attributes(_current_session.span, kwargs)
220-
_finalize_span(_current_session.span, _current_session.token)
221-
_flush_span_processors()
222-
_current_session = None
237+
try:
238+
_set_span_attributes(_current_session.span, kwargs)
239+
_finalize_span(_current_session.span, _current_session.token)
240+
_flush_span_processors()
241+
_current_session = None
242+
except Exception as e:
243+
logger.warning(f"Error ending current session: {e}")
244+
# Fallback: try direct span ending
245+
try:
246+
if hasattr(_current_session.span, "end"):
247+
_current_session.span.end()
248+
_current_session = None
249+
except:
250+
pass
223251
return
224252

225253
# Handle the standard pattern and CrewAI >= 0.105.0 pattern where a Session object is passed.
226254
# In both cases, we call _finalize_span with the span and token from the Session.
227255
# This is the most direct and precise way to end a specific session.
228256
if hasattr(session_or_status, 'span') and hasattr(session_or_status, 'token'):
229-
# Set attributes and finalize the span
230-
_set_span_attributes(session_or_status.span, kwargs)
231-
_finalize_span(session_or_status.span, session_or_status.token)
232-
_flush_span_processors()
233-
234-
# Clear the global session reference if this is the current session
235-
if _current_session is session_or_status:
236-
_current_session = None
257+
try:
258+
# Set attributes and finalize the span
259+
_set_span_attributes(session_or_status.span, kwargs)
260+
_finalize_span(session_or_status.span, session_or_status.token)
261+
_flush_span_processors()
262+
263+
# Clear the global session reference if this is the current session
264+
if _current_session is session_or_status:
265+
_current_session = None
266+
except Exception as e:
267+
logger.warning(f"Error ending session object: {e}")
268+
# Fallback: try direct span ending
269+
try:
270+
if hasattr(session_or_status.span, "end"):
271+
session_or_status.span.end()
272+
if _current_session is session_or_status:
273+
_current_session = None
274+
except:
275+
pass
237276

238277

239278
def end_all_sessions():

agentops/sdk/core.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
from opentelemetry.sdk.resources import Resource
1919
from opentelemetry.sdk.trace import SpanProcessor, TracerProvider
2020
from opentelemetry.sdk.trace.export import BatchSpanProcessor
21+
from opentelemetry import context as context_api
2122

2223
from agentops.exceptions import AgentOpsClientNotInitializedException
2324
from agentops.logging import logger
@@ -169,6 +170,9 @@ def setup_telemetry(
169170
meter_provider = MeterProvider(resource=resource, metric_readers=[metric_reader])
170171
metrics.set_meter_provider(meter_provider)
171172

173+
# Initialize root context
174+
context_api.get_current()
175+
172176
logger.debug("Telemetry system initialized")
173177

174178
return provider, meter_provider

agentops/sdk/decorators/utility.py

Lines changed: 51 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -164,10 +164,6 @@ def _make_span(
164164
- context is the span context
165165
- token is the context token needed for detaching
166166
"""
167-
# Log before we do anything
168-
before_span = _get_current_span_info()
169-
logger.debug(f"[DEBUG] BEFORE _make_span {operation_name}.{span_kind} - Current context: {before_span}")
170-
171167
# Create span with proper naming convention
172168
span_name = f"{operation_name}.{span_kind}"
173169

@@ -186,20 +182,19 @@ def _make_span(
186182
if version is not None:
187183
attributes[SpanAttributes.OPERATION_VERSION] = version
188184

189-
# Get current context explicitly
190185
current_context = context_api.get_current()
191186

192-
# Create the span with explicit context
193-
span = tracer.start_span(span_name, context=current_context, attributes=attributes)
194-
195-
# Set as current context and get token for later detachment
187+
# Create the span with proper context management
188+
if span_kind == SpanKind.SESSION:
189+
# For session spans, create as a root span
190+
span = tracer.start_span(span_name, attributes=attributes)
191+
else:
192+
# For other spans, use the current context
193+
span = tracer.start_span(span_name, context=current_context, attributes=attributes)
194+
195+
# Set as current context and get token for detachment
196196
ctx = trace.set_span_in_context(span)
197197
token = context_api.attach(ctx)
198-
199-
# Log after span creation
200-
if hasattr(span, "get_span_context"):
201-
span_ctx = span.get_span_context()
202-
logger.debug(f"[DEBUG] CREATED _make_span {span_name} - span_id: {span_ctx.span_id:x}, parent: {before_span.get('span_id', 'None')}")
203198

204199
return span, ctx, token
205200

@@ -232,19 +227,49 @@ def _record_entity_output(span: trace.Span, result: Any) -> None:
232227

233228

234229
def _finalize_span(span: trace.Span, token: Any) -> None:
235-
"""End the span and detach the context token"""
236-
if hasattr(span, "get_span_context") and hasattr(span.get_span_context(), "span_id"):
237-
span_id = f"{span.get_span_context().span_id:x}"
238-
logger.debug(f"[DEBUG] ENDING span {getattr(span, 'name', 'unknown')} - span_id: {span_id}")
230+
"""
231+
Finalizes a span and cleans up its context.
239232
240-
span.end()
233+
This function performs three critical tasks needed for proper span lifecycle management:
234+
1. Ends the span to mark it complete and calculate its duration
235+
2. Detaches the context token to prevent memory leaks and maintain proper context hierarchy
236+
3. Forces immediate span export rather than waiting for batch processing
237+
238+
Use cases:
239+
- Session span termination: Ensures root spans are properly ended and exported
240+
- Shutdown handling: Ensures spans are flushed during application termination
241+
- Async operations: Finalizes spans from asynchronous execution contexts
241242
242-
# Debug info before detaching
243-
current_after_end = _get_current_span_info()
244-
logger.debug(f"[DEBUG] AFTER span.end() - Current context: {current_after_end}")
243+
Without proper finalization, spans may not trigger on_end events in processors,
244+
potentially resulting in missing or incomplete telemetry data.
245245
246-
context_api.detach(token)
246+
Args:
247+
span: The span to finalize
248+
token: The context token to detach
249+
"""
250+
# End the span
251+
if span:
252+
try:
253+
span.end()
254+
except Exception as e:
255+
logger.warning(f"Error ending span: {e}")
256+
257+
# Detach context token if provided
258+
if token:
259+
try:
260+
context_api.detach(token)
261+
except Exception:
262+
pass
247263

248-
# Debug info after detaching
249-
final_context = _get_current_span_info()
250-
logger.debug(f"[DEBUG] AFTER detach - Final context: {final_context}")
264+
# Try to flush span processors
265+
# Note: force_flush() might not be available in certain scenarios:
266+
# - During application shutdown when the provider may be partially destroyed
267+
# We use try/except to gracefully handle these cases while ensuring spans are
268+
# flushed when possible, which is especially critical for session spans.
269+
try:
270+
from opentelemetry.trace import get_tracer_provider
271+
tracer_provider = get_tracer_provider()
272+
tracer_provider.force_flush()
273+
except (AttributeError, Exception):
274+
# Either force_flush doesn't exist or there was an error calling it
275+
pass

0 commit comments

Comments
 (0)