Skip to content

Commit 77e2974

Browse files
authored
{agent, telemetry}: add output for invoke_agent span (#419)
1 parent e05cec8 commit 77e2974

File tree

7 files changed

+89
-234
lines changed

7 files changed

+89
-234
lines changed

agent/llmagent/extras_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515

1616
"github.com/stretchr/testify/assert"
1717
"github.com/stretchr/testify/require"
18+
"go.opentelemetry.io/otel/trace/noop"
1819

1920
"trpc.group/trpc-go/trpc-agent-go/agent"
2021
"trpc.group/trpc-go/trpc-agent-go/event"
@@ -133,7 +134,7 @@ func TestLLMAgent_AfterCb(t *testing.T) {
133134
inv := &agent.Invocation{InvocationID: "id", AgentName: "agent"}
134135

135136
llm := &LLMAgent{agentCallbacks: cb}
136-
wrapped := llm.wrapEventChannel(context.Background(), inv, orig)
137+
wrapped := llm.wrapEventChannel(context.Background(), inv, orig, noop.Span{})
137138

138139
var objs []string
139140
for e := range wrapped {
@@ -157,7 +158,7 @@ func TestLLMAgent_AfterCbNoResp(t *testing.T) {
157158
inv := &agent.Invocation{InvocationID: "id2", AgentName: "agent2"}
158159

159160
llm := &LLMAgent{}
160-
wrapped := llm.wrapEventChannel(context.Background(), inv, orig)
161+
wrapped := llm.wrapEventChannel(context.Background(), inv, orig, noop.Span{})
161162

162163
// Expect exactly one event propagated from original channel and no extras.
163164
count := 0

agent/llmagent/llm_agent.go

Lines changed: 26 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ import (
1616
"reflect"
1717
"sync"
1818

19+
sdktrace "go.opentelemetry.io/otel/trace"
20+
1921
"trpc.group/trpc-go/trpc-agent-go/agent"
2022
"trpc.group/trpc-go/trpc-agent-go/agent/llmagent/internal/jsonschema"
2123
"trpc.group/trpc-go/trpc-agent-go/codeexecutor"
@@ -562,16 +564,22 @@ func registerTools(options *Options) []tool.Tool {
562564

563565
// Run implements the agent.Agent interface.
564566
// It executes the LLM agent flow and returns a channel of events.
565-
func (a *LLMAgent) Run(ctx context.Context, invocation *agent.Invocation) (<-chan *event.Event, error) {
566-
ctx, span := trace.Tracer.Start(ctx, fmt.Sprintf("%s [%s]", itelemetry.SpanNamePrefixAgentRun, a.name))
567-
defer span.End()
567+
func (a *LLMAgent) Run(ctx context.Context, invocation *agent.Invocation) (e <-chan *event.Event, err error) {
568+
ctx, span := trace.Tracer.Start(ctx, fmt.Sprintf("invoke_agent {%s}", a.name))
569+
itelemetry.TraceBeforeInvokeAgent(span, invocation)
570+
defer func() {
571+
if err != nil {
572+
span.End()
573+
}
574+
}()
568575

569576
// Setup invocation
570577
a.setupInvocation(invocation)
571578

572579
// Run before agent callbacks if they exist.
573580
if a.agentCallbacks != nil {
574-
customResponse, err := a.agentCallbacks.RunBeforeAgent(ctx, invocation)
581+
var customResponse *model.Response
582+
customResponse, err = a.agentCallbacks.RunBeforeAgent(ctx, invocation)
575583
if err != nil {
576584
return nil, fmt.Errorf("before agent callback failed: %w", err)
577585
}
@@ -591,13 +599,7 @@ func (a *LLMAgent) Run(ctx context.Context, invocation *agent.Invocation) (<-cha
591599
if err != nil {
592600
return nil, err
593601
}
594-
595-
// If we have after agent callbacks, we need to wrap the event channel.
596-
if a.agentCallbacks != nil {
597-
return a.wrapEventChannel(ctx, invocation, flowEventChan), nil
598-
}
599-
600-
return flowEventChan, nil
602+
return a.wrapEventChannel(ctx, invocation, flowEventChan, span), nil
601603
}
602604

603605
// setupInvocation sets up the invocation
@@ -621,14 +623,25 @@ func (a *LLMAgent) wrapEventChannel(
621623
ctx context.Context,
622624
invocation *agent.Invocation,
623625
originalChan <-chan *event.Event,
626+
span sdktrace.Span,
624627
) <-chan *event.Event {
625628
wrappedChan := make(chan *event.Event, 256) // Use default buffer size
626629

627630
go func() {
628-
defer close(wrappedChan)
631+
var fullResponse *model.Response
632+
defer func() {
633+
if fullResponse != nil {
634+
itelemetry.TraceAfterInvokeAgent(span, fullResponse)
635+
}
636+
span.End()
637+
close(wrappedChan)
638+
}()
629639

630640
// Forward all events from the original channel
631641
for evt := range originalChan {
642+
if evt != nil && evt.Response != nil && !evt.Response.IsPartial {
643+
fullResponse = evt.Response
644+
}
632645
if err := event.EmitEvent(ctx, wrappedChan, evt); err != nil {
633646
return
634647
}
@@ -647,6 +660,7 @@ func (a *LLMAgent) wrapEventChannel(
647660
err.Error(),
648661
)
649662
} else if customResponse != nil {
663+
fullResponse = customResponse
650664
// Create an event from the custom response.
651665
evt = event.NewResponseEvent(invocation.InvocationID, invocation.AgentName, customResponse)
652666
}

internal/telemetry/telemetry.go

Lines changed: 45 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -35,12 +35,10 @@ const (
3535

3636
SpanNameCallLLM = "call_llm"
3737
SpanNamePrefixExecuteTool = "execute_tool"
38-
SpanNamePrefixAgentRun = "agent_run"
39-
SpanNameInvocation = "invocation"
4038

4139
OperationExecuteTool = "execute_tool"
4240
OperationCallLLM = "call_llm"
43-
OperationRunRunner = "run_runner" // attribute of SpanNameInvocation
41+
OperationInvokeAgent = "invoke_agent"
4442
)
4543

4644
const (
@@ -75,11 +73,19 @@ var (
7573
KeyToolID = "trpc.go.agent.tool_id"
7674

7775
// GenAI operation attributes
78-
KeyGenAIOperationName = "gen_ai.operation.name"
79-
KeyGenAISystem = "gen_ai.system"
80-
KeyGenAIToolName = "gen_ai.tool.name"
81-
KeyGenAIToolDesc = "gen_ai.tool.description"
82-
KeyGenAIRequestModel = "gen_ai.request.model"
76+
KeyGenAIOperationName = "gen_ai.operation.name"
77+
KeyGenAISystem = "gen_ai.system"
78+
KeyGenAIToolName = "gen_ai.tool.name"
79+
KeyGenAIToolDesc = "gen_ai.tool.description"
80+
KeyGenAIRequestModel = "gen_ai.request.model"
81+
KeyGenAIInputMessages = "gen_ai.input.messages"
82+
KeyGenAIOutputMessages = "gen_ai.output.messages"
83+
KeyGenAIAgentName = "gen_ai.agent.name"
84+
KeyGenAIConversationID = "gen_ai.conversation.id"
85+
KeyGenAIResponseModel = "gen_ai.response.model"
86+
KeyGenAIUsageOutputTokens = "gen_ai.usage.output_tokens"
87+
KeyGenAIResponseID = "gen_ai.response.id"
88+
KeyGenAIUsageInputTokens = "gen_ai.usage.input_tokens"
8389

8490
// System value
8591
SystemTRPCGoAgent = "trpc.go.agent"
@@ -142,24 +148,45 @@ func TraceMergedToolCalls(span trace.Span, rspEvent *event.Event) {
142148
)
143149
}
144150

145-
// TraceRunner traces the invocation of a runner.
146-
func TraceRunner(span trace.Span, appName string, invoke *agent.Invocation, message model.Message) {
147-
if bts, err := json.Marshal(&model.Request{Messages: []model.Message{message}}); err == nil {
151+
// TraceBeforeInvokeAgent traces the before invocation of an agent.
152+
func TraceBeforeInvokeAgent(span trace.Span, invoke *agent.Invocation) {
153+
if bts, err := json.Marshal(&model.Request{Messages: []model.Message{invoke.Message}}); err == nil {
148154
span.SetAttributes(
149-
attribute.String(KeyRunnerInput, string(bts)),
155+
attribute.String(KeyGenAIInputMessages, string(bts)),
150156
)
151157
} else {
152-
span.SetAttributes(attribute.String(KeyRunnerInput, "<not json serializable>"))
158+
span.SetAttributes(attribute.String(KeyGenAIInputMessages, "<not json serializable>"))
153159
}
154-
155160
span.SetAttributes(
156161
attribute.String(KeyGenAISystem, SystemTRPCGoAgent),
157-
attribute.String(KeyGenAIOperationName, OperationRunRunner),
158-
attribute.String(KeyRunnerName, fmt.Sprintf("[trpc-go-agent]: %s/%s", appName, invoke.AgentName)),
162+
attribute.String(KeyGenAIOperationName, OperationInvokeAgent),
163+
attribute.String(KeyGenAIAgentName, invoke.AgentName),
159164
attribute.String(KeyInvocationID, invoke.InvocationID),
160-
attribute.String(KeyRunnerSessionID, invoke.Session.ID),
161-
attribute.String(KeyRunnerUserID, invoke.Session.UserID),
162165
)
166+
if invoke.Session != nil {
167+
span.SetAttributes(
168+
attribute.String(KeyRunnerUserID, invoke.Session.UserID),
169+
attribute.String(KeyGenAIConversationID, invoke.Session.ID),
170+
)
171+
}
172+
}
173+
174+
// TraceAfterInvokeAgent traces the after invocation of an agent.
175+
func TraceAfterInvokeAgent(span trace.Span, rsp *model.Response) {
176+
if len(rsp.Choices) > 0 {
177+
if bts, err := json.Marshal(rsp.Choices[0].Message); err == nil {
178+
span.SetAttributes(
179+
attribute.String(KeyGenAIOutputMessages, string(bts)),
180+
)
181+
}
182+
}
183+
span.SetAttributes(attribute.String(KeyGenAIResponseModel, rsp.Model))
184+
if rsp.Usage != nil {
185+
span.SetAttributes(attribute.Int(KeyGenAIUsageInputTokens, rsp.Usage.PromptTokens))
186+
span.SetAttributes(attribute.Int(KeyGenAIUsageOutputTokens, rsp.Usage.CompletionTokens))
187+
}
188+
span.SetAttributes(attribute.String(KeyGenAIResponseID, rsp.ID))
189+
163190
}
164191

165192
// TraceCallLLM traces the invocation of an LLM call.

knowledge/vectorstore/inmemory/inmemory_test.go

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,7 @@ func TestVectorStore_DeleteByFilter(t *testing.T) {
201201
// Test delete all
202202
require.NoError(t, store.Add(ctx, doc1, embedding))
203203
require.NoError(t, store.Add(ctx, doc2, embedding))
204-
204+
205205
err = store.DeleteByFilter(ctx, vectorstore.WithDeleteAll(true))
206206
require.NoError(t, err)
207207

@@ -313,9 +313,9 @@ func TestVectorStore_GetMetadata(t *testing.T) {
313313
metadata, err = store.GetMetadata(ctx, vectorstore.WithGetMetadataLimit(1), vectorstore.WithGetMetadataOffset(1))
314314
require.NoError(t, err)
315315
require.Equal(t, 1, len(metadata))
316-
316+
317317
// Get metadata with combined filters
318-
metadata, err = store.GetMetadata(ctx,
318+
metadata, err = store.GetMetadata(ctx,
319319
vectorstore.WithGetMetadataFilter(map[string]any{"type": "test"}),
320320
vectorstore.WithGetMetadataLimit(10),
321321
vectorstore.WithGetMetadataOffset(0))
@@ -344,7 +344,7 @@ func TestVectorStore_SearchModes(t *testing.T) {
344344
},
345345
}
346346
doc2 := &document.Document{
347-
ID: "doc2",
347+
ID: "doc2",
348348
Content: "bonjour monde",
349349
Metadata: map[string]any{
350350
"lang": "fr",
@@ -358,10 +358,10 @@ func TestVectorStore_SearchModes(t *testing.T) {
358358

359359
// Test vector search mode
360360
result, err := store.Search(ctx, &vectorstore.SearchQuery{
361-
Vector: []float64{0.9, 0.1, 0.2},
362-
Limit: 5,
363-
SearchMode: vectorstore.SearchModeVector,
364-
MinScore: 0.5,
361+
Vector: []float64{0.9, 0.1, 0.2},
362+
Limit: 5,
363+
SearchMode: vectorstore.SearchModeVector,
364+
MinScore: 0.5,
365365
})
366366
require.NoError(t, err)
367367
require.Len(t, result.Results, 1)
@@ -383,10 +383,10 @@ func TestVectorStore_SearchModes(t *testing.T) {
383383

384384
// Test hybrid search mode (falls back to vector search)
385385
result, err = store.Search(ctx, &vectorstore.SearchQuery{
386-
Vector: []float64{0.1, 0.9, 0.2},
387-
SearchMode: vectorstore.SearchModeHybrid,
388-
Limit: 5,
389-
MinScore: 0.5,
386+
Vector: []float64{0.1, 0.9, 0.2},
387+
SearchMode: vectorstore.SearchModeHybrid,
388+
Limit: 5,
389+
MinScore: 0.5,
390390
})
391391
require.NoError(t, err)
392392
require.Len(t, result.Results, 1)
@@ -395,8 +395,8 @@ func TestVectorStore_SearchModes(t *testing.T) {
395395

396396
// Test keyword search mode (falls back to filter search)
397397
result, err = store.Search(ctx, &vectorstore.SearchQuery{
398-
Query: "bonjour",
399-
SearchMode: vectorstore.SearchModeKeyword,
398+
Query: "bonjour",
399+
SearchMode: vectorstore.SearchModeKeyword,
400400
Filter: &vectorstore.SearchFilter{
401401
Metadata: map[string]any{"lang": "fr"},
402402
},

runner/runner.go

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,11 @@ import (
2020
"trpc.group/trpc-go/trpc-agent-go/agent"
2121
"trpc.group/trpc-go/trpc-agent-go/artifact"
2222
"trpc.group/trpc-go/trpc-agent-go/event"
23-
itelemetry "trpc.group/trpc-go/trpc-agent-go/internal/telemetry"
2423
"trpc.group/trpc-go/trpc-agent-go/log"
2524
"trpc.group/trpc-go/trpc-agent-go/memory"
2625
"trpc.group/trpc-go/trpc-agent-go/model"
2726
"trpc.group/trpc-go/trpc-agent-go/session"
2827
"trpc.group/trpc-go/trpc-agent-go/session/inmemory"
29-
"trpc.group/trpc-go/trpc-agent-go/telemetry/trace"
3028
)
3129

3230
// Author types for events.
@@ -190,8 +188,6 @@ func (r *runner) Run(
190188
// transfer_to_agent that rely on agent.InvocationFromContext(ctx).
191189
ctx = agent.NewInvocationContext(ctx, invocation)
192190

193-
ctx, span := trace.Tracer.Start(ctx, itelemetry.SpanNameInvocation)
194-
defer span.End()
195191
// Run the agent and get the event channel.
196192
agentEventCh, err := r.agent.Run(ctx, invocation)
197193
if err != nil {
@@ -255,8 +251,6 @@ func (r *runner) Run(
255251
agent.EmitEvent(ctx, invocation, processedEventCh, runnerCompletionEvent)
256252
}()
257253

258-
itelemetry.TraceRunner(span, r.appName, invocation, message)
259-
260254
return processedEventCh, nil
261255
}
262256

telemetry/langfuse/exporter.go

Lines changed: 1 addition & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -108,8 +108,7 @@ func transformSpan(span *tracepb.Span) {
108108
transformCallLLM(span)
109109
case itelemetry.OperationExecuteTool:
110110
transformExecuteTool(span)
111-
case itelemetry.OperationRunRunner:
112-
transformRunRunner(span)
111+
default:
113112
}
114113
}
115114

@@ -247,63 +246,6 @@ func transformExecuteTool(span *tracepb.Span) {
247246
span.Attributes = newAttributes
248247
}
249248

250-
// transformRunRunner transforms runner spans for Langfuse
251-
func transformRunRunner(span *tracepb.Span) {
252-
var newAttributes []*commonpb.KeyValue
253-
254-
newAttributes = append(newAttributes, &commonpb.KeyValue{
255-
Key: observationType,
256-
Value: &commonpb.AnyValue{
257-
Value: &commonpb.AnyValue_StringValue{StringValue: "agent"},
258-
},
259-
})
260-
// Process existing attributes
261-
for _, attr := range span.Attributes {
262-
switch attr.Key {
263-
case itelemetry.KeyRunnerInput:
264-
if attr.Value != nil {
265-
newAttributes = append(newAttributes, &commonpb.KeyValue{
266-
Key: observationInput,
267-
Value: &commonpb.AnyValue{
268-
Value: &commonpb.AnyValue_StringValue{StringValue: attr.Value.GetStringValue()},
269-
},
270-
})
271-
} else {
272-
newAttributes = append(newAttributes, &commonpb.KeyValue{
273-
Key: observationInput,
274-
Value: &commonpb.AnyValue{
275-
Value: &commonpb.AnyValue_StringValue{StringValue: "N/A"},
276-
},
277-
})
278-
}
279-
// Skip this attribute (delete it)
280-
case itelemetry.KeyRunnerOutput:
281-
if attr.Value != nil {
282-
newAttributes = append(newAttributes, &commonpb.KeyValue{
283-
Key: observationOutput,
284-
Value: &commonpb.AnyValue{
285-
Value: &commonpb.AnyValue_StringValue{StringValue: attr.Value.GetStringValue()},
286-
},
287-
})
288-
} else {
289-
newAttributes = append(newAttributes, &commonpb.KeyValue{
290-
Key: observationOutput,
291-
Value: &commonpb.AnyValue{
292-
Value: &commonpb.AnyValue_StringValue{StringValue: "N/A"},
293-
},
294-
})
295-
}
296-
// Skip this attribute (delete it)
297-
default:
298-
// Keep other attributes
299-
newAttributes = append(newAttributes, attr)
300-
}
301-
}
302-
303-
// Replace span attributes
304-
span.Attributes = newAttributes
305-
}
306-
307249
func (e *exporter) Shutdown(ctx context.Context) error {
308250
e.mu.RLock()
309251
started := e.started

0 commit comments

Comments
 (0)