Skip to content

Commit 16e1fa0

Browse files
authored
refactor: 将格式转换逻辑从 Adapter 移至 Executor 层 (#120)
将请求/响应格式转换职责从各 Provider Adapter 集中到 Executor 层: - 在 Executor 中检测客户端类型与 Provider 支持的类型是否匹配 - 新增 ConvertingResponseWriter 用于流式/非流式响应的格式转换 - 简化 CustomAdapter,移除其格式转换逻辑 - 更新 AntigravityAdapter 支持的客户端类型(移除 OpenAI,由 Executor 转换处理) - 新增 OriginalClientType 上下文字段以追踪原始客户端类型 - 实时处理 Adapter 事件以提升请求可见性
1 parent 151c076 commit 16e1fa0

File tree

6 files changed

+394
-79
lines changed

6 files changed

+394
-79
lines changed

internal/adapter/provider/antigravity/adapter.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,9 @@ func NewAdapter(p *domain.Provider) (provider.ProviderAdapter, error) {
4949
}
5050

5151
func (a *AntigravityAdapter) SupportedClientTypes() []domain.ClientType {
52-
// Antigravity natively supports Claude, OpenAI, and Gemini by converting to Gemini/v1internal API
53-
return []domain.ClientType{domain.ClientTypeClaude, domain.ClientTypeOpenAI, domain.ClientTypeGemini}
52+
// Antigravity natively supports Claude and Gemini by converting to Gemini/v1internal API
53+
// OpenAI requests will be converted to Claude format by Executor before reaching this adapter
54+
return []domain.ClientType{domain.ClientTypeClaude, domain.ClientTypeGemini}
5455
}
5556

5657
func (a *AntigravityAdapter) Execute(ctx context.Context, w http.ResponseWriter, req *http.Request, provider *domain.Provider) error {

internal/adapter/provider/custom/adapter.go

Lines changed: 29 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ import (
1414

1515
"github.com/awsl-project/maxx/internal/adapter/provider"
1616
ctxutil "github.com/awsl-project/maxx/internal/context"
17-
"github.com/awsl-project/maxx/internal/converter"
1817
"github.com/awsl-project/maxx/internal/domain"
1918
"github.com/awsl-project/maxx/internal/usage"
2019
)
@@ -24,17 +23,15 @@ func init() {
2423
}
2524

2625
type CustomAdapter struct {
27-
provider *domain.Provider
28-
converter *converter.Registry
26+
provider *domain.Provider
2927
}
3028

3129
func NewAdapter(p *domain.Provider) (provider.ProviderAdapter, error) {
3230
if p.Config == nil || p.Config.Custom == nil {
3331
return nil, fmt.Errorf("provider %s missing custom config", p.Name)
3432
}
3533
return &CustomAdapter{
36-
provider: p,
37-
converter: converter.NewRegistry(),
34+
provider: p,
3835
}, nil
3936
}
4037

@@ -50,22 +47,12 @@ func (a *CustomAdapter) Execute(ctx context.Context, w http.ResponseWriter, req
5047
// Determine if streaming
5148
stream := isStreamRequest(requestBody)
5249

53-
// Determine target client type for the provider
54-
// If provider supports the client's type natively, use it directly
55-
// Otherwise, find a supported type and convert
56-
targetType := clientType
57-
needsConversion := false
58-
if !a.supportsClientType(clientType) {
59-
// Find a supported type (prefer OpenAI as it's most common)
60-
for _, supported := range a.provider.SupportedClientTypes {
61-
targetType = supported
62-
break
63-
}
64-
needsConversion = true
65-
}
50+
// Note: Format conversion is now handled by Executor layer
51+
// The clientType in context is already the correct type that this provider supports
52+
// We use clientType directly for URL building and auth header selection
6653

6754
// Build upstream URL
68-
baseURL := a.getBaseURL(targetType)
55+
baseURL := a.getBaseURL(clientType)
6956
requestURI := ctxutil.GetRequestURI(ctx)
7057

7158
// For Gemini, update model in URL path if mapping is configured
@@ -87,7 +74,7 @@ func (a *CustomAdapter) Execute(ctx context.Context, w http.ResponseWriter, req
8774

8875
// Override auth headers with provider's credentials
8976
if a.provider.Config.Custom.APIKey != "" {
90-
setAuthHeader(upstreamReq, targetType, a.provider.Config.Custom.APIKey)
77+
setAuthHeader(upstreamReq, clientType, a.provider.Config.Custom.APIKey)
9178
}
9279

9380
// Send request info via EventChannel
@@ -100,12 +87,14 @@ func (a *CustomAdapter) Execute(ctx context.Context, w http.ResponseWriter, req
10087
})
10188
}
10289

103-
// Execute request
104-
client := &http.Client{}
90+
// Execute request with reasonable timeout
91+
client := &http.Client{
92+
Timeout: 10 * time.Minute, // Long timeout for LLM requests
93+
}
10594
resp, err := client.Do(upstreamReq)
10695
if err != nil {
10796
proxyErr := domain.NewProxyErrorWithMessage(domain.ErrUpstreamError, true, "failed to connect to upstream")
108-
proxyErr.IsNetworkError = true // Mark as network error (connection timeout, DNS failure, etc.)
97+
proxyErr.IsNetworkError = true
10998
return proxyErr
11099
}
111100
defer resp.Body.Close()
@@ -144,10 +133,12 @@ func (a *CustomAdapter) Execute(ctx context.Context, w http.ResponseWriter, req
144133
}
145134

146135
// Handle response
136+
// Note: Response format conversion is handled by Executor's ConvertingResponseWriter
137+
// Adapters simply pass through the upstream response
147138
if stream {
148-
return a.handleStreamResponse(ctx, w, resp, clientType, targetType, needsConversion)
139+
return a.handleStreamResponse(ctx, w, resp, clientType)
149140
}
150-
return a.handleNonStreamResponse(ctx, w, resp, clientType, targetType, needsConversion)
141+
return a.handleNonStreamResponse(ctx, w, resp, clientType)
151142
}
152143

153144
func (a *CustomAdapter) supportsClientType(ct domain.ClientType) bool {
@@ -167,7 +158,7 @@ func (a *CustomAdapter) getBaseURL(clientType domain.ClientType) string {
167158
return config.BaseURL
168159
}
169160

170-
func (a *CustomAdapter) handleNonStreamResponse(ctx context.Context, w http.ResponseWriter, resp *http.Response, clientType, targetType domain.ClientType, needsConversion bool) error {
161+
func (a *CustomAdapter) handleNonStreamResponse(ctx context.Context, w http.ResponseWriter, resp *http.Response, clientType domain.ClientType) error {
171162
body, err := io.ReadAll(resp.Body)
172163
if err != nil {
173164
return domain.NewProxyErrorWithMessage(domain.ErrUpstreamError, true, "failed to read upstream response")
@@ -197,28 +188,21 @@ func (a *CustomAdapter) handleNonStreamResponse(ctx context.Context, w http.Resp
197188
}
198189

199190
// Extract and send responseModel
200-
if responseModel := extractResponseModel(body, targetType); responseModel != "" {
191+
if responseModel := extractResponseModel(body, clientType); responseModel != "" {
201192
eventChan.SendResponseModel(responseModel)
202193
}
203194

204-
var responseBody []byte
205-
if needsConversion {
206-
responseBody, err = a.converter.TransformResponse(targetType, clientType, body)
207-
if err != nil {
208-
return domain.NewProxyErrorWithMessage(domain.ErrFormatConversion, false, "failed to transform response")
209-
}
210-
} else {
211-
responseBody = body
212-
}
195+
// Note: Response format conversion is handled by Executor's ConvertingResponseWriter
196+
// Adapter simply passes through the upstream response body
213197

214198
// Copy upstream headers (except those we override)
215199
copyResponseHeaders(w.Header(), resp.Header)
216200
w.WriteHeader(resp.StatusCode)
217-
_, _ = w.Write(responseBody)
201+
_, _ = w.Write(body)
218202
return nil
219203
}
220204

221-
func (a *CustomAdapter) handleStreamResponse(ctx context.Context, w http.ResponseWriter, resp *http.Response, clientType, targetType domain.ClientType, needsConversion bool) error {
205+
func (a *CustomAdapter) handleStreamResponse(ctx context.Context, w http.ResponseWriter, resp *http.Response, clientType domain.ClientType) error {
222206
eventChan := ctxutil.GetEventChan(ctx)
223207

224208
// Send initial response info (for streaming, we only capture status and headers)
@@ -251,10 +235,8 @@ func (a *CustomAdapter) handleStreamResponse(ctx context.Context, w http.Respons
251235
return domain.NewProxyErrorWithMessage(domain.ErrUpstreamError, false, "streaming not supported")
252236
}
253237

254-
var state *converter.TransformState
255-
if needsConversion {
256-
state = converter.NewTransformState()
257-
}
238+
// Note: Response format conversion is handled by Executor's ConvertingResponseWriter
239+
// Adapter simply passes through the upstream SSE data
258240

259241
// Collect all SSE events for response body and token extraction
260242
var sseBuffer strings.Builder
@@ -285,7 +267,7 @@ func (a *CustomAdapter) handleStreamResponse(ctx context.Context, w http.Respons
285267
}
286268

287269
// Extract and send responseModel
288-
if responseModel := extractResponseModelFromSSE(sseBuffer.String(), targetType); responseModel != "" {
270+
if responseModel := extractResponseModelFromSSE(sseBuffer.String(), clientType); responseModel != "" {
289271
eventChan.SendResponseModel(responseModel)
290272
}
291273
}
@@ -369,20 +351,10 @@ func (a *CustomAdapter) handleStreamResponse(ctx context.Context, w http.Respons
369351
}
370352
}
371353

372-
var output []byte
373-
if needsConversion {
374-
// Transform the chunk
375-
transformed, transformErr := a.converter.TransformStreamChunk(targetType, clientType, []byte(line), state)
376-
if transformErr != nil {
377-
continue // Skip malformed chunks
378-
}
379-
output = transformed
380-
} else {
381-
output = []byte(line)
382-
}
383-
384-
if len(output) > 0 {
385-
_, writeErr := w.Write(output)
354+
// Note: Response format conversion is handled by Executor's ConvertingResponseWriter
355+
// Adapter simply passes through the upstream SSE data
356+
if len(line) > 0 {
357+
_, writeErr := w.Write([]byte(line))
386358
if writeErr != nil {
387359
// Client disconnected
388360
sendFinalEvents()

internal/context/context.go

Lines changed: 27 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -11,28 +11,33 @@ import (
1111
type contextKey string
1212

1313
const (
14-
CtxKeyClientType contextKey = "client_type"
15-
CtxKeySessionID contextKey = "session_id"
16-
CtxKeyProjectID contextKey = "project_id"
17-
CtxKeyRequestModel contextKey = "request_model"
18-
CtxKeyMappedModel contextKey = "mapped_model"
19-
CtxKeyResponseModel contextKey = "response_model"
20-
CtxKeyProxyRequest contextKey = "proxy_request"
21-
CtxKeyRequestBody contextKey = "request_body"
22-
CtxKeyUpstreamAttempt contextKey = "upstream_attempt"
23-
CtxKeyRequestHeaders contextKey = "request_headers"
24-
CtxKeyRequestURI contextKey = "request_uri"
25-
CtxKeyBroadcaster contextKey = "broadcaster"
26-
CtxKeyIsStream contextKey = "is_stream"
27-
CtxKeyAPITokenID contextKey = "api_token_id"
28-
CtxKeyEventChan contextKey = "event_chan"
14+
CtxKeyClientType contextKey = "client_type"
15+
CtxKeyOriginalClientType contextKey = "original_client_type" // Original client type before format conversion
16+
CtxKeySessionID contextKey = "session_id"
17+
CtxKeyProjectID contextKey = "project_id"
18+
CtxKeyRequestModel contextKey = "request_model"
19+
CtxKeyMappedModel contextKey = "mapped_model"
20+
CtxKeyResponseModel contextKey = "response_model"
21+
CtxKeyProxyRequest contextKey = "proxy_request"
22+
CtxKeyRequestBody contextKey = "request_body"
23+
CtxKeyUpstreamAttempt contextKey = "upstream_attempt"
24+
CtxKeyRequestHeaders contextKey = "request_headers"
25+
CtxKeyRequestURI contextKey = "request_uri"
26+
CtxKeyBroadcaster contextKey = "broadcaster"
27+
CtxKeyIsStream contextKey = "is_stream"
28+
CtxKeyAPITokenID contextKey = "api_token_id"
29+
CtxKeyEventChan contextKey = "event_chan"
2930
)
3031

3132
// Setters
3233
func WithClientType(ctx context.Context, ct domain.ClientType) context.Context {
3334
return context.WithValue(ctx, CtxKeyClientType, ct)
3435
}
3536

37+
func WithOriginalClientType(ctx context.Context, ct domain.ClientType) context.Context {
38+
return context.WithValue(ctx, CtxKeyOriginalClientType, ct)
39+
}
40+
3641
func WithSessionID(ctx context.Context, sid string) context.Context {
3742
return context.WithValue(ctx, CtxKeySessionID, sid)
3843
}
@@ -81,6 +86,13 @@ func GetClientType(ctx context.Context) domain.ClientType {
8186
return ""
8287
}
8388

89+
func GetOriginalClientType(ctx context.Context) domain.ClientType {
90+
if v, ok := ctx.Value(CtxKeyOriginalClientType).(domain.ClientType); ok {
91+
return v
92+
}
93+
return ""
94+
}
95+
8496
func GetSessionID(ctx context.Context) string {
8597
if v, ok := ctx.Value(CtxKeySessionID).(string); ok {
8698
return v

0 commit comments

Comments
 (0)