Skip to content

Commit aae4e34

Browse files
committed
fix linter
Signed-off-by: Musilah <nataleigh.nk@gmail.com>
1 parent ba85983 commit aae4e34

File tree

19 files changed

+229
-144
lines changed

19 files changed

+229
-144
lines changed

cmd/main.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ const (
3636

3737
coapWithoutDTLS = "MPROXY_COAP_WITHOUT_DTLS_"
3838
coapWithDTLS = "MPROXY_COAP_WITH_DTLS_"
39+
40+
defaultTargetHost = "localhost"
3941
)
4042

4143
func main() {
@@ -136,7 +138,7 @@ func startMQTTProxy(g *errgroup.Group, ctx context.Context, envPrefix string, ha
136138
}
137139

138140
if cfg.TargetHost == "" {
139-
cfg.TargetHost = "localhost"
141+
cfg.TargetHost = defaultTargetHost
140142
}
141143

142144
if cfg.TargetPort == "" {
@@ -187,7 +189,7 @@ func startWebSocketProxy(g *errgroup.Group, ctx context.Context, envPrefix strin
187189
}
188190

189191
if cfg.TargetHost == "" {
190-
cfg.TargetHost = "localhost"
192+
cfg.TargetHost = defaultTargetHost
191193
}
192194

193195
if cfg.TargetPort == "" {
@@ -245,7 +247,7 @@ func startHTTPProxy(g *errgroup.Group, ctx context.Context, envPrefix string, ha
245247
}
246248

247249
if cfg.TargetHost == "" {
248-
cfg.TargetHost = "localhost"
250+
cfg.TargetHost = defaultTargetHost
249251
}
250252

251253
if cfg.TargetPort == "" {
@@ -300,7 +302,7 @@ func startCoAPProxy(g *errgroup.Group, ctx context.Context, envPrefix string, ha
300302
}
301303

302304
if cfg.TargetHost == "" {
303-
cfg.TargetHost = "localhost"
305+
cfg.TargetHost = defaultTargetHost
304306
}
305307

306308
if cfg.TargetPort == "" {

cmd/production/handlers.go

Lines changed: 5 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ import (
1313
"github.com/absmach/mproxy/pkg/ratelimit"
1414
)
1515

16+
const protocolMQTT = "mqtt"
17+
1618
// RateLimitedHandler wraps a handler with rate limiting.
1719
type RateLimitedHandler struct {
1820
handler handler.Handler
@@ -97,13 +99,10 @@ type InstrumentedHandler struct {
9799
func (h *InstrumentedHandler) AuthConnect(ctx context.Context, hctx *handler.Context) error {
98100
start := time.Now()
99101
h.metrics.AuthAttempts.WithLabelValues(hctx.Protocol, "connect").Inc()
100-
101102
err := h.handler.AuthConnect(ctx, hctx)
102-
103103
if err != nil {
104104
h.metrics.AuthFailures.WithLabelValues(hctx.Protocol, "connect", "unauthorized").Inc()
105105
}
106-
107106
duration := time.Since(start).Seconds()
108107
h.metrics.RequestDuration.WithLabelValues(hctx.Protocol, "connect").Observe(duration)
109108

@@ -114,17 +113,13 @@ func (h *InstrumentedHandler) AuthConnect(ctx context.Context, hctx *handler.Con
114113
func (h *InstrumentedHandler) AuthPublish(ctx context.Context, hctx *handler.Context, topic *string, payload *[]byte) error {
115114
start := time.Now()
116115
h.metrics.AuthAttempts.WithLabelValues(hctx.Protocol, "publish").Inc()
117-
118116
if payload != nil {
119117
h.metrics.RequestSize.WithLabelValues(hctx.Protocol).Observe(float64(len(*payload)))
120118
}
121-
122119
err := h.handler.AuthPublish(ctx, hctx, topic, payload)
123-
124120
if err != nil {
125121
h.metrics.AuthFailures.WithLabelValues(hctx.Protocol, "publish", "unauthorized").Inc()
126122
}
127-
128123
duration := time.Since(start).Seconds()
129124
h.metrics.RequestDuration.WithLabelValues(hctx.Protocol, "publish").Observe(duration)
130125

@@ -141,13 +136,10 @@ func (h *InstrumentedHandler) AuthPublish(ctx context.Context, hctx *handler.Con
141136
func (h *InstrumentedHandler) AuthSubscribe(ctx context.Context, hctx *handler.Context, topics *[]string) error {
142137
start := time.Now()
143138
h.metrics.AuthAttempts.WithLabelValues(hctx.Protocol, "subscribe").Inc()
144-
145139
err := h.handler.AuthSubscribe(ctx, hctx, topics)
146-
147140
if err != nil {
148141
h.metrics.AuthFailures.WithLabelValues(hctx.Protocol, "subscribe", "unauthorized").Inc()
149142
}
150-
151143
duration := time.Since(start).Seconds()
152144
h.metrics.RequestDuration.WithLabelValues(hctx.Protocol, "subscribe").Observe(duration)
153145

@@ -170,7 +162,7 @@ func (h *InstrumentedHandler) OnConnect(ctx context.Context, hctx *handler.Conte
170162

171163
// OnPublish implements handler.Handler with metrics.
172164
func (h *InstrumentedHandler) OnPublish(ctx context.Context, hctx *handler.Context, topic string, payload []byte) error {
173-
if hctx.Protocol == "mqtt" {
165+
if hctx.Protocol == protocolMQTT {
174166
h.metrics.MQTTPackets.WithLabelValues("publish", "upstream").Inc()
175167
}
176168

@@ -179,7 +171,7 @@ func (h *InstrumentedHandler) OnPublish(ctx context.Context, hctx *handler.Conte
179171

180172
// OnSubscribe implements handler.Handler with metrics.
181173
func (h *InstrumentedHandler) OnSubscribe(ctx context.Context, hctx *handler.Context, topics []string) error {
182-
if hctx.Protocol == "mqtt" {
174+
if hctx.Protocol == protocolMQTT {
183175
h.metrics.MQTTPackets.WithLabelValues("subscribe", "upstream").Inc()
184176
}
185177

@@ -188,7 +180,7 @@ func (h *InstrumentedHandler) OnSubscribe(ctx context.Context, hctx *handler.Con
188180

189181
// OnUnsubscribe implements handler.Handler with metrics.
190182
func (h *InstrumentedHandler) OnUnsubscribe(ctx context.Context, hctx *handler.Context, topics []string) error {
191-
if hctx.Protocol == "mqtt" {
183+
if hctx.Protocol == protocolMQTT {
192184
h.metrics.MQTTPackets.WithLabelValues("unsubscribe", "upstream").Inc()
193185
}
194186

cmd/production/main.go

Lines changed: 28 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -30,27 +30,29 @@ import (
3030
"golang.org/x/sync/errgroup"
3131
)
3232

33+
const logError = "error"
34+
3335
// Config holds the application configuration.
3436
type Config struct {
3537
// Observability
36-
MetricsPort int `env:"METRICS_PORT" envDefault:"9090"`
37-
HealthPort int `env:"HEALTH_PORT" envDefault:"8080"`
38-
LogLevel string `env:"LOG_LEVEL" envDefault:"info"`
39-
LogFormat string `env:"LOG_FORMAT" envDefault:"json"`
38+
MetricsPort int `env:"METRICS_PORT" envDefault:"9090"`
39+
HealthPort int `env:"HEALTH_PORT" envDefault:"8080"`
40+
LogLevel string `env:"LOG_LEVEL" envDefault:"info"`
41+
LogFormat string `env:"LOG_FORMAT" envDefault:"json"`
4042

4143
// Resource Limits
42-
MaxConnections int `env:"MAX_CONNECTIONS" envDefault:"10000"`
43-
MaxGoroutines int `env:"MAX_GOROUTINES" envDefault:"50000"`
44+
MaxConnections int `env:"MAX_CONNECTIONS" envDefault:"10000"`
45+
MaxGoroutines int `env:"MAX_GOROUTINES" envDefault:"50000"`
4446

4547
// Connection Pooling
4648
PoolMaxIdle int `env:"POOL_MAX_IDLE" envDefault:"100"`
4749
PoolMaxActive int `env:"POOL_MAX_ACTIVE" envDefault:"1000"`
4850
PoolIdleTimeout time.Duration `env:"POOL_IDLE_TIMEOUT" envDefault:"5m"`
4951

5052
// Circuit Breaker
51-
BreakerMaxFailures int `env:"BREAKER_MAX_FAILURES" envDefault:"5"`
52-
BreakerResetTimeout time.Duration `env:"BREAKER_RESET_TIMEOUT" envDefault:"60s"`
53-
BreakerTimeout time.Duration `env:"BREAKER_TIMEOUT" envDefault:"30s"`
53+
BreakerMaxFailures int `env:"BREAKER_MAX_FAILURES" envDefault:"5"`
54+
BreakerResetTimeout time.Duration `env:"BREAKER_RESET_TIMEOUT" envDefault:"60s"`
55+
BreakerTimeout time.Duration `env:"BREAKER_TIMEOUT" envDefault:"30s"`
5456

5557
// Rate Limiting
5658
RateLimitCapacity int64 `env:"RATE_LIMIT_CAPACITY" envDefault:"100"`
@@ -70,14 +72,20 @@ type Config struct {
7072
}
7173

7274
func main() {
75+
if err := run(); err != nil {
76+
fmt.Fprintln(os.Stderr, err)
77+
os.Exit(1)
78+
}
79+
}
80+
81+
func run() error {
7382
// Load configuration
7483
cfg := Config{}
7584
if err := godotenv.Load(); err != nil {
7685
// .env file is optional
7786
}
7887
if err := env.Parse(&cfg); err != nil {
79-
fmt.Fprintf(os.Stderr, "Failed to parse config: %v\n", err)
80-
os.Exit(1)
88+
return fmt.Errorf("failed to parse config: %w", err)
8189
}
8290

8391
// Setup logger
@@ -216,7 +224,7 @@ func main() {
216224

217225
mqttProxy, err := proxy.NewMQTT(mqttProxyConfig, instrumentedHandler)
218226
if err != nil {
219-
logger.Error("Failed to create MQTT proxy", slog.String("error", err.Error()))
227+
logger.Error("Failed to create MQTT proxy", slog.String(logError, err.Error()))
220228
} else {
221229
g.Go(func() error {
222230
address := net.JoinHostPort(mqttProxyConfig.Host, mqttProxyConfig.Port)
@@ -246,21 +254,22 @@ func main() {
246254
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), cfg.ShutdownTimeout)
247255
defer shutdownCancel()
248256

249-
done := make(chan error)
257+
done := make(chan error, 1)
250258
go func() {
251259
done <- g.Wait()
252260
}()
253261

254262
select {
255263
case err := <-done:
256264
if err != nil {
257-
logger.Error("Shutdown error", slog.String("error", err.Error()))
258-
os.Exit(1)
265+
logger.Error("Shutdown error", slog.String(logError, err.Error()))
266+
return err
259267
}
260268
logger.Info("Graceful shutdown completed")
269+
return nil
261270
case <-shutdownCtx.Done():
262271
logger.Warn("Shutdown timeout exceeded, forcing exit")
263-
os.Exit(1)
272+
return shutdownCtx.Err()
264273
}
265274
}
266275

@@ -274,7 +283,7 @@ func setupLogger(level, format string) *slog.Logger {
274283
logLevel = slog.LevelInfo
275284
case "warn":
276285
logLevel = slog.LevelWarn
277-
case "error":
286+
case logError:
278287
logLevel = slog.LevelError
279288
default:
280289
logLevel = slog.LevelInfo
@@ -311,7 +320,7 @@ func startMetricsServer(port int, logger *slog.Logger) {
311320
}
312321

313322
if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
314-
logger.Error("Metrics server error", slog.String("error", err.Error()))
323+
logger.Error("Metrics server error", slog.String(logError, err.Error()))
315324
}
316325
}
317326

@@ -334,6 +343,6 @@ func startHealthServer(port int, checker *health.Checker, logger *slog.Logger) {
334343
}
335344

336345
if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
337-
logger.Error("Health server error", slog.String("error", err.Error()))
346+
logger.Error("Health server error", slog.String(logError, err.Error()))
338347
}
339348
}

pkg/breaker/breaker.go

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,8 @@ import (
1010
"time"
1111
)
1212

13-
var (
14-
// ErrCircuitOpen is returned when the circuit breaker is open.
15-
ErrCircuitOpen = errors.New("circuit breaker is open")
16-
)
13+
// ErrCircuitOpen is returned when the circuit breaker is open.
14+
var ErrCircuitOpen = errors.New("circuit breaker is open")
1715

1816
// State represents the circuit breaker state.
1917
type State int
@@ -51,14 +49,14 @@ type Config struct {
5149

5250
// CircuitBreaker implements the circuit breaker pattern.
5351
type CircuitBreaker struct {
54-
mu sync.RWMutex
55-
config Config
56-
state State
57-
failures int
58-
successes int
59-
lastFailureTime time.Time
60-
lastStateChange time.Time
61-
onStateChange func(from, to State)
52+
mu sync.RWMutex
53+
config Config
54+
state State
55+
failures int
56+
successes int
57+
lastFailureTime time.Time
58+
lastStateChange time.Time
59+
onStateChange func(from, to State)
6260
}
6361

6462
// New creates a new circuit breaker.

pkg/errors/errors.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import (
99
"fmt"
1010
)
1111

12-
// Common error types
12+
// Common error types.
1313
var (
1414
// ErrUnauthorized indicates authentication or authorization failure.
1515
ErrUnauthorized = errors.New("unauthorized")
@@ -41,11 +41,11 @@ var (
4141

4242
// ProxyError wraps an error with additional context.
4343
type ProxyError struct {
44-
Op string // Operation that failed
45-
Protocol string // Protocol (mqtt, http, coap, websocket)
46-
SessionID string // Session identifier
44+
Op string // Operation that failed
45+
Protocol string // Protocol (mqtt, http, coap, websocket)
46+
SessionID string // Session identifier
4747
RemoteAddr string // Client address
48-
Err error // Underlying error
48+
Err error // Underlying error
4949
}
5050

5151
// Error implements the error interface.
@@ -67,11 +67,11 @@ func New(op, protocol, sessionID, remoteAddr string, err error) error {
6767
return nil
6868
}
6969
return &ProxyError{
70-
Op: op,
71-
Protocol: protocol,
72-
SessionID: sessionID,
70+
Op: op,
71+
Protocol: protocol,
72+
SessionID: sessionID,
7373
RemoteAddr: remoteAddr,
74-
Err: err,
74+
Err: err,
7575
}
7676
}
7777

pkg/health/health.go

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -120,13 +120,13 @@ func (c *Checker) HTTPHandler() http.HandlerFunc {
120120
w.Header().Set("Content-Type", "application/json")
121121
if status == StatusUnhealthy {
122122
w.WriteHeader(http.StatusServiceUnavailable)
123-
} else if status == StatusDegraded {
124-
w.WriteHeader(http.StatusOK) // Still accept traffic
125123
} else {
126-
w.WriteHeader(http.StatusOK)
124+
w.WriteHeader(http.StatusOK) // Still accept traffic
127125
}
128126

129-
json.NewEncoder(w).Encode(response)
127+
if err := json.NewEncoder(w).Encode(response); err != nil {
128+
http.Error(w, err.Error(), http.StatusInternalServerError)
129+
}
130130
}
131131
}
132132

@@ -135,9 +135,11 @@ func LivenessHandler() http.HandlerFunc {
135135
return func(w http.ResponseWriter, r *http.Request) {
136136
w.Header().Set("Content-Type", "application/json")
137137
w.WriteHeader(http.StatusOK)
138-
json.NewEncoder(w).Encode(map[string]string{
138+
if err := json.NewEncoder(w).Encode(map[string]string{
139139
"status": "alive",
140-
})
140+
}); err != nil {
141+
http.Error(w, err.Error(), http.StatusInternalServerError)
142+
}
141143
}
142144
}
143145

@@ -161,6 +163,8 @@ func (c *Checker) ReadinessHandler() http.HandlerFunc {
161163
w.WriteHeader(http.StatusOK)
162164
}
163165

164-
json.NewEncoder(w).Encode(response)
166+
if err := json.NewEncoder(w).Encode(response); err != nil {
167+
http.Error(w, err.Error(), http.StatusInternalServerError)
168+
}
165169
}
166170
}

0 commit comments

Comments
 (0)