@@ -54,6 +54,7 @@ def __init__(self):
5454 self ._restart_requested = False
5555 self ._first_run = True # Track if this is the first SDK run or a mid-session restart
5656 self ._otel_initialized = False # Track if OpenTelemetry has been initialized
57+ self ._otel_init_lock = asyncio .Lock () # Thread-safe OTEL initialization
5758
5859 async def initialize (self , context : RunnerContext ):
5960 """Initialize the adapter with context."""
@@ -66,6 +67,43 @@ async def initialize(self, context: RunnerContext):
6667 # Validate prerequisite files exist for phase-based commands
6768 await self ._validate_prerequisites ()
6869
70+ @staticmethod
71+ def _sanitize_user_context (user_id : str , user_name : str ) -> tuple [str , str ]:
72+ """Validate and sanitize user context fields to prevent injection attacks.
73+
74+ Returns:
75+ Tuple of (sanitized_user_id, sanitized_user_name)
76+ """
77+ # Validate user_id: alphanumeric, dash, underscore, at sign only
78+ # Max 255 characters (email addresses can be up to 254 chars)
79+ if user_id :
80+ user_id = str (user_id ).strip ()
81+ if len (user_id ) > 255 :
82+ logging .warning (f"User ID exceeds max length (255), truncating: { len (user_id )} chars" )
83+ user_id = user_id [:255 ]
84+ # Remove any characters that could cause injection issues
85+ import re
86+ sanitized_id = re .sub (r'[^a-zA-Z0-9@._-]' , '' , user_id )
87+ if sanitized_id != user_id :
88+ logging .warning (f"User ID contained invalid characters, sanitized from { len (user_id )} to { len (sanitized_id )} chars" )
89+ user_id = sanitized_id
90+
91+ # Validate user_name: printable ASCII, no control characters
92+ # Max 255 characters
93+ if user_name :
94+ user_name = str (user_name ).strip ()
95+ if len (user_name ) > 255 :
96+ logging .warning (f"User name exceeds max length (255), truncating: { len (user_name )} chars" )
97+ user_name = user_name [:255 ]
98+ # Remove control characters and non-printable characters
99+ import re
100+ sanitized_name = re .sub (r'[\x00-\x1f\x7f-\x9f]' , '' , user_name )
101+ if sanitized_name != user_name :
102+ logging .warning (f"User name contained control characters, sanitized from { len (user_name )} to { len (sanitized_name )} chars" )
103+ user_name = sanitized_name
104+
105+ return user_id , user_name
106+
69107 async def run (self ):
70108 """Run the Claude Code CLI session."""
71109 try :
@@ -191,9 +229,11 @@ async def run(self):
191229 async def _run_claude_agent_sdk (self , prompt : str ):
192230 """Execute the Claude Code SDK with the given prompt."""
193231 try :
194- # Extract user context for observability (used by both Langfuse and OTLP)
195- user_id = os .getenv ('USER_ID' , '' ).strip ()
196- user_name = os .getenv ('USER_NAME' , '' ).strip ()
232+ # Extract and sanitize user context for observability (used by both Langfuse and OTLP)
233+ # This prevents trace poisoning, log injection, and other security issues
234+ raw_user_id = os .getenv ('USER_ID' , '' ).strip ()
235+ raw_user_name = os .getenv ('USER_NAME' , '' ).strip ()
236+ user_id , user_name = self ._sanitize_user_context (raw_user_id , raw_user_name )
197237
198238 # Initialize Langfuse for observability if configured
199239 langfuse_client = None
@@ -209,29 +249,39 @@ async def _run_claude_agent_sdk(self, prompt: str):
209249 )
210250
211251 # Start a span for this session (Langfuse SDK 3.x)
252+ # Note: User tracking is done via OTLP span attributes (enduser.id, user.name)
253+ # Langfuse SDK context managers don't support update_trace()
212254 langfuse_session_span = langfuse_client .start_as_current_span (
213255 name = "claude_agent_session" ,
214256 input = {"prompt" : prompt },
215257 metadata = {
216258 "session_id" : self .context .session_id ,
217259 "namespace" : self .context .get_env ('AGENTIC_SESSION_NAMESPACE' , 'unknown' ),
260+ "user_id" : user_id if user_id else None ,
218261 "user_name" : user_name if user_name else None ,
219262 },
220263 )
221264
222- # Set userId on the trace (trace-level attribute, not span-level)
223265 if user_id :
224- try :
225- langfuse_session_span .update_trace (user_id = user_id )
226- logging .info (f"Langfuse: Tracking session for user { user_name } ({ user_id } )" )
227- except Exception as e :
228- logging .warning (f"Failed to set user_id on trace: { e } " )
266+ logging .info (f"Langfuse: Tracking session for user { user_name } ({ user_id } )" )
229267
230268 logging .info (f"Langfuse tracing enabled for session" )
231269 except Exception as e :
232- logging .warning (f"Failed to initialize Langfuse: { e } " )
233- import traceback
234- logging .warning (traceback .format_exc ())
270+ # Sanitize error message to prevent API key leakage
271+ # Replace any occurrence of public_key or secret_key with [REDACTED]
272+ error_msg = str (e )
273+ public_key = os .getenv ('LANGFUSE_PUBLIC_KEY' , '' )
274+ secret_key = os .getenv ('LANGFUSE_SECRET_KEY' , '' )
275+ if public_key :
276+ error_msg = error_msg .replace (public_key , '[REDACTED_PUBLIC_KEY]' )
277+ if secret_key :
278+ error_msg = error_msg .replace (secret_key , '[REDACTED_SECRET_KEY]' )
279+
280+ # Log sanitized error without full traceback
281+ logging .error (f"Failed to initialize Langfuse observability: { error_msg } " )
282+ logging .debug (f"Langfuse initialization error type: { type (e ).__name__ } " )
283+
284+ # Continue without Langfuse - don't fail the session
235285 langfuse_client = None
236286 langfuse_session_span = None
237287
@@ -291,13 +341,20 @@ async def _run_claude_agent_sdk(self, prompt: str):
291341 otel_provider .add_span_processor (BatchSpanProcessor (otlp_exporter ))
292342
293343 # Set as global tracer provider (only on first run)
294- # This prevents "Overriding of current TracerProvider" warnings on restarts
295- if not self ._otel_initialized :
296- trace .set_tracer_provider (otel_provider )
297- self ._otel_initialized = True
298- logging .info ("Initialized OpenTelemetry TracerProvider" )
299- else :
300- logging .debug ("Reusing existing OpenTelemetry TracerProvider from previous run" )
344+ # Use async lock for thread-safe initialization to prevent race conditions
345+ async with self ._otel_init_lock :
346+ if not self ._otel_initialized :
347+ # Check if a TracerProvider is already set (prevents override warning)
348+ current_provider = trace .get_tracer_provider ()
349+ # If no provider set, or it's the default no-op provider, set ours
350+ if current_provider is None or isinstance (current_provider , trace .ProxyTracerProvider ):
351+ trace .set_tracer_provider (otel_provider )
352+ logging .info ("Initialized OpenTelemetry TracerProvider" )
353+ else :
354+ logging .debug ("TracerProvider already set, reusing existing provider" )
355+ self ._otel_initialized = True
356+ else :
357+ logging .debug ("Reusing existing OpenTelemetry TracerProvider from previous run" )
301358
302359 # Get tracer (works whether we just set it or it was already set)
303360 otel_tracer = trace .get_tracer (__name__ )
@@ -326,9 +383,12 @@ async def _run_claude_agent_sdk(self, prompt: str):
326383
327384 logging .info (f"OpenTelemetry tracing enabled (endpoint: { otel_endpoint } )" )
328385 except Exception as e :
329- logging .warning (f"Failed to initialize OpenTelemetry: { e } " )
330- import traceback
331- logging .warning (traceback .format_exc ())
386+ # Log OTEL initialization failure for operator visibility
387+ logging .error (f"Failed to initialize OpenTelemetry observability: { e } " )
388+ logging .debug (f"OTEL initialization error type: { type (e ).__name__ } " )
389+ logging .debug (f"OTEL endpoint was: { otel_endpoint } " )
390+
391+ # Continue without OTEL - don't fail the session
332392 otel_tracer = None
333393 otel_span = None
334394
@@ -936,16 +996,36 @@ async def process_one_prompt(text: str):
936996 }
937997 except Exception as e :
938998 logging .error (f"Failed to run Claude Code SDK: { e } " )
939- # End Langfuse session span with error if available
999+
1000+ # Clean up observability spans on error path
1001+ # 1. End Langfuse session span with error if available
9401002 if 'langfuse_session_span' in locals () and langfuse_session_span and 'langfuse_client' in locals () and langfuse_client :
9411003 try :
9421004 langfuse_session_span .end (
9431005 level = "ERROR" ,
9441006 status_message = str (e )
9451007 )
9461008 langfuse_client .flush ()
947- except Exception :
948- pass
1009+ except Exception as cleanup_err :
1010+ logging .debug (f"Failed to cleanup Langfuse span: { cleanup_err } " )
1011+
1012+ # 2. End OTEL span with error if available
1013+ if 'otel_span' in locals () and otel_span :
1014+ try :
1015+ otel_span .set_status (trace .Status (trace .StatusCode .ERROR , str (e )))
1016+ otel_span .end ()
1017+ logging .debug ("OTEL span ended with error status" )
1018+ except Exception as cleanup_err :
1019+ logging .debug (f"Failed to cleanup OTEL span: { cleanup_err } " )
1020+
1021+ # 3. Force flush OTEL provider if available
1022+ if 'otel_provider' in locals () and otel_provider :
1023+ try :
1024+ otel_provider .force_flush (timeout_millis = 5000 )
1025+ logging .debug ("OTEL spans flushed on error path" )
1026+ except Exception as cleanup_err :
1027+ logging .debug (f"Failed to flush OTEL provider: { cleanup_err } " )
1028+
9491029 return {
9501030 "success" : False ,
9511031 "error" : str (e )
0 commit comments