Skip to content

Commit 908f990

Browse files
authored
Merge pull request #153 from hookdeck/fix/too-many-open-files
Fix/too many open files
2 parents 72238a1 + 6805cc5 commit 908f990

File tree

3 files changed

+83
-17
lines changed

3 files changed

+83
-17
lines changed

pkg/cmd/listen.go

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,10 @@ import (
2828
)
2929

3030
type listenCmd struct {
31-
cmd *cobra.Command
32-
noWSS bool
33-
path string
31+
cmd *cobra.Command
32+
noWSS bool
33+
path string
34+
maxConnections int
3435
}
3536

3637
// Map --cli-path to --path
@@ -95,6 +96,7 @@ Destination CLI path will be "/". To set the CLI path, use the "--path" flag.`,
9596
lc.cmd.Flags().MarkHidden("no-wss")
9697

9798
lc.cmd.Flags().StringVar(&lc.path, "path", "", "Sets the path to which events are forwarded e.g., /webhooks or /api/stripe")
99+
lc.cmd.Flags().IntVar(&lc.maxConnections, "max-connections", 50, "Maximum concurrent connections to local endpoint (default: 50, increase for high-volume testing)")
98100

99101
// --cli-path is an alias for
100102
lc.cmd.Flags().SetNormalizeFunc(normalizeCliPathFlag)
@@ -162,7 +164,8 @@ func (lc *listenCmd) runListenCmd(cmd *cobra.Command, args []string) error {
162164
}
163165

164166
return listen.Listen(url, sourceQuery, connectionQuery, listen.Flags{
165-
NoWSS: lc.noWSS,
166-
Path: lc.path,
167+
NoWSS: lc.noWSS,
168+
Path: lc.path,
169+
MaxConnections: lc.maxConnections,
167170
}, &Config)
168171
}

pkg/listen/listen.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,9 @@ import (
3131
)
3232

3333
type Flags struct {
34-
NoWSS bool
35-
Path string
34+
NoWSS bool
35+
Path string
36+
MaxConnections int
3637
}
3738

3839
// listenCmd represents the listen command
@@ -139,6 +140,7 @@ Specify a single destination to update the path. For example, pass a connection
139140
URL: URL,
140141
Log: log.StandardLogger(),
141142
Insecure: config.Insecure,
143+
MaxConnections: flags.MaxConnections,
142144
}, connections)
143145

144146
err = p.Run(context.Background())

pkg/proxy/proxy.go

Lines changed: 71 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"os/signal"
1515
"strconv"
1616
"strings"
17+
"sync/atomic"
1718
"syscall"
1819
"time"
1920

@@ -50,6 +51,13 @@ type Config struct {
5051
// Force use of unencrypted ws:// protocol instead of wss://
5152
NoWSS bool
5253
Insecure bool
54+
// MaxConnections allows tuning the maximum concurrent connections per host.
55+
// Default: 50 concurrent connections
56+
// This can be increased for high-volume testing scenarios where the local
57+
// endpoint can handle more concurrent requests.
58+
// Example: Set to 100+ when load testing with many parallel webhooks.
59+
// Warning: Setting this too high may cause resource exhaustion.
60+
MaxConnections int
5361
}
5462

5563
// A Proxy opens a websocket connection with Hookdeck, listens for incoming
@@ -60,6 +68,10 @@ type Proxy struct {
6068
connections []*hookdecksdk.Connection
6169
webSocketClient *websocket.Client
6270
connectionTimer *time.Timer
71+
httpClient *http.Client
72+
transport *http.Transport
73+
activeRequests int32
74+
maxConnWarned bool // Track if we've warned about connection limit
6375
}
6476

6577
func withSIGTERMCancel(ctx context.Context, onCancel func()) context.Context {
@@ -252,21 +264,42 @@ func (p *Proxy) processAttempt(msg websocket.IncomingMessage) {
252264
fmt.Println(webhookEvent.Body.Request.DataString)
253265
} else {
254266
url := p.cfg.URL.Scheme + "://" + p.cfg.URL.Host + p.cfg.URL.Path + webhookEvent.Body.Path
255-
tr := &http.Transport{
256-
TLSClientConfig: &tls.Config{InsecureSkipVerify: p.cfg.Insecure},
257-
}
258267

268+
// Create request with context for timeout control
259269
timeout := webhookEvent.Body.Request.Timeout
260270
if timeout == 0 {
261271
timeout = 1000 * 30
262272
}
263273

264-
client := &http.Client{
265-
Timeout: time.Duration(timeout) * time.Millisecond,
266-
Transport: tr,
274+
// Track active requests
275+
atomic.AddInt32(&p.activeRequests, 1)
276+
defer atomic.AddInt32(&p.activeRequests, -1)
277+
278+
activeCount := atomic.LoadInt32(&p.activeRequests)
279+
280+
// Calculate warning thresholds proportionally to max connections
281+
maxConns := int32(p.transport.MaxConnsPerHost)
282+
warningThreshold := int32(float64(maxConns) * 0.8) // Warn at 80% capacity
283+
resetThreshold := int32(float64(maxConns) * 0.6) // Reset warning at 60% capacity
284+
285+
// Warn when approaching connection limit
286+
if activeCount > warningThreshold && !p.maxConnWarned {
287+
p.maxConnWarned = true
288+
color := ansi.Color(os.Stdout)
289+
fmt.Printf("\n%s High connection load detected (%d active requests)\n",
290+
color.Yellow("⚠ WARNING:"), activeCount)
291+
fmt.Printf(" The CLI is limited to %d concurrent connections per host.\n", p.transport.MaxConnsPerHost)
292+
fmt.Printf(" Consider reducing request rate or increasing connection limit.\n")
293+
fmt.Printf(" Run with --max-connections=%d to increase the limit.\n\n", maxConns*2)
294+
} else if activeCount < resetThreshold && p.maxConnWarned {
295+
// Reset warning flag when load decreases
296+
p.maxConnWarned = false
267297
}
268298

269-
req, err := http.NewRequest(webhookEvent.Body.Request.Method, url, nil)
299+
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(timeout)*time.Millisecond)
300+
defer cancel()
301+
302+
req, err := http.NewRequestWithContext(ctx, webhookEvent.Body.Request.Method, url, nil)
270303
if err != nil {
271304
fmt.Printf("Error: %s\n", err)
272305
return
@@ -286,13 +319,13 @@ func (p *Proxy) processAttempt(msg websocket.IncomingMessage) {
286319
req.Body = ioutil.NopCloser(strings.NewReader(webhookEvent.Body.Request.DataString))
287320
req.ContentLength = int64(len(webhookEvent.Body.Request.DataString))
288321

289-
res, err := client.Do(req)
290-
322+
res, err := p.httpClient.Do(req)
291323
if err != nil {
292324
color := ansi.Color(os.Stdout)
293325
localTime := time.Now().Format(timeLayout)
294326

295-
errStr := fmt.Sprintf("%s [%s] Failed to %s: %v",
327+
// Use the original error message
328+
errStr := fmt.Sprintf("%s [%s] Failed to %s: %s",
296329
color.Faint(localTime),
297330
color.Red("ERROR"),
298331
webhookEvent.Body.Request.Method,
@@ -309,7 +342,11 @@ func (p *Proxy) processAttempt(msg websocket.IncomingMessage) {
309342
},
310343
}})
311344
} else {
345+
// Process the response (this reads the entire body)
312346
p.processEndpointResponse(webhookEvent, res)
347+
348+
// Close the body - connection can be reused since body was fully read
349+
res.Body.Close()
313350
}
314351
}
315352
}
@@ -366,10 +403,34 @@ func New(cfg *Config, connections []*hookdecksdk.Connection) *Proxy {
366403
cfg.Log = &log.Logger{Out: ioutil.Discard}
367404
}
368405

406+
// Default to 50 connections if not specified
407+
maxConns := cfg.MaxConnections
408+
if maxConns <= 0 {
409+
maxConns = 50
410+
}
411+
412+
// Create a shared HTTP transport with connection pooling
413+
tr := &http.Transport{
414+
TLSClientConfig: &tls.Config{InsecureSkipVerify: cfg.Insecure},
415+
// Connection pool settings - sensible defaults for typical usage
416+
MaxIdleConns: 20, // Total idle connections across all hosts
417+
MaxIdleConnsPerHost: 10, // Keep some idle connections for reuse
418+
IdleConnTimeout: 30 * time.Second, // Clean up idle connections
419+
DisableKeepAlives: false,
420+
// Limit concurrent connections to prevent resource exhaustion
421+
MaxConnsPerHost: maxConns, // User-configurable (default: 50)
422+
ResponseHeaderTimeout: 60 * time.Second,
423+
}
424+
369425
p := &Proxy{
370426
cfg: cfg,
371427
connections: connections,
372428
connectionTimer: time.NewTimer(0), // Defaults to no delay
429+
transport: tr,
430+
httpClient: &http.Client{
431+
Transport: tr,
432+
// Timeout is controlled per-request via context in processAttempt
433+
},
373434
}
374435

375436
return p

0 commit comments

Comments
 (0)