Skip to content
7 changes: 5 additions & 2 deletions cmd/maxx/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/awsl-project/maxx/internal/core"
"github.com/awsl-project/maxx/internal/executor"
"github.com/awsl-project/maxx/internal/handler"
"github.com/awsl-project/maxx/internal/health"
"github.com/awsl-project/maxx/internal/repository/cached"
"github.com/awsl-project/maxx/internal/repository/sqlite"
"github.com/awsl-project/maxx/internal/router"
Expand Down Expand Up @@ -192,8 +193,10 @@ func main() {
}
log.Printf("[Startup] Caches loaded (%v)", time.Since(startupStep))

providerHealthTracker := health.NewTracker()

// Create router
r := router.NewRouter(cachedRouteRepo, cachedProviderRepo, cachedRoutingStrategyRepo, cachedRetryConfigRepo, cachedProjectRepo)
r := router.NewRouter(cachedRouteRepo, cachedProviderRepo, cachedRoutingStrategyRepo, cachedRetryConfigRepo, cachedProjectRepo, providerHealthTracker)

// Initialize provider adapters
startupStep = time.Now()
Expand Down Expand Up @@ -279,7 +282,7 @@ func main() {
statsAggregator := stats.NewStatsAggregator(usageStatsRepo)

// Create executor
requestExecutor := executor.NewExecutor(r, proxyRequestRepo, attemptRepo, cachedRetryConfigRepo, cachedSessionRepo, cachedModelMappingRepo, settingRepo, wsHub, projectWaiter, instanceID, statsAggregator)
requestExecutor := executor.NewExecutor(r, proxyRequestRepo, attemptRepo, cachedRetryConfigRepo, cachedSessionRepo, cachedModelMappingRepo, settingRepo, wsHub, projectWaiter, instanceID, statsAggregator, providerHealthTracker)

// Create client adapter
clientAdapter := client.NewAdapter()
Expand Down
2 changes: 2 additions & 0 deletions internal/adapter/provider/antigravity/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,7 @@ func (a *AntigravityAdapter) Execute(c *flow.Ctx, provider *domain.Provider) err
proxyErr.IsNetworkError = true // Mark as network error (connection timeout, DNS failure, etc.)
return proxyErr
}
resp.Body = flow.WrapResponseBody(c, resp.Body)

// Check for 401 (token expired) and retry once
if resp.StatusCode == http.StatusUnauthorized {
Expand Down Expand Up @@ -273,6 +274,7 @@ func (a *AntigravityAdapter) Execute(c *flow.Ctx, provider *domain.Provider) err
proxyErr.IsNetworkError = true // Mark as network error
return proxyErr
}
resp.Body = flow.WrapResponseBody(c, resp.Body)
}

// Check for error response
Expand Down
3 changes: 2 additions & 1 deletion internal/adapter/provider/claude/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ func (a *ClaudeAdapter) Execute(c *flow.Ctx, provider *domain.Provider) error {
proxyErr.IsNetworkError = true
return proxyErr
}
resp.Body = flow.WrapResponseBody(c, resp.Body)
defer resp.Body.Close()

// Handle 401 (token expired) - refresh and retry once
Expand Down Expand Up @@ -165,6 +166,7 @@ func (a *ClaudeAdapter) Execute(c *flow.Ctx, provider *domain.Provider) error {
proxyErr.IsNetworkError = true
return proxyErr
}
resp.Body = flow.WrapResponseBody(c, resp.Body)
defer resp.Body.Close()
}

Expand Down Expand Up @@ -691,7 +693,6 @@ func extractModelFromResponse(body []byte) string {
return ""
}


var claudeFilteredHeaders = map[string]bool{
// Hop-by-hop headers
"connection": true,
Expand Down
3 changes: 3 additions & 0 deletions internal/adapter/provider/cliproxyapi_antigravity/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,9 @@ func (a *CLIProxyAPIAntigravityAdapter) executeNonStream(c *flow.Ctx, w http.Res
if c.Request != nil {
ctx = c.Request.Context()
}
// CPA 非流式接口会缓冲完整响应后一次性返回,无法暴露真实上游首字节。
// 这类路径退回到 TotalTimeout 控制,避免把大响应误判成 first-byte timeout。
flow.DisableFirstByteTimeout(c)

resp, err := a.executor.Execute(ctx, a.authObj, execReq, execOpts)
if err != nil {
Expand Down
5 changes: 4 additions & 1 deletion internal/adapter/provider/cliproxyapi_codex/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ import (
"github.com/awsl-project/maxx/internal/domain"
"github.com/awsl-project/maxx/internal/flow"
"github.com/awsl-project/maxx/internal/usage"
"github.com/tidwall/sjson"
"github.com/router-for-me/CLIProxyAPI/v6/sdk/cliproxy/auth"
"github.com/router-for-me/CLIProxyAPI/v6/sdk/cliproxy/executor"
"github.com/router-for-me/CLIProxyAPI/v6/sdk/exec"
"github.com/router-for-me/CLIProxyAPI/v6/sdk/translator"
"github.com/tidwall/sjson"
)

// TokenCache caches access tokens
Expand Down Expand Up @@ -271,6 +271,9 @@ func (a *CLIProxyAPICodexAdapter) executeNonStream(c *flow.Ctx, w http.ResponseW
if c.Request != nil {
ctx = c.Request.Context()
}
// CPA 非流式接口会缓冲完整响应后一次性返回,无法暴露真实上游首字节。
// 这类路径退回到 TotalTimeout 控制,避免把大响应误判成 first-byte timeout。
flow.DisableFirstByteTimeout(c)

resp, err := a.executor.Execute(ctx, a.authObj, execReq, execOpts)
if err != nil {
Expand Down
3 changes: 2 additions & 1 deletion internal/adapter/provider/codex/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ func (a *CodexAdapter) Execute(c *flow.Ctx, provider *domain.Provider) error {
proxyErr.IsNetworkError = true
return proxyErr
}
resp.Body = flow.WrapResponseBody(c, resp.Body)
defer resp.Body.Close()

// Handle 401 (token expired) - refresh and retry once
Expand Down Expand Up @@ -207,6 +208,7 @@ func (a *CodexAdapter) Execute(c *flow.Ctx, provider *domain.Provider) error {
proxyErr.IsNetworkError = true
return proxyErr
}
resp.Body = flow.WrapResponseBody(c, resp.Body)
defer resp.Body.Close()
}

Expand Down Expand Up @@ -688,7 +690,6 @@ func extractModelFromResponse(body []byte) string {
return ""
}


// applyCodexHeaders applies headers for Codex API requests
// It follows the CLIProxyAPI pattern: passthrough client headers, use defaults only when missing
func (a *CodexAdapter) applyCodexHeaders(upstreamReq, clientReq *http.Request, accessToken, accountID string, stream bool, cacheID string) {
Expand Down
1 change: 1 addition & 0 deletions internal/adapter/provider/custom/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ func (a *CustomAdapter) Execute(c *flow.Ctx, provider *domain.Provider) error {
proxyErr.IsNetworkError = true
return proxyErr
}
resp.Body = flow.WrapResponseBody(c, resp.Body)
defer resp.Body.Close()

// Check for error response
Expand Down
2 changes: 2 additions & 0 deletions internal/adapter/provider/kiro/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ func (a *KiroAdapter) Execute(c *flow.Ctx, provider *domain.Provider) error {
proxyErr.IsNetworkError = true
return proxyErr
}
resp.Body = flow.WrapResponseBody(c, resp.Body)
defer resp.Body.Close()

// Check for 401 (token expired) and retry once
Expand Down Expand Up @@ -171,6 +172,7 @@ func (a *KiroAdapter) Execute(c *flow.Ctx, provider *domain.Provider) error {
proxyErr.IsNetworkError = true
return proxyErr
}
resp.Body = flow.WrapResponseBody(c, resp.Body)
defer resp.Body.Close()
}

Expand Down
4 changes: 4 additions & 0 deletions internal/core/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/awsl-project/maxx/internal/event"
"github.com/awsl-project/maxx/internal/executor"
"github.com/awsl-project/maxx/internal/handler"
"github.com/awsl-project/maxx/internal/health"
"github.com/awsl-project/maxx/internal/pricing"
"github.com/awsl-project/maxx/internal/repository"
"github.com/awsl-project/maxx/internal/repository/cached"
Expand Down Expand Up @@ -254,12 +255,14 @@ func InitializeServerComponents(
}

log.Printf("[Core] Creating router")
providerHealthTracker := health.NewTracker()
r := router.NewRouter(
repos.CachedRouteRepo,
repos.CachedProviderRepo,
repos.CachedRoutingStrategyRepo,
repos.CachedRetryConfigRepo,
repos.CachedProjectRepo,
providerHealthTracker,
)

log.Printf("[Core] Initializing provider adapters")
Expand Down Expand Up @@ -322,6 +325,7 @@ func InitializeServerComponents(
projectWaiter,
instanceID,
statsAggregator,
providerHealthTracker,
)

log.Printf("[Core] Creating client adapter")
Expand Down
1 change: 1 addition & 0 deletions internal/domain/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ type BackupRoute struct {
ClientType ClientType `json:"clientType"`
ProviderName string `json:"providerName"`
Position int `json:"position"`
Weight int `json:"weight"`
RetryConfigName string `json:"retryConfigName"` // empty = default
}

Expand Down
3 changes: 3 additions & 0 deletions internal/domain/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,9 @@ type Route struct {
// 位置,数字越小越优先
Position int `json:"position"`

// 权重,仅用于 weighted_random 策略,<=0 视为 1
Weight int `json:"weight"`

// 重试配置,0 表示使用系统默认
RetryConfigID uint64 `json:"retryConfigID"`
}
Expand Down
57 changes: 57 additions & 0 deletions internal/executor/attempt_budget.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package executor

import "time"

type AttemptBudget struct {
RequestTimeout time.Duration
TotalTimeout time.Duration
FirstByteTimeout time.Duration
StreamIdleTimeout time.Duration
MaxRetryAfter time.Duration
MaxRetryWait time.Duration
}

func DefaultAttemptBudget() AttemptBudget {
return AttemptBudget{
RequestTimeout: 45 * time.Second,
TotalTimeout: 30 * time.Second,
FirstByteTimeout: 10 * time.Second,
StreamIdleTimeout: 15 * time.Second,
MaxRetryAfter: 30 * time.Second,
MaxRetryWait: 10 * time.Second,
}
}

func (b AttemptBudget) ClampRetryWait(wait time.Duration) time.Duration {
if wait <= 0 {
return 0
}
if b.MaxRetryWait > 0 && wait > b.MaxRetryWait {
return b.MaxRetryWait
}
return wait
}

func (b AttemptBudget) RemainingSince(start time.Time) time.Duration {
if b.TotalTimeout <= 0 {
return 0
}

remaining := b.TotalTimeout - time.Since(start)
if remaining < 0 {
return 0
}
return remaining
}

func (b AttemptBudget) RequestRemainingSince(start time.Time) time.Duration {
if b.RequestTimeout <= 0 {
return 0
}

remaining := b.RequestTimeout - time.Since(start)
if remaining < 0 {
return 0
}
return remaining
}
79 changes: 79 additions & 0 deletions internal/executor/attempt_budget_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package executor

import (
"testing"
"time"
)

func TestDefaultAttemptBudgetProvidesPositiveCaps(t *testing.T) {
budget := DefaultAttemptBudget()

if budget.RequestTimeout <= 0 {
t.Fatalf("RequestTimeout = %v, want > 0", budget.RequestTimeout)
}
if budget.TotalTimeout <= 0 {
t.Fatalf("TotalTimeout = %v, want > 0", budget.TotalTimeout)
}
if budget.FirstByteTimeout <= 0 {
t.Fatalf("FirstByteTimeout = %v, want > 0", budget.FirstByteTimeout)
}
if budget.StreamIdleTimeout <= 0 {
t.Fatalf("StreamIdleTimeout = %v, want > 0", budget.StreamIdleTimeout)
}
if budget.MaxRetryAfter <= 0 {
t.Fatalf("MaxRetryAfter = %v, want > 0", budget.MaxRetryAfter)
}
if budget.MaxRetryWait <= 0 {
t.Fatalf("MaxRetryWait = %v, want > 0", budget.MaxRetryWait)
}
}

func TestAttemptBudgetCapsRetryWait(t *testing.T) {
budget := AttemptBudget{
MaxRetryWait: 5 * time.Second,
}

got := budget.ClampRetryWait(30 * time.Second)

if got != 5*time.Second {
t.Fatalf("ClampRetryWait(30s) = %v, want 5s", got)
}
}

func TestAttemptBudgetKeepsShorterRetryWait(t *testing.T) {
budget := AttemptBudget{
MaxRetryWait: 5 * time.Second,
}

got := budget.ClampRetryWait(2 * time.Second)

if got != 2*time.Second {
t.Fatalf("ClampRetryWait(2s) = %v, want 2s", got)
}
}

func TestAttemptBudgetReturnsZeroWhenRemainingBudgetExhausted(t *testing.T) {
budget := AttemptBudget{
TotalTimeout: 2 * time.Second,
}

start := time.Now().Add(-3 * time.Second)
got := budget.RemainingSince(start)

if got != 0 {
t.Fatalf("RemainingSince(exhausted) = %v, want 0", got)
}
}

func TestAttemptBudgetReturnsZeroWhenRequestBudgetExhausted(t *testing.T) {
budget := AttemptBudget{
RequestTimeout: 2 * time.Second,
}

start := time.Now().Add(-3 * time.Second)
got := budget.RequestRemainingSince(start)

if got != 0 {
t.Fatalf("RequestRemainingSince(exhausted) = %v, want 0", got)
}
}
Loading