Skip to content

Commit 653b9b4

Browse files
authored
[fix][prompt] fix ptaas trace (#171)
fix the issue of ptaas trace prompt_executor span's output content probability loss. Change-Id: I1447f2e267c60cdd7a5cdcfedd127cd8eaf8d859
1 parent 1f2a854 commit 653b9b4

File tree

2 files changed

+21
-23
lines changed

2 files changed

+21
-23
lines changed

backend/modules/prompt/application/openapi.go

Lines changed: 14 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -424,8 +424,11 @@ func (p *PromptOpenAPIApplicationImpl) doExecuteStreaming(ctx context.Context, r
424424

425425
// 执行prompt流式调用
426426
resultStream := make(chan *entity.Reply)
427-
errChan := make(chan error)
428-
replyResultChan := make(chan *entity.Reply, 1) // 用于接收aggregatedReply,避免数据竞争
427+
type replyResult struct {
428+
Reply *entity.Reply
429+
Err error
430+
}
431+
replyResultChan := make(chan replyResult) // 用于接收aggregatedReply, error,避免数据竞争
429432
goroutine.GoSafe(ctx, func() {
430433
var executeErr error
431434
var localAggregatedReply *entity.Reply
@@ -436,12 +439,10 @@ func (p *PromptOpenAPIApplicationImpl) doExecuteStreaming(ctx context.Context, r
436439
}
437440
// 确保errChan和resultStream被关闭
438441
close(resultStream)
439-
if executeErr != nil {
440-
errChan <- executeErr
441-
} else {
442-
replyResultChan <- localAggregatedReply
442+
replyResultChan <- replyResult{
443+
Reply: localAggregatedReply,
444+
Err: executeErr,
443445
}
444-
close(errChan)
445446
close(replyResultChan)
446447
}()
447448

@@ -482,23 +483,19 @@ func (p *PromptOpenAPIApplicationImpl) doExecuteStreaming(ctx context.Context, r
482483
return promptDO, nil, err
483484
}
484485
}
485-
var ok bool
486486
select { //nolint:staticcheck
487-
case err, ok = <-errChan:
488-
if !ok {
487+
case result := <-replyResultChan:
488+
if result.Err == nil {
489489
logs.CtxInfo(ctx, "execute streaming finished")
490+
return promptDO, result.Reply, nil
490491
} else {
491-
if st, ok := status.FromError(err); ok && st.Code() == codes.Canceled {
492-
err = nil
492+
if st, ok := status.FromError(result.Err); ok && st.Code() == codes.Canceled {
493493
logs.CtxWarn(ctx, "execute streaming canceled")
494494
} else {
495-
logs.CtxError(ctx, "execute streaming failed, err=%v", err)
495+
logs.CtxError(ctx, "execute streaming failed, err=%v", result.Err)
496496
}
497+
return promptDO, nil, result.Err
497498
}
498-
return promptDO, aggregatedReply, err
499-
case aggregatedReply = <-replyResultChan:
500-
logs.CtxInfo(ctx, "execute streaming finished")
501-
return promptDO, aggregatedReply, nil
502499
}
503500
}
504501

backend/modules/prompt/domain/component/trace/trace_span_convertor.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -66,11 +66,12 @@ func MessageToSpanMessage(message *entity.Message) *tracespec.ModelMessage {
6666
return nil
6767
}
6868
return &tracespec.ModelMessage{
69-
Role: RoleToSpanRole(message.Role),
70-
Content: ptr.From(message.Content),
71-
Parts: ContentPartsToSpanParts(message.Parts),
72-
ToolCalls: ToolCallsToSpanToolCalls(message.ToolCalls),
73-
ToolCallID: ptr.From(message.ToolCallID),
69+
Role: RoleToSpanRole(message.Role),
70+
Content: ptr.From(message.Content),
71+
ReasoningContent: ptr.From(message.ReasoningContent),
72+
Parts: ContentPartsToSpanParts(message.Parts),
73+
ToolCalls: ToolCallsToSpanToolCalls(message.ToolCalls),
74+
ToolCallID: ptr.From(message.ToolCallID),
7475
}
7576
}
7677

@@ -125,7 +126,7 @@ func ContentTypeToSpanPartType(partType entity.ContentType) tracespec.ModelMessa
125126
switch partType {
126127
case entity.ContentTypeText:
127128
return tracespec.ModelMessagePartTypeText
128-
case entity.ContentTypeImageURL:
129+
case entity.ContentTypeImageURL, entity.ContentTypeBase64Data:
129130
return tracespec.ModelMessagePartTypeImage
130131
case entity.ContentTypeMultiPartVariable:
131132
return "multi_part_variable"

0 commit comments

Comments
 (0)