Skip to content

Commit e7c5a2c

Browse files
Ambient Code Botclaude
authored andcommitted
Simplify Langfuse instrumentation - use SDK's authoritative turn numbers only
🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com> Signed-off-by: sallyom <somalley@redhat.com>
1 parent 39aab05 commit e7c5a2c

File tree

7 files changed

+283
-59
lines changed

7 files changed

+283
-59
lines changed

components/operator/internal/handlers/sessions.go

Lines changed: 26 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -140,15 +140,29 @@ func handleAgenticSessionEvent(obj *unstructured.Unstructured) error {
140140
log.Printf("Successfully deleted job %s for stopped session", jobName)
141141
}
142142

143-
// IMPORTANT: Do NOT explicitly delete pods here
144-
// Job deletion with Foreground propagation will automatically cascade to pods
145-
// Explicit pod deletion bypasses TerminationGracePeriodSeconds
146-
//
147-
// Kubernetes will:
148-
// 1. Send SIGTERM to container for graceful shutdown
149-
// 2. Wait up to TerminationGracePeriodSeconds (30s default) for graceful exit
150-
// 3. Send SIGKILL if still running after grace period
151-
log.Printf("Pods for job %s will be deleted automatically by Kubernetes", jobName)
143+
// Then, explicitly delete all pods for this job (by job-name label)
144+
podSelector := fmt.Sprintf("job-name=%s", jobName)
145+
log.Printf("Deleting pods with job-name selector: %s", podSelector)
146+
err = config.K8sClient.CoreV1().Pods(sessionNamespace).DeleteCollection(context.TODO(), v1.DeleteOptions{}, v1.ListOptions{
147+
LabelSelector: podSelector,
148+
})
149+
if err != nil && !errors.IsNotFound(err) {
150+
log.Printf("Failed to delete pods for job %s: %v (continuing anyway)", jobName, err)
151+
} else {
152+
log.Printf("Successfully deleted pods for job %s", jobName)
153+
}
154+
155+
// Also delete any pods labeled with this session (in case owner refs are lost)
156+
sessionPodSelector := fmt.Sprintf("agentic-session=%s", name)
157+
log.Printf("Deleting pods with agentic-session selector: %s", sessionPodSelector)
158+
err = config.K8sClient.CoreV1().Pods(sessionNamespace).DeleteCollection(context.TODO(), v1.DeleteOptions{}, v1.ListOptions{
159+
LabelSelector: sessionPodSelector,
160+
})
161+
if err != nil && !errors.IsNotFound(err) {
162+
log.Printf("Failed to delete session-labeled pods: %v (continuing anyway)", err)
163+
} else {
164+
log.Printf("Successfully deleted session-labeled pods")
165+
}
152166
} else {
153167
log.Printf("Job %s already completed (Succeeded: %d, Failed: %d), no cleanup needed", jobName, job.Status.Succeeded, job.Status.Failed)
154168
}
@@ -299,9 +313,9 @@ func handleAgenticSessionEvent(obj *unstructured.Unstructured) error {
299313

300314
// Hardcoded secret names (convention over configuration)
301315
const runnerSecretsName = "ambient-runner-secrets" // ANTHROPIC_API_KEY only (ignored when Vertex enabled)
302-
const integrationSecretsName = "ambient-non-vertex-integrations" // GIT_*, JIRA_*, custom keys (optional, NO Langfuse keys)
316+
const integrationSecretsName = "ambient-non-vertex-integrations" // GIT_*, JIRA_*, custom keys (optional)
303317

304-
// Check if integration secrets exist (user-provided integrations like GIT_TOKEN, JIRA_*)
318+
// Check if integration secrets exist (optional)
305319
integrationSecretsExist := false
306320
if _, err := config.K8sClient.CoreV1().Secrets(sessionNamespace).Get(context.TODO(), integrationSecretsName, v1.GetOptions{}); err == nil {
307321
integrationSecretsExist = true
@@ -383,8 +397,6 @@ func handleAgenticSessionEvent(obj *unstructured.Unstructured) error {
383397
},
384398
Spec: corev1.PodSpec{
385399
RestartPolicy: corev1.RestartPolicyNever,
386-
// Default grace period for graceful shutdown
387-
TerminationGracePeriodSeconds: int64Ptr(30),
388400
// Explicitly set service account for pod creation permissions
389401
AutomountServiceAccountToken: boolPtr(false),
390402
Volumes: []corev1.Volume{
@@ -611,7 +623,7 @@ func handleAgenticSessionEvent(obj *unstructured.Unstructured) error {
611623
}(),
612624

613625
// Import secrets as environment variables
614-
// - integrationSecretsName: Only if exists (GIT_TOKEN, JIRA_*, custom keys - NO Langfuse keys)
626+
// - integrationSecretsName: Only if exists (GIT_TOKEN, JIRA_*, custom keys)
615627
// - runnerSecretsName: Only when Vertex disabled (ANTHROPIC_API_KEY)
616628
// - ambient-langfuse-keys: Platform-wide Langfuse observability (LANGFUSE_PUBLIC_KEY, LANGFUSE_SECRET_KEY, LANGFUSE_HOST, LANGFUSE_ENABLED)
617629
EnvFrom: func() []corev1.EnvFromSource {
@@ -641,10 +653,6 @@ func handleAgenticSessionEvent(obj *unstructured.Unstructured) error {
641653
log.Printf("Skipping runner secrets '%s' for session %s (Vertex enabled)", runnerSecretsName, name)
642654
}
643655

644-
// Note: Platform-wide Langfuse observability keys are injected via explicit Env entries above
645-
// (LANGFUSE_* env vars from ambient-admin-langfuse-secret Secret, platform-admin managed)
646-
// EnvFrom is intentionally NOT used here to prevent automatic exposure of future secret keys
647-
648656
return sources
649657
}(),
650658

components/runners/claude-code-runner/observability.py

Lines changed: 41 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,12 @@
44
Provides Langfuse LLM observability for Claude sessions with trace structure:
55
66
1. Turn Traces (top-level generations):
7-
- Each turn is a separate trace
8-
- Named: claude_turn_1, claude_turn_2, etc.
9-
- Contains authoritative usage data from Claude SDK
7+
- ONE trace per turn (SDK sends multiple AssistantMessages during streaming, but guard prevents duplicates)
8+
- Named: "claude_interaction" (turn number stored in metadata)
9+
- First AssistantMessage creates trace, subsequent ones ignored until end_turn() clears it
10+
- Final trace contains authoritative turn number and usage data from ResultMessage
1011
- Canonical format with separate cache token tracking for accurate cost
11-
- All turns grouped by session_id via propagate_attributes()
12+
- All traces grouped by session_id via propagate_attributes()
1213
1314
2. Tool Spans (observations within turn traces):
1415
- Named: tool_Read, tool_Write, tool_Bash, etc.
@@ -18,21 +19,20 @@
1819
1920
Architecture:
2021
- Session-based grouping via propagate_attributes() with session_id and user_id
21-
- Each turn is an independent trace (not nested under a session trace)
22+
- Each turn creates ONE independent trace (not nested under session)
2223
- Langfuse automatically aggregates tokens and costs across all traces with same session_id
23-
- Filter by session_id, user_id, or model in Langfuse UI
24+
- Filter by session_id, user_id, model, or metadata.turn in Langfuse UI
25+
- Sessions can be paused/resumed: each turn creates a trace regardless of session lifecycle
2426
2527
Trace Hierarchy:
26-
claude_turn_1 (trace - generation)
28+
claude_interaction (trace - generation, metadata: {turn: 1})
2729
├── tool_Read (observation - span)
2830
└── tool_Write (observation - span)
2931
30-
claude_turn_2 (trace - generation)
32+
claude_interaction (trace - generation, metadata: {turn: 2})
3133
└── tool_Bash (observation - span)
3234
33-
claude_turn_3 (trace - generation)
34-
35-
Usage Format (turn-level only):
35+
Usage Format:
3636
{
3737
"input": int, # Regular input tokens
3838
"output": int, # Output tokens
@@ -183,71 +183,73 @@ async def initialize(self, prompt: str, namespace: str, model: str = None) -> bo
183183
self._propagate_ctx = None
184184
return False
185185

186-
def start_turn(self, turn_count: int, model: str, user_input: str | None = None) -> None:
186+
def start_turn(self, model: str, user_input: str | None = None) -> None:
187187
"""Start tracking a new turn as a top-level trace.
188188
189189
Creates the turn generation as a TRACE (not an observation) so that each turn
190190
appears as a separate trace in Langfuse. Tools will be observations within the trace.
191191
192-
Note: Cannot use 'with' context managers due to async streaming architecture.
192+
Prevents duplicate traces when SDK sends multiple AssistantMessages per turn during
193+
streaming. Only the first AssistantMessage creates a trace; subsequent ones are ignored
194+
until end_turn() clears the current trace.
195+
196+
Cannot use 'with' context managers due to async streaming architecture.
193197
Messages arrive asynchronously (AssistantMessage → ToolUseBlocks → ResultMessage)
194198
and the turn context must stay open across multiple async loop iterations.
195199
196200
Args:
197-
turn_count: Current turn number
198201
model: Model name (e.g., "claude-3-5-sonnet-20241022")
199202
user_input: Optional actual user input/prompt (if available)
200203
"""
201204
if not self.langfuse_client:
202205
return
203206

204-
# Guard: Don't create a new turn if one is already active
205-
# This prevents duplicate traces when AssistantMessage arrives multiple times
207+
# Guard: Prevent creating duplicate traces for the same turn
208+
# SDK sends multiple AssistantMessages during streaming - only create trace once
206209
if self._current_turn_generation:
207-
logging.debug(f"Langfuse: Turn already active, skipping start_turn for turn {turn_count}")
210+
logging.debug("Langfuse: Trace already active for current turn, skipping duplicate start_turn")
208211
return
209212

210213
try:
211-
# Build metadata
212-
metadata = {"turn": turn_count}
213-
214214
# Use pending initial prompt for turn 1 if available
215-
if user_input is None and turn_count == 1 and self._pending_initial_prompt:
215+
if user_input is None and self._pending_initial_prompt:
216216
user_input = self._pending_initial_prompt
217217
self._pending_initial_prompt = None # Clear after use
218-
logging.debug("Langfuse: Using pending initial prompt for turn 1")
218+
logging.debug("Langfuse: Using pending initial prompt")
219219

220-
# Use actual user input if provided, otherwise use placeholder
220+
# Use actual user input if provided, otherwise use generic placeholder
221221
if user_input:
222222
input_content = [{"role": "user", "content": user_input}]
223-
logging.info(f"Langfuse: Starting turn {turn_count} trace with model={model} and actual user input")
223+
logging.info(f"Langfuse: Starting turn trace with model={model} and actual user input")
224224
else:
225-
input_content = [{"role": "user", "content": f"Turn {turn_count}"}]
226-
logging.info(f"Langfuse: Starting turn {turn_count} trace with model={model}")
225+
input_content = [{"role": "user", "content": "User input"}]
226+
logging.info(f"Langfuse: Starting turn trace with model={model}")
227227

228228
# Create generation as a TRACE using start_as_current_observation()
229-
# This makes claude_turn_X a top-level trace, not an observation
229+
# Name doesn't include turn number - that will be added to metadata in end_turn()
230+
# This makes the trace a top-level observation, not nested
230231
# Tools will automatically become child observations of this trace
231232
self._current_turn_ctx = self.langfuse_client.start_as_current_observation(
232233
as_type="generation",
233-
name=f"claude_turn_{turn_count}",
234+
name="claude_interaction", # Generic name, turn number added in metadata
234235
input=input_content,
235236
model=model,
236-
metadata=metadata,
237+
metadata={}, # Turn number will be added in end_turn()
237238
)
238239
self._current_turn_generation = self._current_turn_ctx.__enter__()
239-
logging.debug(f"Langfuse: Turn {turn_count} trace started, ready for tool spans")
240+
logging.info(f"Langfuse: Created new trace (model={model})")
240241

241242
except Exception as e:
242243
logging.error(f"Langfuse: Failed to start turn: {e}", exc_info=True)
243244

244245
def end_turn(self, turn_count: int, message: Any, usage: dict | None = None) -> None:
245246
"""Complete turn tracking with output and usage data (called when ResultMessage arrives).
246247
247-
Updates the turn generation with the assistant's output and usage metrics, then closes it.
248+
Updates the turn generation with the assistant's output, usage metrics, and SDK's
249+
authoritative turn number in metadata, then closes it.
248250
249251
Args:
250-
turn_count: Current turn number
252+
turn_count: Current turn number (from SDK's authoritative num_turns in ResultMessage)
251253
message: AssistantMessage from Claude SDK
252254
usage: Usage dict from ResultMessage with input_tokens, output_tokens, cache tokens, etc.
253255
"""
@@ -291,8 +293,12 @@ def end_turn(self, turn_count: int, message: Any, usage: dict | None = None) ->
291293
if cache_creation > 0:
292294
usage_details_dict["cache_creation_input_tokens"] = cache_creation
293295

294-
# Update with output and usage_details (SDK v3 requires 'usage_details' parameter)
295-
update_params = {"output": output_text}
296+
# Update with output, usage_details, and turn number in metadata
297+
# SDK v3 requires 'usage_details' parameter for usage tracking
298+
update_params = {
299+
"output": output_text,
300+
"metadata": {"turn": turn_count} # Add SDK's authoritative turn number
301+
}
296302
if usage_details_dict:
297303
update_params["usage_details"] = usage_details_dict
298304
self._current_turn_generation.update(**update_params)
@@ -310,7 +316,7 @@ def end_turn(self, turn_count: int, message: Any, usage: dict | None = None) ->
310316
if self.langfuse_client:
311317
try:
312318
self.langfuse_client.flush()
313-
logging.debug(f"Langfuse: Flushed turn {turn_count} data")
319+
logging.info(f"Langfuse: Flushed turn {turn_count} data")
314320
except Exception as e:
315321
logging.warning(f"Langfuse: Flush failed after turn {turn_count}: {e}")
316322

Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
"""Unit tests for duplicate turn prevention in observability module."""
2+
3+
import pytest
4+
from unittest.mock import Mock, patch, MagicMock
5+
from observability import ObservabilityManager
6+
7+
8+
class TestDuplicateTurnPrevention:
9+
"""Tests for preventing duplicate trace creation."""
10+
11+
@pytest.mark.asyncio
12+
async def test_multiple_assistant_messages_same_turn_no_duplicates(self):
13+
"""Test that multiple AssistantMessages for the same turn don't create duplicate traces."""
14+
manager = ObservabilityManager(
15+
session_id="test-session", user_id="user-1", user_name="Test User"
16+
)
17+
18+
# Mock Langfuse client
19+
mock_client = Mock()
20+
mock_ctx = Mock()
21+
mock_generation = Mock()
22+
mock_ctx.__enter__ = Mock(return_value=mock_generation)
23+
mock_ctx.__exit__ = Mock()
24+
mock_client.start_as_current_observation = Mock(return_value=mock_ctx)
25+
26+
manager.langfuse_client = mock_client
27+
28+
# Simulate first AssistantMessage - should create trace
29+
manager.start_turn("claude-sonnet-4-5")
30+
assert manager._current_turn_generation is not None
31+
assert mock_client.start_as_current_observation.call_count == 1
32+
33+
# Simulate second AssistantMessage for SAME turn - should skip
34+
manager.start_turn("claude-sonnet-4-5")
35+
# Should still be 1 call (no new trace created)
36+
assert mock_client.start_as_current_observation.call_count == 1
37+
38+
# Simulate third AssistantMessage for SAME turn - should skip
39+
manager.start_turn("claude-sonnet-4-5")
40+
# Should still be 1 call
41+
assert mock_client.start_as_current_observation.call_count == 1
42+
43+
@pytest.mark.asyncio
44+
async def test_sequential_turns_create_separate_traces(self):
45+
"""Test that sequential turns create separate traces."""
46+
manager = ObservabilityManager(
47+
session_id="test-session", user_id="user-1", user_name="Test User"
48+
)
49+
50+
# Mock Langfuse client
51+
mock_client = Mock()
52+
mock_ctx = Mock()
53+
mock_generation = Mock()
54+
mock_ctx.__enter__ = Mock(return_value=mock_generation)
55+
mock_ctx.__exit__ = Mock()
56+
mock_client.start_as_current_observation = Mock(return_value=mock_ctx)
57+
58+
manager.langfuse_client = mock_client
59+
60+
# Turn 1
61+
manager.start_turn("claude-sonnet-4-5")
62+
assert manager._current_turn_generation is not None
63+
assert mock_client.start_as_current_observation.call_count == 1
64+
65+
# End turn 1 (clear generation)
66+
manager._current_turn_generation = None
67+
manager._current_turn_ctx = None
68+
69+
# Turn 2 - should create new trace
70+
manager.start_turn("claude-sonnet-4-5")
71+
assert manager._current_turn_generation is not None
72+
assert mock_client.start_as_current_observation.call_count == 2
73+
74+
# End turn 2
75+
manager._current_turn_generation = None
76+
manager._current_turn_ctx = None
77+
78+
# Turn 3 - should create new trace
79+
manager.start_turn("claude-sonnet-4-5")
80+
assert manager._current_turn_generation is not None
81+
assert mock_client.start_as_current_observation.call_count == 3
82+
83+
@pytest.mark.asyncio
84+
async def test_end_turn_adds_turn_number_to_metadata(self):
85+
"""Test that end_turn adds SDK's authoritative turn number to metadata."""
86+
manager = ObservabilityManager(
87+
session_id="test-session", user_id="user-1", user_name="Test User"
88+
)
89+
90+
# Mock Langfuse client and generation
91+
mock_client = Mock()
92+
mock_generation = Mock()
93+
mock_ctx = Mock()
94+
mock_ctx.__exit__ = Mock()
95+
96+
manager.langfuse_client = mock_client
97+
manager._current_turn_generation = mock_generation
98+
manager._current_turn_ctx = mock_ctx
99+
100+
# Create mock AssistantMessage
101+
mock_message = MagicMock()
102+
mock_message.content = []
103+
104+
# End turn with SDK's turn number
105+
manager.end_turn(5, mock_message, usage={"input_tokens": 100, "output_tokens": 50})
106+
107+
# Check that update was called with turn number in metadata
108+
mock_generation.update.assert_called_once()
109+
call_kwargs = mock_generation.update.call_args[1]
110+
assert "metadata" in call_kwargs
111+
assert call_kwargs["metadata"]["turn"] == 5
112+
113+
@pytest.mark.asyncio
114+
async def test_no_prediction_just_sdk_turn_count(self):
115+
"""Test that we use SDK's authoritative turn count, not predictions."""
116+
manager = ObservabilityManager(
117+
session_id="test-session", user_id="user-1", user_name="Test User"
118+
)
119+
120+
# Mock Langfuse client
121+
mock_client = Mock()
122+
mock_ctx = Mock()
123+
mock_generation = Mock()
124+
mock_ctx.__enter__ = Mock(return_value=mock_generation)
125+
mock_ctx.__exit__ = Mock()
126+
mock_client.start_as_current_observation = Mock(return_value=mock_ctx)
127+
mock_client.flush = Mock()
128+
129+
manager.langfuse_client = mock_client
130+
131+
# Start turn without specifying turn number
132+
manager.start_turn("claude-sonnet-4-5")
133+
assert manager._current_turn_generation is not None
134+
assert mock_client.start_as_current_observation.call_count == 1
135+
136+
# Second AssistantMessage arrives
137+
manager.start_turn("claude-sonnet-4-5")
138+
# Should be skipped - turn already active
139+
assert mock_client.start_as_current_observation.call_count == 1
140+
141+
# SDK ResultMessage arrives with authoritative num_turns=2
142+
mock_message = MagicMock()
143+
mock_message.content = []
144+
145+
manager.end_turn(2, mock_message, usage={"input_tokens": 100, "output_tokens": 50})
146+
147+
# Check turn number was added to metadata
148+
call_kwargs = mock_generation.update.call_args[1]
149+
assert call_kwargs["metadata"]["turn"] == 2
150+
151+
# Should have called flush
152+
assert mock_client.flush.call_count == 1

0 commit comments

Comments
 (0)