Skip to content

Commit 8da533f

Browse files
authored
Merge pull request #27 from MaurUppi/broken-pipe-patch
fix(control): exit stale UDP convoy after failed delete
2 parents 5fa0e0f + 11a966c commit 8da533f

File tree

3 files changed

+74
-1
lines changed

3 files changed

+74
-1
lines changed

.github/workflows/udp-taskpool-tdd.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ jobs:
6464
shell: bash
6565
run: |
6666
set +e
67-
go test -v -race -timeout 30s ./control/... -run "Test(UdpTaskPool|CompareAndDelete|NoGoroutineLeak|ConvoyExitAfterFailedDelete|HighConcurrencyStress)" \
67+
go test -v -race -timeout 30s ./control/... -run "Test(UdpTaskPool|CompareAndDelete|NoGoroutineLeak|ConvoyExitAfterFailedDelete|ConvoyExitsWhenQueueMappingDeletedBeforeSelfDelete|HighConcurrencyStress)" \
6868
2>&1 | tee tdd-artifacts/taskpool-race.log
6969
status=${PIPESTATUS[0]}
7070
set -e

control/udp_task_pool.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,14 @@ func (q *UdpTaskQueue) convoy() {
178178
q.p.queueChPool.Put(q.ch)
179179
return
180180
}
181+
182+
// If mapping no longer points to this queue, this convoy became stale.
183+
// Exit to avoid spinning forever outside the queue map.
184+
if current, ok := q.p.queues.Load(q.key); !ok || current.(*UdpTaskQueue) != q {
185+
q.p.queueChPool.Put(q.ch)
186+
return
187+
}
188+
181189
q.draining.Store(false)
182190
q.safeTimerReset(timer)
183191
}
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/*
2+
* SPDX-License-Identifier: AGPL-3.0-only
3+
* Copyright (c) 2022-2025, daeuniverse Organization <dae@daeuniverse.org>
4+
*/
5+
6+
package control
7+
8+
import (
9+
"net/netip"
10+
"testing"
11+
"time"
12+
)
13+
14+
// Regression guard:
15+
// When queue mapping is removed before convoy self-delete, the old convoy
16+
// must exit instead of looping forever outside the queue map.
17+
func TestConvoyExitsWhenQueueMappingDeletedBeforeSelfDelete(t *testing.T) {
18+
oldAging := UdpTaskPoolAgingTime
19+
UdpTaskPoolAgingTime = 20 * time.Millisecond
20+
defer func() { UdpTaskPoolAgingTime = oldAging }()
21+
22+
pool := NewUdpTaskPool()
23+
key := netip.MustParseAddrPort("198.51.100.10:443")
24+
q := newTestQueue(pool, key)
25+
q.agingTime = UdpTaskPoolAgingTime
26+
pool.queues.Store(key, q)
27+
28+
done := make(chan struct{})
29+
go func() {
30+
q.convoy()
31+
close(done)
32+
}()
33+
34+
// Wait until convoy reaches draining state.
35+
deadline := time.Now().Add(2 * time.Second)
36+
for !q.draining.Load() && time.Now().Before(deadline) {
37+
time.Sleep(1 * time.Millisecond)
38+
}
39+
if !q.draining.Load() {
40+
t.Fatal("convoy did not enter draining state in time")
41+
}
42+
43+
// Simulate concurrent path deleting map entry first (acquireQueue draining path).
44+
if !pool.tryDeleteQueue(key, q) {
45+
t.Fatal("expected initial delete to succeed")
46+
}
47+
if got := pool.Count(); got != 0 {
48+
t.Fatalf("expected queue map to be empty, got count=%d", got)
49+
}
50+
51+
// Old convoy should exit within one cleanup cycle.
52+
select {
53+
case <-done:
54+
// Expected: clean exit after failed self-delete with stale/absent mapping.
55+
case <-time.After(UdpTaskPoolAgingTime + 120*time.Millisecond):
56+
// Cleanup to avoid leaking goroutine into other tests, then fail.
57+
pool.queues.Store(key, q)
58+
select {
59+
case <-done:
60+
case <-time.After(1 * time.Second):
61+
t.Fatal("convoy did not exit during cleanup after timeout")
62+
}
63+
t.Fatal("convoy did not exit after mapping was deleted before self-delete")
64+
}
65+
}

0 commit comments

Comments
 (0)