Skip to content

Commit 78dffea

Browse files
committed
rewrite connection shutdown sequence
to avoid race conditions under high concurrency load
1 parent a856eda commit 78dffea

File tree

2 files changed

+187
-118
lines changed

2 files changed

+187
-118
lines changed

src/Suave/ConnectionFacade.fs

Lines changed: 34 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@ open ConnectionHealthChecker
2020

2121
type ConnectionFacade(connection: Connection, runtime: HttpRuntime, connectionPool: ConcurrentPool<ConnectionFacade>, tracker: ActiveConnectionTracker<ConnectionFacade>, cancellationToken: CancellationToken, webpart: WebPart) =
2222

23+
static let mutable connectionIdCounter = 0L
24+
let connectionId = Interlocked.Increment(&connectionIdCounter)
25+
2326
let httpOutput = new HttpOutput(connection,runtime)
2427

2528
let reader = connection.reader
@@ -234,6 +237,7 @@ type ConnectionFacade(connection: Connection, runtime: HttpRuntime, connectionPo
234237

235238
member val Connection = connection with get,set
236239
member val Runtime = runtime with get,set
240+
member val ConnectionId = connectionId with get
237241

238242
member this.parsePostData maxContentLength (contentLengthHeader : Choice<string,_>) (contentTypeHeader:Choice<string,_>) : SocketOp<unit> =
239243
socket {
@@ -259,8 +263,6 @@ type ConnectionFacade(connection: Connection, runtime: HttpRuntime, connectionPo
259263
| Choice2Of2 _ -> return ()
260264
}
261265

262-
/// Process the request, reading as it goes from the incoming 'stream', yielding a HttpRequest
263-
/// when done
264266
member this.readRequest () = socket {
265267

266268
let! firstLine = reader.readLine()
@@ -337,14 +339,17 @@ type ConnectionFacade(connection: Connection, runtime: HttpRuntime, connectionPo
337339
}
338340

339341
member this.shutdown() =
340-
connection.pipe.Reader.CancelPendingRead()
342+
reader.cancelPendingReads()
341343
// Shutdown transport FIRST to unblock any waiting reads in readLoop
342344
// This prevents the readLoop from being stuck in transport.read() when we set running=false
343345
connection.transport.shutdown()
344346
reader.stop()
347+
348+
member private this.recycleConnection() =
345349
// Clear the line buffer to prevent data leakage and ensure clean state for reuse
346350
Array.Clear(connection.lineBuffer)
347351
connection.lineBufferCount <- 0
352+
try connection.pipe.Writer.Complete() with _ -> ()
348353
try connection.pipe.Reader.Complete() with _ -> ()
349354
try connection.pipe.Reset() with _ -> ()
350355
// Note: Push() now notifies the tracker that connection is being returned
@@ -377,25 +382,39 @@ type ConnectionFacade(connection: Connection, runtime: HttpRuntime, connectionPo
377382
member this.accept(binding) = task{
378383
let clientIp = (binding.ip.ToString())
379384
if Globals.verbose then
380-
Console.WriteLine("{0} connected. Now has {1} connected", clientIp, tracker.ActiveConnectionCount)
385+
Console.WriteLine("[Conn:{0}] accept: {1} connected. Now has {2} connected", connectionId, clientIp, tracker.ActiveConnectionCount)
381386
connection.socketBinding <- binding
387+
// Set the connection ID on the reader for consistent logging
388+
let mutable readTask : Task<Result<unit,Error>> = null
382389
try
383-
try
384-
// Start read loop in background - use _ignore to suppress async warning
385390
reader.init()
386-
let readTask = reader.readLoop()
391+
readTask <- reader.readLoop()
387392
let! loopRes = this.requestLoop()
388393
match loopRes with
389394
| Ok () -> ()
390395
| Result.Error err ->
391396
if Globals.verbose then
392-
do Console.WriteLine(sprintf "Error: %A" err)
393-
with
394-
| ex ->
395-
if Globals.verbose then
396-
do Console.WriteLine("Error: " + ex.Message)
397+
do Console.WriteLine(sprintf "[Conn:%d] accept: Error: %A" connectionId err)
397398
finally
398-
do this.shutdown()
399-
if Globals.verbose then
400-
do Console.WriteLine("Disconnected {0}. {1} connected.", clientIp, tracker.ActiveConnectionCount)
399+
// First phase: stop reader and transport (this unblocks readTask)
400+
this.shutdown()
401+
402+
// Wait for readTask to complete BEFORE recycling the connection
403+
// This is critical: we must ensure readLoop has finished (and called pipe.Writer.Complete())
404+
// before we reset the pipe and push the connection back to the pool
405+
if readTask <> null then
406+
try
407+
// Use a timeout to avoid hanging forever if something goes wrong
408+
let! completed = Task.WhenAny(readTask, Task.Delay(1000))
409+
if not (Object.ReferenceEquals(completed, readTask)) then
410+
if Globals.verbose then
411+
Console.WriteLine("[Conn:{0}] accept: readTask did not complete within timeout", connectionId)
412+
with ex ->
413+
if Globals.verbose then
414+
Console.WriteLine("[Conn:{0}] accept: error waiting for readTask: {1}", connectionId, ex.Message)
415+
416+
// Second phase: reset pipe and recycle connection (only after readTask is done)
417+
this.recycleConnection()
418+
if Globals.verbose then
419+
do Console.WriteLine("[Conn:{0}] accept:Disconnected {1}. {2} connected.", connectionId, clientIp, tracker.ActiveConnectionCount)
401420
}

0 commit comments

Comments
 (0)