From 3fa1300eca3cd8f9700f14a6932e5ab3119bf72c Mon Sep 17 00:00:00 2001 From: ekarlso Date: Thu, 23 Oct 2025 22:50:32 +0200 Subject: [PATCH 1/2] Switch to use slog and pass references of services --- Dockerfile | 17 ++---- Makefile | 24 +++++++- cmd/binance-proxy/main.go | 96 ++++++++++++++++++------------- internal/handler/exchangeinfo.go | 1 + internal/handler/handler.go | 98 ++++++++++++++++++-------------- internal/handler/kline.go | 23 ++++---- internal/handler/ticker.go | 8 +-- internal/logcache/logcache.go | 6 ++ internal/service/ban_detector.go | 13 +++-- internal/service/common.go | 1 + internal/service/depth.go | 31 +++++----- internal/service/exchangeinfo.go | 34 ++++++----- internal/service/kline.go | 47 ++++++++------- internal/service/service.go | 26 +++++---- internal/service/ticker.go | 29 ++++++---- 15 files changed, 268 insertions(+), 186 deletions(-) diff --git a/Dockerfile b/Dockerfile index 0732cf7..622cdb0 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,15 +1,8 @@ -# build stage -FROM golang:1.24 AS builder -WORKDIR /app -COPY . . -RUN go mod download -RUN go mod vendor -RUN go mod tidy -RUN CGO_ENABLED=0 go build -o binance-proxy ./cmd/binance-proxy/main.go - -# target stage FROM alpine -COPY --from=builder /app/binance-proxy /go/bin/binance-proxy + +COPY bin/binance-proxy /binance-proxy + EXPOSE 8090 EXPOSE 8091 -ENTRYPOINT ["/go/bin/binance-proxy"] \ No newline at end of file + +ENTRYPOINT ["/binance-proxy"] diff --git a/Makefile b/Makefile index adb33d1..9a71628 100644 --- a/Makefile +++ b/Makefile @@ -7,6 +7,15 @@ LD_FLAGS := -X main.Version='$(GOLDFLAGS_VERSION)' -X main.Buildtime='$(GOLD SOURCE_FILES ?= ./internal/... ./pkg/... ./cmd/... UNAME := $(uname -s) +BIN_DIR := bin +TOOLS_BIN_DIR := $(shell pwd)/$(BIN_DIR) +$(TOOLS_BIN_DIR): + mkdir -p $(TOOLS_BIN_DIR) + +GOLANGCI_VER := 2.5.0 +GOLANGCI_BIN := golangci +GOLANGCI := $(TOOLS_BIN_DIR)/$(GOLANGCI_BIN)-$(GOLANGCI_VER) + $(info GOLDFLAGS_VERSION=$(GOLDFLAGS_VERSION)) $(info GOLDFLAGS_BUILD_TIME=$(GOLDFLAGS_BUILD_TIME)) $(info LD_FLAGS=$(LD_FLAGS)) @@ -63,7 +72,9 @@ vet: ### Vet ### Lint .PHONY: lint -lint: fmt vet +lint: $(GOLANGCI) fmt vet + $(GOLANGCI) run -c .golangci.yml -v + ### Clean test .PHONY: test-clean @@ -77,3 +88,14 @@ test: lint ### Run tests .PHONY: cover cover: test ### Run tests and generate coverage @go tool cover -html=cover.out -o=cover.html + + +$(GOLANGCI): $(TOOLS_BIN_DIR) +ifeq (,$(wildcard $(GOLANGCI))) + mkdir -p /tmp/golangci && \ + cd /tmp/golangci && \ + curl -L https://github.com/golangci/golangci-lint/releases/download/v$(GOLANGCI_VER)/golangci-lint-$(GOLANGCI_VER)-linux-amd64.tar.gz -o golangci-lint-$(GOLANGCI_VER)-linux-amd64.tar.gz && \ + tar xvf golangci-lint-$(GOLANGCI_VER)-linux-amd64.tar.gz && \ + mv golangci-lint-$(GOLANGCI_VER)-linux-amd64/golangci-lint $(GOLANGCI) && \ + ln -sf $(GOLANGCI) $(TOOLS_BIN_DIR)/$(GOLANGCI_BIN) +endif \ No newline at end of file diff --git a/cmd/binance-proxy/main.go b/cmd/binance-proxy/main.go index b4b2a7b..4e6d04b 100644 --- a/cmd/binance-proxy/main.go +++ b/cmd/binance-proxy/main.go @@ -5,8 +5,10 @@ import ( "binance-proxy/internal/logcache" "binance-proxy/internal/service" "context" + "errors" "fmt" stdlog "log" + "log/slog" "net/http" "os" "os/signal" @@ -16,13 +18,12 @@ import ( _ "net/http/pprof" "github.com/jessevdk/go-flags" - log "github.com/sirupsen/logrus" ) -func startProxy(ctx context.Context, port int, class service.Class, disablefakekline bool, alwaysshowforwards bool) { +func startProxy(ctx context.Context, logger *slog.Logger, bd *service.BanDetector, port int, class service.Class, disablefakekline bool, alwaysshowforwards bool, errChan chan<- error) { mux := http.NewServeMux() address := fmt.Sprintf(":%d", port) - mux.HandleFunc("/", handler.NewHandler(ctx, class, !disablefakekline, alwaysshowforwards)) + mux.HandleFunc("/", handler.NewHandler(ctx, logger, bd, class, !disablefakekline, alwaysshowforwards)) // Create an HTTP server with a custom ErrorLog that suppresses repeated lines srv := &http.Server{ @@ -38,9 +39,10 @@ func startProxy(ctx context.Context, port int, class service.Class, disablefakek ), } - log.Infof("%s websocket proxy starting on port %d.", class, port) - if err := srv.ListenAndServe(); err != nil { - log.Fatalf("%s websocket proxy start failed (error: %s).", class, err) + logger.Info("websocket proxy starting", "class", class, "port", port) + if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed { + logger.Error("websocket proxy start failed", "class", class, "error", err) + errChan <- fmt.Errorf("%s proxy failed: %w", class, err) } } @@ -74,22 +76,19 @@ var ( ) func main() { - log.SetFormatter(&log.TextFormatter{ - DisableColors: true, - FullTimestamp: true, - }) + logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelInfo})) + slog.SetDefault(logger) - // Route logcache output through logrus for consistent formatting/levels logcache.SetLoggerHook(func(level, msg string) { switch level { case "warn": - log.Warn(msg) + logger.Warn(msg) case "error": - log.Error(msg) + logger.Error(msg) case "info": - log.Info(msg) + logger.Info(msg) default: - log.Print(msg) + logger.Info(msg) } }) logcache.SetWriterHook(func(msg string) { @@ -97,52 +96,73 @@ func main() { if len(msg) > 0 && msg[len(msg)-1] == '\n' { msg = msg[:len(msg)-1] } - log.Warnf("http: %s", msg) + + logger.Warn("http request", "msg", msg) }) - log.Infof("Binance proxy version %s, build time %s", Version, Buildtime) + logger.Info("Binance proxy version", "version", Version, "build", Buildtime) if _, err := parser.Parse(); err != nil { if flagsErr, ok := err.(*flags.Error); ok && flagsErr.Type == flags.ErrHelp { os.Exit(0) } else { - log.Fatalf("%s - %s", err, flagsErr.Type) + logger.Error("failed parsing flags", "error", err, "type", flagsErr.Type) } } - - if len(config.Verbose) >= 2 { - log.SetLevel(log.TraceLevel) - } else if len(config.Verbose) == 1 { - log.SetLevel(log.DebugLevel) - } else { - log.SetLevel(log.InfoLevel) - } - - if log.GetLevel() > log.InfoLevel { - log.Infof("Set level to %s", log.GetLevel()) - } - if config.DisableSpot && config.DisableFutures { - log.Fatal("can't start if both SPOT and FUTURES are disabled!") + logger.Error("can't start if both SPOT and FUTURES are disabled!") + os.Exit(1) } if !config.DisableFakeKline { - log.Infof("Fake candles are enabled for faster processing, the feature can be disabled with --disable-fake-candles or -c") + logger.Info("Fake candles are enabled for faster processing, the feature can be disabled with --disable-fake-candles or -c") } if config.AlwaysShowForwards { - log.Infof("Always show forwards is enabled, all API requests, that can't be served from websockets cached will be logged.") + logger.Info("Always show forwards is enabled, all API requests, that can't be served from websockets cached will be logged.") } go handleSignal() + // Channel to collect errors from proxy goroutines + errChan := make(chan error, 2) // Buffer for up to 2 proxies + var proxyCount int + + banDetector := service.NewBanDetector(logger) + if !config.DisableSpot { - go startProxy(ctx, config.SpotAddress, service.SPOT, config.DisableFakeKline, config.AlwaysShowForwards) + proxyCount++ + go startProxy(ctx, logger, banDetector, config.SpotAddress, service.SPOT, config.DisableFakeKline, config.AlwaysShowForwards, errChan) } + if !config.DisableFutures { - go startProxy(ctx, config.FuturesAddress, service.FUTURES, config.DisableFakeKline, config.AlwaysShowForwards) + proxyCount++ + go startProxy(ctx, logger, banDetector, config.FuturesAddress, service.FUTURES, config.DisableFakeKline, config.AlwaysShowForwards, errChan) } - <-ctx.Done() - log.Info("SIGINT received, aborting ...") + // Wait for either context cancellation or errors from proxies + var collectedErrors []error + done := false + + for !done { + select { + case <-ctx.Done(): + logger.Info("SIGINT received, aborting ...") + done = true + case err := <-errChan: + if err != nil { + collectedErrors = append(collectedErrors, err) + // If all proxies have failed, exit + if len(collectedErrors) >= proxyCount { + done = true + } + } + } + } + + // Log any collected errors + if len(collectedErrors) > 0 { + combinedErr := errors.Join(collectedErrors...) + logger.Error("Proxy errors occurred", "error", combinedErr) + } } diff --git a/internal/handler/exchangeinfo.go b/internal/handler/exchangeinfo.go index 6219109..bc48d96 100644 --- a/internal/handler/exchangeinfo.go +++ b/internal/handler/exchangeinfo.go @@ -13,5 +13,6 @@ func (s *Handler) exchangeInfo(w http.ResponseWriter) { w.Header().Set("Content-Type", "application/json") w.Header().Set("Data-Source", "cache") + w.Write(data) } diff --git a/internal/handler/handler.go b/internal/handler/handler.go index 97cedec..be0b55c 100644 --- a/internal/handler/handler.go +++ b/internal/handler/handler.go @@ -8,13 +8,13 @@ import ( "encoding/json" "fmt" "io" + "log/slog" "net/http" "net/http/httputil" "net/url" + "os" "sync" "time" - - log "github.com/sirupsen/logrus" ) // bufferPool implements httputil.BufferPool interface @@ -43,9 +43,11 @@ func (f roundTripperFunc) RoundTrip(r *http.Request) (*http.Response, error) { return f(r) } -func NewHandler(ctx context.Context, class service.Class, enableFakeKline bool, alwaysShowForwards bool) func(w http.ResponseWriter, r *http.Request) { +func NewHandler(ctx context.Context, logger *slog.Logger, bd *service.BanDetector, class service.Class, enableFakeKline bool, alwaysShowForwards bool) func(w http.ResponseWriter, r *http.Request) { handler := &Handler{ - srv: service.NewService(ctx, class), + logger: logger, + srv: service.NewService(ctx, logger, bd, class), + banDetector: bd, class: class, enableFakeKline: enableFakeKline, alwaysShowForwards: alwaysShowForwards, @@ -56,8 +58,10 @@ func NewHandler(ctx context.Context, class service.Class, enableFakeKline bool, } type Handler struct { - ctx context.Context - cancel context.CancelFunc + logger *slog.Logger + banDetector *service.BanDetector + ctx context.Context + cancel context.CancelFunc class service.Class srv *service.Service @@ -93,8 +97,16 @@ func (s *Handler) Router(w http.ResponseWriter, r *http.Request) { default: s.reverseProxy(w, r) } + duration := time.Since(start) - log.Debugf("%s request %s %s from %s served in %s", s.class, r.Method, r.RequestURI, r.RemoteAddr, duration) + + s.logger.Debug("request served", + "duration", duration, + "method", r.Method, + "uri", r.RequestURI, + "remote", r.RemoteAddr, + "class", s.class, + ) } // HTTP client with connection pooling for reverse proxy @@ -103,7 +115,7 @@ var ( proxyHTTPClient *http.Client ) -func getProxyHTTPClient() *http.Client { +func getProxyHTTPClient(logger *slog.Logger) *http.Client { proxyHTTPClientOnce.Do(func() { // Create a new transport each time to avoid concurrent modification issues transport := &http.Transport{ @@ -122,7 +134,7 @@ func getProxyHTTPClient() *http.Client { } if proxyHTTPClient == nil { - log.Errorf("Failed to create HTTP client") + logger.Error("Failed to create HTTP client") proxyHTTPClient = &http.Client{ Transport: http.DefaultTransport, Timeout: 60 * time.Second, @@ -130,13 +142,14 @@ func getProxyHTTPClient() *http.Client { } if proxyHTTPClient.Transport == nil { - log.Errorf("Created HTTP client has nil transport, using default transport") + logger.Error("Created HTTP client has nil transport, using default transport") proxyHTTPClient.Transport = http.DefaultTransport } }) if proxyHTTPClient == nil { - log.Errorf("HTTP client is nil after sync.Once, creating emergency default client") + logger.Error("HTTP client is nil after sync.Once, creating emergency default client") + return &http.Client{ Transport: http.DefaultTransport, Timeout: 60 * time.Second, @@ -145,7 +158,7 @@ func getProxyHTTPClient() *http.Client { // Double-check transport is not nil and clone it to avoid concurrent modification if proxyHTTPClient.Transport == nil { - log.Errorf("HTTP client transport is nil, fixing with default transport") + logger.Error("HTTP client transport is nil, fixing with default transport") proxyHTTPClient.Transport = http.DefaultTransport } @@ -162,20 +175,22 @@ func getProxyHTTPClient() *http.Client { } func (s *Handler) reverseProxy(w http.ResponseWriter, r *http.Request) { + logger := s.logger.With("method", r.Method, "uri", r.RequestURI, "remote", r.RemoteAddr, "class", s.class) + // Validate handler state if s == nil { - log.Errorf("Handler is nil in reverseProxy") + logger.Error("Handler is nil in reverseProxy") http.Error(w, "Internal server error", http.StatusInternalServerError) return } if w == nil { - log.Errorf("ResponseWriter is nil in reverseProxy") + logger.Error("ResponseWriter is nil in reverseProxy") return } if r == nil { - log.Errorf("Request is nil in reverseProxy") + logger.Error("Request is nil in reverseProxy") http.Error(w, "Internal server error", http.StatusInternalServerError) return } @@ -193,9 +208,8 @@ func (s *Handler) reverseProxy(w http.ResponseWriter, r *http.Request) { } // Check if API is banned - banDetector := service.GetBanDetector() - if banDetector != nil && banDetector.IsBanned(s.class) { - banned, recoveryTime := banDetector.GetBanStatus(s.class) + if s.banDetector != nil && s.banDetector.IsBanned(s.class) { + banned, recoveryTime := s.banDetector.GetBanStatus(s.class) if banned { msg := fmt.Sprintf("%s API is banned, returning empty response. Recovery time: %v", s.class, recoveryTime) logcache.LogOncePerDuration("warn", msg) @@ -204,11 +218,11 @@ func (s *Handler) reverseProxy(w http.ResponseWriter, r *http.Request) { } } - msg := fmt.Sprintf("%s request %s %s from %s is not cachable", s.class, r.Method, r.RequestURI, r.RemoteAddr) + msg := "Request was not cachable (forwarded to upstream)" if s.alwaysShowForwards { - log.Info(msg) + logger.Info(msg) } else { - log.Trace(msg) + logger.Debug(msg) } service.RateWait(s.ctx, s.class, r.Method, r.URL.Path, r.URL.Query()) @@ -231,7 +245,7 @@ func (s *Handler) reverseProxy(w http.ResponseWriter, r *http.Request) { } // Use custom HTTP client with connection pooling - httpClient := getProxyHTTPClient() + httpClient := getProxyHTTPClient(logger) if httpClient == nil { logcache.LogOncePerDuration("error", "HTTP client is nil, cannot create proxy") http.Error(w, "Internal server error", http.StatusInternalServerError) @@ -272,7 +286,7 @@ func (s *Handler) reverseProxy(w http.ResponseWriter, r *http.Request) { proxy := &httputil.ReverseProxy{ Director: func(req *http.Request) { if req == nil { - log.Errorf("Request is nil in proxy director") + logger.Error("Request is nil in proxy director") return } @@ -287,8 +301,7 @@ func (s *Handler) reverseProxy(w http.ResponseWriter, r *http.Request) { Transport: contextAwareTransport, BufferPool: &bufferPool{}, ModifyResponse: func(resp *http.Response) error { - bd := service.GetBanDetector() - if bd != nil && bd.CheckResponse(s.class, resp, nil) { + if s.banDetector.CheckResponse(s.class, resp, nil) { if resp.Body != nil { resp.Body.Close() } @@ -311,7 +324,7 @@ func (s *Handler) reverseProxy(w http.ResponseWriter, r *http.Request) { resp.StatusCode = http.StatusTooManyRequests resp.Status = "429 Too Many Requests" // Populate Retry-After based on ban detector recovery time if available - if banned, until := bd.GetBanStatus(s.class); banned { + if banned, until := s.banDetector.GetBanStatus(s.class); banned { secs := int(time.Until(until).Seconds()) if secs < 1 { secs = 30 @@ -332,8 +345,7 @@ func (s *Handler) reverseProxy(w http.ResponseWriter, r *http.Request) { logcache.LogOncePerDuration("error", fmt.Sprintf("%s proxy transport error: %v", s.class, err)) // If ban detector suggests a backoff, reuse the synthetic empty path - bd := service.GetBanDetector() - if bd != nil && bd.CheckResponse(s.class, nil, err) { + if s.banDetector.CheckResponse(s.class, nil, err) { logcache.LogOncePerDuration("warn", fmt.Sprintf("%s API transport error treated as ban", s.class)) s.returnEmptyResponse(rw, req) return @@ -358,8 +370,9 @@ func (s *Handler) reverseProxy(w http.ResponseWriter, r *http.Request) { // Add panic recovery for the proxy ServeHTTP call defer func() { if panicVal := recover(); panicVal != nil { + logger.Error("Panic recovered in reverseProxy.ServeHTTP", "error", panicVal) logcache.LogOncePerDuration("error", fmt.Sprintf("Panic recovered in reverseProxy.ServeHTTP for %s %s: %v", r.Method, r.URL.Path, panicVal)) - defer func() { recover() }() + defer func() { _ = recover() }() http.Error(w, "Internal server error", http.StatusInternalServerError) } }() @@ -367,14 +380,14 @@ func (s *Handler) reverseProxy(w http.ResponseWriter, r *http.Request) { // Create a copy of the request to avoid concurrent modification issues reqCopy := r.Clone(r.Context()) if reqCopy == nil { - log.Errorf("Failed to clone request") + logger.Error("Failed to clone request") http.Error(w, "Internal server error", http.StatusInternalServerError) return } - log.Debugf("About to call proxy.ServeHTTP for %s %s", reqCopy.Method, reqCopy.URL.Path) + logger.Debug("Proxying request to upstream") proxy.ServeHTTP(w, reqCopy) - log.Debugf("Completed proxy.ServeHTTP for %s %s", reqCopy.Method, reqCopy.URL.Path) + logger.Debug("Request proxied to upstream") } func (s *Handler) returnEmptyResponse(w http.ResponseWriter, r *http.Request) { @@ -384,8 +397,9 @@ func (s *Handler) returnEmptyResponse(w http.ResponseWriter, r *http.Request) { w.Header().Set("X-Proxy-Empty", "1") // Set backoff headers if we have a recovery time - if bd := service.GetBanDetector(); bd != nil { - if banned, until := bd.GetBanStatus(s.class); banned { + // Set backoff headers if we have a recovery time + if s.banDetector != nil { + if banned, until := s.banDetector.GetBanStatus(s.class); banned { secs := int(time.Until(until).Seconds()) if secs < 1 { secs = 30 @@ -394,7 +408,6 @@ func (s *Handler) returnEmptyResponse(w http.ResponseWriter, r *http.Request) { w.Header().Set("X-Backoff-Until", until.Format(time.RFC3339)) } } - var response []byte switch r.URL.Path { case "/api/v3/klines", "/fapi/v1/klines": @@ -416,7 +429,7 @@ func (s *Handler) status(w http.ResponseWriter) { // Check if context is still valid select { case <-s.ctx.Done(): - log.Warnf("Status endpoint called but context is canceled") + s.logger.Warn("Status endpoint called but context is canceled") w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusServiceUnavailable) w.Write([]byte(`{"error": "service shutting down", "status": "unavailable"}`)) @@ -433,8 +446,7 @@ func (s *Handler) status(w http.ResponseWriter) { status := statusTracker.GetStatus() // Add ban information from the existing ban detector - banDetector := service.GetBanDetector() - isBanned, recoveryTime := banDetector.GetBanStatus(s.class) + isBanned, recoveryTime := s.banDetector.GetBanStatus(s.class) // Create response with both status and ban info response := map[string]interface{}{ "proxy_status": status, @@ -466,7 +478,7 @@ func (s *Handler) restart(w http.ResponseWriter, r *http.Request) { return } - log.Warnf("RESTART requested from %s for class %s", r.RemoteAddr, s.class) + s.logger.Warn("RESTART initiated for class", "class", s.class, "remote", r.RemoteAddr) // Send immediate response before restart w.Header().Set("Content-Type", "application/json") @@ -479,7 +491,7 @@ func (s *Handler) restart(w http.ResponseWriter, r *http.Request) { } if err := json.NewEncoder(w).Encode(response); err != nil { - log.Errorf("Failed to encode restart response: %v", err) + s.logger.Error("Failed to encode restart response", "err", err) } // Flush the response to ensure it's sent @@ -490,13 +502,15 @@ func (s *Handler) restart(w http.ResponseWriter, r *http.Request) { // Give the response time to be sent go func() { time.Sleep(2 * time.Second) - log.Warnf("Executing restart for class %s...", s.class) + s.logger.Warn("Executing restart for class", "class", s.class) // Cancel the context to trigger graceful shutdown s.cancel() // Give some time for graceful shutdown, then force exit time.Sleep(3 * time.Second) - log.Fatalf("Force restart for class %s", s.class) + s.logger.Error("Force restart for class", "class", s.class) + + os.Exit(0) }() } diff --git a/internal/handler/kline.go b/internal/handler/kline.go index 03558ff..90e95ae 100644 --- a/internal/handler/kline.go +++ b/internal/handler/kline.go @@ -1,20 +1,23 @@ package handler import ( - "binance-proxy/internal/service" "encoding/json" "net/http" "strconv" "time" - - log "github.com/sirupsen/logrus" ) func (s *Handler) klines(w http.ResponseWriter, r *http.Request) { + logger := s.logger.With( + "class", s.class, + "method", r.Method, + "uri", r.RequestURI, + "remote", r.RemoteAddr, + ) + // Check if API is banned - banDetector := service.GetBanDetector() - if banDetector.IsBanned(s.class) { - log.Debugf("%s klines request returning empty due to API ban", s.class) + if s.banDetector.IsBanned(s.class) { + logger.Debug("klines request returning empty due to API ban") w.Header().Set("Content-Type", "application/json") w.Header().Set("Data-Source", "ban-protection") w.Write([]byte("[]")) @@ -32,14 +35,14 @@ func (s *Handler) klines(w http.ResponseWriter, r *http.Request) { switch { case err != nil, limitInt <= 0, limitInt > 1000, r.URL.Query().Get("startTime") != "", r.URL.Query().Get("endTime") != "", symbol == "", interval == "": - log.Tracef("%s %s@%s kline proxying via REST", s.class, symbol, interval) + logger.Debug("Proxying via rest api", "symbol", symbol, "interval", interval, "limit", limit) s.reverseProxy(w, r) return } data := s.srv.Klines(symbol, interval) if data == nil { - log.Tracef("%s %s@%s kline proxying via REST", s.class, symbol, interval) + logger.Debug("Proxying via rest api", "symbol", symbol, "interval", interval, "limit", limit) s.reverseProxy(w, r) return } @@ -76,11 +79,11 @@ func (s *Handler) klines(w http.ResponseWriter, r *http.Request) { currentTime := time.Now().UnixNano() / 1e6 if dataLen > 0 && currentTime > data[dataLen-1].CloseTime { fakeKlineTimestampOpen = data[dataLen-1].CloseTime + 1 - log.Tracef("%s %s@%s kline requested for %s but not yet received", s.class, symbol, interval, strconv.FormatInt(fakeKlineTimestampOpen, 10)) + logger.Debug("Kline requested for future timestamp", "symbol", symbol, "interval", interval, "timestamp", strconv.FormatInt(fakeKlineTimestampOpen, 10)) } if s.enableFakeKline && dataLen > 0 && currentTime > data[dataLen-1].CloseTime { - log.Tracef("%s %s@%s kline faking candle for timestamp %s", s.class, symbol, interval, strconv.FormatInt(fakeKlineTimestampOpen, 10)) + logger.Debug("Kline faking candle for timestamp", "symbol", symbol, "interval", interval, "timestamp", strconv.FormatInt(fakeKlineTimestampOpen, 10)) lastData := data[dataLen-1] fakeKline := []interface{}{ lastData.CloseTime + 1, diff --git a/internal/handler/ticker.go b/internal/handler/ticker.go index abe4e23..cea6c37 100644 --- a/internal/handler/ticker.go +++ b/internal/handler/ticker.go @@ -3,26 +3,24 @@ package handler import ( "encoding/json" "net/http" - - log "github.com/sirupsen/logrus" ) func (s *Handler) ticker(w http.ResponseWriter, r *http.Request) { symbol := r.URL.Query().Get("symbol") if symbol == "" { - log.Tracef("%s ticker24hr without symbol request proxying via REST", s.class) + s.logger.Debug("Ticker24hr request without symbol, proxying via REST", "class", s.class) s.reverseProxy(w, r) return } ticker := s.srv.Ticker(symbol) if ticker == nil { - log.Tracef("%s ticker24hr for %s proxying via REST", s.class, symbol) + s.logger.Debug("Ticker24hr data not available, proxying via REST", "class", s.class, "symbol", symbol) s.reverseProxy(w, r) return } else { - log.Tracef("%s ticker24hr for %s delivering via websocket cache", s.class, symbol) + s.logger.Debug("Ticker24hr data available, delivering via websocket cache", "class", s.class, "symbol", symbol) } w.Header().Set("Content-Type", "application/json") diff --git a/internal/logcache/logcache.go b/internal/logcache/logcache.go index ac4c737..d28d3b6 100644 --- a/internal/logcache/logcache.go +++ b/internal/logcache/logcache.go @@ -35,15 +35,18 @@ func LogOncePerDuration(level, msg string) { key := Normalize(msg) cacheLock.Lock() defer cacheLock.Unlock() + last, found := cache[key] if found && time.Since(last) < SuppressDuration { return } + cache[key] = time.Now() if loggerHook != nil { loggerHook(level, msg) return } + // Default to standard logger if no hook set switch level { case "warn": @@ -78,15 +81,18 @@ func (w *suppressingWriter) Write(p []byte) (int, error) { // Pretend we wrote it to avoid backpressure; drop the line. return len(p), nil } + cache[key] = time.Now() cacheLock.Unlock() if writerHook != nil { writerHook(msg) return len(p), nil } + if w.next != nil { return w.next.Write(p) } + // Nothing to write to but not an error; pretend success return len(p), nil } diff --git a/internal/service/ban_detector.go b/internal/service/ban_detector.go index af088fc..5ce6641 100644 --- a/internal/service/ban_detector.go +++ b/internal/service/ban_detector.go @@ -5,6 +5,7 @@ import ( "encoding/json" "fmt" "io" + "log/slog" "net/http" "regexp" "strconv" @@ -27,6 +28,8 @@ var bufferPool = sync.Pool{ type BanDetector struct { mu sync.RWMutex + logger *slog.Logger + // Ban status per class spotBanned bool futuresBanned bool @@ -54,10 +57,12 @@ type BanDetector struct { futuresBackoffCount int } -var globalBanDetector = &BanDetector{} - -func GetBanDetector() *BanDetector { - return globalBanDetector +func NewBanDetector(logger *slog.Logger) *BanDetector { + return &BanDetector{ + spotWeightReset: time.Now().Truncate(time.Minute).Add(time.Minute), + futuresWeightReset: time.Now().Truncate(time.Minute).Add(time.Minute), + logger: logger, + } } func (bd *BanDetector) IsBanned(class Class) bool { diff --git a/internal/service/common.go b/internal/service/common.go index 236a333..39323e1 100644 --- a/internal/service/common.go +++ b/internal/service/common.go @@ -25,6 +25,7 @@ type symbolInterval struct { Symbol string Interval string } + type Class string var SPOT Class = "SPOT" diff --git a/internal/service/depth.go b/internal/service/depth.go index 777840b..8d91227 100644 --- a/internal/service/depth.go +++ b/internal/service/depth.go @@ -3,12 +3,11 @@ package service import ( "binance-proxy/internal/tool" "context" + "log/slog" "strings" "sync" "time" - log "github.com/sirupsen/logrus" - spot "github.com/adshao/go-binance/v2" futures "github.com/adshao/go-binance/v2/futures" ) @@ -16,9 +15,9 @@ import ( type DepthSrv struct { rw sync.RWMutex - ctx context.Context - cancel context.CancelFunc - + ctx context.Context + cancel context.CancelFunc + logger *slog.Logger initCtx context.Context initDone context.CancelFunc @@ -34,8 +33,8 @@ type Depth struct { Asks []futures.Ask } -func NewDepthSrv(ctx context.Context, si *symbolInterval) *DepthSrv { - s := &DepthSrv{si: si} +func NewDepthSrv(ctx context.Context, logger *slog.Logger, si *symbolInterval) *DepthSrv { + s := &DepthSrv{si: si, logger: logger} s.ctx, s.cancel = context.WithCancel(ctx) s.initCtx, s.initDone = context.WithCancel(context.Background()) @@ -51,11 +50,12 @@ func (s *DepthSrv) Start() { doneC, stopC, err := s.connect() if err != nil { - log.Errorf("%s %s depth websocket connection error: %s.", s.si.Class, s.si.Symbol, err) + s.logger.Error("Depth websocket connection error", "class", s.si.Class, "symbol", s.si.Symbol, "error", err) continue } - log.Debugf("%s %s depth websocket connected.", s.si.Class, s.si.Symbol) + s.logger.Info("Depth websocket connected", "class", s.si.Class, "symbol", s.si.Symbol) + // Reset the reconnect backoff now that we have a successful connection d.Reset() select { @@ -65,7 +65,7 @@ func (s *DepthSrv) Start() { case <-doneC: } - log.Warnf("%s %s depth websocket disconnected, trying to reconnect.", s.si.Class, s.si.Symbol) + s.logger.Warn("Depth websocket disconnected, trying to reconnect", "class", s.si.Class, "symbol", s.si.Symbol) } }() } @@ -105,7 +105,8 @@ func (s *DepthSrv) wsHandlerFutures(event *futures.WsDepthEvent) { Bids: event.Bids, Asks: event.Asks, } - log.Tracef("%s %s depth websocket message received", s.si.Class, s.si.Symbol) + + s.logger.Debug("Depth websocket message received", "class", s.si.Class, "symbol", s.si.Symbol, "lastUpdateID", event.LastUpdateID) } func (s *DepthSrv) wsHandler(event *spot.WsPartialDepthEvent) { @@ -123,19 +124,19 @@ func (s *DepthSrv) wsHandler(event *spot.WsPartialDepthEvent) { Bids: event.Bids, Asks: event.Asks, } - log.Tracef("%s %s depth websocket message received", s.si.Class, s.si.Symbol) + s.logger.Debug("Depth websocket message received", "class", s.si.Class, "symbol", s.si.Symbol, "lastUpdateID", event.LastUpdateID) } func (s *DepthSrv) errHandler(err error) { msg := err.Error() switch { case strings.Contains(msg, "context canceled"): - log.Warnf("%s %s depth websocket context canceled, will restart connection.", s.si.Class, s.si.Symbol) + s.logger.Warn("Depth websocket context canceled, will restart connection", "class", s.si.Class, "symbol", s.si.Symbol) case strings.Contains(msg, "use of closed network connection"): // This commonly indicates a normal remote close/rotation; treat as info/debug to reduce noise - log.Infof("%s %s depth websocket closed by peer; reconnecting.", s.si.Class, s.si.Symbol) + s.logger.Info("Depth websocket closed by peer; reconnecting", "class", s.si.Class, "symbol", s.si.Symbol) default: - log.Errorf("%s %s depth websocket connection error: %s.", s.si.Class, s.si.Symbol, err) + s.logger.Error("Depth websocket connection error", "class", s.si.Class, "symbol", s.si.Symbol, "error", err) } } diff --git a/internal/service/exchangeinfo.go b/internal/service/exchangeinfo.go index 8cc59b2..8e150b7 100644 --- a/internal/service/exchangeinfo.go +++ b/internal/service/exchangeinfo.go @@ -2,14 +2,14 @@ package service import ( "context" - "io/ioutil" + "log/slog" "net/http" "sync" "time" "binance-proxy/internal/tool" - log "github.com/sirupsen/logrus" + "io" ) type ExchangeInfoSrv struct { @@ -18,6 +18,9 @@ type ExchangeInfoSrv struct { ctx context.Context cancel context.CancelFunc + banDetector *BanDetector + logger *slog.Logger + initCtx context.Context initDone context.CancelFunc @@ -50,12 +53,16 @@ func getHTTPClient() *http.Client { return httpClient } -func NewExchangeInfoSrv(ctx context.Context, si *symbolInterval) *ExchangeInfoSrv { +func NewExchangeInfoSrv(ctx context.Context, logger *slog.Logger, bd *BanDetector, si *symbolInterval) *ExchangeInfoSrv { s := &ExchangeInfoSrv{ - si: si, - refreshDur: 60 * time.Second, + banDetector: bd, + si: si, + refreshDur: 60 * time.Second, + logger: logger, } - log.Tracef("%s exchangeInfo initialization with refresh of %.0fs.", s.si.Class, s.refreshDur.Seconds()) + + logger.Debug("ExchangeInfoSrv initialization", "class", s.si.Class, "refreshDur", s.refreshDur.Seconds()) + s.ctx, s.cancel = context.WithCancel(ctx) s.initCtx, s.initDone = context.WithCancel(context.Background()) @@ -102,9 +109,8 @@ func (s *ExchangeInfoSrv) reTryRefreshExchangeInfo() { func (s *ExchangeInfoSrv) refreshExchangeInfo() error { // Check if API is banned - banDetector := GetBanDetector() - if banDetector.IsBanned(s.si.Class) { - log.Debugf("%s exchangeInfo refresh skipped due to API ban", s.si.Class) + if s.banDetector.IsBanned(s.si.Class) { + s.logger.Debug("ExchangeInfo refresh skipped due to API ban", "class", s.si.Class) return nil // Don't retry during ban } @@ -121,14 +127,14 @@ func (s *ExchangeInfoSrv) refreshExchangeInfo() error { client := getHTTPClient() req, err := http.NewRequestWithContext(s.ctx, http.MethodGet, url, nil) if err != nil { - log.Errorf("%s exchangeInfo request creation failed, error: %s.", s.si.Class, err) + s.logger.Error("ExchangeInfo request creation failed", "class", s.si.Class, "error", err) return err } resp, err := client.Do(req) // Check for bans - if banDetector.CheckResponse(s.si.Class, resp, err) { + if s.banDetector.CheckResponse(s.si.Class, resp, err) { if resp != nil { resp.Body.Close() } @@ -136,12 +142,12 @@ func (s *ExchangeInfoSrv) refreshExchangeInfo() error { } if err != nil { - log.Errorf("%s exchangeInfo refresh failed, error: %s.", s.si.Class, err) + s.logger.Error("ExchangeInfo refresh failed", "class", s.si.Class, "error", err) return err } defer resp.Body.Close() - data, err := ioutil.ReadAll(resp.Body) + data, err := io.ReadAll(resp.Body) if err != nil { return err } @@ -155,7 +161,7 @@ func (s *ExchangeInfoSrv) refreshExchangeInfo() error { s.exchangeInfo = data - log.Debugf("%s exchangeInfo refreshed sucessfully.", s.si.Class) + s.logger.Debug("ExchangeInfo refreshed successfully", "class", s.si.Class) return nil } diff --git a/internal/service/kline.go b/internal/service/kline.go index 9b966df..d9d35dc 100644 --- a/internal/service/kline.go +++ b/internal/service/kline.go @@ -4,13 +4,12 @@ import ( "binance-proxy/internal/tool" "container/list" "context" + "log/slog" "net/http" "net/url" "strings" "sync" - log "github.com/sirupsen/logrus" - spot "github.com/adshao/go-binance/v2" futures "github.com/adshao/go-binance/v2/futures" ) @@ -35,16 +34,20 @@ type KlinesSrv struct { ctx context.Context cancel context.CancelFunc + logger *slog.Logger initCtx context.Context initDone context.CancelFunc - si *symbolInterval - klinesList *list.List - klinesArr []*Kline + banDetector *BanDetector + si *symbolInterval + klinesList *list.List + klinesArr []*Kline } -func NewKlinesSrv(ctx context.Context, si *symbolInterval) *KlinesSrv { - s := &KlinesSrv{si: si} +func NewKlinesSrv(ctx context.Context, logger *slog.Logger, bd *BanDetector, si *symbolInterval) *KlinesSrv { + logger = logger.With("symbol", si.Symbol, "interval", si.Interval, "class", si.Class) + + s := &KlinesSrv{logger: logger, si: si, banDetector: bd} s.ctx, s.cancel = context.WithCancel(ctx) s.initCtx, s.initDone = context.WithCancel(context.Background()) @@ -60,18 +63,18 @@ func (s *KlinesSrv) Start() { doneC, stopC, err := s.connect() if err != nil { - log.Errorf("%s %s@%s kline websocket connection error: %s.", s.si.Class, s.si.Symbol, s.si.Interval, err) + s.logger.Error("Kline websocket connection error", "error", err) continue } - log.Debugf("%s %s@%s kline websocket connected.", s.si.Class, s.si.Symbol, s.si.Interval) + s.logger.Debug("Kline websocket connected") select { case <-s.ctx.Done(): stopC <- struct{}{} return case <-doneC: } - log.Warnf("%s %s@%s kline websocket disconnected, trying to reconnect.", s.si.Class, s.si.Symbol, s.si.Interval) + s.logger.Warn("Kline websocket disconnected, trying to reconnect") } }() } @@ -82,9 +85,9 @@ func (s *KlinesSrv) Stop() { func (s *KlinesSrv) errHandler(err error) { if strings.Contains(err.Error(), "context canceled") { - log.Warnf("%s %s@%s kline websocket context canceled, will restart connection.", s.si.Class, s.si.Symbol, s.si.Interval) + s.logger.Warn("Kline websocket context canceled, will restart connection.") } else { - log.Errorf("%s %s@%s kline websocket connection error: %s connected.", s.si.Class, s.si.Symbol, s.si.Interval, err) + s.logger.Error("Kline websocket connection error", "err", err) } } @@ -106,9 +109,8 @@ func (s *KlinesSrv) connect() (doneC, stopC chan struct{}, err error) { func (s *KlinesSrv) initKlineData() { // Check if API is banned - banDetector := GetBanDetector() - if banDetector.IsBanned(s.si.Class) { - log.Debugf("%s %s@%s kline initialization skipped due to API ban", s.si.Class, s.si.Symbol, s.si.Interval) + if s.banDetector.IsBanned(s.si.Class) { + s.logger.Debug("Kline initialization skipped due to API ban") // Create empty klines list to prevent repeated initialization attempts s.klinesList = list.New() @@ -118,11 +120,11 @@ func (s *KlinesSrv) initKlineData() { var klines interface{} var err error - log.Debugf("%s %s@%s kline initialization through REST.", s.si.Class, s.si.Symbol, s.si.Interval) + s.logger.Debug("Kline initialization through REST.") for d := tool.NewDelayIterator(); ; d.Delay() { // Check ban status before each attempt - if banDetector.IsBanned(s.si.Class) { - log.Debugf("%s %s@%s kline initialization stopped due to API ban", s.si.Class, s.si.Symbol, s.si.Interval) + if s.banDetector.IsBanned(s.si.Class) { + s.logger.Debug("Kline initialization stopped due to API ban") s.klinesList = list.New() defer s.initDone() return @@ -148,15 +150,15 @@ func (s *KlinesSrv) initKlineData() { } // Check for bans (resp might be nil for SDK calls, so we check err) - if banDetector.CheckResponse(s.si.Class, resp, err) { - log.Debugf("%s %s@%s kline initialization stopped due to detected ban", s.si.Class, s.si.Symbol, s.si.Interval) + if s.banDetector.CheckResponse(s.si.Class, resp, err) { + s.logger.Debug("Kline initialization stopped due to detected ban") s.klinesList = list.New() defer s.initDone() return } if err != nil { - log.Errorf("%s %s@%s kline initialization via REST failed, error: %s.", s.si.Class, s.si.Symbol, s.si.Interval, err) + s.logger.Error("Kline initialization via REST retrying...") continue } @@ -206,6 +208,7 @@ func (s *KlinesSrv) initKlineData() { } func (s *KlinesSrv) wsHandler(event interface{}) { + if s.klinesList == nil { s.initKlineData() } @@ -242,7 +245,7 @@ func (s *KlinesSrv) wsHandler(event interface{}) { } } - log.Tracef("%s %s@%s kline websocket message received for open timestamp %d", s.si.Class, s.si.Symbol, s.si.Interval, k.OpenTime) + s.logger.Debug("Kline websocket message received", "openTime", k.OpenTime) if s.klinesList.Back().Value.(*Kline).OpenTime < k.OpenTime { s.klinesList.PushBack(k) diff --git a/internal/service/service.go b/internal/service/service.go index 2b943db..d036505 100644 --- a/internal/service/service.go +++ b/internal/service/service.go @@ -2,16 +2,18 @@ package service import ( "context" + "log/slog" "sync" "time" - - log "github.com/sirupsen/logrus" ) type Service struct { ctx context.Context cancel context.CancelFunc + logger *slog.Logger + + banDetector *BanDetector class Class exchangeInfoSrv *ExchangeInfoSrv klinesSrv sync.Map // map[symbolInterval]*Klines @@ -23,10 +25,10 @@ type Service struct { lastGetTicker sync.Map // map[symbolInterval]time.Time } -func NewService(ctx context.Context, class Class) *Service { - s := &Service{class: class} +func NewService(ctx context.Context, logger *slog.Logger, bd *BanDetector, class Class) *Service { + s := &Service{class: class, logger: logger} s.ctx, s.cancel = context.WithCancel(ctx) - s.exchangeInfoSrv = NewExchangeInfoSrv(s.ctx, NewSymbolInterval(s.class, "", "")) + s.exchangeInfoSrv = NewExchangeInfoSrv(s.ctx, logger, bd, NewSymbolInterval(s.class, "", "")) s.exchangeInfoSrv.Start() go func() { @@ -57,7 +59,8 @@ func (s *Service) autoRemoveExpired() { if t, ok := s.lastGetKlines.Load(si); ok { expiry := 2 * INTERVAL_2_DURATION[si.Interval] if now.Sub(t.(time.Time)) > expiry { - log.Debugf("%s %s@%s kline websocket closed after being idle for %.0fs.", si.Class, si.Symbol, si.Interval, expiry.Seconds()) + s.logger.Debug("Kline websocket closed after being idle", "class", si.Class, "symbol", si.Symbol, "interval", si.Interval, "duration", expiry.Seconds()) + // Remove from all caches s.lastGetKlines.Delete(si) s.klinesSrv.Delete(si) srv.Stop() @@ -74,7 +77,7 @@ func (s *Service) autoRemoveExpired() { if t, ok := s.lastGetDepth.Load(si); ok { expiry := 2 * time.Minute if now.Sub(t.(time.Time)) > expiry { - log.Debugf("%s %s depth websocket closed after being idle for %.0fs.", si.Class, si.Symbol, expiry.Seconds()) + s.logger.Debug("Depth websocket closed after being idle", "class", si.Class, "symbol", si.Symbol, "duration", expiry.Seconds()) s.lastGetDepth.Delete(si) s.depthSrv.Delete(si) srv.Stop() @@ -91,7 +94,8 @@ func (s *Service) autoRemoveExpired() { if t, ok := s.lastGetTicker.Load(si); ok { expiry := 2 * time.Minute if now.Sub(t.(time.Time)) > expiry { - log.Debugf("%s %s ticker24hr websocket closed after being idle for %.0fs.", si.Class, si.Symbol, expiry.Seconds()) + s.logger.Debug("Ticker websocket closed after being idle", "class", si.Class, "symbol", si.Symbol, "duration", expiry.Seconds()) + // Remove from all caches s.lastGetTicker.Delete(si) s.tickerSrv.Delete(si) srv.Stop() @@ -107,7 +111,7 @@ func (s *Service) Ticker(symbol string) *Ticker24hr { si := NewSymbolInterval(s.class, symbol, "") srv, loaded := s.tickerSrv.Load(*si) if !loaded { - if srv, loaded = s.tickerSrv.LoadOrStore(*si, NewTickerSrv(s.ctx, si)); !loaded { + if srv, loaded = s.tickerSrv.LoadOrStore(*si, NewTickerSrv(s.ctx, s.logger, si)); !loaded { srv.(*TickerSrv).Start() } } @@ -124,7 +128,7 @@ func (s *Service) Klines(symbol, interval string) []*Kline { si := NewSymbolInterval(s.class, symbol, interval) srv, loaded := s.klinesSrv.Load(*si) if !loaded { - if srv, loaded = s.klinesSrv.LoadOrStore(*si, NewKlinesSrv(s.ctx, si)); !loaded { + if srv, loaded = s.klinesSrv.LoadOrStore(*si, NewKlinesSrv(s.ctx, s.logger, s.banDetector, si)); !loaded { srv.(*KlinesSrv).Start() } } @@ -137,7 +141,7 @@ func (s *Service) Depth(symbol string) *Depth { si := NewSymbolInterval(s.class, symbol, "") srv, loaded := s.depthSrv.Load(*si) if !loaded { - if srv, loaded = s.depthSrv.LoadOrStore(*si, NewDepthSrv(s.ctx, si)); !loaded { + if srv, loaded = s.depthSrv.LoadOrStore(*si, NewDepthSrv(s.ctx, s.logger, si)); !loaded { srv.(*DepthSrv).Start() } } diff --git a/internal/service/ticker.go b/internal/service/ticker.go index 2b66f49..a01be9e 100644 --- a/internal/service/ticker.go +++ b/internal/service/ticker.go @@ -3,11 +3,10 @@ package service import ( "binance-proxy/internal/tool" "context" + "log/slog" "strings" "sync" - log "github.com/sirupsen/logrus" - spot "github.com/adshao/go-binance/v2" ) @@ -17,6 +16,8 @@ type TickerSrv struct { ctx context.Context cancel context.CancelFunc + logger *slog.Logger + initCtx context.Context initDone context.CancelFunc @@ -55,8 +56,10 @@ type Ticker24hr struct { Count int64 `json:"count"` } -func NewTickerSrv(ctx context.Context, si *symbolInterval) *TickerSrv { - s := &TickerSrv{si: si} +func NewTickerSrv(ctx context.Context, logger *slog.Logger, si *symbolInterval) *TickerSrv { + logger = logger.With("symbol", si.Symbol, "class", si.Class) + + s := &TickerSrv{si: si, logger: logger} s.ctx, s.cancel = context.WithCancel(ctx) s.initCtx, s.initDone = context.WithCancel(context.Background()) @@ -73,18 +76,19 @@ func (s *TickerSrv) Start() { ticker24hrDoneC, ticker24hrstopC, err := s.connectTicker24hr() if err != nil { - log.Errorf("%s %s ticker24hr websocket connection error: %s.", s.si.Class, s.si.Symbol, err) + s.logger.Error("Ticker24hr websocket connection error", "error", err) continue } bookDoneC, bookStopC, err := s.connectTickerBook() if err != nil { bookStopC <- struct{}{} - log.Errorf("%s %s bookTicker websocket connection error: %s.", s.si.Class, s.si.Symbol, err) + s.logger.Error("BookTicker websocket connection error", "error", err) continue } - log.Debugf("%s %s ticker24hr and bookTicker websocket connected.", s.si.Class, s.si.Symbol) + s.logger.Info("Ticker24hr and BookTicker websocket connected") + select { case <-s.ctx.Done(): bookStopC <- struct{}{} @@ -96,7 +100,7 @@ func (s *TickerSrv) Start() { bookStopC <- struct{}{} } - log.Warnf("%s %s ticker24hr or bookTicker websocket disconnected, trying to reconnect.", s.si.Class, s.si.Symbol) + s.logger.Warn("Ticker24hr or BookTicker websocket disconnected, trying to reconnect.") } }() } @@ -159,7 +163,8 @@ func (s *TickerSrv) wsHandlerBookTicker(event *spot.WsBookTickerEvent) { AskPrice: event.BestAskPrice, AskQuantity: event.BestAskQty, } - log.Tracef("%s %s bookTicker websocket message received", s.si.Class, s.si.Symbol) + + s.logger.Debug("BookTicker websocket message received", "bidPrice", event.BestBidPrice, "askPrice", event.BestAskPrice) } func (s *TickerSrv) wsHandlerTicker24hr(event *spot.WsMarketStatEvent) { @@ -191,13 +196,13 @@ func (s *TickerSrv) wsHandlerTicker24hr(event *spot.WsMarketStatEvent) { LastID: event.LastID, Count: event.Count, } - log.Tracef("%s %s ticker24hr websocket message received", s.si.Class, s.si.Symbol) + s.logger.Debug("Ticker24hr websocket message received", "lastPrice", event.LastPrice) } func (s *TickerSrv) errHandler(err error) { if strings.Contains(err.Error(), "context canceled") { - log.Warnf("%s %s ticker websocket context canceled, will restart connection.", s.si.Class, s.si.Symbol) + s.logger.Warn("Ticker websocket context canceled, will restart connection.", "error", err) } else { - log.Errorf("%s %s ticker24hr websocket connection error: %s.", s.si.Class, s.si.Symbol, err) + s.logger.Error("Ticker24hr websocket connection error", "error", err) } } From 4e185d443fae013ed66ab3e7601663a0aade861c Mon Sep 17 00:00:00 2001 From: ekarlso Date: Sat, 25 Oct 2025 19:54:55 +0200 Subject: [PATCH 2/2] Fix linter issues --- .golangci.yml | 105 +++++++++++++++++++++++++++++++ README.md | 3 +- cmd/binance-proxy/main.go | 9 ++- go.mod | 7 +-- go.sum | 8 --- internal/handler/depth.go | 5 +- internal/handler/exchangeinfo.go | 5 +- internal/handler/handler.go | 83 +++++++++++++----------- internal/handler/intern.go | 16 ++--- internal/handler/kline.go | 11 +++- internal/handler/pools.go | 6 +- internal/handler/ticker.go | 5 +- internal/logcache/logcache.go | 2 +- internal/service/ban_detector.go | 47 +++++++------- internal/service/depth.go | 2 +- internal/service/exchangeinfo.go | 17 ++--- internal/service/kline.go | 17 +++-- internal/service/limiter.go | 104 ++++++++++++++++++------------ internal/service/status.go | 20 +++--- internal/service/ticker.go | 2 +- 20 files changed, 315 insertions(+), 159 deletions(-) create mode 100644 .golangci.yml diff --git a/.golangci.yml b/.golangci.yml new file mode 100644 index 0000000..f02f1d1 --- /dev/null +++ b/.golangci.yml @@ -0,0 +1,105 @@ +version: "2" +linters: + default: none + enable: + - asciicheck + - bodyclose + - copyloopvar + - depguard + - dogsled + - errcheck + - goconst + - gocritic + - gocyclo + - godot + - goprintffuncname + - gosec + - govet + - importas + - ineffassign + - misspell + - nakedret + - nilerr + - noctx + - nolintlint + - prealloc + - predeclared + - revive + - rowserrcheck + - staticcheck + - thelper + - unconvert + - unparam + - unused + - whitespace + settings: + depguard: + rules: + main: + allow: + - $gostd + - go.trollit.tech/binance-proxy + - github.com/adshao/go-binance + - github.com/jessevdk/go-flags + - golang.org/x/time/rate + - github.com/samber/slog-multi + - google.golang.org/grpc + - go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp + - github.com/go-slog/otelslog + - go.opentelemetry.io/otel + - github.com/stash86/binance-proxy + godot: + scope: toplevel + exclude: + - ^ \+.* + - ^ ANCHOR.* + gosec: + excludes: + - G307 + - G108 + importas: + no-unaliased: true + nolintlint: + require-specific: true + revive: + rules: + - name: exported + arguments: + - disableStutteringCheck + - name: unused-parameter + disabled: true + tagliatelle: + case: + rules: + json: goCamel + exclusions: + generated: lax + presets: + - comments + - common-false-positives + - legacy + - std-error-handling + paths: + - third_party$ + - builtin$ + - examples$ +formatters: + enable: + - gci + - gofmt + - goimports + settings: + gci: + sections: + - standard + - default + - prefix(go.trollit.tech/binance-proxy) + goimports: + local-prefixes: + - go.trollit.tech/binance-proxy + exclusions: + generated: lax + paths: + - third_party$ + - builtin$ + - examples$ diff --git a/README.md b/README.md index d8cb38e..4b85e95 100644 --- a/README.md +++ b/README.md @@ -125,7 +125,7 @@ The status endpoint provides comprehensive information about the proxy service: ```json { "proxy_status": { - "service": "binance-proxy", + "service": "github.com/stash86/binance-proxy", "healthy": true, "start_time": "2025-06-15T10:30:00Z", "uptime": "2h15m30s", @@ -535,7 +535,6 @@ By submitting a pull request to this project, you agree to license your contribu - [go-binance](https://github.com/adshao/go-binance/blob/master/LICENSE) - [go-flags](https://github.com/jessevdk/go-flags/blob/master/LICENSE) -- [logrus](https://github.com/sirupsen/logrus/blob/master/LICENSE) - [go-time](https://cs.opensource.google/go/x/time/+/master:LICENSE) - [go-simplejson](https://github.com/bitly/go-simplejson/blob/master/LICENSE) - [websocket](https://github.com/gorilla/websocket/blob/master/LICENSE) diff --git a/cmd/binance-proxy/main.go b/cmd/binance-proxy/main.go index 4e6d04b..11132b6 100644 --- a/cmd/binance-proxy/main.go +++ b/cmd/binance-proxy/main.go @@ -1,23 +1,22 @@ package main import ( - "binance-proxy/internal/handler" - "binance-proxy/internal/logcache" - "binance-proxy/internal/service" "context" "errors" "fmt" stdlog "log" "log/slog" "net/http" + _ "net/http/pprof" "os" "os/signal" "syscall" "time" - _ "net/http/pprof" - "github.com/jessevdk/go-flags" + "github.com/stash86/binance-proxy/internal/handler" + "github.com/stash86/binance-proxy/internal/logcache" + "github.com/stash86/binance-proxy/internal/service" ) func startProxy(ctx context.Context, logger *slog.Logger, bd *service.BanDetector, port int, class service.Class, disablefakekline bool, alwaysshowforwards bool, errChan chan<- error) { diff --git a/go.mod b/go.mod index da6d342..5363eee 100644 --- a/go.mod +++ b/go.mod @@ -1,13 +1,10 @@ -module binance-proxy +module github.com/stash86/binance-proxy -go 1.23.0 - -toolchain go1.24.4 +go 1.25.3 require ( github.com/adshao/go-binance/v2 v2.8.2 github.com/jessevdk/go-flags v1.6.1 - github.com/sirupsen/logrus v1.9.3 golang.org/x/time v0.12.0 ) diff --git a/go.sum b/go.sum index 29db16f..a349a0b 100644 --- a/go.sum +++ b/go.sum @@ -2,7 +2,6 @@ github.com/adshao/go-binance/v2 v2.8.2 h1:cpMaoBnrg9g7aTNEAeMRIIMwVZ8S/oR5Fca+Py github.com/adshao/go-binance/v2 v2.8.2/go.mod h1:XkkuecSyJKPolaCGf/q4ovJYB3t0P+7RUYTbGr+LMGM= github.com/bitly/go-simplejson v0.5.1 h1:xgwPbetQScXt1gh9BmoJ6j9JMr3TElvuIyjR8pgdoow= github.com/bitly/go-simplejson v0.5.1/go.mod h1:YOPVLzCfwK14b4Sff3oP1AmGhI9T9Vsg84etUnlyp+Q= -github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/golang/mock v1.6.0 h1:ErTB+efbowRARo13NNdxyJji2egdxLGQhRaY+DUumQc= @@ -19,20 +18,13 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/shopspring/decimal v1.4.0 h1:bxl37RwXBklmTi0C79JfXCEBD1cqqHt0bbgBAGFp81k= github.com/shopspring/decimal v1.4.0/go.mod h1:gawqmDU56v4yIKSwfBSFip1HdCCXN8/+DMd9qYNcwME= -github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= -github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= -github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= -github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= -golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.33.0 h1:q3i8TbbEz+JRD9ywIRlyRAQbM0qF7hu24q3teo2hbuw= golang.org/x/sys v0.33.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= golang.org/x/time v0.12.0 h1:ScB/8o8olJvc+CQPWrK3fPZNfh7qgwCrY0zJmoEQLSE= golang.org/x/time v0.12.0/go.mod h1:CDIdPxbZBQxdj6cxyCIdrNogrJKMJ7pr37NYpMcMDSg= -gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/internal/handler/depth.go b/internal/handler/depth.go index 7b851e5..e12206b 100644 --- a/internal/handler/depth.go +++ b/internal/handler/depth.go @@ -74,5 +74,8 @@ func (s *Handler) depth(w http.ResponseWriter, r *http.Request) { return } - w.Write(buf.Bytes()) + if _, err := w.Write(buf.Bytes()); err != nil { + http.Error(w, "Failed to write response", http.StatusInternalServerError) + return + } } diff --git a/internal/handler/exchangeinfo.go b/internal/handler/exchangeinfo.go index bc48d96..888b1d2 100644 --- a/internal/handler/exchangeinfo.go +++ b/internal/handler/exchangeinfo.go @@ -14,5 +14,8 @@ func (s *Handler) exchangeInfo(w http.ResponseWriter) { w.Header().Set("Content-Type", "application/json") w.Header().Set("Data-Source", "cache") - w.Write(data) + if _, err := w.Write(data); err != nil { + http.Error(w, "Failed to write response", http.StatusInternalServerError) + return + } } diff --git a/internal/handler/handler.go b/internal/handler/handler.go index be0b55c..e332016 100644 --- a/internal/handler/handler.go +++ b/internal/handler/handler.go @@ -1,8 +1,6 @@ package handler import ( - "binance-proxy/internal/logcache" - "binance-proxy/internal/service" "bytes" "context" "encoding/json" @@ -15,9 +13,22 @@ import ( "os" "sync" "time" + + "github.com/stash86/binance-proxy/internal/logcache" + "github.com/stash86/binance-proxy/internal/service" +) + +const ( + APIKlinesPath = "/api/v3/klines" + FAPIKlinesPath = "/fapi/v1/klines" + APIDepthPath = "/api/v3/depth" + FAPIDepthPath = "/fapi/v1/depth" + APITickerPath = "/api/v3/ticker/24hr" + APIExchangePath = "/api/v3/exchangeInfo" + FAPIExchangePath = "/fapi/v1/exchangeInfo" ) -// bufferPool implements httputil.BufferPool interface +// bufferPool implements httputil.BufferPool interface. type bufferPool struct { pool sync.Pool } @@ -25,7 +36,7 @@ type bufferPool struct { func (bp *bufferPool) Get() []byte { if bp.pool.New == nil { bp.pool.New = func() interface{} { - buf := make([]byte, 32*1024) // 32KB buffer + buf := make([]byte, 32*1024) // 32KB buffer. return &buf } } @@ -109,7 +120,7 @@ func (s *Handler) Router(w http.ResponseWriter, r *http.Request) { ) } -// HTTP client with connection pooling for reverse proxy +// HTTP client with connection pooling for reverse proxy. var ( proxyHTTPClientOnce sync.Once proxyHTTPClient *http.Client @@ -117,20 +128,20 @@ var ( func getProxyHTTPClient(logger *slog.Logger) *http.Client { proxyHTTPClientOnce.Do(func() { - // Create a new transport each time to avoid concurrent modification issues + // Create a new transport each time to avoid concurrent modification issues. transport := &http.Transport{ MaxIdleConns: 200, MaxIdleConnsPerHost: 20, IdleConnTimeout: 90 * time.Second, DisableCompression: false, ForceAttemptHTTP2: true, - // Connection pooling settings for high throughput + // Connection pooling settings for high throughput. MaxConnsPerHost: 50, } proxyHTTPClient = &http.Client{ Transport: transport, - Timeout: 60 * time.Second, // Longer timeout for proxy requests + Timeout: 60 * time.Second, // Longer timeout for proxy requests. } if proxyHTTPClient == nil { @@ -156,13 +167,13 @@ func getProxyHTTPClient(logger *slog.Logger) *http.Client { } } - // Double-check transport is not nil and clone it to avoid concurrent modification + // Double-check transport is not nil and clone it to avoid concurrent modification. if proxyHTTPClient.Transport == nil { logger.Error("HTTP client transport is nil, fixing with default transport") proxyHTTPClient.Transport = http.DefaultTransport } - // Return a copy of the client with a cloned transport to avoid concurrent modifications + // Return a copy of the client with a cloned transport to avoid concurrent modifications. transport := proxyHTTPClient.Transport if ht, ok := transport.(*http.Transport); ok { transport = ht.Clone() @@ -177,24 +188,6 @@ func getProxyHTTPClient(logger *slog.Logger) *http.Client { func (s *Handler) reverseProxy(w http.ResponseWriter, r *http.Request) { logger := s.logger.With("method", r.Method, "uri", r.RequestURI, "remote", r.RemoteAddr, "class", s.class) - // Validate handler state - if s == nil { - logger.Error("Handler is nil in reverseProxy") - http.Error(w, "Internal server error", http.StatusInternalServerError) - return - } - - if w == nil { - logger.Error("ResponseWriter is nil in reverseProxy") - return - } - - if r == nil { - logger.Error("Request is nil in reverseProxy") - http.Error(w, "Internal server error", http.StatusInternalServerError) - return - } - // Check if context is cancelled if s.ctx != nil { select { @@ -203,7 +196,7 @@ func (s *Handler) reverseProxy(w http.ResponseWriter, r *http.Request) { http.Error(w, "Service unavailable", http.StatusServiceUnavailable) return default: - // Context is still valid, continue + // Context is still valid, continue. } } @@ -225,7 +218,11 @@ func (s *Handler) reverseProxy(w http.ResponseWriter, r *http.Request) { logger.Debug(msg) } - service.RateWait(s.ctx, s.class, r.Method, r.URL.Path, r.URL.Query()) + if err := service.RateWait(s.ctx, s.class, r.Method, r.URL.Path, r.URL.Query()); err != nil { + logcache.LogOncePerDuration("error", fmt.Sprintf("Rate wait failed for %s: %v", s.class, err)) + http.Error(w, "Service temporarily unavailable", http.StatusServiceUnavailable) + return + } // Use hardcoded endpoints (current working version) var u *url.URL @@ -307,11 +304,11 @@ func (s *Handler) reverseProxy(w http.ResponseWriter, r *http.Request) { } var body []byte switch resp.Request.URL.Path { - case "/api/v3/klines", "/fapi/v1/klines": + case APIKlinesPath, FAPIKlinesPath: body = []byte("[]") - case "/api/v3/depth", "/fapi/v1/depth": + case APIDepthPath, FAPIDepthPath: body = []byte(`{"lastUpdateId":0,"bids":[],"asks":[]}`) - case "/api/v3/ticker/24hr": + case APITickerPath: body = []byte("{}") default: body = []byte("{}") @@ -422,7 +419,7 @@ func (s *Handler) returnEmptyResponse(w http.ResponseWriter, r *http.Request) { // Return 429 to signal clients to slow down/back off w.WriteHeader(http.StatusTooManyRequests) - w.Write(response) + _, _ = w.Write(response) } func (s *Handler) status(w http.ResponseWriter) { @@ -432,7 +429,12 @@ func (s *Handler) status(w http.ResponseWriter) { s.logger.Warn("Status endpoint called but context is canceled") w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusServiceUnavailable) - w.Write([]byte(`{"error": "service shutting down", "status": "unavailable"}`)) + + _, err := w.Write([]byte(`{"error": "service shutting down", "status": "unavailable"}`)) + if err != nil { + s.logger.Error("Failed to write shutdown status response", "err", err) + } + return default: // Context is still valid, proceed normally @@ -466,7 +468,9 @@ func (s *Handler) status(w http.ResponseWriter) { } w.Header().Set("Content-Type", "application/json") - json.NewEncoder(w).Encode(response) + if err := json.NewEncoder(w).Encode(response); err != nil { + s.logger.Error("Failed to encode status response", "err", err) + } } func (s *Handler) restart(w http.ResponseWriter, r *http.Request) { @@ -474,7 +478,12 @@ func (s *Handler) restart(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodGet { w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusMethodNotAllowed) - w.Write([]byte(`{"error": "only GET method allowed", "status": "failed"}`)) + + _, err := w.Write([]byte(`{"error": "only GET method allowed", "status": "failed"}`)) + if err != nil { + s.logger.Error("Failed to write method not allowed response", "err", err) + } + return } diff --git a/internal/handler/intern.go b/internal/handler/intern.go index d3b719a..22c6611 100644 --- a/internal/handler/intern.go +++ b/internal/handler/intern.go @@ -4,10 +4,10 @@ import ( "sync" ) -// Optimized for exactly 30 trading pairs +// Optimized for exactly 30 trading pairs. var ( - symbolIntern = newStringInterner(35) // 30 pairs + 5 buffer for growth - intervalIntern = newStringInterner(15) // Standard intervals + symbolIntern = newStringInterner(35) // 30 pairs + 5 buffer for growth. + intervalIntern = newStringInterner(15) // Standard intervals. ) type stringInterner struct { @@ -26,7 +26,7 @@ func (si *stringInterner) intern(s string) string { return s } - // Fast path: read-only lookup (most common case) + // Fast path: read-only lookup (most common case). si.mu.RLock() if interned, exists := si.cache[s]; exists { si.mu.RUnlock() @@ -34,21 +34,21 @@ func (si *stringInterner) intern(s string) string { } si.mu.RUnlock() - // Slow path: add new string (rare after startup) + // Slow path: add new string (rare after startup). si.mu.Lock() defer si.mu.Unlock() - // Double-check after acquiring write lock + // Double-check after acquiring write lock. if interned, exists := si.cache[s]; exists { return interned } - // Store the string + // Store the string. si.cache[s] = s return s } -// Public API +// Public API. func InternSymbol(symbol string) string { return symbolIntern.intern(symbol) } diff --git a/internal/handler/kline.go b/internal/handler/kline.go index 90e95ae..a0158ca 100644 --- a/internal/handler/kline.go +++ b/internal/handler/kline.go @@ -20,7 +20,12 @@ func (s *Handler) klines(w http.ResponseWriter, r *http.Request) { logger.Debug("klines request returning empty due to API ban") w.Header().Set("Content-Type", "application/json") w.Header().Set("Data-Source", "ban-protection") - w.Write([]byte("[]")) + + _, err := w.Write([]byte("[]")) + if err != nil { + s.logger.Error("Failed to write ban protection response", "err", err) + } + return } @@ -122,5 +127,7 @@ func (s *Handler) klines(w http.ResponseWriter, r *http.Request) { return } - w.Write(buf.Bytes()) + if _, err := w.Write(buf.Bytes()); err != nil { + logger.Error("Failed to write response", "err", err) + } } diff --git a/internal/handler/pools.go b/internal/handler/pools.go index 5456d48..4a09287 100644 --- a/internal/handler/pools.go +++ b/internal/handler/pools.go @@ -5,19 +5,19 @@ import ( "sync" ) -// Shared buffer pool for all handlers to reduce memory overhead +// Shared buffer pool for all handlers to reduce memory overhead. var BufferPool = sync.Pool{ New: func() interface{} { return &bytes.Buffer{} }, } -// GetBuffer gets a buffer from the shared pool +// GetBuffer gets a buffer from the shared pool. func GetBuffer() *bytes.Buffer { return BufferPool.Get().(*bytes.Buffer) } -// PutBuffer returns a buffer to the shared pool after resetting it +// PutBuffer returns a buffer to the shared pool after resetting it. func PutBuffer(buf *bytes.Buffer) { if buf != nil { buf.Reset() diff --git a/internal/handler/ticker.go b/internal/handler/ticker.go index cea6c37..98586ff 100644 --- a/internal/handler/ticker.go +++ b/internal/handler/ticker.go @@ -37,5 +37,8 @@ func (s *Handler) ticker(w http.ResponseWriter, r *http.Request) { return } - w.Write(buf.Bytes()) + if _, err := w.Write(buf.Bytes()); err != nil { + s.logger.Error("Failed to write response", "error", err) + return + } } diff --git a/internal/logcache/logcache.go b/internal/logcache/logcache.go index d28d3b6..fc44f67 100644 --- a/internal/logcache/logcache.go +++ b/internal/logcache/logcache.go @@ -18,7 +18,7 @@ var ( timestampRegexp = regexp.MustCompile(`\d{4}-\d{2}-\d{2}[ T]\d{2}:\d{2}:\d{2}(?:\.\d+)?(?:Z|[+-]\d{2}:?\d{2})?`) quotedRegexp = regexp.MustCompile(`"[^"]*"`) - // Optional hooks for unified logging backends + // Optional hooks for unified logging backends. loggerHook func(level, msg string) writerHook func(msg string) ) diff --git a/internal/service/ban_detector.go b/internal/service/ban_detector.go index 5ce6641..0dfd254 100644 --- a/internal/service/ban_detector.go +++ b/internal/service/ban_detector.go @@ -13,15 +13,14 @@ import ( "sync" "time" - "binance-proxy/internal/logcache" - - log "github.com/sirupsen/logrus" + "github.com/stash86/binance-proxy/internal/logcache" ) -// Buffer pool for reusing byte buffers to reduce GC pressure +// Buffer pool for reusing byte buffers to reduce GC pressure. var bufferPool = sync.Pool{ - New: func() interface{} { - return make([]byte, 0, 1024) // Start with 1KB capacity + New: func() any { + buf := make([]byte, 0, 1024) // Start with 1KB capacity + return &buf }, } @@ -77,7 +76,7 @@ func (bd *BanDetector) IsBanned(class Class) bool { } else if bd.spotBanned && now.After(bd.spotRecoveryTime) { // Recovery time passed, clear ban bd.spotBanned = false - log.Infof("%s API ban lifted, resuming normal operation", class) + bd.logger.Info("%s API ban lifted, resuming normal operation", "class", class) } } else { if bd.futuresBanned && now.Before(bd.futuresRecoveryTime) { @@ -85,7 +84,7 @@ func (bd *BanDetector) IsBanned(class Class) bool { } else if bd.futuresBanned && now.After(bd.futuresRecoveryTime) { // Recovery time passed, clear ban bd.futuresBanned = false - log.Infof("%s API ban lifted, resuming normal operation", class) + bd.logger.Info("%s API ban lifted, resuming normal operation", "class", class) } } @@ -123,9 +122,9 @@ func (bd *BanDetector) CheckResponse(class Class, resp *http.Response, err error if banUntil.IsZero() { // If both methods fail, use 10 minutes default banUntil = now.Add(10 * time.Minute) - log.Errorf("%s API IP banned (418), no expiry found, suspending requests for 10 minutes until %v", class, banUntil) + bd.logger.Error("API Banned (418), no expiry found, suspending requests for 10 minutes until", "class", class, "banUntil", banUntil) } else { - log.Errorf("%s API IP banned (418), suspending requests until %v", class, banUntil) + bd.logger.Error("API IP banned (418), suspending requests until", "class", class, "banUntil", banUntil) } bd.setBanned(class, banUntil) bd.resetBackoffCount(class) // Reset backoff on explicit ban @@ -135,16 +134,16 @@ func (bd *BanDetector) CheckResponse(class Class, resp *http.Response, err error if banUntil.IsZero() { // Fallback to 1 minute default banUntil = now.Add(1 * time.Minute) - log.Warnf("%s API rate limited (429), no Retry-After header, suspending requests for 1 minute until %v", class, banUntil) + bd.logger.Warn("API rate limited (429), no Retry-After header, suspending requests for 1 minute until", "class", class, "banUntil", banUntil) } else { - log.Warnf("%s API rate limited (429), suspending requests until %v", class, banUntil) + bd.logger.Warn("API rate limited (429), suspending requests until", "class", class, "banUntil", banUntil) } bd.setBanned(class, banUntil) bd.resetBackoffCount(class) // Reset backoff on explicit rate limit return true case 403: // Forbidden bd.setBanned(class, now.Add(5*time.Minute)) - log.Warnf("%s API access forbidden (403), suspending requests until %v", class, bd.getRecoveryTime(class)) + bd.logger.Warn("API access forbidden (403), suspending requests until", "class", class, "banUntil", bd.getRecoveryTime(class)) return true } } @@ -155,7 +154,6 @@ func (bd *BanDetector) CheckResponse(class Class, resp *http.Response, err error if strings.Contains(errorMsg, "connection refused") || strings.Contains(errorMsg, "timeout") || strings.Contains(errorMsg, "no route to host") { - bd.incrementErrorCount(class, now) // If too many errors in short time, use exponential backoff @@ -164,7 +162,7 @@ func (bd *BanDetector) CheckResponse(class Class, resp *http.Response, err error backoffDuration := bd.getExponentialBackoff(class) bd.setBanned(class, now.Add(backoffDuration)) bd.resetErrorCount(class) - log.Warnf("%s API connection issues detected (%d errors), suspending requests for %v until %v", class, errorCount, backoffDuration, bd.getRecoveryTime(class)) + bd.logger.Warn("API connection issues detected, suspending requests", "class", class, "errorCount", errorCount, "backoffDuration", backoffDuration, "banUntil", bd.getRecoveryTime(class)) return true } } @@ -183,12 +181,11 @@ func (bd *BanDetector) parseBanExpiryNonDestructive(resp *http.Response) time.Ti if resp == nil || resp.Body == nil { return time.Time{} } - // Get buffer from pool - buf := bufferPool.Get().([]byte) + bufPtr := bufferPool.Get().(*[]byte) defer func() { - buf = buf[:0] // Reset length but keep capacity - bufferPool.Put(buf) + *bufPtr = (*bufPtr)[:0] // Reset length but keep capacity + bufferPool.Put(bufPtr) }() // Read response body without consuming it @@ -214,7 +211,7 @@ func (bd *BanDetector) parseBanExpiryNonDestructive(resp *http.Response) time.Ti if timestamp, err := strconv.ParseInt(matches[1], 10, 64); err == nil { // Convert milliseconds to seconds if needed if timestamp > 9999999999 { - timestamp = timestamp / 1000 + timestamp /= 1000 } return time.Unix(timestamp, 0) } @@ -297,7 +294,15 @@ func (bd *BanDetector) getExponentialBackoff(class Class) time.Duration { } // Exponential backoff: 2^n seconds, max 10 minutes - duration := time.Duration(1< 30 { // Prevent overflow in bit shift + backoffCount = 30 + } + + duration := time.Duration(1< maxDuration { duration = maxDuration diff --git a/internal/service/depth.go b/internal/service/depth.go index 8d91227..322ce11 100644 --- a/internal/service/depth.go +++ b/internal/service/depth.go @@ -1,7 +1,6 @@ package service import ( - "binance-proxy/internal/tool" "context" "log/slog" "strings" @@ -10,6 +9,7 @@ import ( spot "github.com/adshao/go-binance/v2" futures "github.com/adshao/go-binance/v2/futures" + "github.com/stash86/binance-proxy/internal/tool" ) type DepthSrv struct { diff --git a/internal/service/exchangeinfo.go b/internal/service/exchangeinfo.go index 8e150b7..34dc13d 100644 --- a/internal/service/exchangeinfo.go +++ b/internal/service/exchangeinfo.go @@ -2,14 +2,13 @@ package service import ( "context" + "io" "log/slog" "net/http" "sync" "time" - "binance-proxy/internal/tool" - - "io" + "github.com/stash86/binance-proxy/internal/tool" ) type ExchangeInfoSrv struct { @@ -29,7 +28,7 @@ type ExchangeInfoSrv struct { exchangeInfo []byte } -// HTTP client pool for connection reuse +// HTTP client pool for connection reuse. var ( httpClientOnce sync.Once httpClient *http.Client @@ -88,7 +87,7 @@ func (s *ExchangeInfoSrv) Start() { }() } -// Nothing to do +// Nothing to do. func (s *ExchangeInfoSrv) Stop() {} func (s *ExchangeInfoSrv) GetExchangeInfo() []byte { @@ -117,10 +116,14 @@ func (s *ExchangeInfoSrv) refreshExchangeInfo() error { var url string if s.si.Class == SPOT { url = "https://api.binance.com/api/v3/exchangeInfo" - RateWait(s.ctx, s.si.Class, http.MethodGet, "/api/v3/exchangeInfo", nil) + if err := RateWait(s.ctx, s.si.Class, http.MethodGet, "/api/v3/exchangeInfo", nil); err != nil { + return err + } } else { url = "https://fapi.binance.com/fapi/v1/exchangeInfo" - RateWait(s.ctx, s.si.Class, http.MethodGet, "/fapi/v1/exchangeInfo", nil) + if err := RateWait(s.ctx, s.si.Class, http.MethodGet, "/fapi/v1/exchangeInfo", nil); err != nil { + return err + } } // Use pooled HTTP client instead of http.Get() diff --git a/internal/service/kline.go b/internal/service/kline.go index d9d35dc..9710e04 100644 --- a/internal/service/kline.go +++ b/internal/service/kline.go @@ -1,7 +1,6 @@ package service import ( - "binance-proxy/internal/tool" "container/list" "context" "log/slog" @@ -12,6 +11,7 @@ import ( spot "github.com/adshao/go-binance/v2" futures "github.com/adshao/go-binance/v2/futures" + "github.com/stash86/binance-proxy/internal/tool" ) type Kline struct { @@ -132,17 +132,23 @@ func (s *KlinesSrv) initKlineData() { var resp *http.Response if s.si.Class == SPOT { - RateWait(s.ctx, s.si.Class, http.MethodGet, "/api/v3/klines", url.Values{ + if err := RateWait(s.ctx, s.si.Class, http.MethodGet, "/api/v3/klines", url.Values{ "limit": []string{"1000"}, - }) + }); err != nil { + s.logger.Error("Rate limit wait failed", "error", err) + continue + } client := spot.NewClient("", "") klines, err = client.NewKlinesService(). Symbol(s.si.Symbol).Interval(s.si.Interval).Limit(1000). Do(s.ctx) } else { - RateWait(s.ctx, s.si.Class, http.MethodGet, "/fapi/v1/klines", url.Values{ + if err := RateWait(s.ctx, s.si.Class, http.MethodGet, "/fapi/v1/klines", url.Values{ "limit": []string{"1000"}, - }) + }); err != nil { + s.logger.Error("Rate limit wait failed", "error", err) + continue + } client := futures.NewClient("", "") klines, err = client.NewKlinesService(). Symbol(s.si.Symbol).Interval(s.si.Interval).Limit(1000). @@ -208,7 +214,6 @@ func (s *KlinesSrv) initKlineData() { } func (s *KlinesSrv) wsHandler(event interface{}) { - if s.klinesList == nil { s.initKlineData() } diff --git a/internal/service/limiter.go b/internal/service/limiter.go index 030efea..361a61f 100644 --- a/internal/service/limiter.go +++ b/internal/service/limiter.go @@ -14,63 +14,89 @@ var ( FuturesLimiter = rate.NewLimiter(40, 2400) ) -func RateWait(ctx context.Context, class Class, method, path string, query url.Values) { - weight := 1 +func getKlinesWeight(limit int) int { + switch { + case limit >= 1 && limit < 100: + return 1 + case limit >= 100 && limit < 500: + return 2 + case limit >= 500 && limit <= 1000: + return 5 + case limit > 1000 && limit <= 1500: + return 10 + default: + return 5 + } +} + +func getSpotDepthWeight(limit int) int { + switch { + case limit >= 5 && limit <= 100: + return 1 + case limit >= 100 && limit < 500: + return 2 + case limit == 500: + return 5 + case limit == 1000: + return 10 + case limit == 5000: + return 50 + default: + return 1 + } +} + +func getFuturesDepthWeight(limit int) int { + switch { + case limit >= 5 && limit <= 50: + return 2 + case limit == 100: + return 5 + case limit == 500: + return 10 + case limit == 1000: + return 20 + default: + return 2 + } +} + +func calculateWeight(method, path string, query url.Values) int { switch path { case "/fapi/v1/klines": - weight = 5 limitInt, _ := strconv.Atoi(query.Get("limit")) - if limitInt >= 1 && limitInt < 100 { - weight = 1 - } else if limitInt >= 100 && limitInt < 500 { - weight = 2 - } else if limitInt >= 500 && limitInt <= 1000 { - weight = 5 - } else if limitInt > 1000 && limitInt <= 1500 { - weight = 10 - } + return getKlinesWeight(limitInt) case "/api/v3/depth": limitInt, _ := strconv.Atoi(query.Get("limit")) - if limitInt >= 5 && limitInt <= 100 { - weight = 1 - } else if limitInt >= 100 && limitInt < 500 { - weight = 2 - } else if limitInt == 500 { - weight = 5 - } else if limitInt == 1000 { - weight = 10 - } else if limitInt == 5000 { - weight = 50 - } + return getSpotDepthWeight(limitInt) case "/fapi/v1/depth": limitInt, _ := strconv.Atoi(query.Get("limit")) - if limitInt >= 5 && limitInt <= 50 { - weight = 2 - } else if limitInt == 100 { - weight = 5 - } else if limitInt == 500 { - weight = 10 - } else if limitInt == 1000 { - weight = 20 - } + return getFuturesDepthWeight(limitInt) case "/api/v3/ticker/24hr", "/fapi/v1/ticker/24hr": if query.Get("symbol") == "" { - weight = 40 + return 40 } + return 1 case "/api/v3/exchangeInfo", "/fapi/v1/exchangeInfo", "/api/v3/account", "/api/v3/myTrades": - weight = 10 + return 10 case "/api/v3/order": if method == http.MethodGet { - weight = 2 + return 2 } + return 1 case "/fapi/v1/userTrades", "/fapi/v2/account": - weight = 5 - + return 5 + default: + return 1 } +} + +func RateWait(ctx context.Context, class Class, method, path string, query url.Values) error { + weight := calculateWeight(method, path, query) if class == SPOT { - SpotLimiter.WaitN(ctx, weight) + return SpotLimiter.WaitN(ctx, weight) } else { - FuturesLimiter.WaitN(ctx, weight) + return FuturesLimiter.WaitN(ctx, weight) } } diff --git a/internal/service/status.go b/internal/service/status.go index f13833c..6bdc126 100644 --- a/internal/service/status.go +++ b/internal/service/status.go @@ -5,7 +5,7 @@ import ( "time" ) -// StatusTracker tracks the overall status of the proxy service +// StatusTracker tracks the overall status of the proxy service. type StatusTracker struct { mu sync.RWMutex startTime time.Time @@ -21,7 +21,7 @@ var ( statusTrackerOnce sync.Once ) -// GetStatusTracker returns the global status tracker instance +// GetStatusTracker returns the global status tracker instance. func GetStatusTracker() *StatusTracker { statusTrackerOnce.Do(func() { statusTracker = &StatusTracker{ @@ -32,7 +32,7 @@ func GetStatusTracker() *StatusTracker { return statusTracker } -// Status represents the current status of the proxy +// Status represents the current status of the proxy. type Status struct { Service string `json:"service"` Healthy bool `json:"healthy"` @@ -46,7 +46,7 @@ type Status struct { Timestamp time.Time `json:"timestamp"` } -// GetStatus returns the current status +// GetStatus returns the current status. func (st *StatusTracker) GetStatus() Status { st.mu.RLock() defer st.mu.RUnlock() @@ -58,7 +58,7 @@ func (st *StatusTracker) GetStatus() Status { } status := Status{ - Service: "binance-proxy", + Service: "github.com/stash86/binance-proxy", Healthy: st.isHealthy, StartTime: st.startTime, Uptime: uptime.String(), @@ -76,14 +76,14 @@ func (st *StatusTracker) GetStatus() Status { return status } -// RecordRequest increments the request counter +// RecordRequest increments the request counter. func (st *StatusTracker) RecordRequest() { st.mu.Lock() defer st.mu.Unlock() st.requests++ } -// RecordError increments the error counter and records the error +// RecordError increments the error counter and records the error. func (st *StatusTracker) RecordError(err error) { st.mu.Lock() defer st.mu.Unlock() @@ -91,20 +91,20 @@ func (st *StatusTracker) RecordError(err error) { st.lastError = err st.lastErrorAt = time.Now() - // Consider service unhealthy if error rate is too high + // Consider service unhealthy if error rate is too high. if st.requests > 100 && float64(st.errors)/float64(st.requests) > 0.1 { st.isHealthy = false } } -// SetHealthy manually sets the health status +// SetHealthy manually sets the health status. func (st *StatusTracker) SetHealthy(healthy bool) { st.mu.Lock() defer st.mu.Unlock() st.isHealthy = healthy } -// Reset resets all counters (useful for testing) +// Reset resets all counters (useful for testing). func (st *StatusTracker) Reset() { st.mu.Lock() defer st.mu.Unlock() diff --git a/internal/service/ticker.go b/internal/service/ticker.go index a01be9e..371b34a 100644 --- a/internal/service/ticker.go +++ b/internal/service/ticker.go @@ -1,13 +1,13 @@ package service import ( - "binance-proxy/internal/tool" "context" "log/slog" "strings" "sync" spot "github.com/adshao/go-binance/v2" + "github.com/stash86/binance-proxy/internal/tool" ) type TickerSrv struct {