Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 63 additions & 16 deletions cmd/maxx/main.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
package main

import (
"context"
"flag"
"fmt"
"log"
"net/http"
"os"
"os/signal"
"path/filepath"
"syscall"
"time"

"github.com/awsl-project/maxx/internal/adapter/client"
Expand All @@ -18,9 +21,9 @@ import (
"github.com/awsl-project/maxx/internal/handler"
"github.com/awsl-project/maxx/internal/repository/cached"
"github.com/awsl-project/maxx/internal/repository/sqlite"
"github.com/awsl-project/maxx/internal/stats"
"github.com/awsl-project/maxx/internal/router"
"github.com/awsl-project/maxx/internal/service"
"github.com/awsl-project/maxx/internal/stats"
"github.com/awsl-project/maxx/internal/version"
"github.com/awsl-project/maxx/internal/waiter"
)
Expand Down Expand Up @@ -118,6 +121,12 @@ func main() {
} else if count > 0 {
log.Printf("Marked %d stale requests as failed", count)
}
// Also mark stale upstream attempts as failed
if count, err := attemptRepo.MarkStaleAttemptsFailed(); err != nil {
log.Printf("Warning: Failed to mark stale attempts: %v", err)
} else if count > 0 {
log.Printf("Marked %d stale upstream attempts as failed", count)
}

// Create cached repositories
cachedProviderRepo := cached.NewProviderRepository(providerRepo)
Expand Down Expand Up @@ -228,6 +237,7 @@ func main() {
responseModelRepo,
*addr,
r, // Router implements ProviderAdapterRefresher interface
wsHub,
)

// Create backup service
Expand Down Expand Up @@ -257,8 +267,12 @@ func main() {
log.Println("Proxy token authentication is enabled")
}

// Create request tracker for graceful shutdown
requestTracker := core.NewRequestTracker()

// Create handlers
proxyHandler := handler.NewProxyHandler(clientAdapter, exec, cachedSessionRepo, tokenAuthMiddleware)
proxyHandler.SetRequestTracker(requestTracker)
adminHandler := handler.NewAdminHandler(adminService, backupService, logPath)
authHandler := handler.NewAuthHandler(authMiddleware)
antigravityHandler := handler.NewAntigravityHandler(adminService, antigravityQuotaRepo, wsHub)
Expand Down Expand Up @@ -309,22 +323,55 @@ func main() {
// Wrap with logging middleware
loggedMux := handler.LoggingMiddleware(mux)

// Start server
// Create HTTP server
server := &http.Server{
Addr: *addr,
Handler: loggedMux,
}

// Start server in goroutine
log.Printf("Starting Maxx server %s on %s", version.Info(), *addr)
log.Printf("Data directory: %s", dataDirPath)
log.Printf(" Database: %s", dbPath)
log.Printf(" Log file: %s", logPath)
log.Printf("Admin API: http://localhost%s/api/admin/", *addr)
log.Printf("WebSocket: ws://localhost%s/ws", *addr)
log.Printf("Proxy endpoints:")
log.Printf(" Claude: http://localhost%s/v1/messages", *addr)
log.Printf(" OpenAI: http://localhost%s/v1/chat/completions", *addr)
log.Printf(" Codex: http://localhost%s/v1/responses", *addr)
log.Printf(" Gemini: http://localhost%s/v1beta/models/{model}:generateContent", *addr)
log.Printf("Project proxy: http://localhost%s/{project-slug}/v1/messages (etc.)", *addr)

if err := http.ListenAndServe(*addr, loggedMux); err != nil {
log.Printf("Server error: %v", err)
os.Exit(1)

go func() {
if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
log.Printf("Server error: %v", err)
os.Exit(1)
}
}()

// Wait for interrupt signal (SIGINT or SIGTERM)
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
sig := <-sigCh
log.Printf("Received signal %v, initiating graceful shutdown...", sig)

// Step 1: Wait for active proxy requests to complete
activeCount := requestTracker.ActiveCount()
if activeCount > 0 {
log.Printf("Waiting for %d active proxy requests to complete...", activeCount)
completed := requestTracker.GracefulShutdown(core.GracefulShutdownTimeout)
if !completed {
log.Printf("Graceful shutdown timeout, some requests may be interrupted")
} else {
log.Printf("All proxy requests completed successfully")
}
} else {
// Mark as shutting down to reject new requests
requestTracker.GracefulShutdown(0)
log.Printf("No active proxy requests")
}

// Step 2: Shutdown HTTP server
shutdownCtx, cancel := context.WithTimeout(context.Background(), core.HTTPShutdownTimeout)
defer cancel()

if err := server.Shutdown(shutdownCtx); err != nil {
log.Printf("HTTP server graceful shutdown failed: %v, forcing close", err)
if closeErr := server.Close(); closeErr != nil {
log.Printf("Force close error: %v", closeErr)
}
}

log.Printf("Server stopped")
}
64 changes: 64 additions & 0 deletions coverage.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
mode: set
github.com/awsl-project/maxx/internal/stats/aggregator.go:14.90,18.2 1 0
github.com/awsl-project/maxx/internal/stats/aggregator.go:21.46,23.2 1 0
github.com/awsl-project/maxx/internal/stats/pure.go:35.93,37.11 2 1
github.com/awsl-project/maxx/internal/stats/pure.go:38.32,39.33 1 1
github.com/awsl-project/maxx/internal/stats/pure.go:40.30,41.31 1 1
github.com/awsl-project/maxx/internal/stats/pure.go:42.29,43.66 1 1
github.com/awsl-project/maxx/internal/stats/pure.go:44.30,47.19 2 1
github.com/awsl-project/maxx/internal/stats/pure.go:47.19,49.4 1 1
github.com/awsl-project/maxx/internal/stats/pure.go:50.3,50.78 1 1
github.com/awsl-project/maxx/internal/stats/pure.go:51.31,52.60 1 1
github.com/awsl-project/maxx/internal/stats/pure.go:53.30,54.52 1 1
github.com/awsl-project/maxx/internal/stats/pure.go:55.10,56.31 1 1
github.com/awsl-project/maxx/internal/stats/pure.go:63.90,64.23 1 1
github.com/awsl-project/maxx/internal/stats/pure.go:64.23,66.3 1 1
github.com/awsl-project/maxx/internal/stats/pure.go:68.2,79.28 3 1
github.com/awsl-project/maxx/internal/stats/pure.go:79.28,93.21 4 1
github.com/awsl-project/maxx/internal/stats/pure.go:93.21,95.4 1 1
github.com/awsl-project/maxx/internal/stats/pure.go:96.3,96.17 1 1
github.com/awsl-project/maxx/internal/stats/pure.go:96.17,98.4 1 1
github.com/awsl-project/maxx/internal/stats/pure.go:100.3,100.33 1 1
github.com/awsl-project/maxx/internal/stats/pure.go:100.33,110.4 9 1
github.com/awsl-project/maxx/internal/stats/pure.go:110.9,130.4 1 1
github.com/awsl-project/maxx/internal/stats/pure.go:133.2,134.29 2 1
github.com/awsl-project/maxx/internal/stats/pure.go:134.29,136.3 1 1
github.com/awsl-project/maxx/internal/stats/pure.go:137.2,137.15 1 1
github.com/awsl-project/maxx/internal/stats/pure.go:143.105,144.21 1 1
github.com/awsl-project/maxx/internal/stats/pure.go:144.21,146.3 1 1
github.com/awsl-project/maxx/internal/stats/pure.go:148.2,159.26 3 1
github.com/awsl-project/maxx/internal/stats/pure.go:159.26,172.40 3 1
github.com/awsl-project/maxx/internal/stats/pure.go:172.40,182.4 9 1
github.com/awsl-project/maxx/internal/stats/pure.go:182.9,202.4 1 1
github.com/awsl-project/maxx/internal/stats/pure.go:205.2,206.29 2 1
github.com/awsl-project/maxx/internal/stats/pure.go:206.29,208.3 1 1
github.com/awsl-project/maxx/internal/stats/pure.go:209.2,209.15 1 1
github.com/awsl-project/maxx/internal/stats/pure.go:214.73,227.34 3 1
github.com/awsl-project/maxx/internal/stats/pure.go:227.34,228.27 1 1
github.com/awsl-project/maxx/internal/stats/pure.go:228.27,240.39 2 1
github.com/awsl-project/maxx/internal/stats/pure.go:240.39,250.5 9 1
github.com/awsl-project/maxx/internal/stats/pure.go:250.10,254.5 2 1
github.com/awsl-project/maxx/internal/stats/pure.go:258.2,259.27 2 1
github.com/awsl-project/maxx/internal/stats/pure.go:259.27,261.3 1 1
github.com/awsl-project/maxx/internal/stats/pure.go:262.2,262.15 1 1
github.com/awsl-project/maxx/internal/stats/pure.go:268.140,269.26 1 1
github.com/awsl-project/maxx/internal/stats/pure.go:269.26,278.3 8 1
github.com/awsl-project/maxx/internal/stats/pure.go:279.2,279.8 1 1
github.com/awsl-project/maxx/internal/stats/pure.go:284.83,287.26 2 1
github.com/awsl-project/maxx/internal/stats/pure.go:287.26,288.24 1 1
github.com/awsl-project/maxx/internal/stats/pure.go:288.24,289.12 1 1
github.com/awsl-project/maxx/internal/stats/pure.go:292.3,292.47 1 1
github.com/awsl-project/maxx/internal/stats/pure.go:292.47,301.4 8 1
github.com/awsl-project/maxx/internal/stats/pure.go:301.9,313.4 1 1
github.com/awsl-project/maxx/internal/stats/pure.go:317.2,317.28 1 1
github.com/awsl-project/maxx/internal/stats/pure.go:317.28,318.27 1 1
github.com/awsl-project/maxx/internal/stats/pure.go:318.27,320.4 1 1
github.com/awsl-project/maxx/internal/stats/pure.go:323.2,323.15 1 1
github.com/awsl-project/maxx/internal/stats/pure.go:327.97,329.26 2 1
github.com/awsl-project/maxx/internal/stats/pure.go:329.26,330.25 1 1
github.com/awsl-project/maxx/internal/stats/pure.go:330.25,332.4 1 1
github.com/awsl-project/maxx/internal/stats/pure.go:334.2,334.15 1 1
github.com/awsl-project/maxx/internal/stats/pure.go:339.95,341.26 2 1
github.com/awsl-project/maxx/internal/stats/pure.go:341.26,342.62 1 1
github.com/awsl-project/maxx/internal/stats/pure.go:342.62,344.4 1 1
github.com/awsl-project/maxx/internal/stats/pure.go:346.2,346.15 1 1
26 changes: 16 additions & 10 deletions internal/adapter/provider/antigravity/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,8 @@ func (a *AntigravityAdapter) Execute(ctx context.Context, w http.ResponseWriter,

// Transform request based on client type
var geminiBody []byte
if clientType == domain.ClientTypeClaude {
switch clientType {
case domain.ClientTypeClaude:
// Use direct transformation (no converter dependency)
// This combines cache control cleanup, thinking filter, tool loop recovery,
// system instruction building, content transformation, tool building, and generation config
Expand All @@ -127,10 +128,10 @@ func (a *AntigravityAdapter) Execute(ctx context.Context, w http.ResponseWriter,

// Apply minimal post-processing for features not yet fully integrated
geminiBody = applyClaudePostProcess(geminiBody, sessionID, hasThinking, requestBody, mappedModel)
} else if clientType == domain.ClientTypeOpenAI {
case domain.ClientTypeOpenAI:
// TODO: Implement OpenAI transformation in the future
return domain.NewProxyErrorWithMessage(domain.ErrFormatConversion, true, "OpenAI transformation not yet implemented")
} else {
default:
// For Gemini, unwrap CLI envelope if present
geminiBody = unwrapGeminiCLIEnvelope(requestBody)
}
Expand Down Expand Up @@ -439,12 +440,9 @@ func applyClaudePostProcess(geminiBody []byte, sessionID string, hasThinking boo
return geminiBody
}

modified := false
modified := InjectToolConfig(request)

// 1. Inject toolConfig with VALIDATED mode when tools exist
if InjectToolConfig(request) {
modified = true
}

// 2. Process contents for additional signature validation
if contents, ok := request["contents"].([]interface{}); ok {
Expand Down Expand Up @@ -545,16 +543,17 @@ func (a *AntigravityAdapter) handleNonStreamResponse(ctx context.Context, w http
var responseBody []byte

// Transform response based on client type
if clientType == domain.ClientTypeClaude {
switch clientType {
case domain.ClientTypeClaude:
requestModel := ctxutil.GetRequestModel(ctx)
responseBody, err = convertGeminiToClaudeResponse(unwrappedBody, requestModel)
if err != nil {
return domain.NewProxyErrorWithMessage(domain.ErrFormatConversion, false, "failed to transform response")
}
} else if clientType == domain.ClientTypeOpenAI {
case domain.ClientTypeOpenAI:
// TODO: Implement OpenAI response transformation
return domain.NewProxyErrorWithMessage(domain.ErrFormatConversion, false, "OpenAI response transformation not yet implemented")
} else {
default:
// Gemini native
responseBody = unwrappedBody
}
Expand Down Expand Up @@ -648,6 +647,7 @@ func (a *AntigravityAdapter) handleStreamResponse(ctx context.Context, w http.Re
// Read chunks and accumulate until we have complete lines
var lineBuffer bytes.Buffer
buf := make([]byte, 4096)
firstChunkSent := false // Track TTFT

for {
// Check context before reading
Expand Down Expand Up @@ -700,6 +700,12 @@ func (a *AntigravityAdapter) handleStreamResponse(ctx context.Context, w http.Re
return domain.NewProxyErrorWithMessage(writeErr, false, "client disconnected")
}
flusher.Flush()

// Track TTFT: send first token time on first successful write
if !firstChunkSent {
firstChunkSent = true
eventChan.SendFirstToken(time.Now().UnixMilli())
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,9 @@ func PostProcessClaudeRequest(geminiBody []byte, sessionID string, hasThinking b
return geminiBody
}

modified := false
modified := injectAntigravityIdentity(request)

// 1. Inject Antigravity identity into system instruction (like Antigravity-Manager)
if injectAntigravityIdentity(request) {
modified = true
}

// 2. Clean tool input schemas for Gemini compatibility (like Antigravity-Manager)
if cleanToolInputSchemas(request) {
Expand Down
1 change: 0 additions & 1 deletion internal/adapter/provider/antigravity/response.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,6 @@ func convertGeminiToClaudeResponse(geminiBody []byte, requestModel string) ([]by
"thinking": "",
"signature": trailingSignature,
})
trailingSignature = ""
}
}

Expand Down
2 changes: 1 addition & 1 deletion internal/adapter/provider/antigravity/retry_delay.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func ParseRetryInfo(statusCode int, body []byte) *RetryInfo {
bodyStr := string(body)

// Parse reason
reason := RateLimitReasonUnknown
var reason RateLimitReason
if statusCode == 429 {
reason = parseRateLimitReason(bodyStr)
} else {
Expand Down
2 changes: 1 addition & 1 deletion internal/adapter/provider/antigravity/transform_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ func removeTrailingUnsignedThinking(messages *[]ClaudeMessage) {
}

blocks := parseContentBlocks((*messages)[i].Content)
if blocks == nil || len(blocks) == 0 {
if len(blocks) == 0 {
continue
}

Expand Down
2 changes: 1 addition & 1 deletion internal/adapter/provider/antigravity/transform_tools.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
// buildTools converts Claude tools to Gemini tools format
// Reference: Antigravity-Manager's build_tools
func buildTools(claudeReq *ClaudeRequest) interface{} {
if claudeReq.Tools == nil || len(claudeReq.Tools) == 0 {
if len(claudeReq.Tools) == 0 {
return nil
}

Expand Down
10 changes: 8 additions & 2 deletions internal/adapter/provider/custom/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,7 @@ func (a *CustomAdapter) handleStreamResponse(ctx context.Context, w http.Respons
// Use buffer-based approach to handle incomplete lines properly
var lineBuffer bytes.Buffer
buf := make([]byte, 4096)
firstChunkSent := false // Track TTFT

for {
// Check context before reading
Expand Down Expand Up @@ -361,6 +362,12 @@ func (a *CustomAdapter) handleStreamResponse(ctx context.Context, w http.Respons
return domain.NewProxyErrorWithMessage(writeErr, false, "client disconnected")
}
flusher.Flush()

// Track TTFT: send first token time on first successful write
if !firstChunkSent {
firstChunkSent = true
eventChan.SendFirstToken(time.Now().UnixMilli())
}
}
}
}
Expand Down Expand Up @@ -579,7 +586,7 @@ func copyResponseHeaders(dst, src http.Header) {
// Supports multiple API formats: OpenAI, Anthropic, Gemini, etc.
func parseRateLimitInfo(resp *http.Response, body []byte, clientType domain.ClientType) *domain.RateLimitInfo {
var resetTime time.Time
var rateLimitType string = "rate_limit_exceeded"
var rateLimitType = "rate_limit_exceeded"

// Method 1: Parse Retry-After header
if retryAfter := resp.Header.Get("Retry-After"); retryAfter != "" {
Expand Down Expand Up @@ -698,7 +705,6 @@ func extractResponseModel(body []byte, targetType domain.ClientType) string {
return ""
}


// extractResponseModelFromSSE extracts the model name from SSE content based on target type
func extractResponseModelFromSSE(sseContent string, targetType domain.ClientType) string {
var lastModel string
Expand Down
Loading