Skip to content

Commit bc43df2

Browse files
authored
optimize: mapreduce panic stacktrace (#5168)
1 parent 351b8cb commit bc43df2

File tree

2 files changed

+46
-18
lines changed

2 files changed

+46
-18
lines changed

core/mr/mapreduce.go

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,9 @@ package mr
33
import (
44
"context"
55
"errors"
6+
"fmt"
7+
"runtime/debug"
8+
"strings"
69
"sync"
710
"sync/atomic"
811

@@ -183,12 +186,16 @@ func buildOptions(opts ...Option) *mapReduceOptions {
183186
return options
184187
}
185188

189+
func buildPanicInfo(r any, stack []byte) string {
190+
return fmt.Sprintf("%+v\n\n%s", r, strings.TrimSpace(string(stack)))
191+
}
192+
186193
func buildSource[T any](generate GenerateFunc[T], panicChan *onceChan) chan T {
187194
source := make(chan T)
188195
go func() {
189196
defer func() {
190197
if r := recover(); r != nil {
191-
panicChan.write(r)
198+
panicChan.write(buildPanicInfo(r, debug.Stack()))
192199
}
193200
close(source)
194201
}()
@@ -235,7 +242,7 @@ func executeMappers[T, U any](mCtx mapperContext[T, U]) {
235242
defer func() {
236243
if r := recover(); r != nil {
237244
atomic.AddInt32(&failed, 1)
238-
mCtx.panicChan.write(r)
245+
mCtx.panicChan.write(buildPanicInfo(r, debug.Stack()))
239246
}
240247
wg.Done()
241248
<-pool
@@ -289,7 +296,7 @@ func mapReduceWithPanicChan[T, U, V any](source <-chan T, panicChan *onceChan, m
289296
defer func() {
290297
drain(collector)
291298
if r := recover(); r != nil {
292-
panicChan.write(r)
299+
panicChan.write(buildPanicInfo(r, debug.Stack()))
293300
}
294301
finish()
295302
}()

core/mr/mapreduce_test.go

Lines changed: 36 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package mr
33
import (
44
"context"
55
"errors"
6+
"fmt"
67
"io"
78
"log"
89
"runtime"
@@ -148,11 +149,28 @@ func TestForEach(t *testing.T) {
148149

149150
assert.Equal(t, tasks/2, int(count))
150151
})
152+
}
151153

152-
t.Run("all", func(t *testing.T) {
153-
defer goleak.VerifyNone(t)
154+
func TestPanics(t *testing.T) {
155+
defer goleak.VerifyNone(t)
156+
157+
const tasks = 1000
158+
verify := func(t *testing.T, r any) {
159+
panicStr := fmt.Sprintf("%v", r)
160+
assert.Contains(t, panicStr, "foo")
161+
assert.Contains(t, panicStr, "goroutine")
162+
assert.Contains(t, panicStr, "runtime/debug.Stack")
163+
panic(r)
164+
}
165+
166+
t.Run("ForEach run panics", func(t *testing.T) {
167+
assert.Panics(t, func() {
168+
defer func() {
169+
if r := recover(); r != nil {
170+
verify(t, r)
171+
}
172+
}()
154173

155-
assert.PanicsWithValue(t, "foo", func() {
156174
ForEach(func(source chan<- int) {
157175
for i := 0; i < tasks; i++ {
158176
source <- i
@@ -162,28 +180,31 @@ func TestForEach(t *testing.T) {
162180
})
163181
})
164182
})
165-
}
166183

167-
func TestGeneratePanic(t *testing.T) {
168-
defer goleak.VerifyNone(t)
184+
t.Run("ForEach generate panics", func(t *testing.T) {
185+
assert.Panics(t, func() {
186+
defer func() {
187+
if r := recover(); r != nil {
188+
verify(t, r)
189+
}
190+
}()
169191

170-
t.Run("all", func(t *testing.T) {
171-
assert.PanicsWithValue(t, "foo", func() {
172192
ForEach(func(source chan<- int) {
173193
panic("foo")
174194
}, func(item int) {
175195
})
176196
})
177197
})
178-
}
179-
180-
func TestMapperPanic(t *testing.T) {
181-
defer goleak.VerifyNone(t)
182198

183-
const tasks = 1000
184199
var run int32
185-
t.Run("all", func(t *testing.T) {
186-
assert.PanicsWithValue(t, "foo", func() {
200+
t.Run("Mapper panics", func(t *testing.T) {
201+
assert.Panics(t, func() {
202+
defer func() {
203+
if r := recover(); r != nil {
204+
verify(t, r)
205+
}
206+
}()
207+
187208
_, _ = MapReduce(func(source chan<- int) {
188209
for i := 0; i < tasks; i++ {
189210
source <- i

0 commit comments

Comments
 (0)