diff --git a/.gitignore b/.gitignore index 525987b6b..adddd2e9e 100644 --- a/.gitignore +++ b/.gitignore @@ -34,3 +34,13 @@ output/* # Vscode files .vscode + +# Trae files +.trae/ + +# Claude files +.claude/ + +# Data files +*.jsonl +*.csv \ No newline at end of file diff --git a/callbacks/cozeloop/adk_data_parser_test.go b/callbacks/cozeloop/adk_data_parser_test.go new file mode 100644 index 000000000..a800090a1 --- /dev/null +++ b/callbacks/cozeloop/adk_data_parser_test.go @@ -0,0 +1,548 @@ +/* + * Copyright 2026 CloudWeGo Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cozeloop + +import ( + "context" + "errors" + "testing" + + "github.com/cloudwego/eino/adk" + "github.com/cloudwego/eino/callbacks" + "github.com/cloudwego/eino/schema" + "github.com/coze-dev/cozeloop-go/spec/tracespec" + "github.com/smartystreets/goconvey/convey" +) + +func TestDefaultDataParser_ParseInput_Agent_Run(t *testing.T) { + convey.Convey("TestDefaultDataParser_ParseInput_Agent_Run", t, func() { + parser := &defaultDataParser{} + ctx := context.Background() + info := &callbacks.RunInfo{ + Name: "test-agent", + Type: "ChatModel", + Component: adk.ComponentOfAgent, + } + + input := &adk.AgentCallbackInput{ + Input: &adk.AgentInput{ + Messages: []*schema.Message{ + {Role: schema.User, Content: "hello"}, + }, + EnableStreaming: true, + }, + } + + tags := parser.ParseInput(ctx, info, input) + convey.So(tags, convey.ShouldNotBeNil) + convey.So(tags[attrKeyRunMode], convey.ShouldEqual, "run") + convey.So(tags[attrKeyAgentName], convey.ShouldEqual, "test-agent") + convey.So(tags[tracespec.Stream], convey.ShouldEqual, true) + convey.So(tags[tracespec.Input], convey.ShouldNotBeNil) + }) +} + +func TestDefaultDataParser_ParseInput_Agent_Resume(t *testing.T) { + convey.Convey("TestDefaultDataParser_ParseInput_Agent_Resume", t, func() { + parser := &defaultDataParser{} + ctx := context.Background() + info := &callbacks.RunInfo{ + Name: "test-agent", + Type: "ChatModel", + Component: adk.ComponentOfAgent, + } + + input := &adk.AgentCallbackInput{ + ResumeInfo: &adk.ResumeInfo{ + EnableStreaming: false, + ResumeData: map[string]any{"key": "value"}, + InterruptState: map[string]any{"state": "interrupted"}, + }, + } + + tags := parser.ParseInput(ctx, info, input) + convey.So(tags, convey.ShouldNotBeNil) + convey.So(tags[attrKeyRunMode], convey.ShouldEqual, "resume") + convey.So(tags[attrKeyAgentName], convey.ShouldEqual, "test-agent") + convey.So(tags[tracespec.Stream], convey.ShouldEqual, false) + convey.So(tags[tracespec.Input], convey.ShouldNotBeNil) + }) +} + +func TestDefaultDataParser_ParseInput_Agent_NilInput(t *testing.T) { + convey.Convey("TestDefaultDataParser_ParseInput_Agent_NilInput", t, func() { + parser := &defaultDataParser{} + ctx := context.Background() + + convey.Convey("nil info", func() { + tags := parser.ParseInput(ctx, nil, &adk.AgentCallbackInput{}) + convey.So(tags, convey.ShouldBeNil) + }) + }) +} + +func TestDefaultDataParser_ParseOutput_Agent_MultipleEvents(t *testing.T) { + convey.Convey("TestDefaultDataParser_ParseOutput_Agent_MultipleEvents", t, func() { + parser := &defaultDataParser{} + ctx := context.Background() + info := &callbacks.RunInfo{ + Name: "test-agent", + Component: adk.ComponentOfAgent, + } + + iter, gen := adk.NewAsyncIteratorPair[*adk.AgentEvent]() + go func() { + defer gen.Close() + gen.Send(&adk.AgentEvent{ + Output: &adk.AgentOutput{ + MessageOutput: &adk.MessageVariant{ + IsStreaming: false, + Message: &schema.Message{Role: schema.Assistant, Content: "first"}, + }, + }, + }) + gen.Send(&adk.AgentEvent{ + Output: &adk.AgentOutput{ + MessageOutput: &adk.MessageVariant{ + IsStreaming: false, + Message: &schema.Message{Role: schema.Assistant, Content: "second"}, + }, + }, + }) + }() + + output := &adk.AgentCallbackOutput{Events: iter} + tags := parser.ParseOutput(ctx, info, output) + convey.So(tags, convey.ShouldNotBeNil) + convey.So(tags[tracespec.Output], convey.ShouldNotBeNil) + }) +} + +func TestDefaultDataParser_ParseOutput_Agent_WithError(t *testing.T) { + convey.Convey("TestDefaultDataParser_ParseOutput_Agent_WithError", t, func() { + parser := &defaultDataParser{} + ctx := context.Background() + info := &callbacks.RunInfo{ + Name: "test-agent", + Component: adk.ComponentOfAgent, + } + + iter, gen := adk.NewAsyncIteratorPair[*adk.AgentEvent]() + go func() { + defer gen.Close() + gen.Send(&adk.AgentEvent{ + Err: errors.New("test error"), + }) + }() + + output := &adk.AgentCallbackOutput{Events: iter} + tags := parser.ParseOutput(ctx, info, output) + convey.So(tags, convey.ShouldNotBeNil) + convey.So(tags[tracespec.Output], convey.ShouldNotBeNil) + }) +} + +func TestDefaultDataParser_ParseOutput_Agent_NilOutput(t *testing.T) { + convey.Convey("TestDefaultDataParser_ParseOutput_Agent_NilOutput", t, func() { + parser := &defaultDataParser{} + ctx := context.Background() + + convey.Convey("nil info", func() { + tags := parser.ParseOutput(ctx, nil, &adk.AgentCallbackOutput{}) + convey.So(tags, convey.ShouldBeNil) + }) + + convey.Convey("nil events", func() { + info := &callbacks.RunInfo{Name: "test", Component: adk.ComponentOfAgent} + tags := parser.ParseOutput(ctx, info, &adk.AgentCallbackOutput{Events: nil}) + convey.So(tags, convey.ShouldNotBeNil) + convey.So(tags[tracespec.Output], convey.ShouldBeNil) + }) + }) +} + +func TestDefaultDataParser_ParseOutput_Agent_StreamingMessage(t *testing.T) { + convey.Convey("TestDefaultDataParser_ParseOutput_Agent_StreamingMessage", t, func() { + parser := &defaultDataParser{} + ctx := context.Background() + info := &callbacks.RunInfo{ + Name: "test-agent", + Component: adk.ComponentOfAgent, + } + + msgReader, msgWriter := schema.Pipe[*schema.Message](3) + go func() { + defer msgWriter.Close() + msgWriter.Send(&schema.Message{Role: schema.Assistant, Content: "hello"}, nil) + msgWriter.Send(&schema.Message{Role: schema.Assistant, Content: "world"}, nil) + }() + + iter, gen := adk.NewAsyncIteratorPair[*adk.AgentEvent]() + go func() { + defer gen.Close() + gen.Send(&adk.AgentEvent{ + Output: &adk.AgentOutput{ + MessageOutput: &adk.MessageVariant{ + IsStreaming: true, + MessageStream: msgReader, + }, + }, + }) + }() + + output := &adk.AgentCallbackOutput{Events: iter} + tags := parser.ParseOutput(ctx, info, output) + convey.So(tags, convey.ShouldNotBeNil) + convey.So(tags[tracespec.Output], convey.ShouldNotBeNil) + }) +} + +func TestDefaultDataParser_ParseOutput_Agent_WithAction(t *testing.T) { + convey.Convey("TestDefaultDataParser_ParseOutput_Agent_WithAction", t, func() { + parser := &defaultDataParser{} + ctx := context.Background() + info := &callbacks.RunInfo{ + Name: "test-agent", + Component: adk.ComponentOfAgent, + } + + convey.Convey("exit action", func() { + iter, gen := adk.NewAsyncIteratorPair[*adk.AgentEvent]() + go func() { + defer gen.Close() + gen.Send(&adk.AgentEvent{ + Action: &adk.AgentAction{ + Exit: true, + }, + }) + }() + + output := &adk.AgentCallbackOutput{Events: iter} + tags := parser.ParseOutput(ctx, info, output) + convey.So(tags, convey.ShouldNotBeNil) + convey.So(tags[tracespec.Output], convey.ShouldNotBeNil) + }) + + convey.Convey("transfer to agent action", func() { + iter, gen := adk.NewAsyncIteratorPair[*adk.AgentEvent]() + go func() { + defer gen.Close() + gen.Send(&adk.AgentEvent{ + Action: &adk.AgentAction{ + TransferToAgent: &adk.TransferToAgentAction{ + DestAgentName: "other-agent", + }, + }, + }) + }() + + output := &adk.AgentCallbackOutput{Events: iter} + tags := parser.ParseOutput(ctx, info, output) + convey.So(tags, convey.ShouldNotBeNil) + convey.So(tags[tracespec.Output], convey.ShouldNotBeNil) + }) + + convey.Convey("break loop action", func() { + iter, gen := adk.NewAsyncIteratorPair[*adk.AgentEvent]() + go func() { + defer gen.Close() + gen.Send(&adk.AgentEvent{ + Action: &adk.AgentAction{ + BreakLoop: &adk.BreakLoopAction{}, + }, + }) + }() + + output := &adk.AgentCallbackOutput{Events: iter} + tags := parser.ParseOutput(ctx, info, output) + convey.So(tags, convey.ShouldNotBeNil) + convey.So(tags[tracespec.Output], convey.ShouldNotBeNil) + }) + + convey.Convey("customized action", func() { + iter, gen := adk.NewAsyncIteratorPair[*adk.AgentEvent]() + go func() { + defer gen.Close() + gen.Send(&adk.AgentEvent{ + Action: &adk.AgentAction{ + CustomizedAction: map[string]any{"custom": "action"}, + }, + }) + }() + + output := &adk.AgentCallbackOutput{Events: iter} + tags := parser.ParseOutput(ctx, info, output) + convey.So(tags, convey.ShouldNotBeNil) + convey.So(tags[tracespec.Output], convey.ShouldNotBeNil) + }) + }) +} + +func TestDefaultDataParser_ParseOutput_Agent_WithInterrupt(t *testing.T) { + convey.Convey("TestDefaultDataParser_ParseOutput_Agent_WithInterrupt", t, func() { + parser := &defaultDataParser{} + ctx := context.Background() + info := &callbacks.RunInfo{ + Name: "test-agent", + Component: adk.ComponentOfAgent, + } + + iter, gen := adk.NewAsyncIteratorPair[*adk.AgentEvent]() + go func() { + defer gen.Close() + gen.Send(&adk.AgentEvent{ + Action: &adk.AgentAction{ + Interrupted: &adk.InterruptInfo{ + Data: map[string]any{"interrupt": "data"}, + InterruptContexts: []*adk.InterruptCtx{ + { + ID: "ctx-1", + Info: map[string]any{"info": "data"}, + IsRootCause: true, + }, + }, + }, + }, + }) + }() + + output := &adk.AgentCallbackOutput{Events: iter} + tags := parser.ParseOutput(ctx, info, output) + convey.So(tags, convey.ShouldNotBeNil) + convey.So(tags[tracespec.Output], convey.ShouldNotBeNil) + }) +} + +func TestDefaultDataParser_ParseOutput_Agent_WithAgentName(t *testing.T) { + convey.Convey("TestDefaultDataParser_ParseOutput_Agent_WithAgentName", t, func() { + parser := &defaultDataParser{} + ctx := context.Background() + info := &callbacks.RunInfo{ + Name: "test-agent", + Component: adk.ComponentOfAgent, + } + + iter, gen := adk.NewAsyncIteratorPair[*adk.AgentEvent]() + go func() { + defer gen.Close() + gen.Send(&adk.AgentEvent{ + AgentName: "nested-agent", + Output: &adk.AgentOutput{ + MessageOutput: &adk.MessageVariant{ + IsStreaming: false, + Message: &schema.Message{Role: schema.Assistant, Content: "nested output"}, + }, + }, + }) + }() + + output := &adk.AgentCallbackOutput{Events: iter} + tags := parser.ParseOutput(ctx, info, output) + convey.So(tags, convey.ShouldNotBeNil) + convey.So(tags[tracespec.Output], convey.ShouldNotBeNil) + }) +} + +func TestDefaultDataParser_ParseOutput_Agent_WithCustomizedOutput(t *testing.T) { + convey.Convey("TestDefaultDataParser_ParseOutput_Agent_WithCustomizedOutput", t, func() { + parser := &defaultDataParser{} + ctx := context.Background() + info := &callbacks.RunInfo{ + Name: "test-agent", + Component: adk.ComponentOfAgent, + } + + iter, gen := adk.NewAsyncIteratorPair[*adk.AgentEvent]() + go func() { + defer gen.Close() + gen.Send(&adk.AgentEvent{ + Output: &adk.AgentOutput{ + CustomizedOutput: map[string]any{"custom": "output"}, + }, + }) + }() + + output := &adk.AgentCallbackOutput{Events: iter} + tags := parser.ParseOutput(ctx, info, output) + convey.So(tags, convey.ShouldNotBeNil) + convey.So(tags[tracespec.Output], convey.ShouldNotBeNil) + }) +} + +func TestParseSpanTypeFromComponent_Agent(t *testing.T) { + convey.Convey("TestParseSpanTypeFromComponent_Agent", t, func() { + convey.So(parseSpanTypeFromComponent(adk.ComponentOfAgent), convey.ShouldEqual, spanTypeAgent) + }) +} + +func TestSerializeRunPath(t *testing.T) { + convey.Convey("TestSerializeRunPath", t, func() { + convey.Convey("empty run path", func() { + result := serializeRunPath(nil) + convey.So(result, convey.ShouldEqual, "") + }) + + convey.Convey("empty slice", func() { + result := serializeRunPath([]adk.RunStep{}) + convey.So(result, convey.ShouldEqual, "") + }) + }) +} + +func TestSerializeAgentAction(t *testing.T) { + convey.Convey("TestSerializeAgentAction", t, func() { + convey.Convey("nil action", func() { + result := serializeAgentAction(nil) + convey.So(result, convey.ShouldBeNil) + }) + + convey.Convey("exit action", func() { + action := &adk.AgentAction{Exit: true} + result := serializeAgentAction(action) + convey.So(result, convey.ShouldNotBeNil) + convey.So(result["exit"], convey.ShouldEqual, true) + }) + + convey.Convey("transfer to agent action", func() { + action := &adk.AgentAction{ + TransferToAgent: &adk.TransferToAgentAction{DestAgentName: "target-agent"}, + } + result := serializeAgentAction(action) + convey.So(result, convey.ShouldNotBeNil) + convey.So(result["transfer_to_agent"], convey.ShouldEqual, "target-agent") + }) + + convey.Convey("break loop action", func() { + action := &adk.AgentAction{BreakLoop: &adk.BreakLoopAction{}} + result := serializeAgentAction(action) + convey.So(result, convey.ShouldNotBeNil) + convey.So(result["break_loop"], convey.ShouldEqual, true) + }) + + convey.Convey("customized action", func() { + action := &adk.AgentAction{CustomizedAction: map[string]any{"key": "value"}} + result := serializeAgentAction(action) + convey.So(result, convey.ShouldNotBeNil) + convey.So(result["customized_action"], convey.ShouldNotBeNil) + }) + }) +} + +func TestSerializeInterruptInfo(t *testing.T) { + convey.Convey("TestSerializeInterruptInfo", t, func() { + convey.Convey("nil info", func() { + result := serializeInterruptInfo(nil) + convey.So(result, convey.ShouldBeNil) + }) + + convey.Convey("with data", func() { + info := &adk.InterruptInfo{ + Data: map[string]any{"key": "value"}, + } + result := serializeInterruptInfo(info) + convey.So(result, convey.ShouldNotBeNil) + convey.So(result["data"], convey.ShouldNotBeNil) + }) + + convey.Convey("with byte data (should be skipped)", func() { + info := &adk.InterruptInfo{ + Data: []byte("binary data"), + } + result := serializeInterruptInfo(info) + convey.So(result, convey.ShouldNotBeNil) + convey.So(result["data"], convey.ShouldBeNil) + }) + + convey.Convey("with interrupt contexts", func() { + info := &adk.InterruptInfo{ + InterruptContexts: []*adk.InterruptCtx{ + {ID: "ctx-1", IsRootCause: true}, + {ID: "ctx-2"}, + }, + } + result := serializeInterruptInfo(info) + convey.So(result, convey.ShouldNotBeNil) + convey.So(result["interrupt_contexts"], convey.ShouldNotBeNil) + }) + }) +} + +func TestSerializeInterruptCtx(t *testing.T) { + convey.Convey("TestSerializeInterruptCtx", t, func() { + convey.Convey("nil ctx", func() { + result := serializeInterruptCtx(nil) + convey.So(result, convey.ShouldBeNil) + }) + + convey.Convey("with all fields", func() { + ctx := &adk.InterruptCtx{ + ID: "ctx-1", + Info: map[string]any{"key": "value"}, + IsRootCause: true, + Parent: &adk.InterruptCtx{ + ID: "parent-ctx", + }, + } + result := serializeInterruptCtx(ctx) + convey.So(result, convey.ShouldNotBeNil) + convey.So(result["id"], convey.ShouldEqual, "ctx-1") + convey.So(result["info"], convey.ShouldNotBeNil) + convey.So(result["is_root_cause"], convey.ShouldEqual, true) + convey.So(result["parent"], convey.ShouldNotBeNil) + }) + + convey.Convey("with non-empty id", func() { + ctx := &adk.InterruptCtx{ + ID: "ctx-1", + } + result := serializeInterruptCtx(ctx) + convey.So(result, convey.ShouldNotBeNil) + convey.So(result["id"], convey.ShouldEqual, "ctx-1") + }) + }) +} + +func TestSerializeAgentEvent(t *testing.T) { + convey.Convey("TestSerializeAgentEvent", t, func() { + ctx := context.Background() + + convey.Convey("nil event", func() { + result := serializeAgentEvent(ctx, nil) + convey.So(result, convey.ShouldBeNil) + }) + + convey.Convey("empty event", func() { + event := &adk.AgentEvent{} + result := serializeAgentEvent(ctx, event) + convey.So(result, convey.ShouldBeNil) + }) + + convey.Convey("event with agent name", func() { + event := &adk.AgentEvent{AgentName: "test-agent"} + result := serializeAgentEvent(ctx, event) + convey.So(result, convey.ShouldNotBeNil) + convey.So(result["agent_name"], convey.ShouldEqual, "test-agent") + }) + + convey.Convey("event with error", func() { + event := &adk.AgentEvent{Err: errors.New("test error")} + result := serializeAgentEvent(ctx, event) + convey.So(result, convey.ShouldNotBeNil) + convey.So(result["error"], convey.ShouldEqual, "test error") + }) + }) +} diff --git a/callbacks/cozeloop/adk_trace_handler_test.go b/callbacks/cozeloop/adk_trace_handler_test.go new file mode 100644 index 000000000..67e75ba35 --- /dev/null +++ b/callbacks/cozeloop/adk_trace_handler_test.go @@ -0,0 +1,252 @@ +/* + * Copyright 2026 CloudWeGo Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cozeloop + +import ( + "context" + "os" + "testing" + + "github.com/bytedance/mockey" + "github.com/cloudwego/eino/adk" + "github.com/cloudwego/eino/callbacks" + "github.com/cloudwego/eino/schema" + "github.com/coze-dev/cozeloop-go" + "github.com/coze-dev/cozeloop-go/spec/tracespec" + "github.com/smartystreets/goconvey/convey" +) + +func TestEinoTracer_OnAgentStart(t *testing.T) { + os.Setenv(cozeloop.EnvWorkspaceID, "1234567890") + os.Setenv(cozeloop.EnvApiToken, "xxxx") + mockey.PatchConvey("test einoTracer OnStart with Agent component", t, func() { + client, err := cozeloop.NewClient() + if err != nil { + return + } + runtime := &tracespec.Runtime{} + + tracer := &einoTracer{ + client: client, + parser: &defaultDataParser{}, + runtime: runtime, + logger: cozeloop.GetLogger(), + } + + info := &callbacks.RunInfo{ + Name: "test-agent", + Type: "ChatModel", + Component: adk.ComponentOfAgent, + } + + mockey.PatchConvey("test OnStart with AgentInput (run)", func() { + input := &adk.AgentCallbackInput{ + Input: &adk.AgentInput{ + Messages: []*schema.Message{ + {Role: schema.User, Content: "hello"}, + }, + EnableStreaming: false, + }, + } + + ctx := tracer.OnStart(context.Background(), info, input) + convey.So(ctx, convey.ShouldNotBeNil) + }) + + mockey.PatchConvey("test OnStart with ResumeInfo (resume)", func() { + input := &adk.AgentCallbackInput{ + ResumeInfo: &adk.ResumeInfo{ + EnableStreaming: true, + ResumeData: map[string]any{"key": "value"}, + InterruptState: map[string]any{"state": "interrupted"}, + }, + } + + ctx := tracer.OnStart(context.Background(), info, input) + convey.So(ctx, convey.ShouldNotBeNil) + }) + + mockey.PatchConvey("test OnStart with nil info", func() { + input := &adk.AgentCallbackInput{ + Input: &adk.AgentInput{ + Messages: []*schema.Message{ + {Role: schema.User, Content: "hello"}, + }, + }, + } + + ctx := tracer.OnStart(context.Background(), nil, input) + convey.So(ctx, convey.ShouldNotBeNil) + }) + }) +} + +func TestEinoTracer_OnAgentEnd(t *testing.T) { + os.Setenv(cozeloop.EnvWorkspaceID, "1234567890") + os.Setenv(cozeloop.EnvApiToken, "xxxx") + mockey.PatchConvey("test einoTracer OnEnd with Agent component", t, func() { + client, err := cozeloop.NewClient() + if err != nil { + return + } + runtime := &tracespec.Runtime{} + + tracer := &einoTracer{ + client: client, + parser: &defaultDataParser{}, + runtime: runtime, + logger: cozeloop.GetLogger(), + } + + info := &callbacks.RunInfo{ + Name: "test-agent", + Type: "ChatModel", + Component: adk.ComponentOfAgent, + } + + mockey.PatchConvey("test OnEnd with multiple events", func() { + ctx := tracer.OnStart(context.Background(), info, &adk.AgentCallbackInput{ + Input: &adk.AgentInput{ + Messages: []*schema.Message{{Role: schema.User, Content: "hello"}}, + }, + }) + + iter, gen := adk.NewAsyncIteratorPair[*adk.AgentEvent]() + go func() { + defer gen.Close() + gen.Send(&adk.AgentEvent{ + Output: &adk.AgentOutput{ + MessageOutput: &adk.MessageVariant{ + IsStreaming: false, + Message: &schema.Message{Role: schema.Assistant, Content: "response"}, + }, + }, + }) + }() + + output := &adk.AgentCallbackOutput{Events: iter} + result := tracer.OnEnd(ctx, info, output) + convey.So(result, convey.ShouldNotBeNil) + }) + + mockey.PatchConvey("test OnEnd with nil info", func() { + ctx := context.Background() + iter, gen := adk.NewAsyncIteratorPair[*adk.AgentEvent]() + go func() { + gen.Close() + }() + + output := &adk.AgentCallbackOutput{Events: iter} + result := tracer.OnEnd(ctx, nil, output) + convey.So(result, convey.ShouldNotBeNil) + }) + }) +} + +func TestEinoTracer_OnStartWithStreamInput_Agent(t *testing.T) { + os.Setenv(cozeloop.EnvWorkspaceID, "1234567890") + os.Setenv(cozeloop.EnvApiToken, "xxxx") + mockey.PatchConvey("test einoTracer OnStartWithStreamInput with Agent component", t, func() { + client, err := cozeloop.NewClient() + if err != nil { + return + } + runtime := &tracespec.Runtime{} + + tracer := &einoTracer{ + client: client, + parser: &defaultDataParser{}, + runtime: runtime, + logger: cozeloop.GetLogger(), + } + + info := &callbacks.RunInfo{ + Name: "test-agent", + Type: "ChatModel", + Component: adk.ComponentOfAgent, + } + + mockey.PatchConvey("test OnStartWithStreamInput", func() { + reader, writer := schema.Pipe[callbacks.CallbackInput](2) + go func() { + defer writer.Close() + writer.Send(&adk.AgentCallbackInput{ + Input: &adk.AgentInput{ + Messages: []*schema.Message{{Role: schema.User, Content: "hello"}}, + }, + }, nil) + }() + + ctx := tracer.OnStartWithStreamInput(context.Background(), info, reader) + convey.So(ctx, convey.ShouldNotBeNil) + }) + + mockey.PatchConvey("test OnStartWithStreamInput with nil info", func() { + reader, writer := schema.Pipe[callbacks.CallbackInput](2) + go func() { + defer writer.Close() + }() + + ctx := tracer.OnStartWithStreamInput(context.Background(), nil, reader) + convey.So(ctx, convey.ShouldNotBeNil) + }) + }) +} + +func TestNewTraceCallbackHandler_WithCustomParser(t *testing.T) { + os.Setenv(cozeloop.EnvWorkspaceID, "1234567890") + os.Setenv(cozeloop.EnvApiToken, "xxxx") + mockey.PatchConvey("test newTraceCallbackHandler with custom parser", t, func() { + client, err := cozeloop.NewClient() + if err != nil { + return + } + + customParser := &mockDataParser{} + + handler := newTraceCallbackHandler(client, &options{ + parser: customParser, + }) + convey.So(handler, convey.ShouldNotBeNil) + + tracer, ok := handler.(*einoTracer) + convey.So(ok, convey.ShouldBeTrue) + convey.So(tracer.parser, convey.ShouldEqual, customParser) + }) +} + +func TestNewTraceCallbackHandler_WithDefaultParser(t *testing.T) { + os.Setenv(cozeloop.EnvWorkspaceID, "1234567890") + os.Setenv(cozeloop.EnvApiToken, "xxxx") + mockey.PatchConvey("test newTraceCallbackHandler with default parser", t, func() { + client, err := cozeloop.NewClient() + if err != nil { + return + } + + handler := newTraceCallbackHandler(client, &options{}) + convey.So(handler, convey.ShouldNotBeNil) + + tracer, ok := handler.(*einoTracer) + convey.So(ok, convey.ShouldBeTrue) + convey.So(tracer.parser, convey.ShouldNotBeNil) + }) +} + +type mockDataParser struct { + defaultDataParser +} diff --git a/callbacks/cozeloop/data_parser.go b/callbacks/cozeloop/data_parser.go index 3d64a86a5..a827d8fbe 100644 --- a/callbacks/cozeloop/data_parser.go +++ b/callbacks/cozeloop/data_parser.go @@ -21,11 +21,13 @@ import ( "encoding/json" "io" "reflect" + "strings" "time" "github.com/cloudwego/eino-ext/callbacks/cozeloop/internal/consts" "github.com/coze-dev/cozeloop-go/spec/tracespec" + "github.com/cloudwego/eino/adk" "github.com/cloudwego/eino/callbacks" "github.com/cloudwego/eino/components" "github.com/cloudwego/eino/components/embedding" @@ -132,6 +134,15 @@ func (d defaultDataParser) ParseInput(ctx context.Context, info *callbacks.RunIn case compose.ComponentOfLambda: tags.set(tracespec.Input, parseAny(ctx, input, false)) + case adk.ComponentOfAgent: + agentInput := adk.ConvAgentCallbackInput(input) + if agentInput != nil { + agentTags := d.parseAgentInput(ctx, info, agentInput) + for k, v := range agentTags { + tags.set(k, v) + } + } + default: messages, ok := input.([]*schema.Message) if ok && level == 1 { @@ -239,6 +250,16 @@ func (d defaultDataParser) ParseOutput(ctx context.Context, info *callbacks.RunI collectOutput.addMessages(iterSliceWithCtx(ctx, iterSlice(messages, convertModelMessage), addToolName)...) } tags.set(tracespec.Output, parseAny(ctx, output, false)) + + case adk.ComponentOfAgent: + agentOutput := adk.ConvAgentCallbackOutput(output) + if agentOutput != nil { + agentTags := d.parseAgentOutput(ctx, info, agentOutput) + for k, v := range agentTags { + tags.set(k, v) + } + } + default: if level == 1 && d.enableAggrMessageOutput { tags.set(tracespec.Output, collectOutput) @@ -555,7 +576,211 @@ func parseSpanTypeFromComponent(c components.Component) string { case compose.ComponentOfGraph: return "graph" + case adk.ComponentOfAgent: + return spanTypeAgent + default: return string(c) } } + +const ( + spanTypeAgent = "Agent" + attrKeyAgentName = "agent_name" + attrKeyRunMode = "run_mode" +) + +func (d defaultDataParser) parseAgentInput(ctx context.Context, info *callbacks.RunInfo, input *adk.AgentCallbackInput) map[string]any { + if info == nil || input == nil { + return nil + } + + tags := make(spanTags) + + if input.Input != nil { + tags.set(attrKeyRunMode, "run") + tags.set(tracespec.Input, parseAny(ctx, input.Input.Messages, false)) + tags.set(tracespec.Stream, input.Input.EnableStreaming) + } else if input.ResumeInfo != nil { + tags.set(attrKeyRunMode, "resume") + tags.set(tracespec.Input, input.ResumeInfo) + tags.set(tracespec.Stream, input.ResumeInfo.EnableStreaming) + } + + tags.set(attrKeyAgentName, info.Name) + + return tags +} + +func (d defaultDataParser) parseAgentOutput(ctx context.Context, info *callbacks.RunInfo, output *adk.AgentCallbackOutput) map[string]any { + if info == nil || output == nil || output.Events == nil { + return nil + } + + var events []map[string]any + + for { + event, ok := output.Events.Next() + if !ok { + break + } + + eventData := serializeAgentEvent(ctx, event) + if eventData != nil { + events = append(events, eventData) + } + } + + tags := make(spanTags) + if len(events) != 0 { + tags.set(tracespec.Output, events) + } + + return tags +} + +func serializeAgentEvent(_ context.Context, event *adk.AgentEvent) map[string]any { + if event == nil { + return nil + } + + result := make(map[string]any) + + if event.AgentName != "" { + result["agent_name"] = event.AgentName + } + + if len(event.RunPath) > 0 { + result["run_path"] = serializeRunPath(event.RunPath) + } + + if event.Output != nil { + if event.Output.MessageOutput != nil { + msg, _, err := adk.GetMessage(event) + if err != nil { + result["message_error"] = err.Error() + } else if msg != nil { + result["message"] = msg + } + } + if event.Output.CustomizedOutput != nil { + result["customized_output"] = event.Output.CustomizedOutput + } + } + + if event.Action != nil { + result["action"] = serializeAgentAction(event.Action) + } + + if event.Err != nil { + result["error"] = event.Err.Error() + } + + if len(result) == 0 { + return nil + } + + return result +} + +func serializeRunPath(runPath []adk.RunStep) string { + if len(runPath) == 0 { + return "" + } + + var parts []string + for _, step := range runPath { + parts = append(parts, step.String()) + } + + return strings.Join(parts, " -> ") +} + +func serializeAgentAction(action *adk.AgentAction) map[string]any { + if action == nil { + return nil + } + + result := make(map[string]any) + + if action.Exit { + result["exit"] = true + } + + if action.Interrupted != nil { + result["interrupted"] = serializeInterruptInfo(action.Interrupted) + } + + if action.TransferToAgent != nil { + result["transfer_to_agent"] = action.TransferToAgent.DestAgentName + } + + if action.BreakLoop != nil { + result["break_loop"] = true + } + + if action.CustomizedAction != nil { + result["customized_action"] = action.CustomizedAction + } + + return result +} + +func serializeInterruptInfo(info *adk.InterruptInfo) map[string]any { + if info == nil { + return nil + } + + result := make(map[string]any) + + if info.Data != nil { + rv := reflect.ValueOf(info.Data) + if rv.Kind() != reflect.Slice || rv.Type().Elem().Kind() != reflect.Uint8 { + result["data"] = info.Data + } + } + + if len(info.InterruptContexts) > 0 { + var contexts []map[string]any + for _, ctx := range info.InterruptContexts { + if ctx != nil { + contexts = append(contexts, serializeInterruptCtx(ctx)) + } + } + if len(contexts) > 0 { + result["interrupt_contexts"] = contexts + } + } + + return result +} + +func serializeInterruptCtx(ctx *adk.InterruptCtx) map[string]any { + if ctx == nil { + return nil + } + + result := make(map[string]any) + + if ctx.ID != "" { + result["id"] = ctx.ID + } + + if len(ctx.Address) > 0 { + result["address"] = ctx.Address.String() + } + + if ctx.Info != nil { + result["info"] = ctx.Info + } + + if ctx.IsRootCause { + result["is_root_cause"] = true + } + + if ctx.Parent != nil { + result["parent"] = serializeInterruptCtx(ctx.Parent) + } + + return result +} diff --git a/callbacks/cozeloop/go.mod b/callbacks/cozeloop/go.mod index 3959852a7..6b49407e1 100644 --- a/callbacks/cozeloop/go.mod +++ b/callbacks/cozeloop/go.mod @@ -4,9 +4,9 @@ go 1.23.0 require ( github.com/bytedance/mockey v1.2.14 - github.com/bytedance/sonic v1.14.1 - github.com/cloudwego/eino v0.7.21 - github.com/coze-dev/cozeloop-go v0.1.17 + github.com/bytedance/sonic v1.15.0 + github.com/cloudwego/eino v0.8.0 + github.com/coze-dev/cozeloop-go v0.1.20 github.com/coze-dev/cozeloop-go/spec v0.1.8 github.com/smartystreets/goconvey v1.8.1 github.com/stretchr/testify v1.10.0 @@ -17,7 +17,7 @@ require ( github.com/bluele/gcache v0.0.2 // indirect github.com/buger/jsonparser v1.1.1 // indirect github.com/bytedance/gopkg v0.1.3 // indirect - github.com/bytedance/sonic/loader v0.3.0 // indirect + github.com/bytedance/sonic/loader v0.5.0 // indirect github.com/cloudwego/base64x v0.1.6 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/dustin/go-humanize v1.0.1 // indirect diff --git a/callbacks/cozeloop/go.sum b/callbacks/cozeloop/go.sum index 689e17254..01a900ac5 100644 --- a/callbacks/cozeloop/go.sum +++ b/callbacks/cozeloop/go.sum @@ -17,15 +17,19 @@ github.com/bytedance/mockey v1.2.14 h1:KZaFgPdiUwW+jOWFieo3Lr7INM1P+6adO3hxZhDsw github.com/bytedance/mockey v1.2.14/go.mod h1:1BPHF9sol5R1ud/+0VEHGQq/+i2lN+GTsr3O2Q9IENY= github.com/bytedance/sonic v1.14.1 h1:FBMC0zVz5XUmE4z9wF4Jey0An5FueFvOsTKKKtwIl7w= github.com/bytedance/sonic v1.14.1/go.mod h1:gi6uhQLMbTdeP0muCnrjHLeCUPyb70ujhnNlhOylAFc= +github.com/bytedance/sonic v1.15.0/go.mod h1:tFkWrPz0/CUCLEF4ri4UkHekCIcdnkqXw9VduqpJh0k= github.com/bytedance/sonic/loader v0.3.0 h1:dskwH8edlzNMctoruo8FPTJDF3vLtDT0sXZwvZJyqeA= github.com/bytedance/sonic/loader v0.3.0/go.mod h1:N8A3vUdtUebEY2/VQC0MyhYeKUFosQU6FxH2JmUe6VI= +github.com/bytedance/sonic/loader v0.5.0/go.mod h1:AR4NYCk5DdzZizZ5djGqQ92eEhCCcdf5x77udYiSJRo= github.com/certifi/gocertifi v0.0.0-20190105021004-abcd57078448/go.mod h1:GJKEexRPVJrBSOjoqN5VNOIKJ5Q3RViH6eu3puDRwx4= github.com/cloudwego/base64x v0.1.6 h1:t11wG9AECkCDk5fMSoxmufanudBtJ+/HemLstXDLI2M= github.com/cloudwego/base64x v0.1.6/go.mod h1:OFcloc187FXDaYHvrNIjxSe8ncn0OOM8gEHfghB2IPU= -github.com/cloudwego/eino v0.7.21 h1:kkq7hlHYzwkGOAMbY4ffym4oBT7e9g5hXpJTsZbhsik= -github.com/cloudwego/eino v0.7.21/go.mod h1:nA8Vacmuqv3pqKBQbTWENBLQ8MmGmPt/WqiyLeB8ohQ= -github.com/coze-dev/cozeloop-go v0.1.17 h1:oUd5B7O7BuaFgp8V9JtLui9YfIsDw0jFit8BsII4/qM= -github.com/coze-dev/cozeloop-go v0.1.17/go.mod h1:lM7cmUEZlnAlQYdwfk4Li0SC3RdZ++QMHX75nvKceSc= +github.com/cloudwego/eino v0.8.0-alpha.14 h1:7rwp6aIWyUK6KOyXeKqW7/th6spL7jimlMR2nRdPYz4= +github.com/cloudwego/eino v0.8.0-alpha.14/go.mod h1:R/UcCjduEJKpdX4rSYPmBd61NP/NN8gdqQxh7skdv4c= +github.com/cloudwego/eino v0.8.0 h1:DLbrgEAloA+l7aR2qim7qQocQB48DjPrb8LzG3PYMHY= +github.com/cloudwego/eino v0.8.0/go.mod h1:+2N4nsMPxA6kGBHpH+75JuTfEcGprAMTdsZESrShKpU= +github.com/coze-dev/cozeloop-go v0.1.20 h1:RO/o5cm8Nu71hG+7yoveA53oY9Q24u7NJBYZ9KBEDIo= +github.com/coze-dev/cozeloop-go v0.1.20/go.mod h1:lM7cmUEZlnAlQYdwfk4Li0SC3RdZ++QMHX75nvKceSc= github.com/coze-dev/cozeloop-go/spec v0.1.8 h1:hFVBj/C1B6mUNGH/q52kO2n1pXuTomG578RbKlfYLGk= github.com/coze-dev/cozeloop-go/spec v0.1.8/go.mod h1:/f3BrWehffwXIpd4b5rYIqktLd/v5dlLBw0h9F/LQIU= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= @@ -123,6 +127,7 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+ github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= diff --git a/callbacks/cozeloop/trace_handler.go b/callbacks/cozeloop/trace_handler.go index abc331c14..3823398ad 100644 --- a/callbacks/cozeloop/trace_handler.go +++ b/callbacks/cozeloop/trace_handler.go @@ -23,6 +23,7 @@ import ( "github.com/cloudwego/eino-ext/callbacks/cozeloop/internal/async" "github.com/cloudwego/eino-ext/callbacks/cozeloop/internal/consts" + "github.com/cloudwego/eino/adk" "github.com/cloudwego/eino/callbacks" "github.com/cloudwego/eino/schema" "github.com/coze-dev/cozeloop-go" @@ -102,6 +103,10 @@ func (l *einoTracer) OnStart(ctx context.Context, info *callbacks.RunInfo, input l.setRunInfo(ctx, span, info) l.setSpanContext(ctx, span) + if info.Component == adk.ComponentOfAgent { + span.SetBaggage(ctx, map[string]string{attrKeyAgentName: info.Name}) + } + if l.parser != nil { span.SetTags(ctx, l.parser.ParseInput(ctx, info, input)) } @@ -122,6 +127,25 @@ func (l *einoTracer) OnEnd(ctx context.Context, info *callbacks.RunInfo, output return ctx } + if info.Component == adk.ComponentOfAgent { + if l.parser != nil { + go func() { + defer func() { + if e := recover(); e != nil { + l.logger.CtxWarnf(ctx, "[einoTracer][OnEnd] recovered: %s", e) + } + span.Finish(ctx) + }() + + tags := l.parser.ParseOutput(ctx, info, output) + span.SetTags(ctx, tags) + }() + } else { + span.Finish(ctx) + } + return ctx + } + var tags map[string]any if l.parser != nil { tags = l.parser.ParseOutput(ctx, info, output) @@ -183,6 +207,10 @@ func (l *einoTracer) OnStartWithStreamInput(ctx context.Context, info *callbacks l.setRunInfo(ctx, span, info) l.setSpanContext(ctx, span) + if info.Component == adk.ComponentOfAgent { + span.SetBaggage(ctx, map[string]string{attrKeyAgentName: info.Name}) + } + if l.parser != nil { go func() { defer func() {