Skip to content

Commit d9c2c7a

Browse files
committed
Add connection pooling. Track active connections. Warn users if connections close to limit. Allow max-connections override.
1 parent c36975b commit d9c2c7a

File tree

3 files changed

+63
-13
lines changed

3 files changed

+63
-13
lines changed

pkg/cmd/listen.go

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,10 @@ import (
2828
)
2929

3030
type listenCmd struct {
31-
cmd *cobra.Command
32-
noWSS bool
33-
path string
31+
cmd *cobra.Command
32+
noWSS bool
33+
path string
34+
maxConnections int
3435
}
3536

3637
// Map --cli-path to --path
@@ -95,6 +96,7 @@ Destination CLI path will be "/". To set the CLI path, use the "--path" flag.`,
9596
lc.cmd.Flags().MarkHidden("no-wss")
9697

9798
lc.cmd.Flags().StringVar(&lc.path, "path", "", "Sets the path to which events are forwarded e.g., /webhooks or /api/stripe")
99+
lc.cmd.Flags().IntVar(&lc.maxConnections, "max-connections", 50, "Maximum concurrent connections to local endpoint (default: 50, increase for high-volume testing)")
98100

99101
// --cli-path is an alias for
100102
lc.cmd.Flags().SetNormalizeFunc(normalizeCliPathFlag)
@@ -162,7 +164,8 @@ func (lc *listenCmd) runListenCmd(cmd *cobra.Command, args []string) error {
162164
}
163165

164166
return listen.Listen(url, sourceQuery, connectionQuery, listen.Flags{
165-
NoWSS: lc.noWSS,
166-
Path: lc.path,
167+
NoWSS: lc.noWSS,
168+
Path: lc.path,
169+
MaxConnections: lc.maxConnections,
167170
}, &Config)
168171
}

pkg/listen/listen.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,9 @@ import (
3131
)
3232

3333
type Flags struct {
34-
NoWSS bool
35-
Path string
34+
NoWSS bool
35+
Path string
36+
MaxConnections int
3637
}
3738

3839
// listenCmd represents the listen command
@@ -139,6 +140,7 @@ Specify a single destination to update the path. For example, pass a connection
139140
URL: URL,
140141
Log: log.StandardLogger(),
141142
Insecure: config.Insecure,
143+
MaxConnections: flags.MaxConnections,
142144
}, connections)
143145

144146
err = p.Run(context.Background())

pkg/proxy/proxy.go

Lines changed: 51 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"os/signal"
1515
"strconv"
1616
"strings"
17+
"sync/atomic"
1718
"syscall"
1819
"time"
1920

@@ -50,6 +51,13 @@ type Config struct {
5051
// Force use of unencrypted ws:// protocol instead of wss://
5152
NoWSS bool
5253
Insecure bool
54+
// MaxConnections allows tuning the maximum concurrent connections per host.
55+
// Default: 50 concurrent connections
56+
// This can be increased for high-volume testing scenarios where the local
57+
// endpoint can handle more concurrent requests.
58+
// Example: Set to 100+ when load testing with many parallel webhooks.
59+
// Warning: Setting this too high may cause resource exhaustion.
60+
MaxConnections int
5361
}
5462

5563
// A Proxy opens a websocket connection with Hookdeck, listens for incoming
@@ -61,6 +69,9 @@ type Proxy struct {
6169
webSocketClient *websocket.Client
6270
connectionTimer *time.Timer
6371
httpClient *http.Client
72+
transport *http.Transport
73+
activeRequests int32
74+
maxConnWarned bool // Track if we've warned about connection limit
6475
}
6576

6677
func withSIGTERMCancel(ctx context.Context, onCancel func()) context.Context {
@@ -260,6 +271,26 @@ func (p *Proxy) processAttempt(msg websocket.IncomingMessage) {
260271
timeout = 1000 * 30
261272
}
262273

274+
// Track active requests
275+
atomic.AddInt32(&p.activeRequests, 1)
276+
defer atomic.AddInt32(&p.activeRequests, -1)
277+
278+
activeCount := atomic.LoadInt32(&p.activeRequests)
279+
280+
// Warn when approaching connection limit
281+
if activeCount > 40 && !p.maxConnWarned {
282+
p.maxConnWarned = true
283+
color := ansi.Color(os.Stdout)
284+
fmt.Printf("\n%s High connection load detected (%d active requests)\n",
285+
color.Yellow("⚠ WARNING:"), activeCount)
286+
fmt.Printf(" The CLI is limited to %d concurrent connections per host.\n", p.transport.MaxConnsPerHost)
287+
fmt.Printf(" Consider reducing request rate or increasing connection limit.\n")
288+
fmt.Printf(" Run with --max-connections=100 to increase the limit.\n\n")
289+
} else if activeCount < 30 && p.maxConnWarned {
290+
// Reset warning flag when load decreases
291+
p.maxConnWarned = false
292+
}
293+
263294
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(timeout)*time.Millisecond)
264295
defer cancel()
265296

@@ -288,7 +319,8 @@ func (p *Proxy) processAttempt(msg websocket.IncomingMessage) {
288319
color := ansi.Color(os.Stdout)
289320
localTime := time.Now().Format(timeLayout)
290321

291-
errStr := fmt.Sprintf("%s [%s] Failed to %s: %v",
322+
// Use the original error message
323+
errStr := fmt.Sprintf("%s [%s] Failed to %s: %s",
292324
color.Faint(localTime),
293325
color.Red("ERROR"),
294326
webhookEvent.Body.Request.Method,
@@ -305,8 +337,11 @@ func (p *Proxy) processAttempt(msg websocket.IncomingMessage) {
305337
},
306338
}})
307339
} else {
308-
defer res.Body.Close()
340+
// Process the response (this reads the entire body)
309341
p.processEndpointResponse(webhookEvent, res)
342+
343+
// Close the body - connection can be reused since body was fully read
344+
res.Body.Close()
310345
}
311346
}
312347
}
@@ -363,20 +398,30 @@ func New(cfg *Config, connections []*hookdecksdk.Connection) *Proxy {
363398
cfg.Log = &log.Logger{Out: ioutil.Discard}
364399
}
365400

401+
// Default to 50 connections if not specified
402+
maxConns := cfg.MaxConnections
403+
if maxConns <= 0 {
404+
maxConns = 50
405+
}
406+
366407
// Create a shared HTTP transport with connection pooling
367408
tr := &http.Transport{
368409
TLSClientConfig: &tls.Config{InsecureSkipVerify: cfg.Insecure},
369-
// Connection pool settings for better performance
370-
MaxIdleConns: 100,
371-
MaxIdleConnsPerHost: 10,
372-
IdleConnTimeout: 90 * time.Second,
410+
// Connection pool settings - sensible defaults for typical usage
411+
MaxIdleConns: 20, // Total idle connections across all hosts
412+
MaxIdleConnsPerHost: 10, // Keep some idle connections for reuse
413+
IdleConnTimeout: 30 * time.Second, // Clean up idle connections
373414
DisableKeepAlives: false,
415+
// Limit concurrent connections to prevent resource exhaustion
416+
MaxConnsPerHost: maxConns, // User-configurable (default: 50)
417+
ResponseHeaderTimeout: 60 * time.Second,
374418
}
375419

376420
p := &Proxy{
377421
cfg: cfg,
378422
connections: connections,
379423
connectionTimer: time.NewTimer(0), // Defaults to no delay
424+
transport: tr,
380425
httpClient: &http.Client{
381426
Transport: tr,
382427
// Default timeout can be overridden per request

0 commit comments

Comments
 (0)