Skip to content

Commit e558893

Browse files
committed
feat: 流式输出性能优化
1 parent f83cb83 commit e558893

File tree

1 file changed

+49
-26
lines changed

1 file changed

+49
-26
lines changed

internal/proxy/server.go

Lines changed: 49 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
package proxy
33

44
import (
5+
"bufio"
56
"bytes"
67
"context"
78
"encoding/json"
@@ -24,6 +25,8 @@ import (
2425
var ignorableStreamErrors = []string{
2526
"context canceled",
2627
"connection reset by peer",
28+
"broken pipe",
29+
"use of closed network connection",
2730
}
2831

2932
// isIgnorableStreamError checks if the error is a common, non-critical error that can occur
@@ -59,7 +62,7 @@ func NewProxyServer(keyManager types.KeyManager, configManager types.ConfigManag
5962
MaxIdleConnsPerHost: 20,
6063
MaxConnsPerHost: 0,
6164
IdleConnTimeout: time.Duration(openaiConfig.IdleConnTimeout) * time.Second,
62-
TLSHandshakeTimeout: time.Duration(openaiConfig.ResponseTimeout) * time.Second,
65+
TLSHandshakeTimeout: 15 * time.Second,
6366
ExpectContinueTimeout: 1 * time.Second,
6467
DisableCompression: !perfConfig.EnableGzip,
6568
ForceAttemptHTTP2: true,
@@ -73,12 +76,12 @@ func NewProxyServer(keyManager types.KeyManager, configManager types.ConfigManag
7376
MaxIdleConnsPerHost: 40,
7477
MaxConnsPerHost: 0,
7578
IdleConnTimeout: time.Duration(openaiConfig.IdleConnTimeout) * time.Second,
76-
TLSHandshakeTimeout: time.Duration(openaiConfig.ResponseTimeout) * time.Second,
79+
TLSHandshakeTimeout: 15 * time.Second,
7780
ExpectContinueTimeout: 1 * time.Second,
78-
DisableCompression: true, // Always disable compression for streaming
81+
DisableCompression: true,
7982
ForceAttemptHTTP2: true,
80-
WriteBufferSize: 64 * 1024,
81-
ReadBufferSize: 64 * 1024,
83+
WriteBufferSize: 0,
84+
ReadBufferSize: 0,
8285
ResponseHeaderTimeout: time.Duration(openaiConfig.ResponseTimeout) * time.Second,
8386
}
8487

@@ -132,6 +135,11 @@ func (ps *ProxyServer) HandleProxy(c *gin.Context) {
132135

133136
// isStreamRequest determines if this is a streaming request
134137
func (ps *ProxyServer) isStreamRequest(bodyBytes []byte, c *gin.Context) bool {
138+
// Check for Gemini streaming endpoint
139+
if strings.HasSuffix(c.Request.URL.Path, ":streamGenerateContent") {
140+
return true
141+
}
142+
135143
// Check Accept header
136144
if strings.Contains(c.GetHeader("Accept"), "text/event-stream") {
137145
return true
@@ -144,9 +152,11 @@ func (ps *ProxyServer) isStreamRequest(bodyBytes []byte, c *gin.Context) bool {
144152

145153
// Check stream parameter in request body
146154
if len(bodyBytes) > 0 {
147-
if strings.Contains(string(bodyBytes), `"stream":true`) ||
148-
strings.Contains(string(bodyBytes), `"stream": true`) {
149-
return true
155+
var bodyJSON map[string]interface{}
156+
if err := json.Unmarshal(bodyBytes, &bodyJSON); err == nil {
157+
if stream, ok := bodyJSON["stream"].(bool); ok && stream {
158+
return true
159+
}
150160
}
151161
}
152162

@@ -398,13 +408,16 @@ func (ps *ProxyServer) executeRequestWithRetry(c *gin.Context, startTime time.Ti
398408
}
399409
}
400410

411+
var newline = []byte("\n")
412+
401413
// handleStreamingResponse handles streaming responses
402414
func (ps *ProxyServer) handleStreamingResponse(c *gin.Context, resp *http.Response) {
403415
// Set headers for streaming
404416
c.Header("Cache-Control", "no-cache")
405417
c.Header("Connection", "keep-alive")
418+
c.Header("Content-Type", "text/event-stream")
419+
c.Header("X-Accel-Buffering", "no")
406420

407-
// Stream response directly
408421
flusher, ok := c.Writer.(http.Flusher)
409422
if !ok {
410423
logrus.Error("Streaming unsupported")
@@ -415,26 +428,36 @@ func (ps *ProxyServer) handleStreamingResponse(c *gin.Context, resp *http.Respon
415428
return
416429
}
417430

418-
// Copy streaming data with optimized buffer size
419-
buffer := make([]byte, 32*1024) // 32KB buffer for better performance
420-
for {
421-
n, err := resp.Body.Read(buffer)
422-
if n > 0 {
423-
if _, writeErr := c.Writer.Write(buffer[:n]); writeErr != nil {
424-
logrus.Errorf("Failed to write streaming data: %v", writeErr)
425-
break
431+
scanner := bufio.NewScanner(resp.Body)
432+
scanner.Buffer(make([]byte, 0, 64*1024), 1024*1024)
433+
434+
for scanner.Scan() {
435+
lineBytes := scanner.Bytes()
436+
if _, err := c.Writer.Write(lineBytes); err != nil {
437+
if isIgnorableStreamError(err) {
438+
logrus.Debugf("Stream closed by client: %v", err)
439+
} else {
440+
logrus.Errorf("Failed to write streaming data: %v", err)
426441
}
427-
flusher.Flush()
442+
return
428443
}
429-
if err != nil {
430-
if err != io.EOF {
431-
if isIgnorableStreamError(err) {
432-
logrus.Debugf("Stream closed by client or network: %v", err)
433-
} else {
434-
logrus.Errorf("Error reading streaming response: %v", err)
435-
}
444+
if _, err := c.Writer.Write(newline); err != nil {
445+
if isIgnorableStreamError(err) {
446+
logrus.Debugf("Stream closed by client: %v", err)
447+
} else {
448+
logrus.Errorf("Failed to write streaming data: %v", err)
436449
}
437-
break
450+
return
451+
}
452+
453+
flusher.Flush()
454+
}
455+
456+
if err := scanner.Err(); err != nil {
457+
if isIgnorableStreamError(err) {
458+
logrus.Debugf("Stream closed by client or network: %v", err)
459+
} else {
460+
logrus.Errorf("Error reading streaming response: %v", err)
438461
}
439462
}
440463
}

0 commit comments

Comments
 (0)