Skip to content

Commit 20985d1

Browse files
committed
Refactor executor error handling and usage reporting
- Updated the Execute methods in various executors (GeminiCLIExecutor, GeminiExecutor, IFlowExecutor, OpenAICompatExecutor, QwenExecutor) to return a response and error as named return values for improved clarity. - Enhanced error handling by deferring failure tracking in usage reporters, ensuring that failures are reported correctly. - Improved response body handling by ensuring proper closure and error logging for HTTP responses across all executors. - Added failure tracking and reporting in the usage reporter to capture unsuccessful requests. - Updated the usage logging structure to include a 'Failed' field for better tracking of request outcomes. - Adjusted the logic in the RequestStatistics and Record methods to accommodate the new failure tracking mechanism.
1 parent 67f5538 commit 20985d1

File tree

11 files changed

+390
-215
lines changed

11 files changed

+390
-215
lines changed

internal/api/handlers/management/usage.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,5 +13,8 @@ func (h *Handler) GetUsageStatistics(c *gin.Context) {
1313
if h != nil && h.usageStats != nil {
1414
snapshot = h.usageStats.Snapshot()
1515
}
16-
c.JSON(http.StatusOK, gin.H{"usage": snapshot})
16+
c.JSON(http.StatusOK, gin.H{
17+
"usage": snapshot,
18+
"failed_requests": snapshot.FailureCount,
19+
})
1720
}

internal/runtime/executor/claude_executor.go

Lines changed: 48 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -36,13 +36,14 @@ func (e *ClaudeExecutor) Identifier() string { return "claude" }
3636

3737
func (e *ClaudeExecutor) PrepareRequest(_ *http.Request, _ *cliproxyauth.Auth) error { return nil }
3838

39-
func (e *ClaudeExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (cliproxyexecutor.Response, error) {
39+
func (e *ClaudeExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (resp cliproxyexecutor.Response, err error) {
4040
apiKey, baseURL := claudeCreds(auth)
4141

4242
if baseURL == "" {
4343
baseURL = "https://api.anthropic.com"
4444
}
4545
reporter := newUsageReporter(ctx, e.Identifier(), req.Model, auth)
46+
defer reporter.trackFailure(ctx, &err)
4647
from := opts.SourceFormat
4748
to := sdktranslator.FromString("claude")
4849
// Use streaming translation to preserve function calling, except for claude.
@@ -56,7 +57,7 @@ func (e *ClaudeExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, r
5657
url := fmt.Sprintf("%s/v1/messages?beta=true", baseURL)
5758
httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(body))
5859
if err != nil {
59-
return cliproxyexecutor.Response{}, err
60+
return resp, err
6061
}
6162
applyClaudeHeaders(httpReq, apiKey, false)
6263
var authID, authLabel, authType, authValue string
@@ -78,38 +79,39 @@ func (e *ClaudeExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, r
7879
})
7980

8081
httpClient := newProxyAwareHTTPClient(ctx, e.cfg, auth, 0)
81-
resp, err := httpClient.Do(httpReq)
82+
httpResp, err := httpClient.Do(httpReq)
8283
if err != nil {
8384
recordAPIResponseError(ctx, e.cfg, err)
84-
return cliproxyexecutor.Response{}, err
85+
return resp, err
8586
}
8687
defer func() {
87-
if errClose := resp.Body.Close(); errClose != nil {
88+
if errClose := httpResp.Body.Close(); errClose != nil {
8889
log.Errorf("response body close error: %v", errClose)
8990
}
9091
}()
91-
recordAPIResponseMetadata(ctx, e.cfg, resp.StatusCode, resp.Header.Clone())
92-
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
93-
b, _ := io.ReadAll(resp.Body)
92+
recordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone())
93+
if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 {
94+
b, _ := io.ReadAll(httpResp.Body)
9495
appendAPIResponseChunk(ctx, e.cfg, b)
95-
log.Debugf("request error, error status: %d, error body: %s", resp.StatusCode, string(b))
96-
return cliproxyexecutor.Response{}, statusErr{code: resp.StatusCode, msg: string(b)}
96+
log.Debugf("request error, error status: %d, error body: %s", httpResp.StatusCode, string(b))
97+
err = statusErr{code: httpResp.StatusCode, msg: string(b)}
98+
return resp, err
9799
}
98-
reader := io.Reader(resp.Body)
100+
reader := io.Reader(httpResp.Body)
99101
var decoder *zstd.Decoder
100-
if hasZSTDEcoding(resp.Header.Get("Content-Encoding")) {
101-
decoder, err = zstd.NewReader(resp.Body)
102+
if hasZSTDEcoding(httpResp.Header.Get("Content-Encoding")) {
103+
decoder, err = zstd.NewReader(httpResp.Body)
102104
if err != nil {
103105
recordAPIResponseError(ctx, e.cfg, err)
104-
return cliproxyexecutor.Response{}, fmt.Errorf("failed to initialize zstd decoder: %w", err)
106+
return resp, fmt.Errorf("failed to initialize zstd decoder: %w", err)
105107
}
106108
reader = decoder
107109
defer decoder.Close()
108110
}
109111
data, err := io.ReadAll(reader)
110112
if err != nil {
111113
recordAPIResponseError(ctx, e.cfg, err)
112-
return cliproxyexecutor.Response{}, err
114+
return resp, err
113115
}
114116
appendAPIResponseChunk(ctx, e.cfg, data)
115117
if stream {
@@ -124,16 +126,18 @@ func (e *ClaudeExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, r
124126
}
125127
var param any
126128
out := sdktranslator.TranslateNonStream(ctx, to, from, req.Model, bytes.Clone(opts.OriginalRequest), body, data, &param)
127-
return cliproxyexecutor.Response{Payload: []byte(out)}, nil
129+
resp = cliproxyexecutor.Response{Payload: []byte(out)}
130+
return resp, nil
128131
}
129132

130-
func (e *ClaudeExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (<-chan cliproxyexecutor.StreamChunk, error) {
133+
func (e *ClaudeExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (stream <-chan cliproxyexecutor.StreamChunk, err error) {
131134
apiKey, baseURL := claudeCreds(auth)
132135

133136
if baseURL == "" {
134137
baseURL = "https://api.anthropic.com"
135138
}
136139
reporter := newUsageReporter(ctx, e.Identifier(), req.Model, auth)
140+
defer reporter.trackFailure(ctx, &err)
137141
from := opts.SourceFormat
138142
to := sdktranslator.FromString("claude")
139143
body := sdktranslator.TranslateRequest(from, to, req.Model, bytes.Clone(req.Payload), true)
@@ -164,27 +168,35 @@ func (e *ClaudeExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.A
164168
})
165169

166170
httpClient := newProxyAwareHTTPClient(ctx, e.cfg, auth, 0)
167-
resp, err := httpClient.Do(httpReq)
171+
httpResp, err := httpClient.Do(httpReq)
168172
if err != nil {
169173
recordAPIResponseError(ctx, e.cfg, err)
170174
return nil, err
171175
}
172-
recordAPIResponseMetadata(ctx, e.cfg, resp.StatusCode, resp.Header.Clone())
173-
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
174-
defer func() { _ = resp.Body.Close() }()
175-
b, _ := io.ReadAll(resp.Body)
176+
recordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone())
177+
if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 {
178+
b, _ := io.ReadAll(httpResp.Body)
176179
appendAPIResponseChunk(ctx, e.cfg, b)
177-
log.Debugf("request error, error status: %d, error body: %s", resp.StatusCode, string(b))
178-
return nil, statusErr{code: resp.StatusCode, msg: string(b)}
180+
log.Debugf("request error, error status: %d, error body: %s", httpResp.StatusCode, string(b))
181+
if errClose := httpResp.Body.Close(); errClose != nil {
182+
log.Errorf("response body close error: %v", errClose)
183+
}
184+
err = statusErr{code: httpResp.StatusCode, msg: string(b)}
185+
return nil, err
179186
}
180187
out := make(chan cliproxyexecutor.StreamChunk)
188+
stream = out
181189
go func() {
182190
defer close(out)
183-
defer func() { _ = resp.Body.Close() }()
191+
defer func() {
192+
if errClose := httpResp.Body.Close(); errClose != nil {
193+
log.Errorf("response body close error: %v", errClose)
194+
}
195+
}()
184196

185197
// If from == to (Claude → Claude), directly forward the SSE stream without translation
186198
if from == to {
187-
scanner := bufio.NewScanner(resp.Body)
199+
scanner := bufio.NewScanner(httpResp.Body)
188200
buf := make([]byte, 20_971_520)
189201
scanner.Buffer(buf, 20_971_520)
190202
for scanner.Scan() {
@@ -199,15 +211,16 @@ func (e *ClaudeExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.A
199211
cloned[len(line)] = '\n'
200212
out <- cliproxyexecutor.StreamChunk{Payload: cloned}
201213
}
202-
if err = scanner.Err(); err != nil {
203-
recordAPIResponseError(ctx, e.cfg, err)
204-
out <- cliproxyexecutor.StreamChunk{Err: err}
214+
if errScan := scanner.Err(); errScan != nil {
215+
recordAPIResponseError(ctx, e.cfg, errScan)
216+
reporter.publishFailure(ctx)
217+
out <- cliproxyexecutor.StreamChunk{Err: errScan}
205218
}
206219
return
207220
}
208221

209222
// For other formats, use translation
210-
scanner := bufio.NewScanner(resp.Body)
223+
scanner := bufio.NewScanner(httpResp.Body)
211224
buf := make([]byte, 20_971_520)
212225
scanner.Buffer(buf, 20_971_520)
213226
var param any
@@ -222,12 +235,13 @@ func (e *ClaudeExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.A
222235
out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunks[i])}
223236
}
224237
}
225-
if err = scanner.Err(); err != nil {
226-
recordAPIResponseError(ctx, e.cfg, err)
227-
out <- cliproxyexecutor.StreamChunk{Err: err}
238+
if errScan := scanner.Err(); errScan != nil {
239+
recordAPIResponseError(ctx, e.cfg, errScan)
240+
reporter.publishFailure(ctx)
241+
out <- cliproxyexecutor.StreamChunk{Err: errScan}
228242
}
229243
}()
230-
return out, nil
244+
return stream, nil
231245
}
232246

233247
func (e *ClaudeExecutor) CountTokens(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (cliproxyexecutor.Response, error) {

internal/runtime/executor/codex_executor.go

Lines changed: 47 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -39,13 +39,14 @@ func (e *CodexExecutor) Identifier() string { return "codex" }
3939

4040
func (e *CodexExecutor) PrepareRequest(_ *http.Request, _ *cliproxyauth.Auth) error { return nil }
4141

42-
func (e *CodexExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (cliproxyexecutor.Response, error) {
42+
func (e *CodexExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (resp cliproxyexecutor.Response, err error) {
4343
apiKey, baseURL := codexCreds(auth)
4444

4545
if baseURL == "" {
4646
baseURL = "https://chatgpt.com/backend-api/codex"
4747
}
4848
reporter := newUsageReporter(ctx, e.Identifier(), req.Model, auth)
49+
defer reporter.trackFailure(ctx, &err)
4950

5051
from := opts.SourceFormat
5152
to := sdktranslator.FromString("codex")
@@ -101,7 +102,7 @@ func (e *CodexExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, re
101102
url := strings.TrimSuffix(baseURL, "/") + "/responses"
102103
httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(body))
103104
if err != nil {
104-
return cliproxyexecutor.Response{}, err
105+
return resp, err
105106
}
106107
applyCodexHeaders(httpReq, auth, apiKey)
107108
var authID, authLabel, authType, authValue string
@@ -125,23 +126,28 @@ func (e *CodexExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, re
125126
AuthValue: authValue,
126127
})
127128
httpClient := newProxyAwareHTTPClient(ctx, e.cfg, auth, 0)
128-
resp, err := httpClient.Do(httpReq)
129+
httpResp, err := httpClient.Do(httpReq)
129130
if err != nil {
130131
recordAPIResponseError(ctx, e.cfg, err)
131-
return cliproxyexecutor.Response{}, err
132+
return resp, err
132133
}
133-
defer func() { _ = resp.Body.Close() }()
134-
recordAPIResponseMetadata(ctx, e.cfg, resp.StatusCode, resp.Header.Clone())
135-
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
136-
b, _ := io.ReadAll(resp.Body)
134+
defer func() {
135+
if errClose := httpResp.Body.Close(); errClose != nil {
136+
log.Errorf("codex executor: close response body error: %v", errClose)
137+
}
138+
}()
139+
recordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone())
140+
if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 {
141+
b, _ := io.ReadAll(httpResp.Body)
137142
appendAPIResponseChunk(ctx, e.cfg, b)
138-
log.Debugf("request error, error status: %d, error body: %s", resp.StatusCode, string(b))
139-
return cliproxyexecutor.Response{}, statusErr{code: resp.StatusCode, msg: string(b)}
143+
log.Debugf("request error, error status: %d, error body: %s", httpResp.StatusCode, string(b))
144+
err = statusErr{code: httpResp.StatusCode, msg: string(b)}
145+
return resp, err
140146
}
141-
data, err := io.ReadAll(resp.Body)
147+
data, err := io.ReadAll(httpResp.Body)
142148
if err != nil {
143149
recordAPIResponseError(ctx, e.cfg, err)
144-
return cliproxyexecutor.Response{}, err
150+
return resp, err
145151
}
146152
appendAPIResponseChunk(ctx, e.cfg, data)
147153

@@ -162,18 +168,21 @@ func (e *CodexExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, re
162168

163169
var param any
164170
out := sdktranslator.TranslateNonStream(ctx, to, from, req.Model, bytes.Clone(opts.OriginalRequest), body, line, &param)
165-
return cliproxyexecutor.Response{Payload: []byte(out)}, nil
171+
resp = cliproxyexecutor.Response{Payload: []byte(out)}
172+
return resp, nil
166173
}
167-
return cliproxyexecutor.Response{}, statusErr{code: 408, msg: "stream error: stream disconnected before completion: stream closed before response.completed"}
174+
err = statusErr{code: 408, msg: "stream error: stream disconnected before completion: stream closed before response.completed"}
175+
return resp, err
168176
}
169177

170-
func (e *CodexExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (<-chan cliproxyexecutor.StreamChunk, error) {
178+
func (e *CodexExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (stream <-chan cliproxyexecutor.StreamChunk, err error) {
171179
apiKey, baseURL := codexCreds(auth)
172180

173181
if baseURL == "" {
174182
baseURL = "https://chatgpt.com/backend-api/codex"
175183
}
176184
reporter := newUsageReporter(ctx, e.Identifier(), req.Model, auth)
185+
defer reporter.trackFailure(ctx, &err)
177186

178187
from := opts.SourceFormat
179188
to := sdktranslator.FromString("codex")
@@ -253,28 +262,36 @@ func (e *CodexExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Au
253262
})
254263

255264
httpClient := newProxyAwareHTTPClient(ctx, e.cfg, auth, 0)
256-
resp, err := httpClient.Do(httpReq)
265+
httpResp, err := httpClient.Do(httpReq)
257266
if err != nil {
258267
recordAPIResponseError(ctx, e.cfg, err)
259268
return nil, err
260269
}
261-
recordAPIResponseMetadata(ctx, e.cfg, resp.StatusCode, resp.Header.Clone())
262-
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
263-
defer func() { _ = resp.Body.Close() }()
264-
b, readErr := io.ReadAll(resp.Body)
270+
recordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone())
271+
if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 {
272+
data, readErr := io.ReadAll(httpResp.Body)
273+
if errClose := httpResp.Body.Close(); errClose != nil {
274+
log.Errorf("codex executor: close response body error: %v", errClose)
275+
}
265276
if readErr != nil {
266277
recordAPIResponseError(ctx, e.cfg, readErr)
267278
return nil, readErr
268279
}
269-
appendAPIResponseChunk(ctx, e.cfg, b)
270-
log.Debugf("request error, error status: %d, error body: %s", resp.StatusCode, string(b))
271-
return nil, statusErr{code: resp.StatusCode, msg: string(b)}
280+
appendAPIResponseChunk(ctx, e.cfg, data)
281+
log.Debugf("request error, error status: %d, error body: %s", httpResp.StatusCode, string(data))
282+
err = statusErr{code: httpResp.StatusCode, msg: string(data)}
283+
return nil, err
272284
}
273285
out := make(chan cliproxyexecutor.StreamChunk)
286+
stream = out
274287
go func() {
275288
defer close(out)
276-
defer func() { _ = resp.Body.Close() }()
277-
scanner := bufio.NewScanner(resp.Body)
289+
defer func() {
290+
if errClose := httpResp.Body.Close(); errClose != nil {
291+
log.Errorf("codex executor: close response body error: %v", errClose)
292+
}
293+
}()
294+
scanner := bufio.NewScanner(httpResp.Body)
278295
buf := make([]byte, 20_971_520)
279296
scanner.Buffer(buf, 20_971_520)
280297
var param any
@@ -296,12 +313,13 @@ func (e *CodexExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Au
296313
out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunks[i])}
297314
}
298315
}
299-
if err = scanner.Err(); err != nil {
300-
recordAPIResponseError(ctx, e.cfg, err)
301-
out <- cliproxyexecutor.StreamChunk{Err: err}
316+
if errScan := scanner.Err(); errScan != nil {
317+
recordAPIResponseError(ctx, e.cfg, errScan)
318+
reporter.publishFailure(ctx)
319+
out <- cliproxyexecutor.StreamChunk{Err: errScan}
302320
}
303321
}()
304-
return out, nil
322+
return stream, nil
305323
}
306324

307325
func (e *CodexExecutor) CountTokens(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (cliproxyexecutor.Response, error) {

0 commit comments

Comments
 (0)