From ed3997e2bc48ef2eea1a14ea3b62444e183d1e3f Mon Sep 17 00:00:00 2001 From: Aurora Gaffney Date: Tue, 29 Apr 2025 16:33:16 -0400 Subject: [PATCH] fix: require explicit muxer start before handshake This removes a race condition that caused random failures when the remote peer sent a handshake message before we have registered the protocol Signed-off-by: Aurora Gaffney --- connection.go | 1 + muxer/muxer.go | 32 ++++++++++++++++++++------------ 2 files changed, 21 insertions(+), 12 deletions(-) diff --git a/connection.go b/connection.go index 58a2e5ad..2efb6baf 100644 --- a/connection.go +++ b/connection.go @@ -341,6 +341,7 @@ func (c *Connection) setupConnection() error { } else { c.handshake.Client.Start() } + c.muxer.StartOnce() // Wait for handshake completion or error select { case <-c.doneChan: diff --git a/muxer/muxer.go b/muxer/muxer.go index 4097f79a..f8998ec1 100644 --- a/muxer/muxer.go +++ b/muxer/muxer.go @@ -111,6 +111,15 @@ func (m *Muxer) Start() { } } +// StartOnce unblocks the read loop for one iteration. This is generally used to perform the handshake before registering +// additional protocols and calling Start +func (m *Muxer) StartOnce() { + select { + case m.startChan <- false: + default: + } +} + // Stop shuts down the muxer func (m *Muxer) Stop() { m.onceStop.Do(func() { @@ -262,6 +271,17 @@ func (m *Muxer) readLoop() { return default: } + // Wait until the muxer is started to continue + if !started { + select { + case <-m.doneChan: + // Break out of read loop if we're shutting down + return + case v := <-m.startChan: + // We block again on the next iteration of we get 'false' from startChan + started = v + } + } header := SegmentHeader{} if err := binary.Read(m.conn, binary.BigEndian, &header); err != nil { if errors.Is(err, io.ErrClosedPipe) { @@ -334,17 +354,5 @@ func (m *Muxer) readLoop() { return } recvChan <- msg - - // Wait until the muxer is started to continue - // We don't want to read more than one segment until the handshake is complete - if !started { - select { - case <-m.doneChan: - // Break out of read loop if we're shutting down - return - case <-m.startChan: - started = true - } - } } }