Skip to content

Commit ddbfa27

Browse files
committed
execution: consistently recover in goroutines
Signed-off-by: Michael Hoffmann <mhoffmann@cloudflare.com>
1 parent eae5060 commit ddbfa27

File tree

5 files changed

+34
-16
lines changed

5 files changed

+34
-16
lines changed

engine/engine_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5623,7 +5623,7 @@ func TestEngineRecoversFromPanic(t *testing.T) {
56235623
testutil.Ok(t, err)
56245624

56255625
r := q.Exec(ctx)
5626-
testutil.Assert(t, r.Err.Error() == "unexpected error: panic!")
5626+
testutil.Assert(t, r.Err.Error() == "unexpected panic: panic!")
56275627
})
56285628

56295629
t.Run("range", func(t *testing.T) {
@@ -5633,7 +5633,7 @@ func TestEngineRecoversFromPanic(t *testing.T) {
56335633
testutil.Ok(t, err)
56345634

56355635
r := q.Exec(ctx)
5636-
testutil.Assert(t, r.Err.Error() == "unexpected error: panic!")
5636+
testutil.Assert(t, r.Err.Error() == "unexpected panic: panic!")
56375637
})
56385638
}
56395639

execution/binary/scalar.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,12 +91,17 @@ func (o *scalarOperator) Next(ctx context.Context, buf []model.StepVector) (int,
9191
var lhsN int
9292
var lerrChan = make(chan error, 1)
9393
go func() {
94+
defer func() {
95+
if r := recover(); r != nil {
96+
lerrChan <- fmt.Errorf("unexpected panic: %v", r)
97+
}
98+
close(lerrChan)
99+
}()
94100
var err error
95101
lhsN, err = o.lhs.Next(ctx, o.lhsBuf)
96102
if err != nil {
97103
lerrChan <- err
98104
}
99-
close(lerrChan)
100105
}()
101106

102107
rhsN, rerr := o.rhs.Next(ctx, o.rhsBuf)

execution/binary/vector.go

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -106,12 +106,17 @@ func (o *vectorOperator) Next(ctx context.Context, buf []model.StepVector) (int,
106106
var lhsN int
107107
var lerrChan = make(chan error, 1)
108108
go func() {
109+
defer func() {
110+
if r := recover(); r != nil {
111+
lerrChan <- fmt.Errorf("unexpected panic: %v", r)
112+
}
113+
close(lerrChan)
114+
}()
109115
var err error
110116
lhsN, err = o.lhs.Next(ctx, o.lhsBuf)
111117
if err != nil {
112118
lerrChan <- err
113119
}
114-
close(lerrChan)
115120
}()
116121

117122
rhsN, rerr := o.rhs.Next(ctx, o.rhsBuf)
@@ -153,12 +158,17 @@ func (o *vectorOperator) init(ctx context.Context) error {
153158
var highCardSide []labels.Labels
154159
var errChan = make(chan error, 1)
155160
go func() {
161+
defer func() {
162+
if r := recover(); r != nil {
163+
errChan <- fmt.Errorf("unexpected panic: %v", r)
164+
}
165+
close(errChan)
166+
}()
156167
var err error
157168
highCardSide, err = o.lhs.Series(ctx)
158169
if err != nil {
159170
errChan <- err
160171
}
161-
close(errChan)
162172
}()
163173

164174
lowCardSide, err := o.rhs.Series(ctx)

execution/exchange/coalesce.go

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ package exchange
55

66
import (
77
"context"
8+
"fmt"
89
"math"
910
"sync"
1011
"sync/atomic"
@@ -13,7 +14,6 @@ import (
1314
"github.com/thanos-io/promql-engine/execution/telemetry"
1415
"github.com/thanos-io/promql-engine/query"
1516

16-
"github.com/efficientgo/core/errors"
1717
"github.com/prometheus/prometheus/model/histogram"
1818
"github.com/prometheus/prometheus/model/labels"
1919
)
@@ -124,6 +124,11 @@ func (c *coalesce) Next(ctx context.Context, buf []model.StepVector) (int, error
124124
c.wg.Add(1)
125125
go func(opIdx int, o model.VectorOperator) {
126126
defer c.wg.Done()
127+
defer func() {
128+
if r := recover(); r != nil {
129+
errChan <- fmt.Errorf("unexpected panic: %v", r)
130+
}
131+
}()
127132

128133
n, err := o.Next(ctx, c.tempBufs[opIdx])
129134
if err != nil {
@@ -221,16 +226,9 @@ func (c *coalesce) loadSeries(ctx context.Context) error {
221226
go func(i int) {
222227
defer wg.Done()
223228
defer func() {
224-
e := recover()
225-
if e == nil {
226-
return
229+
if r := recover(); r != nil {
230+
errChan <- fmt.Errorf("unexpected panic: %v", r)
227231
}
228-
229-
switch err := e.(type) {
230-
case error:
231-
errChan <- errors.Wrapf(err, "unexpected error")
232-
}
233-
234232
}()
235233
series, err := c.operators[i].Series(ctx)
236234
if err != nil {

execution/exchange/concurrent.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,12 @@ func (c *concurrencyOperator) Next(ctx context.Context, buf []model.StepVector)
118118
}
119119

120120
func (c *concurrencyOperator) pull(ctx context.Context) {
121-
defer close(c.buffer)
121+
defer func() {
122+
if r := recover(); r != nil {
123+
c.buffer <- maybeStepVector{err: fmt.Errorf("unexpected panic: %v", r)}
124+
}
125+
close(c.buffer)
126+
}()
122127

123128
for {
124129
select {

0 commit comments

Comments
 (0)