Skip to content

Commit f27c12b

Browse files
committed
all: use core/async where approp
1 parent f01a8ac commit f27c12b

File tree

26 files changed

+79
-42
lines changed

26 files changed

+79
-42
lines changed

intra/bootstrap.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"sync"
1717

1818
x "github.com/celzero/firestack/intra/backend"
19+
"github.com/celzero/firestack/intra/core"
1920
"github.com/celzero/firestack/intra/dns53"
2021
"github.com/celzero/firestack/intra/dnsx"
2122
"github.com/celzero/firestack/intra/doh"
@@ -233,7 +234,7 @@ func (b *bootstrap) kickstartLocked(px ipn.ProxyProvider) error {
233234
}
234235

235236
if prev := b.tr; prev != nil {
236-
go stopTransport(prev) // stop after new transport is ready
237+
core.Gx1("dns.bootstrap.stop", stopTransport, prev) // stop after new transport is ready
237238
log.I("dns: default: removing %s %s[%s]; using %s %s",
238239
b.typ, b.hostname, b.IPPorts(), typstr(tr), ippstr(tr))
239240
}

intra/common.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"math/rand"
1313
"net"
1414
"net/netip"
15+
"runtime/debug"
1516
"strconv"
1617
"strings"
1718
"sync"
@@ -439,6 +440,7 @@ func (h *baseHandler) End() {
439440

440441
// TODO: Propagate TCP RST using local.Abort(), on appropriate errors.
441442
func upload(id string, local, remote net.Conn, ioch chan<- ioinfo) {
443+
debug.SetPanicOnFault(true)
442444
defer core.Recover(core.Exit11, "c.upload."+id)
443445
defer core.CloseOp(local, core.CopR)
444446
defer core.CloseOp(remote, core.CopW)

intra/core/async.go

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,15 @@ package core
99
import (
1010
"context"
1111
"errors"
12+
"runtime/debug"
1213
"strconv"
1314
"time"
1415
)
1516

1617
// Go runs f in a goroutine and recovers from any panics.
1718
func Go(who string, f func()) {
1819
go func() {
20+
debug.SetPanicOnFault(true)
1921
defer Recover(DontExit, who)
2022

2123
f()
@@ -25,6 +27,7 @@ func Go(who string, f func()) {
2527
// Go1 runs f(arg) in a goroutine and recovers from any panics.
2628
func Go1[T any](who string, f func(T), arg T) {
2729
go func() {
30+
debug.SetPanicOnFault(true)
2831
defer Recover(DontExit, who)
2932

3033
f(arg)
@@ -34,6 +37,7 @@ func Go1[T any](who string, f func(T), arg T) {
3437
// Go2 runs f(arg0,arg1) in a goroutine and recovers from any panics.
3538
func Go2[T0 any, T1 any](who string, f func(T0, T1), a0 T0, a1 T1) {
3639
go func() {
40+
debug.SetPanicOnFault(true)
3741
defer Recover(DontExit, who)
3842

3943
f(a0, a1)
@@ -44,6 +48,7 @@ func Go2[T0 any, T1 any](who string, f func(T0, T1), a0 T0, a1 T1) {
4448
// then calls cb in a separate goroutine, and recovers from any panics.
4549
func Gg(who string, f func(), cb func()) {
4650
go func() {
51+
debug.SetPanicOnFault(true)
4752
defer RecoverFn(who, cb)
4853

4954
f()
@@ -53,12 +58,23 @@ func Gg(who string, f func(), cb func()) {
5358
// Gx runs f in a goroutine and exits the process if f panics.
5459
func Gx(who string, f func()) {
5560
go func() {
61+
debug.SetPanicOnFault(true)
5662
defer Recover(Exit11, who)
5763

5864
f()
5965
}()
6066
}
6167

68+
// Gx1 runs f in a goroutine and exits the process if f panics.
69+
func Gx1[T any](who string, f func(T), arg T) {
70+
go func() {
71+
debug.SetPanicOnFault(true)
72+
defer Recover(Exit11, who)
73+
74+
f(arg)
75+
}()
76+
}
77+
6278
// Gif runs f in a goroutine if cond is true.
6379
func Gif(cond bool, who string, f func()) {
6480
if cond {
@@ -75,6 +91,7 @@ func Grx[T any](who string, f WorkCtx[T], d time.Duration) (zz T, completed bool
7591

7692
// go.dev/play/p/VtWYJrxhXz6
7793
go func() {
94+
debug.SetPanicOnFault(true)
7895
defer Recover(Exit11, who)
7996
defer close(ch)
8097

@@ -90,6 +107,16 @@ func Grx[T any](who string, f WorkCtx[T], d time.Duration) (zz T, completed bool
90107
return zz, false
91108
}
92109

110+
// Gxe runs f in a goroutine, ignores returned error, and exits on panics.
111+
func Gxe(who string, f func() error) {
112+
go func() {
113+
debug.SetPanicOnFault(true)
114+
defer Recover(Exit11, who)
115+
116+
_ = f()
117+
}()
118+
}
119+
93120
// errPanic returns an error indicating that the function at index i panicked.
94121
func errPanic(who string) error {
95122
return errors.New(who + " fn panicked")
@@ -154,7 +181,7 @@ loop:
154181
func First[T any](who string, overallTimeout time.Duration, fs ...WorkCtx[T]) (zz T, idx int) {
155182
timeoutPerFn := overallTimeout / time.Duration(len(fs))
156183
for i, f := range fs {
157-
i, f := i, f
184+
// unneeded in go1.23+ i, f := i, f
158185
fid := who + ".all." + strconv.Itoa(i)
159186
if x, ok := Grx(fid, f, timeoutPerFn); ok {
160187
return x, i
@@ -176,7 +203,7 @@ func All[T any](who string, timeout time.Duration, fs ...WorkCtx[T]) ([]T, []err
176203
defer cancel()
177204

178205
for i, f := range fs {
179-
i, f := i, f
206+
//unneeded in go1.23+ i, f := i, f
180207
fid := who + ".all." + strconv.Itoa(i)
181208
Gg(fid, func() {
182209
out, err := f(ctx)

intra/core/expiringmap.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ func NewExpiringMapLifetime[P comparable, Q any](ctx context.Context, min time.D
4848
lastreap: time.Now(),
4949
minlife: min,
5050
}
51-
go m.reaper(ctx)
51+
Gx1("expm.reaper", m.reaper, ctx)
5252
// test: go.dev/play/p/EYq_STKvugb
5353
return m
5454
}

intra/core/p2est.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ func NewP2QuantileEstimator(ctx context.Context, samples int64, probability floa
6666
count: 0,
6767
addc: make(chan float64, samples),
6868
}
69-
go p.run()
69+
Gx("p2.est", p.run)
7070
return p
7171
}
7272

intra/core/volatileflow.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ func NewFlowFor[T any](ctx context.Context, v *Volatile[T]) *Flow[T] {
6767
c: make(chan T),
6868
ctx: ctx,
6969
}
70-
go f.stream()
70+
Gx("core.flow", f.stream)
7171
return f
7272
}
7373

@@ -78,25 +78,25 @@ func (f *Flow[T]) stream() {
7878
case <-f.ctx.Done():
7979
return
8080
case v := <-f.c:
81-
go func() {
81+
Gx("flow.stream", func() {
8282
notflowing := make(map[FlowOn[T]]struct{}, 0)
8383
for _, o := range f.observers() {
8484
if ok := o.flow(v); !ok {
8585
notflowing[o] = struct{}{}
8686
}
8787
}
8888
f.removeFinallys(notflowing)
89-
}()
89+
})
9090
case <-time.Tick(3 * time.Hour):
91-
go func() {
91+
Gx("flow.stream.tick", func() {
9292
notflowing := make(map[FlowOn[T]]struct{})
9393
for _, o := range f.observers() {
9494
if o.obsolete() {
9595
notflowing[o] = struct{}{}
9696
}
9797
}
9898
f.removeFinallys(notflowing)
99-
}()
99+
})
100100
}
101101
}
102102
}
@@ -151,7 +151,7 @@ func (f *Flow[T]) On(until context.Context, o FlowFunc[T]) {
151151
defer f.fmu.Unlock()
152152
on := FlowOn[T]{until, &o}
153153
f.o = append(f.o, on)
154-
go on.flow(f.v.Load())
154+
Gx("flow.on", func() { on.flow(f.v.Load()) })
155155
}
156156

157157
func (f *Flow[T]) Store(v T) {

intra/dns53/mdns.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"fmt"
2020
"net"
2121
"net/netip"
22+
"runtime/debug"
2223
"strings"
2324
"sync"
2425
"sync/atomic"
@@ -548,6 +549,7 @@ func (c *client) recv(conn net.PacketConn) {
548549
return
549550
}
550551

552+
debug.SetPanicOnFault(true)
551553
defer core.Recover(core.DontExit, "mdns.recv")
552554

553555
bptr := core.Alloc()

intra/dnscrypt/multiserver.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"fmt"
2323
"net"
2424
"net/netip"
25+
"runtime/debug"
2526
"strings"
2627
"sync"
2728
"time"
@@ -481,6 +482,7 @@ func (proxy *DcMulti) Stop() error {
481482
// refreshRoutes re-adds relay routes to all live/tracked servers.
482483
// Must be called from a goroutine.
483484
func (proxy *DcMulti) refreshRoutes() {
485+
debug.SetPanicOnFault(true)
484486
defer core.Recover(core.Exit11, "dcmulti.refreshRoutes")
485487

486488
udp, tcp := route(proxy)

intra/dnsx/alg.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1020,7 +1020,7 @@ func (t *dnsgateway) qs(t2 Transport, uid, network string, msg *dns.Msg, t1res <
10201020
t2res := make(chan secans, 1)
10211021
msg = msg.Copy() // to avoid racing against changes made by caller
10221022

1023-
go func() {
1023+
core.Gx("alg.qs."+xdns.QName(msg), func() {
10241024
defer close(t2res)
10251025

10261026
qname := xdns.QName(msg)
@@ -1036,7 +1036,7 @@ func (t *dnsgateway) qs(t2 Transport, uid, network string, msg *dns.Msg, t1res <
10361036
r.initIfNeeded() // r may be nil value on Grx:timeout
10371037

10381038
t2res <- r // may be zero secans
1039-
}()
1039+
})
10401040
return t2res
10411041
}
10421042

intra/dnsx/cacher.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,6 @@ func mkcachekey(q *dns.Msg) (string, uint8, bool) {
204204
// scrubCache deletes expired entries from the cache.
205205
// Must be called from a goroutine.
206206
func (cb *cache) scrubCache() {
207-
defer core.Recover(core.Exit11, "c.scrubCache")
208207
// must unlock from deferred since panics are recovered above
209208
cb.mu.Lock()
210209
defer cb.mu.Unlock()
@@ -281,7 +280,7 @@ func (cb *cache) put(key string, cc *cres) (ok bool) {
281280
defer cb.mu.Unlock()
282281

283282
if rand33pc() { // 33% of the time
284-
go cb.scrubCache()
283+
core.Gx("c.scrubCache", cb.scrubCache)
285284
}
286285

287286
if len(cb.c) >= cb.size {

0 commit comments

Comments
 (0)