Skip to content

Commit d6b2eb9

Browse files
authored
Merge pull request #12 from MaurUppi/broken-pipe-fix
fix(udp): two-phase dialer demotion and endpoint cleanup on broken-pipe
2 parents f7ed762 + 7bae9c5 commit d6b2eb9

File tree

5 files changed

+288
-41
lines changed

5 files changed

+288
-41
lines changed

.plan/code_audit_trace-back-4th_broken-pipe.md

Lines changed: 70 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
# 非 DNS 相关修复实施计划(Broken Pipe)
22

33
> 分支: `main` → 创建新分支 `broken-pipe-fix`
4+
>
45
> 来源: `code_audit_trace-back-4th.md` 优先级 1(F6/S5 归属迁移后)
6+
>
57
> 修复原则: 根因在 dae 原始代码,在 main 上创建分支修复
68
79
## 0. 执行策略
@@ -21,6 +23,7 @@
2123
- `ue.WriteTo()` 失败后有重试(`goto getNew`, MaxRetry=2),但重试时 `dialerGroup.Select()` 仍可能选中同一个断裂节点
2224
- broken pipe 信息未反馈到 dialer 健康模型
2325
- 同一源端口最多反复报错 8 次
26+
- `handlePkt` fast path(复用已有 endpoint)写失败会直接返回,绕过健康反馈与重建
2427

2528
### F6/S5 归属迁移结论(2026-02-20)
2629

@@ -33,6 +36,10 @@
3336

3437
```
3538
handlePkt() (udp.go:64)
39+
L68: ue, ueExists := DefaultUdpEndpointPool.Get(realSrc)
40+
L69-95: fast path (ueExists && ue.SniffedDomain != "")
41+
L91: ue.WriteTo(data, dialTarget)
42+
L92-94: 写失败直接 return(当前缺少 ReportUnavailable/Remove/重试)
3643
L149: routingResult.Must > 0 → isDns=false
3744
L171: retry := 0
3845
L191: getNew: 标签
@@ -62,15 +69,15 @@ func (d *Dialer) ReportUnavailable(typ *NetworkType, err error) {
6269
- 后续 `dialerGroup.Select()` 不会选中该 dialer(除非是 FixedPolicy 或只有 1 个 dialer)
6370
- 定时健康检查成功后自动恢复 `Alive=true`
6471

65-
## T1: broken pipe 后调用 ReportUnavailable 标记 dialer 不健康
72+
## T1: 两阶段故障处理(先清理重建,再按重复失败标记 dialer 不健康
6673

67-
**目标**: 解决 F1 和 F2 — broken pipe 后 dialer 应被标记为不健康,重试时避开
74+
**目标**: 解决 F1 和 F2,同时避免把“单条隧道连接正常关闭”误判为“节点整体不健康”。
6875

6976
**修改文件**: `control/udp.go`
7077

7178
**实现**:
7279

73-
L285-303 的 WriteTo 失败处理中,增加 `ReportUnavailable` 调用:
80+
### A. slow path(L285-303)改为两阶段降级
7481

7582
```go
7683
// 现有代码 (L285-303):
@@ -94,23 +101,42 @@ if err != nil {
94101
// ... debug fields ...
95102
}).Debugln("Failed to write UDP packet request. Try to remove old UDP endpoint and retry.")
96103
}
97-
// 将 broken pipe / connection reset 等错误反馈到 dialer 健康状态
98-
ue.Dialer.ReportUnavailable(networkType, err)
104+
// 阶段 1: 首次失败只清理 endpoint 并重试,不立即降级 dialer。
105+
// 阶段 2: 同一请求内重复失败(retry > 0)才将错误反馈到 dialer 健康状态。
106+
if retry > 0 {
107+
ue.Dialer.ReportUnavailable(networkType, err)
108+
}
99109
_ = DefaultUdpEndpointPool.Remove(realSrc, ue)
100110
retry++
101111
goto getNew
102112
}
103113
```
104114

115+
### B. fast path(L69-95)补齐失败处理,不再直接 return
116+
117+
当前 fast path(`ueExists && ue.SniffedDomain != ""`)在 `ue.WriteTo` 失败后直接 `return err`,需要改为:
118+
119+
```go
120+
_, err = ue.WriteTo(data, dialTarget)
121+
if err == nil {
122+
return nil
123+
}
124+
// fast path 失败先只清理 endpoint(阶段 1),不立即 ReportUnavailable。
125+
_ = DefaultUdpEndpointPool.Remove(realSrc, ue)
126+
// 不直接 return,继续走后续 slow path 的 GetOrCreate + retry 逻辑
127+
```
128+
105129
**注意事项**:
106-
1. `ReportUnavailable`**所有**写入错误都调用(不仅限 broken pipe),因为 connection refused/timeout 等也说明 dialer 不可用
107-
2. 这不需要 `isBrokenPipe()` helper — `ReportUnavailable` 接受任意 error
108-
3. `networkType` 已在 L172-176 定义: `&dialer.NetworkType{L4Proto: "udp", IpVersion: ..., IsDns: false}`
109-
4. 调用后 `Alive=false`,下次 `getNew` 时 L266 的 `!ue.Dialer.MustGetAlive(networkType)` 会生效(对于已有 endpoint)
110-
5. 对于新建 endpoint(`isNew=true`),L266 检查不触发,但 `dialerGroup.Select()` 内部也会避开 not-alive dialer
111-
6. **风险评估**: `ReportUnavailable` 会将 dialer 的 `Alive` 设为 false 并 append `Timeout` 延迟。如果仅一次偶发网络抖动就标记 dialer 为 not alive,可能导致流量不必要地切换。但考虑到:(a) 定时健康检查会快速恢复; (b) 当前的问题是**完全不反馈**导致 15 分钟持续写入断裂隧道,过度反馈远好于不反馈。
130+
1. `ReportUnavailable` 触发条件改为“同一请求内重复写失败(retry > 0)”,不是首次失败即触发。
131+
2. 首次失败优先按 endpoint 维度处理(Remove + 重建),符合“单连接失效不等于节点失效”的语义。
132+
3. 这不需要 `isBrokenPipe()` helper — `ReportUnavailable` 接受任意 error;是否触发由“两阶段策略”控制。
133+
4. slow path 的 `networkType` 已在 L172-176 定义: `&dialer.NetworkType{L4Proto: "udp", IpVersion: ..., IsDns: false}`
134+
5. 调用后 `Alive=false`,下次 `getNew` 时 L266 的 `!ue.Dialer.MustGetAlive(networkType)` 会生效(对于已有 endpoint)
135+
6. 对于新建 endpoint(`isNew=true`),L266 检查不触发,但 `dialerGroup.Select()` 内部也会避开 not-alive dialer
136+
7. **风险评估**: 两阶段策略降低了“偶发单连接关闭”导致的误降级风险;同时在重复失败时仍可快速熔断,避免持续打到坏节点。
112137

113138
**关键文件**:
139+
- `control/udp.go:68-95` (fast path 写失败处理)
114140
- `control/udp.go:285-303` (WriteTo 失败处理)
115141
- `component/outbound/dialer/connectivity_check.go:564-568` (ReportUnavailable, 只读引用)
116142
- `component/outbound/dialer/alive_dialer_set.go:144-204` (NotifyLatencyChange, 只读引用)
@@ -119,9 +145,10 @@ if err != nil {
119145
1. 部署修复后,运行 `dae_triage_unified_v5.sh --service dae --enable-tcpdump --enable-strace --peer-ip 163.177.58.13`
120146
2. 等待自然 IEPL 断连(或手动关闭一个 IEPL 节点)
121147
3. **预期**:
122-
- broken pipe 后日志显示 `[ALIVE → NOT ALIVE]`(来自 NotifyLatencyChange L186-189)
148+
- 首次失败仅 endpoint 重建;重复失败才出现 `[ALIVE → NOT ALIVE]`(来自 NotifyLatencyChange L186-189)
123149
- 重试时选择其他健康节点
124150
- 同一源端口的 broken pipe 次数 ≤2(MaxRetry 内)
151+
- fast path 失败不再直接返回,能够进入重建与重试路径
125152
4. **成功标准**:
126153
- triage 中同一源端口重复事件从 8 次降至 ≤2
127154
- IEPL 节点断连后恢复时间 ≈ check_interval(而非持续 15+ 分钟)
@@ -199,38 +226,39 @@ func sendPkt(log *logrus.Logger, data []byte, from netip.AddrPort, realTo, to ne
199226
}
200227
```
201228

202-
改为:
229+
改为(仅在 `EADDRINUSE` 场景触发 fallback):
203230
```go
204231
func sendPkt(log *logrus.Logger, data []byte, from netip.AddrPort, realTo, to netip.AddrPort, lConn *net.UDPConn) (err error) {
205232
uConn, _, err := DefaultAnyfromPool.GetOrCreate(from.String(), AnyfromTimeout)
206233
if err != nil {
207-
// Fallback: if bind fails (e.g., address already in use when from == dae's own
208-
// DNS listen address), use the main UDP listener to send the response.
209-
if lConn != nil {
234+
// Only fallback on bind address conflict at dae's own listener address.
235+
if errors.Is(err, syscall.EADDRINUSE) && lConn != nil && isConnLocalAddr(lConn, from) {
210236
_, err = lConn.WriteToUDPAddrPort(data, realTo)
211237
return err
212238
}
213-
return
239+
return err
214240
}
215241
_, err = uConn.WriteToUDPAddrPort(data, realTo)
216242
return err
217243
}
218244
```
219245

220246
**注意事项**:
221-
1. fallback 使用 `lConn`(主 UDP listener)回写。源地址将是 dae 的监听地址(即 `from`),因为 `lConn` 绑定在该地址上
222-
2. 需要确认 `lConn.WriteToUDPAddrPort` 是否需要 `SO_TRANSPARENT` 权限(`lConn` 已设置,应该可以)
223-
3. 这是一个 graceful degradation:首选 AnyfromPool(精确源地址匹配),失败时 fallback 到主 listener
247+
1. fallback 仅用于 `bind: address already in use``EADDRINUSE`)且 `from == lConn.LocalAddr()` 的场景,避免掩盖其他错误(如权限、资源耗尽)
248+
2. fallback 使用 `lConn`(主 UDP listener)回写。源地址将是 dae 的监听地址(即 `from`),因为 `lConn` 绑定在该地址上
249+
3. 需要确认 `lConn.WriteToUDPAddrPort` 是否需要 `SO_TRANSPARENT` 权限(`lConn` 已设置,应该可以)
250+
4. 这是一个受限的 graceful degradation:首选 AnyfromPool(精确源地址匹配),仅在地址冲突场景 fallback 到主 listener
251+
5. 需要在 `udp.go` 增加 `errors``syscall` 引入,以及 `isConnLocalAddr(lConn, from)` 辅助函数(比较 listener 本地地址与 `from`
224252

225253
**关键文件**:
226254
- `control/udp.go:54-62` (sendPkt)
227255

228256
**测试方法**:
229257
1. 部署修复后,监控 `journalctl -u dae | grep "address already in use"` 30 分钟
230-
2. **成功标准**: 30 分钟内零 bind 错误
258+
2. **成功标准**: `EADDRINUSE` 不再导致响应失败,且日志中其他类型的 `GetOrCreate` 错误仍原样暴露
231259
3. **回归检查**: DNS 响应正常到达客户端(检查 `dig` 成功率不下降)
232260

233-
## T4: UdpEndpoint.start() 静默退出改进
261+
## T4: UdpEndpoint.start() 静默退出改进(含可观测性)
234262

235263
**目标**: 解决 F5 — endpoint 失效时主动从池中清除(不等 NAT 超时)
236264

@@ -261,20 +289,22 @@ func (ue *UdpEndpoint) start() {
261289
}
262290
```
263291

264-
改为(通过 `Reset(0)` 触发立即清理):
292+
改为(通过 `Reset(0)` 触发立即清理 + 增加退出日志):
265293
```go
266294
func (ue *UdpEndpoint) start() {
267295
buf := pool.GetFullCap(consts.EthernetMtu)
268296
defer pool.Put(buf)
269297
for {
270298
n, from, err := ue.conn.ReadFrom(buf[:])
271299
if err != nil {
300+
logrus.WithError(err).Warnln("UdpEndpoint read loop exited")
272301
break
273302
}
274303
ue.mu.Lock()
275304
ue.deadlineTimer.Reset(ue.NatTimeout)
276305
ue.mu.Unlock()
277306
if err = ue.handler(buf[:n], from); err != nil {
307+
logrus.WithError(err).Warnln("UdpEndpoint handler error, scheduling immediate cleanup")
278308
break
279309
}
280310
}
@@ -300,25 +330,30 @@ ue.deadlineTimer = time.AfterFunc(createOption.NatTimeout, func() {
300330
})
301331
```
302332

303-
`Reset(0)` 替代原来的 `Stop()` 会立即触发此回调,无需为 `start()` 额外引用 log 或 pool key。
333+
`Reset(0)` 替代原来的 `Stop()` 会立即触发此回调,无需为 `start()` 额外引用 pool key;新增日志用于满足 S4 可观测性目标
304334

305335
**关键文件**:
306336
- `control/udp_endpoint_pool.go:40-58` (start)
307337
- `control/udp_endpoint_pool.go:146-160` (deadline timer 回调)
308338

309339
**测试方法**:
310340
1. 部署后在 broken pipe 场景观察 endpoint 是否被立即清除
311-
2. **成功标准**: endpoint 错误后立即从池中消失(不等 NatTimeout)
341+
2. **成功标准**:
342+
- endpoint 错误后立即从池中消失(不等 NatTimeout)
343+
- 日志出现 `UdpEndpoint handler error, scheduling immediate cleanup``UdpEndpoint read loop exited`
312344
3. **回归检查**: 正常 UDP 流量不受影响
313345

314-
## T5(承接主报告 S5): CLOSE-WAIT 堆积治理与验收
346+
## T5(承接主报告 S5): CLOSE-WAIT 堆积治理与验收(门禁任务)
315347

316348
**目标**: 以 non-DNS 代理路径修复承接 F6/S5,验证 CLOSE-WAIT 显著下降。
349+
**性质**: 验收门禁任务(T5 不直接引入代码修复;修复来自 T1 两阶段策略 + T4 的完整落地)
317350

318351
**实施方式**:
319-
1. 以 T1(`ReportUnavailable`)+ T4(endpoint 及时清理)作为 CLOSE-WAIT 治理主路径。
320-
2. 不修改 dns_fix 的 `DoTCP.Close()`/`newDnsForwarder` 路径,避免误修复。
321-
3. 将 CLOSE-WAIT 指标纳入 broken-pipe 分支验收门禁。
352+
1. 以 T1(先 endpoint 重建、重复失败再 `ReportUnavailable`)+ T4(endpoint 及时清理)作为 CLOSE-WAIT 治理主路径。
353+
2. T1 必须覆盖两条写路径:`udp.go:68-95` fast path + `udp.go:285-303` slow path。
354+
3. T4 必须同时满足:立即清理 + 退出日志可观测。
355+
4. 不修改 dns_fix 的 `DoTCP.Close()`/`newDnsForwarder` 路径,避免误修复。
356+
5. 将 CLOSE-WAIT 指标纳入 broken-pipe 分支验收门禁。
322357

323358
**测试方法**:
324359
1. 部署 T1-T4 后运行 `dae_triage_unified_v5.sh --service dae --enable-tcpdump --enable-strace --peer-ip 163.177.58.13`
@@ -327,6 +362,7 @@ ue.deadlineTimer = time.AfterFunc(createOption.NatTimeout, func() {
327362
- CLOSE-WAIT max 从 111 降至 ≤10
328363
- CLOSE-WAIT remote 仍仅为 IEPL 节点地址(验证归属不漂移)
329364
- Scenario C 维持 0(不回退 dns_fix 已修复项)
365+
- 可从日志/代码路径确认 T1 fast+slow path 与 T4 cleanup+log 已全部生效
330366
4. 测试记录写入 `.plan/test-log.md`(标题建议:`code_audit_trace-back-4th_broken-pipe: T5 — F6/S5 迁移验收`
331367

332368
## M1: 本地验证
@@ -340,7 +376,7 @@ go test -race ./control/ ./component/outbound/...
340376
## 任务依赖图
341377

342378
```
343-
T1 (dialer 健康反馈) ─┐
379+
T1 (两阶段健康反馈) ─┐
344380
T2 (日志节流) ├→ T5 (F6/S5 验收)
345381
T3 (sendPkt fallback) ┤
346382
T4 (endpoint 清理) ─┘
@@ -354,13 +390,13 @@ T1-T4 互不依赖,可并行开发但需串行测试。建议按 `T1→T2→T3
354390

355391
| 文件 | 改动 | 任务 |
356392
|---|---|---|
357-
| `control/udp.go:285-303` | WriteTo 失败后调用 ReportUnavailable | T1 |
393+
| `control/udp.go:68-95,285-303` | fast/slow path 写失败先清理重建,重复失败再 ReportUnavailable | T1 |
358394
| `control/control_plane.go:~51` | 新增 `handlePktLogEvery` 常量 | T2 |
359395
| `control/control_plane.go:~104` | 新增 `handlePktErrTotal` 字段 | T2 |
360396
| `control/control_plane.go:994` | handlePkt 非 DNS 日志节流 | T2 |
361-
| `control/udp.go:54-62` | sendPkt fallback 到 lConn | T3 |
362-
| `control/udp_endpoint_pool.go:40-58` | start() 退出后立即触发清理 | T4 |
363-
| `.plan/test-log.md` | 新增 F6/S5 迁移验收记录 | T5 |
397+
| `control/udp.go:54-62` | sendPkt 仅在 EADDRINUSE 且自身监听地址时 fallback 到 lConn | T3 |
398+
| `control/udp_endpoint_pool.go:40-58` | start() 退出日志 + 立即触发清理 | T4 |
399+
| `.plan/test-log.md` | 新增 F6/S5 迁移验收记录(门禁) | T5 |
364400

365401
## CI 要求
366402

0 commit comments

Comments
 (0)