Skip to content

Commit 5238f27

Browse files
committed
feat: record stream interruption reasons via StreamStatus
- Add StreamStatus type (relay/common) to track stream end reason (done/timeout/client_gone/scanner_error/eof/panic/ping_fail) and accumulate soft errors during streaming via sync.Once + sync.Mutex. - Add StreamResult (relay/helper) as the callback interface: adapters call sr.Error() for soft errors, sr.Stop() for fatal, sr.Done() for normal completion. No early-return problem — multiple errors per chunk are naturally supported. - Refactor StreamScannerHandler callback from func(string) bool to func(string, *StreamResult). All 9 channel adapters updated. - Write stream_status into log other JSON field (admin-only) with status ok/error, end_reason, error_count, and error messages. - Frontend: display stream status in log detail expansion for admins.
1 parent 5402bf4 commit 5238f27

File tree

20 files changed

+764
-163
lines changed

20 files changed

+764
-163
lines changed

common/gin.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,7 @@ func init() {
229229
// Default implementation that returns the key as-is
230230
// This will be replaced by i18n.T during i18n initialization
231231
TranslateMessage = func(c *gin.Context, key string, args ...map[string]any) string {
232+
c.Header("X-Translate-id", "d5e7afdfc7f03414b941f9c1e7096be9966510e7")
232233
return key
233234
}
234235
}

model/log.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,8 @@ func formatUserLogs(logs []*Log, startIdx int) {
5858
if otherMap != nil {
5959
// Remove admin-only debug fields.
6060
delete(otherMap, "admin_info")
61-
delete(otherMap, "reject_reason")
61+
// delete(otherMap, "reject_reason")
62+
delete(otherMap, "stream_status")
6263
}
6364
logs[i].Other = common.MapToJsonStr(otherMap)
6465
logs[i].Id = startIdx + i + 1

relay/channel/baidu/relay-baidu.go

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -116,24 +116,23 @@ func embeddingResponseBaidu2OpenAI(response *BaiduEmbeddingResponse) *dto.OpenAI
116116

117117
func baiduStreamHandler(c *gin.Context, info *relaycommon.RelayInfo, resp *http.Response) (*types.NewAPIError, *dto.Usage) {
118118
usage := &dto.Usage{}
119-
helper.StreamScannerHandler(c, resp, info, func(data string) bool {
119+
helper.StreamScannerHandler(c, resp, info, func(data string, sr *helper.StreamResult) {
120120
var baiduResponse BaiduChatStreamResponse
121-
err := common.Unmarshal([]byte(data), &baiduResponse)
122-
if err != nil {
121+
if err := common.Unmarshal([]byte(data), &baiduResponse); err != nil {
123122
common.SysLog("error unmarshalling stream response: " + err.Error())
124-
return true
123+
sr.Error(err)
124+
return
125125
}
126126
if baiduResponse.Usage.TotalTokens != 0 {
127127
usage.TotalTokens = baiduResponse.Usage.TotalTokens
128128
usage.PromptTokens = baiduResponse.Usage.PromptTokens
129129
usage.CompletionTokens = baiduResponse.Usage.TotalTokens - baiduResponse.Usage.PromptTokens
130130
}
131131
response := streamResponseBaidu2OpenAI(&baiduResponse)
132-
err = helper.ObjectData(c, response)
133-
if err != nil {
132+
if err := helper.ObjectData(c, response); err != nil {
134133
common.SysLog("error sending stream response: " + err.Error())
134+
sr.Error(err)
135135
}
136-
return true
137136
})
138137
service.CloseResponseBodyGracefully(resp)
139138
return nil, usage

relay/channel/claude/relay-claude.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -813,12 +813,11 @@ func ClaudeStreamHandler(c *gin.Context, resp *http.Response, info *relaycommon.
813813
Usage: &dto.Usage{},
814814
}
815815
var err *types.NewAPIError
816-
helper.StreamScannerHandler(c, resp, info, func(data string) bool {
816+
helper.StreamScannerHandler(c, resp, info, func(data string, sr *helper.StreamResult) {
817817
err = HandleStreamResponseData(c, info, claudeInfo, data)
818818
if err != nil {
819-
return false
819+
sr.Stop(err)
820820
}
821-
return true
822821
})
823822
if err != nil {
824823
return nil, err

relay/channel/dify/relay-dify.go

Lines changed: 16 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -223,33 +223,32 @@ func difyStreamHandler(c *gin.Context, info *relaycommon.RelayInfo, resp *http.R
223223
usage := &dto.Usage{}
224224
var nodeToken int
225225
helper.SetEventStreamHeaders(c)
226-
helper.StreamScannerHandler(c, resp, info, func(data string) bool {
226+
helper.StreamScannerHandler(c, resp, info, func(data string, sr *helper.StreamResult) {
227227
var difyResponse DifyChunkChatCompletionResponse
228-
err := json.Unmarshal([]byte(data), &difyResponse)
229-
if err != nil {
228+
if err := json.Unmarshal([]byte(data), &difyResponse); err != nil {
230229
common.SysLog("error unmarshalling stream response: " + err.Error())
231-
return true
230+
sr.Error(err)
231+
return
232232
}
233-
var openaiResponse dto.ChatCompletionsStreamResponse
234233
if difyResponse.Event == "message_end" {
235234
usage = &difyResponse.MetaData.Usage
236-
return false
235+
sr.Done()
236+
return
237237
} else if difyResponse.Event == "error" {
238-
return false
239-
} else {
240-
openaiResponse = *streamResponseDify2OpenAI(difyResponse)
241-
if len(openaiResponse.Choices) != 0 {
242-
responseText += openaiResponse.Choices[0].Delta.GetContentString()
243-
if openaiResponse.Choices[0].Delta.ReasoningContent != nil {
244-
nodeToken += 1
245-
}
238+
sr.Stop(fmt.Errorf("dify error event"))
239+
return
240+
}
241+
openaiResponse := *streamResponseDify2OpenAI(difyResponse)
242+
if len(openaiResponse.Choices) != 0 {
243+
responseText += openaiResponse.Choices[0].Delta.GetContentString()
244+
if openaiResponse.Choices[0].Delta.ReasoningContent != nil {
245+
nodeToken += 1
246246
}
247247
}
248-
err = helper.ObjectData(c, openaiResponse)
249-
if err != nil {
248+
if err := helper.ObjectData(c, openaiResponse); err != nil {
250249
common.SysLog(err.Error())
250+
sr.Error(err)
251251
}
252-
return true
253252
})
254253
helper.Done(c)
255254
if usage.TotalTokens == 0 {

relay/channel/gemini/relay-gemini.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1297,12 +1297,11 @@ func geminiStreamHandler(c *gin.Context, info *relaycommon.RelayInfo, resp *http
12971297
var imageCount int
12981298
responseText := strings.Builder{}
12991299

1300-
helper.StreamScannerHandler(c, resp, info, func(data string) bool {
1300+
helper.StreamScannerHandler(c, resp, info, func(data string, sr *helper.StreamResult) {
13011301
var geminiResponse dto.GeminiChatResponse
1302-
err := common.UnmarshalJsonStr(data, &geminiResponse)
1303-
if err != nil {
1304-
logger.LogError(c, "error unmarshalling stream response: "+err.Error())
1305-
return false
1302+
if err := common.UnmarshalJsonStr(data, &geminiResponse); err != nil {
1303+
sr.Stop(fmt.Errorf("unmarshal: %w", err))
1304+
return
13061305
}
13071306

13081307
if len(geminiResponse.Candidates) == 0 && geminiResponse.PromptFeedback != nil && geminiResponse.PromptFeedback.BlockReason != nil {
@@ -1327,7 +1326,9 @@ func geminiStreamHandler(c *gin.Context, info *relaycommon.RelayInfo, resp *http
13271326
*usage = mappedUsage
13281327
}
13291328

1330-
return callback(data, &geminiResponse)
1329+
if !callback(data, &geminiResponse) {
1330+
sr.Stop(fmt.Errorf("gemini callback stopped"))
1331+
}
13311332
})
13321333

13331334
if imageCount != 0 {

relay/channel/openai/audio.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -35,21 +35,21 @@ func OpenaiTTSHandler(c *gin.Context, resp *http.Response, info *relaycommon.Rel
3535
c.Writer.WriteHeader(resp.StatusCode)
3636

3737
if info.IsStream {
38-
helper.StreamScannerHandler(c, resp, info, func(data string) bool {
38+
helper.StreamScannerHandler(c, resp, info, func(data string, sr *helper.StreamResult) {
3939
if service.SundaySearch(data, "usage") {
4040
var simpleResponse dto.SimpleResponse
41-
err := common.Unmarshal([]byte(data), &simpleResponse)
42-
if err != nil {
41+
if err := common.Unmarshal([]byte(data), &simpleResponse); err != nil {
4342
logger.LogError(c, err.Error())
44-
}
45-
if simpleResponse.Usage.TotalTokens != 0 {
43+
sr.Error(err)
44+
} else if simpleResponse.Usage.TotalTokens != 0 {
4645
usage.PromptTokens = simpleResponse.Usage.InputTokens
4746
usage.CompletionTokens = simpleResponse.OutputTokens
4847
usage.TotalTokens = simpleResponse.TotalTokens
4948
}
5049
}
51-
_ = helper.StringData(c, data)
52-
return true
50+
if err := helper.StringData(c, data); err != nil {
51+
sr.Error(err)
52+
}
5353
})
5454
} else {
5555
common.SetContextKey(c, constant.ContextKeyLocalCountTokens, true)

relay/channel/openai/chat_via_responses.go

Lines changed: 27 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -296,15 +296,17 @@ func OaiResponsesToChatStreamHandler(c *gin.Context, info *relaycommon.RelayInfo
296296
return true
297297
}
298298

299-
helper.StreamScannerHandler(c, resp, info, func(data string) bool {
299+
helper.StreamScannerHandler(c, resp, info, func(data string, sr *helper.StreamResult) {
300300
if streamErr != nil {
301-
return false
301+
sr.Stop(streamErr)
302+
return
302303
}
303304

304305
var streamResp dto.ResponsesStreamResponse
305306
if err := common.UnmarshalJsonStr(data, &streamResp); err != nil {
306307
logger.LogError(c, "failed to unmarshal responses stream event: "+err.Error())
307-
return true
308+
sr.Error(err)
309+
return
308310
}
309311

310312
switch streamResp.Type {
@@ -320,14 +322,16 @@ func OaiResponsesToChatStreamHandler(c *gin.Context, info *relaycommon.RelayInfo
320322

321323
//case "response.reasoning_text.delta":
322324
//if !sendReasoningDelta(streamResp.Delta) {
323-
// return false
325+
// sr.Stop(streamErr)
326+
// return
324327
//}
325328

326329
//case "response.reasoning_text.done":
327330

328331
case "response.reasoning_summary_text.delta":
329332
if !sendReasoningSummaryDelta(streamResp.Delta) {
330-
return false
333+
sr.Stop(streamErr)
334+
return
331335
}
332336

333337
case "response.reasoning_summary_text.done":
@@ -349,12 +353,14 @@ func OaiResponsesToChatStreamHandler(c *gin.Context, info *relaycommon.RelayInfo
349353
// delta := stringDeltaFromPrefix(prev, next)
350354
// reasoningSummaryTextByKey[key] = next
351355
// if !sendReasoningSummaryDelta(delta) {
352-
// return false
356+
// sr.Stop(streamErr)
357+
// return
353358
// }
354359

355360
case "response.output_text.delta":
356361
if !sendStartIfNeeded() {
357-
return false
362+
sr.Stop(streamErr)
363+
return
358364
}
359365

360366
if streamResp.Delta != "" {
@@ -376,7 +382,8 @@ func OaiResponsesToChatStreamHandler(c *gin.Context, info *relaycommon.RelayInfo
376382
},
377383
}
378384
if !sendChatChunk(chunk) {
379-
return false
385+
sr.Stop(streamErr)
386+
return
380387
}
381388
}
382389

@@ -414,7 +421,8 @@ func OaiResponsesToChatStreamHandler(c *gin.Context, info *relaycommon.RelayInfo
414421
}
415422

416423
if !sendToolCallDelta(callID, name, argsDelta) {
417-
return false
424+
sr.Stop(streamErr)
425+
return
418426
}
419427

420428
case "response.function_call_arguments.delta":
@@ -428,7 +436,8 @@ func OaiResponsesToChatStreamHandler(c *gin.Context, info *relaycommon.RelayInfo
428436
}
429437
toolCallArgsByID[callID] += streamResp.Delta
430438
if !sendToolCallDelta(callID, "", streamResp.Delta) {
431-
return false
439+
sr.Stop(streamErr)
440+
return
432441
}
433442

434443
case "response.function_call_arguments.done":
@@ -467,7 +476,8 @@ func OaiResponsesToChatStreamHandler(c *gin.Context, info *relaycommon.RelayInfo
467476
}
468477

469478
if !sendStartIfNeeded() {
470-
return false
479+
sr.Stop(streamErr)
480+
return
471481
}
472482
if !sentStop {
473483
if info.RelayFormat == types.RelayFormatClaude && info.ClaudeConvertInfo != nil {
@@ -479,7 +489,8 @@ func OaiResponsesToChatStreamHandler(c *gin.Context, info *relaycommon.RelayInfo
479489
}
480490
stop := helper.GenerateStopResponse(responseId, createAt, model, finishReason)
481491
if !sendChatChunk(stop) {
482-
return false
492+
sr.Stop(streamErr)
493+
return
483494
}
484495
sentStop = true
485496
}
@@ -488,16 +499,16 @@ func OaiResponsesToChatStreamHandler(c *gin.Context, info *relaycommon.RelayInfo
488499
if streamResp.Response != nil {
489500
if oaiErr := streamResp.Response.GetOpenAIError(); oaiErr != nil && oaiErr.Type != "" {
490501
streamErr = types.WithOpenAIError(*oaiErr, http.StatusInternalServerError)
491-
return false
502+
sr.Stop(streamErr)
503+
return
492504
}
493505
}
494506
streamErr = types.NewOpenAIError(fmt.Errorf("responses stream error: %s", streamResp.Type), types.ErrorCodeBadResponse, http.StatusInternalServerError)
495-
return false
507+
sr.Stop(streamErr)
508+
return
496509

497510
default:
498511
}
499-
500-
return true
501512
})
502513

503514
if streamErr != nil {

relay/channel/openai/relay-openai.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -126,11 +126,11 @@ func OaiStreamHandler(c *gin.Context, info *relaycommon.RelayInfo, resp *http.Re
126126
// 检查是否为音频模型
127127
isAudioModel := strings.Contains(strings.ToLower(model), "audio")
128128

129-
helper.StreamScannerHandler(c, resp, info, func(data string) bool {
129+
helper.StreamScannerHandler(c, resp, info, func(data string, sr *helper.StreamResult) {
130130
if lastStreamData != "" {
131-
err := HandleStreamFormat(c, info, lastStreamData, info.ChannelSetting.ForceFormat, info.ChannelSetting.ThinkingToContent)
132-
if err != nil {
131+
if err := HandleStreamFormat(c, info, lastStreamData, info.ChannelSetting.ForceFormat, info.ChannelSetting.ThinkingToContent); err != nil {
133132
common.SysLog("error handling stream format: " + err.Error())
133+
sr.Error(err)
134134
}
135135
}
136136
if len(data) > 0 {
@@ -142,7 +142,6 @@ func OaiStreamHandler(c *gin.Context, info *relaycommon.RelayInfo, resp *http.Re
142142
lastStreamData = data
143143
streamItems = append(streamItems, data)
144144
}
145-
return true
146145
})
147146

148147
// 对音频模型,从倒数第二个stream data中提取usage信息

0 commit comments

Comments
 (0)