From 336f7289fde64758b52825840a4cf948b53cba5e Mon Sep 17 00:00:00 2001 From: devanbenz Date: Wed, 19 Mar 2025 09:00:56 -0500 Subject: [PATCH 01/14] fix: Do not panic when using an unexpected cursor type This PR is used to alleviate the erroneous panic we are seeing corresponding with https://github.com/influxdata/influxdb/issues/26142. There should not be a panic and instead we should be throwing an error. --- storage/reads/array_cursor.gen.go | 94 +++++++++++++++----------- storage/reads/array_cursor.gen.go.tmpl | 54 ++++++++++----- storage/reads/array_cursor.go | 12 ++-- 3 files changed, 98 insertions(+), 62 deletions(-) diff --git a/storage/reads/array_cursor.gen.go b/storage/reads/array_cursor.gen.go index 50bfe8d3ab5..62b1dcaab88 100644 --- a/storage/reads/array_cursor.gen.go +++ b/storage/reads/array_cursor.gen.go @@ -24,101 +24,113 @@ const ( MaxPointsPerBlock = 1000 ) -func newLimitArrayCursor(cur cursors.Cursor) cursors.Cursor { +func newLimitArrayCursor(cur cursors.Cursor) (cursors.Cursor, error) { switch cur := cur.(type) { case cursors.FloatArrayCursor: - return newFloatLimitArrayCursor(cur) + return newFloatLimitArrayCursor(cur), nil case cursors.IntegerArrayCursor: - return newIntegerLimitArrayCursor(cur) + return newIntegerLimitArrayCursor(cur), nil case cursors.UnsignedArrayCursor: - return newUnsignedLimitArrayCursor(cur) + return newUnsignedLimitArrayCursor(cur), nil case cursors.StringArrayCursor: - return newStringLimitArrayCursor(cur) + return newStringLimitArrayCursor(cur), nil case cursors.BooleanArrayCursor: - return newBooleanLimitArrayCursor(cur) + return newBooleanLimitArrayCursor(cur), nil default: - panic(fmt.Sprintf("unreachable: %T", cur)) + return nil, &errors2.Error{ + Code: errors2.EInvalid, + Msg: fmt.Sprintf("unreachable: %s", arrayCursorType(cur)), + } } } -func newWindowFirstArrayCursor(cur cursors.Cursor, window interval.Window) cursors.Cursor { +func newWindowFirstArrayCursor(cur cursors.Cursor, window interval.Window) (cursors.Cursor, error) { if window.IsZero() { return newLimitArrayCursor(cur) } switch cur := cur.(type) { case cursors.FloatArrayCursor: - return newFloatWindowFirstArrayCursor(cur, window) + return newFloatWindowFirstArrayCursor(cur, window), nil case cursors.IntegerArrayCursor: - return newIntegerWindowFirstArrayCursor(cur, window) + return newIntegerWindowFirstArrayCursor(cur, window), nil case cursors.UnsignedArrayCursor: - return newUnsignedWindowFirstArrayCursor(cur, window) + return newUnsignedWindowFirstArrayCursor(cur, window), nil case cursors.StringArrayCursor: - return newStringWindowFirstArrayCursor(cur, window) + return newStringWindowFirstArrayCursor(cur, window), nil case cursors.BooleanArrayCursor: - return newBooleanWindowFirstArrayCursor(cur, window) + return newBooleanWindowFirstArrayCursor(cur, window), nil default: - panic(fmt.Sprintf("unreachable: %T", cur)) + return nil, &errors2.Error{ + Code: errors2.EInvalid, + Msg: fmt.Sprintf("unreachable: %s", arrayCursorType(cur)), + } } } -func newWindowLastArrayCursor(cur cursors.Cursor, window interval.Window) cursors.Cursor { +func newWindowLastArrayCursor(cur cursors.Cursor, window interval.Window) (cursors.Cursor, error) { if window.IsZero() { return newLimitArrayCursor(cur) } switch cur := cur.(type) { case cursors.FloatArrayCursor: - return newFloatWindowLastArrayCursor(cur, window) + return newFloatWindowLastArrayCursor(cur, window), nil case cursors.IntegerArrayCursor: - return newIntegerWindowLastArrayCursor(cur, window) + return newIntegerWindowLastArrayCursor(cur, window), nil case cursors.UnsignedArrayCursor: - return newUnsignedWindowLastArrayCursor(cur, window) + return newUnsignedWindowLastArrayCursor(cur, window), nil case cursors.StringArrayCursor: - return newStringWindowLastArrayCursor(cur, window) + return newStringWindowLastArrayCursor(cur, window), nil case cursors.BooleanArrayCursor: - return newBooleanWindowLastArrayCursor(cur, window) + return newBooleanWindowLastArrayCursor(cur, window), nil default: - panic(fmt.Sprintf("unreachable: %T", cur)) + return nil, &errors2.Error{ + Code: errors2.EInvalid, + Msg: fmt.Sprintf("unreachable: %s", arrayCursorType(cur)), + } } } -func newWindowCountArrayCursor(cur cursors.Cursor, window interval.Window) cursors.Cursor { +func newWindowCountArrayCursor(cur cursors.Cursor, window interval.Window) (cursors.Cursor, error) { switch cur := cur.(type) { case cursors.FloatArrayCursor: - return newFloatWindowCountArrayCursor(cur, window) + return newFloatWindowCountArrayCursor(cur, window), nil case cursors.IntegerArrayCursor: - return newIntegerWindowCountArrayCursor(cur, window) + return newIntegerWindowCountArrayCursor(cur, window), nil case cursors.UnsignedArrayCursor: - return newUnsignedWindowCountArrayCursor(cur, window) + return newUnsignedWindowCountArrayCursor(cur, window), nil case cursors.StringArrayCursor: - return newStringWindowCountArrayCursor(cur, window) + return newStringWindowCountArrayCursor(cur, window), nil case cursors.BooleanArrayCursor: - return newBooleanWindowCountArrayCursor(cur, window) + return newBooleanWindowCountArrayCursor(cur, window), nil default: - panic(fmt.Sprintf("unreachable: %T", cur)) + return nil, &errors2.Error{ + Code: errors2.EInvalid, + Msg: fmt.Sprintf("unreachable: %s", arrayCursorType(cur)), + } } } @@ -142,37 +154,43 @@ func newWindowSumArrayCursor(cur cursors.Cursor, window interval.Window) (cursor } } -func newWindowMinArrayCursor(cur cursors.Cursor, window interval.Window) cursors.Cursor { +func newWindowMinArrayCursor(cur cursors.Cursor, window interval.Window) (cursors.Cursor, error) { switch cur := cur.(type) { case cursors.FloatArrayCursor: - return newFloatWindowMinArrayCursor(cur, window) + return newFloatWindowMinArrayCursor(cur, window), nil case cursors.IntegerArrayCursor: - return newIntegerWindowMinArrayCursor(cur, window) + return newIntegerWindowMinArrayCursor(cur, window), nil case cursors.UnsignedArrayCursor: - return newUnsignedWindowMinArrayCursor(cur, window) + return newUnsignedWindowMinArrayCursor(cur, window), nil default: - panic(fmt.Sprintf("unsupported for aggregate min: %T", cur)) + return nil, &errors2.Error{ + Code: errors2.EInvalid, + Msg: fmt.Sprintf("unsupported for aggregate min: %s", arrayCursorType(cur)), + } } } -func newWindowMaxArrayCursor(cur cursors.Cursor, window interval.Window) cursors.Cursor { +func newWindowMaxArrayCursor(cur cursors.Cursor, window interval.Window) (cursors.Cursor, error) { switch cur := cur.(type) { case cursors.FloatArrayCursor: - return newFloatWindowMaxArrayCursor(cur, window) + return newFloatWindowMaxArrayCursor(cur, window), nil case cursors.IntegerArrayCursor: - return newIntegerWindowMaxArrayCursor(cur, window) + return newIntegerWindowMaxArrayCursor(cur, window), nil case cursors.UnsignedArrayCursor: - return newUnsignedWindowMaxArrayCursor(cur, window) + return newUnsignedWindowMaxArrayCursor(cur, window), nil default: - panic(fmt.Sprintf("unsupported for aggregate max: %T", cur)) + return nil, &errors2.Error{ + Code: errors2.EInvalid, + Msg: fmt.Sprintf("unsupported for aggregate max: %s", arrayCursorType(cur)), + } } } diff --git a/storage/reads/array_cursor.gen.go.tmpl b/storage/reads/array_cursor.gen.go.tmpl index 57cee9c6cb3..63786606e98 100644 --- a/storage/reads/array_cursor.gen.go.tmpl +++ b/storage/reads/array_cursor.gen.go.tmpl @@ -18,53 +18,65 @@ const ( MaxPointsPerBlock = 1000 ) -func newLimitArrayCursor(cur cursors.Cursor) cursors.Cursor { +func newLimitArrayCursor(cur cursors.Cursor) (cursors.Cursor, error) { switch cur := cur.(type) { {{range .}}{{/* every type supports limit */}} case cursors.{{.Name}}ArrayCursor: - return new{{.Name}}LimitArrayCursor(cur) + return new{{.Name}}LimitArrayCursor(cur), nil {{end}} default: - panic(fmt.Sprintf("unreachable: %T", cur)) + return nil, &errors2.Error{ + Code: errors2.EInvalid, + Msg: fmt.Sprintf("unreachable: %s", arrayCursorType(cur)), + } } } -func newWindowFirstArrayCursor(cur cursors.Cursor, window interval.Window) cursors.Cursor { +func newWindowFirstArrayCursor(cur cursors.Cursor, window interval.Window) (cursors.Cursor, error) { if window.IsZero() { return newLimitArrayCursor(cur) } switch cur := cur.(type) { {{range .}}{{/* every type supports first */}} case cursors.{{.Name}}ArrayCursor: - return new{{.Name}}WindowFirstArrayCursor(cur, window) + return new{{.Name}}WindowFirstArrayCursor(cur, window), nil {{end}} default: - panic(fmt.Sprintf("unreachable: %T", cur)) + return nil, &errors2.Error{ + Code: errors2.EInvalid, + Msg: fmt.Sprintf("unreachable: %s", arrayCursorType(cur)), + } } } -func newWindowLastArrayCursor(cur cursors.Cursor, window interval.Window) cursors.Cursor { +func newWindowLastArrayCursor(cur cursors.Cursor, window interval.Window) (cursors.Cursor, error) { if window.IsZero() { return newLimitArrayCursor(cur) } switch cur := cur.(type) { {{range .}}{{/* every type supports last */}} case cursors.{{.Name}}ArrayCursor: - return new{{.Name}}WindowLastArrayCursor(cur, window) + return new{{.Name}}WindowLastArrayCursor(cur, window), nil {{end}} default: - panic(fmt.Sprintf("unreachable: %T", cur)) + return nil, &errors2.Error{ + Code: errors2.EInvalid, + Msg: fmt.Sprintf("unreachable: %s", arrayCursorType(cur)), + } } } -func newWindowCountArrayCursor(cur cursors.Cursor, window interval.Window) cursors.Cursor { +func newWindowCountArrayCursor(cur cursors.Cursor, window interval.Window) (cursors.Cursor, error) { switch cur := cur.(type) { {{range .}}{{/* every type supports count */}} case cursors.{{.Name}}ArrayCursor: - return new{{.Name}}WindowCountArrayCursor(cur, window) + return new{{.Name}}WindowCountArrayCursor(cur, window), nil {{end}} default: - panic(fmt.Sprintf("unreachable: %T", cur)) + return nil, &errors2.Error{ + Code: errors2.EInvalid, + Msg: fmt.Sprintf("unreachable: %s", arrayCursorType(cur)), + } } } @@ -87,35 +99,41 @@ func newWindowSumArrayCursor(cur cursors.Cursor, window interval.Window) (cursor } } -func newWindowMinArrayCursor(cur cursors.Cursor, window interval.Window) cursors.Cursor { +func newWindowMinArrayCursor(cur cursors.Cursor, window interval.Window) (cursors.Cursor, error) { switch cur := cur.(type) { {{range .}} {{$Type := .Name}} {{range .Aggs}} {{if eq .Name "Min"}} case cursors.{{$Type}}ArrayCursor: - return new{{$Type}}WindowMinArrayCursor(cur, window) + return new{{$Type}}WindowMinArrayCursor(cur, window), nil {{end}} {{end}}{{/* for each supported agg fn */}} {{end}}{{/* for each field type */}} default: - panic(fmt.Sprintf("unsupported for aggregate min: %T", cur)) + return nil, &errors2.Error{ + Code: errors2.EInvalid, + Msg: fmt.Sprintf("unsupported for aggregate min: %s", arrayCursorType(cur)), + } } } -func newWindowMaxArrayCursor(cur cursors.Cursor, window interval.Window) cursors.Cursor { +func newWindowMaxArrayCursor(cur cursors.Cursor, window interval.Window) (cursors.Cursor, error) { switch cur := cur.(type) { {{range .}} {{$Type := .Name}} {{range .Aggs}} {{if eq .Name "Max"}} case cursors.{{$Type}}ArrayCursor: - return new{{$Type}}WindowMaxArrayCursor(cur, window) + return new{{$Type}}WindowMaxArrayCursor(cur, window), nil {{end}} {{end}}{{/* for each supported agg fn */}} {{end}}{{/* for each field type */}} default: - panic(fmt.Sprintf("unsupported for aggregate max: %T", cur)) + return nil, &errors2.Error{ + Code: errors2.EInvalid, + Msg: fmt.Sprintf("unsupported for aggregate max: %s", arrayCursorType(cur)), + } } } diff --git a/storage/reads/array_cursor.go b/storage/reads/array_cursor.go index 215aab54dba..cd31a490f7b 100644 --- a/storage/reads/array_cursor.go +++ b/storage/reads/array_cursor.go @@ -20,7 +20,7 @@ func (v *singleValue) Value(key string) (interface{}, bool) { func newAggregateArrayCursor(ctx context.Context, agg *datatypes.Aggregate, cursor cursors.Cursor) (cursors.Cursor, error) { switch agg.Type { case datatypes.Aggregate_AggregateTypeFirst, datatypes.Aggregate_AggregateTypeLast: - return newLimitArrayCursor(cursor), nil + return newLimitArrayCursor(cursor) } return newWindowAggregateArrayCursor(ctx, agg, interval.Window{}, cursor) } @@ -32,17 +32,17 @@ func newWindowAggregateArrayCursor(ctx context.Context, agg *datatypes.Aggregate switch agg.Type { case datatypes.Aggregate_AggregateTypeCount: - return newWindowCountArrayCursor(cursor, window), nil + return newWindowCountArrayCursor(cursor, window) case datatypes.Aggregate_AggregateTypeSum: return newWindowSumArrayCursor(cursor, window) case datatypes.Aggregate_AggregateTypeFirst: - return newWindowFirstArrayCursor(cursor, window), nil + return newWindowFirstArrayCursor(cursor, window) case datatypes.Aggregate_AggregateTypeLast: - return newWindowLastArrayCursor(cursor, window), nil + return newWindowLastArrayCursor(cursor, window) case datatypes.Aggregate_AggregateTypeMin: - return newWindowMinArrayCursor(cursor, window), nil + return newWindowMinArrayCursor(cursor, window) case datatypes.Aggregate_AggregateTypeMax: - return newWindowMaxArrayCursor(cursor, window), nil + return newWindowMaxArrayCursor(cursor, window) case datatypes.Aggregate_AggregateTypeMean: return newWindowMeanArrayCursor(cursor, window) default: From 91f532d68d2419e8907fa57e87f2ba1b1ea2c2e1 Mon Sep 17 00:00:00 2001 From: devanbenz Date: Wed, 19 Mar 2025 12:33:42 -0500 Subject: [PATCH 02/14] fix: Remove various panic's --- storage/reads/array_cursor.gen.go.tmpl | 8 ++++---- storage/reads/array_cursor.go | 26 ++++++++++++++++---------- 2 files changed, 20 insertions(+), 14 deletions(-) diff --git a/storage/reads/array_cursor.gen.go.tmpl b/storage/reads/array_cursor.gen.go.tmpl index 63786606e98..4d343013703 100644 --- a/storage/reads/array_cursor.gen.go.tmpl +++ b/storage/reads/array_cursor.gen.go.tmpl @@ -27,7 +27,7 @@ func newLimitArrayCursor(cur cursors.Cursor) (cursors.Cursor, error) { default: return nil, &errors2.Error{ Code: errors2.EInvalid, - Msg: fmt.Sprintf("unreachable: %s", arrayCursorType(cur)), + Msg: fmt.Sprintf("unsupported limit array cursor type: %s", arrayCursorType(cur)), } } } @@ -44,7 +44,7 @@ func newWindowFirstArrayCursor(cur cursors.Cursor, window interval.Window) (curs default: return nil, &errors2.Error{ Code: errors2.EInvalid, - Msg: fmt.Sprintf("unreachable: %s", arrayCursorType(cur)), + Msg: fmt.Sprintf("unsupported window first cursor type: %s", arrayCursorType(cur)), } } } @@ -61,7 +61,7 @@ func newWindowLastArrayCursor(cur cursors.Cursor, window interval.Window) (curso default: return nil, &errors2.Error{ Code: errors2.EInvalid, - Msg: fmt.Sprintf("unreachable: %s", arrayCursorType(cur)), + Msg: fmt.Sprintf("unsupported window last cursor type: %s", arrayCursorType(cur)), } } } @@ -75,7 +75,7 @@ func newWindowCountArrayCursor(cur cursors.Cursor, window interval.Window) (curs default: return nil, &errors2.Error{ Code: errors2.EInvalid, - Msg: fmt.Sprintf("unreachable: %s", arrayCursorType(cur)), + Msg: fmt.Sprintf("unsupported window count cursor type: %s", arrayCursorType(cur)), } } } diff --git a/storage/reads/array_cursor.go b/storage/reads/array_cursor.go index cd31a490f7b..b7e47419d3e 100644 --- a/storage/reads/array_cursor.go +++ b/storage/reads/array_cursor.go @@ -3,6 +3,7 @@ package reads import ( "context" "fmt" + errors2 "github.com/influxdata/influxdb/v2/kit/platform/errors" "github.com/influxdata/flux/interval" "github.com/influxdata/influxdb/v2/storage/reads/datatypes" @@ -46,8 +47,10 @@ func newWindowAggregateArrayCursor(ctx context.Context, agg *datatypes.Aggregate case datatypes.Aggregate_AggregateTypeMean: return newWindowMeanArrayCursor(cursor, window) default: - // TODO(sgc): should be validated higher up - panic("invalid aggregate") + return nil, &errors2.Error{ + Code: errors2.EInvalid, + Msg: fmt.Sprintf("unsupported window aggregate cursor: %s", agg.Type), + } } } @@ -101,7 +104,7 @@ func newMultiShardArrayCursors(ctx context.Context, start, end int64, asc bool) return m } -func (m *multiShardArrayCursors) createCursor(row SeriesRow) cursors.Cursor { +func (m *multiShardArrayCursors) createCursor(row SeriesRow) (cursors.Cursor, error) { m.req.Name = row.Name m.req.Tags = row.SeriesTags m.req.Field = row.Field @@ -120,26 +123,29 @@ func (m *multiShardArrayCursors) createCursor(row SeriesRow) cursors.Cursor { } if cur == nil || err != nil { - return nil + return nil, nil } switch c := cur.(type) { case cursors.IntegerArrayCursor: m.cursors.i.reset(c, row.Query, cond) - return &m.cursors.i + return &m.cursors.i, nil case cursors.FloatArrayCursor: m.cursors.f.reset(c, row.Query, cond) - return &m.cursors.f + return &m.cursors.f, nil case cursors.UnsignedArrayCursor: m.cursors.u.reset(c, row.Query, cond) - return &m.cursors.u + return &m.cursors.u, nil case cursors.StringArrayCursor: m.cursors.s.reset(c, row.Query, cond) - return &m.cursors.s + return &m.cursors.s, nil case cursors.BooleanArrayCursor: m.cursors.b.reset(c, row.Query, cond) - return &m.cursors.b + return &m.cursors.b, nil default: - panic(fmt.Sprintf("unreachable: %T", cur)) + return nil, &errors2.Error{ + Code: errors2.EInvalid, + Msg: fmt.Sprintf("unsupported cursor type while creating cursor: %s", arrayCursorType(cur)), + } } } From 6a1cc9fcf70c143dc2a23a7e42e0e7197e837de5 Mon Sep 17 00:00:00 2001 From: devanbenz Date: Wed, 19 Mar 2025 13:51:30 -0500 Subject: [PATCH 03/14] fix: remove panics --- storage/reads/array_cursor.go | 19 ++++++++----------- 1 file changed, 8 insertions(+), 11 deletions(-) diff --git a/storage/reads/array_cursor.go b/storage/reads/array_cursor.go index b7e47419d3e..6692e00dc79 100644 --- a/storage/reads/array_cursor.go +++ b/storage/reads/array_cursor.go @@ -104,7 +104,7 @@ func newMultiShardArrayCursors(ctx context.Context, start, end int64, asc bool) return m } -func (m *multiShardArrayCursors) createCursor(row SeriesRow) (cursors.Cursor, error) { +func (m *multiShardArrayCursors) createCursor(row SeriesRow) cursors.Cursor { m.req.Name = row.Name m.req.Tags = row.SeriesTags m.req.Field = row.Field @@ -123,29 +123,26 @@ func (m *multiShardArrayCursors) createCursor(row SeriesRow) (cursors.Cursor, er } if cur == nil || err != nil { - return nil, nil + return nil } switch c := cur.(type) { case cursors.IntegerArrayCursor: m.cursors.i.reset(c, row.Query, cond) - return &m.cursors.i, nil + return &m.cursors.i case cursors.FloatArrayCursor: m.cursors.f.reset(c, row.Query, cond) - return &m.cursors.f, nil + return &m.cursors.f case cursors.UnsignedArrayCursor: m.cursors.u.reset(c, row.Query, cond) - return &m.cursors.u, nil + return &m.cursors.u case cursors.StringArrayCursor: m.cursors.s.reset(c, row.Query, cond) - return &m.cursors.s, nil + return &m.cursors.s case cursors.BooleanArrayCursor: m.cursors.b.reset(c, row.Query, cond) - return &m.cursors.b, nil + return &m.cursors.b default: - return nil, &errors2.Error{ - Code: errors2.EInvalid, - Msg: fmt.Sprintf("unsupported cursor type while creating cursor: %s", arrayCursorType(cur)), - } + return nil } } From c8ee8127afc28d506c54d5a4889b809987828b52 Mon Sep 17 00:00:00 2001 From: devanbenz Date: Thu, 20 Mar 2025 11:42:06 -0500 Subject: [PATCH 04/14] fix: Bubble up errors for cursors --- mock/reads_resultset.go | 8 +++++--- storage/flux/reader.go | 12 +++++++++--- storage/flux/table.gen.go | 10 +++++----- storage/flux/table.gen.go.tmpl | 2 +- storage/reads/aggregate_resultset.go | 9 ++++++--- storage/reads/aggregate_resultset_test.go | 7 +++++-- storage/reads/array_cursor.gen.go | 8 ++++---- storage/reads/array_cursor.go | 20 ++++++++++++-------- storage/reads/array_cursor_test.go | 4 +++- storage/reads/group_resultset.go | 23 ++++++++++++++++------- storage/reads/resultset.go | 4 ++-- storage/reads/resultset_lineprotocol.go | 6 +++++- storage/reads/store.go | 4 ++-- storage/reads/store_test.go | 2 +- v1/services/storage/store.go | 18 +++++++++++++++--- 15 files changed, 91 insertions(+), 46 deletions(-) diff --git a/mock/reads_resultset.go b/mock/reads_resultset.go index 0e32a9eaafc..aa8c4708c55 100644 --- a/mock/reads_resultset.go +++ b/mock/reads_resultset.go @@ -1,6 +1,8 @@ package mock import ( + "fmt" + "github.com/influxdata/influxdb/v2/models" "github.com/influxdata/influxdb/v2/pkg/data/gen" "github.com/influxdata/influxdb/v2/storage/reads" @@ -61,7 +63,7 @@ func (g *GeneratorResultSet) Next() bool { return g.sg.Next() && (g.max == 0 || remain > 0) } -func (g *GeneratorResultSet) Cursor() cursors.Cursor { +func (g *GeneratorResultSet) Cursor() (cursors.Cursor, error) { switch g.sg.FieldType() { case models.Float: g.f.tv = g.sg.TimeValuesGenerator() @@ -79,10 +81,10 @@ func (g *GeneratorResultSet) Cursor() cursors.Cursor { g.b.tv = g.sg.TimeValuesGenerator() g.cur = &g.b default: - panic("unreachable") + return nil, fmt.Errorf("unsupported field type: %v", g.sg.FieldType()) } - return g.cur + return g.cur, nil } func copyTags(dst, src models.Tags) models.Tags { diff --git a/storage/flux/reader.go b/storage/flux/reader.go index e7eeaec6b5c..2321e85c917 100644 --- a/storage/flux/reader.go +++ b/storage/flux/reader.go @@ -194,7 +194,10 @@ func (fi *filterIterator) handleRead(f func(flux.Table) error, rs storage.Result READ: for rs.Next() { - cur = rs.Cursor() + cur, err := rs.Cursor() + if err != nil { + return err + } if cur == nil { // no data for series key + field combination continue @@ -331,7 +334,7 @@ func (gi *groupIterator) handleRead(f func(flux.Table) error, rs storage.GroupRe READ: for gc != nil { for gc.Next() { - cur = gc.Cursor() + cur, _ = gc.Cursor() if cur != nil { break } @@ -740,7 +743,10 @@ func (wai *windowAggregateIterator) handleRead(f func(flux.Table) error, rs stor READ: for rs.Next() { - cur = rs.Cursor() + cur, err = rs.Cursor() + if err != nil { + return err + } if cur == nil { // no data for series key + field combination continue diff --git a/storage/flux/table.gen.go b/storage/flux/table.gen.go index 7e2ab6deb46..3a5a4d22667 100644 --- a/storage/flux/table.gen.go +++ b/storage/flux/table.gen.go @@ -970,7 +970,7 @@ func (t *floatGroupTable) advanceCursor() bool { t.cur.Close() t.cur = nil for t.gc.Next() { - cur := t.gc.Cursor() + cur, _ := t.gc.Cursor() if cur == nil { continue } @@ -1954,7 +1954,7 @@ func (t *integerGroupTable) advanceCursor() bool { t.cur.Close() t.cur = nil for t.gc.Next() { - cur := t.gc.Cursor() + cur, _ := t.gc.Cursor() if cur == nil { continue } @@ -2935,7 +2935,7 @@ func (t *unsignedGroupTable) advanceCursor() bool { t.cur.Close() t.cur = nil for t.gc.Next() { - cur := t.gc.Cursor() + cur, _ := t.gc.Cursor() if cur == nil { continue } @@ -3860,7 +3860,7 @@ func (t *stringGroupTable) advanceCursor() bool { t.cur.Close() t.cur = nil for t.gc.Next() { - cur := t.gc.Cursor() + cur, _ := t.gc.Cursor() if cur == nil { continue } @@ -4785,7 +4785,7 @@ func (t *booleanGroupTable) advanceCursor() bool { t.cur.Close() t.cur = nil for t.gc.Next() { - cur := t.gc.Cursor() + cur, _ := t.gc.Cursor() if cur == nil { continue } diff --git a/storage/flux/table.gen.go.tmpl b/storage/flux/table.gen.go.tmpl index a55116fb84c..d607954117b 100644 --- a/storage/flux/table.gen.go.tmpl +++ b/storage/flux/table.gen.go.tmpl @@ -1000,7 +1000,7 @@ func (t *{{.name}}GroupTable) advanceCursor() bool { t.cur.Close() t.cur = nil for t.gc.Next() { - cur := t.gc.Cursor() + cur, _ := t.gc.Cursor() if cur == nil { continue } diff --git a/storage/reads/aggregate_resultset.go b/storage/reads/aggregate_resultset.go index 1d41539457d..90f558b2627 100644 --- a/storage/reads/aggregate_resultset.go +++ b/storage/reads/aggregate_resultset.go @@ -97,7 +97,10 @@ func (r *windowAggregateResultSet) createCursor(seriesRow SeriesRow) (cursors.Cu agg := r.req.Aggregate[0] every := r.req.WindowEvery offset := r.req.Offset - cursor := r.arrayCursors.createCursor(seriesRow) + cursor, err := r.arrayCursors.createCursor(seriesRow) + if err != nil { + return nil, err + } var everyDur values.Duration var offsetDur values.Duration @@ -132,8 +135,8 @@ func (r *windowAggregateResultSet) createCursor(seriesRow SeriesRow) (cursors.Cu } } -func (r *windowAggregateResultSet) Cursor() cursors.Cursor { - return r.cursor +func (r *windowAggregateResultSet) Cursor() (cursors.Cursor, error) { + return r.cursor, nil } func (r *windowAggregateResultSet) Close() { diff --git a/storage/reads/aggregate_resultset_test.go b/storage/reads/aggregate_resultset_test.go index ac86137eef4..59a33bdc0dc 100644 --- a/storage/reads/aggregate_resultset_test.go +++ b/storage/reads/aggregate_resultset_test.go @@ -2,6 +2,7 @@ package reads_test import ( "context" + "github.com/stretchr/testify/require" "reflect" "testing" "time" @@ -196,7 +197,8 @@ func TestNewWindowAggregateResultSet_Mean(t *testing.T) { if !resultSet.Next() { t.Fatalf("unexpected: resultSet could not advance") } - cursor := resultSet.Cursor() + cursor, err := resultSet.Cursor() + require.NoError(t, err, "create cursor failed") if cursor == nil { t.Fatalf("unexpected: cursor was nil") } @@ -238,7 +240,8 @@ func TestNewWindowAggregateResultSet_Months(t *testing.T) { if !resultSet.Next() { t.Fatalf("unexpected: resultSet could not advance") } - cursor := resultSet.Cursor() + cursor, err := resultSet.Cursor() + require.NoError(t, err, "create cursor failed") if cursor == nil { t.Fatalf("unexpected: cursor was nil") } diff --git a/storage/reads/array_cursor.gen.go b/storage/reads/array_cursor.gen.go index 62b1dcaab88..58c02486430 100644 --- a/storage/reads/array_cursor.gen.go +++ b/storage/reads/array_cursor.gen.go @@ -45,7 +45,7 @@ func newLimitArrayCursor(cur cursors.Cursor) (cursors.Cursor, error) { default: return nil, &errors2.Error{ Code: errors2.EInvalid, - Msg: fmt.Sprintf("unreachable: %s", arrayCursorType(cur)), + Msg: fmt.Sprintf("unsupported limit array cursor type: %s", arrayCursorType(cur)), } } } @@ -74,7 +74,7 @@ func newWindowFirstArrayCursor(cur cursors.Cursor, window interval.Window) (curs default: return nil, &errors2.Error{ Code: errors2.EInvalid, - Msg: fmt.Sprintf("unreachable: %s", arrayCursorType(cur)), + Msg: fmt.Sprintf("unsupported window first cursor type: %s", arrayCursorType(cur)), } } } @@ -103,7 +103,7 @@ func newWindowLastArrayCursor(cur cursors.Cursor, window interval.Window) (curso default: return nil, &errors2.Error{ Code: errors2.EInvalid, - Msg: fmt.Sprintf("unreachable: %s", arrayCursorType(cur)), + Msg: fmt.Sprintf("unsupported window last cursor type: %s", arrayCursorType(cur)), } } } @@ -129,7 +129,7 @@ func newWindowCountArrayCursor(cur cursors.Cursor, window interval.Window) (curs default: return nil, &errors2.Error{ Code: errors2.EInvalid, - Msg: fmt.Sprintf("unreachable: %s", arrayCursorType(cur)), + Msg: fmt.Sprintf("unsupported window count cursor type: %s", arrayCursorType(cur)), } } } diff --git a/storage/reads/array_cursor.go b/storage/reads/array_cursor.go index 6692e00dc79..47d6975a173 100644 --- a/storage/reads/array_cursor.go +++ b/storage/reads/array_cursor.go @@ -3,6 +3,7 @@ package reads import ( "context" "fmt" + errors2 "github.com/influxdata/influxdb/v2/kit/platform/errors" "github.com/influxdata/flux/interval" @@ -104,7 +105,7 @@ func newMultiShardArrayCursors(ctx context.Context, start, end int64, asc bool) return m } -func (m *multiShardArrayCursors) createCursor(row SeriesRow) cursors.Cursor { +func (m *multiShardArrayCursors) createCursor(row SeriesRow) (cursors.Cursor, error) { m.req.Name = row.Name m.req.Tags = row.SeriesTags m.req.Field = row.Field @@ -123,26 +124,29 @@ func (m *multiShardArrayCursors) createCursor(row SeriesRow) cursors.Cursor { } if cur == nil || err != nil { - return nil + return nil, err } switch c := cur.(type) { case cursors.IntegerArrayCursor: m.cursors.i.reset(c, row.Query, cond) - return &m.cursors.i + return &m.cursors.i, nil case cursors.FloatArrayCursor: m.cursors.f.reset(c, row.Query, cond) - return &m.cursors.f + return &m.cursors.f, nil case cursors.UnsignedArrayCursor: m.cursors.u.reset(c, row.Query, cond) - return &m.cursors.u + return &m.cursors.u, nil case cursors.StringArrayCursor: m.cursors.s.reset(c, row.Query, cond) - return &m.cursors.s + return &m.cursors.s, nil case cursors.BooleanArrayCursor: m.cursors.b.reset(c, row.Query, cond) - return &m.cursors.b + return &m.cursors.b, nil default: - return nil + return nil, &errors2.Error{ + Code: errors2.EInvalid, + Msg: fmt.Sprintf("unsupported cursor type: %s", arrayCursorType(cur)), + } } } diff --git a/storage/reads/array_cursor_test.go b/storage/reads/array_cursor_test.go index 3f4389f94e9..ceea5e43bb7 100644 --- a/storage/reads/array_cursor_test.go +++ b/storage/reads/array_cursor_test.go @@ -2227,7 +2227,9 @@ func TestMultiShardArrayCursor(t *testing.T) { row := SeriesRow{Query: iter} ctx := context.Background() msac := newMultiShardArrayCursors(ctx, models.MinNanoTime, models.MaxNanoTime, true) - cur, ok := msac.createCursor(row).(cursors.IntegerArrayCursor) + cursor, err := msac.createCursor(row) + require.NoError(t, err, "create cursor failed") + cur, ok := cursor.(cursors.IntegerArrayCursor) require.Truef(t, ok, "Expected IntegerArrayCursor") ia := cur.Next() diff --git a/storage/reads/group_resultset.go b/storage/reads/group_resultset.go index 04cbfa3c7fe..bc4d545e25b 100644 --- a/storage/reads/group_resultset.go +++ b/storage/reads/group_resultset.go @@ -121,7 +121,10 @@ func (g *groupResultSet) Next() GroupCursor { // the time range of the query. func (g *groupResultSet) seriesHasPoints(row *SeriesRow) bool { // TODO(sgc): this is expensive. Storage engine must provide efficient time range queries of series keys. - cur := g.arrayCursors.createCursor(*row) + cur, err := g.arrayCursors.createCursor(*row) + if err != nil { + return false + } var ts []int64 switch c := cur.(type) { case cursors.IntegerArrayCursor: @@ -302,15 +305,18 @@ func (c *groupNoneCursor) Next() bool { } func (c *groupNoneCursor) createCursor(seriesRow SeriesRow) (cur cursors.Cursor, err error) { - cur = c.arrayCursors.createCursor(c.row) + cur, err = c.arrayCursors.createCursor(c.row) + if err != nil { + return nil, err + } if c.agg != nil { cur, err = newAggregateArrayCursor(c.ctx, c.agg, cur) } return cur, err } -func (c *groupNoneCursor) Cursor() cursors.Cursor { - return c.cursor +func (c *groupNoneCursor) Cursor() (cursors.Cursor, error) { + return c.cursor, nil } type groupByCursor struct { @@ -350,15 +356,18 @@ func (c *groupByCursor) Next() bool { } func (c *groupByCursor) createCursor(seriesRow SeriesRow) (cur cursors.Cursor, err error) { - cur = c.arrayCursors.createCursor(seriesRow) + cur, err = c.arrayCursors.createCursor(seriesRow) + if err != nil { + return nil, err + } if c.agg != nil { cur, err = newAggregateArrayCursor(c.ctx, c.agg, cur) } return cur, err } -func (c *groupByCursor) Cursor() cursors.Cursor { - return c.cursor +func (c *groupByCursor) Cursor() (cursors.Cursor, error) { + return c.cursor, nil } func (c *groupByCursor) Stats() cursors.CursorStats { diff --git a/storage/reads/resultset.go b/storage/reads/resultset.go index 775d56417e2..39282223991 100644 --- a/storage/reads/resultset.go +++ b/storage/reads/resultset.go @@ -8,7 +8,7 @@ import ( ) type multiShardCursors interface { - createCursor(row SeriesRow) cursors.Cursor + createCursor(row SeriesRow) (cursors.Cursor, error) } type resultSet struct { @@ -57,7 +57,7 @@ func (r *resultSet) Next() bool { return true } -func (r *resultSet) Cursor() cursors.Cursor { +func (r *resultSet) Cursor() (cursors.Cursor, error) { return r.arrayCursors.createCursor(r.seriesRow) } diff --git a/storage/reads/resultset_lineprotocol.go b/storage/reads/resultset_lineprotocol.go index 3de073b522c..d0de8d7235b 100644 --- a/storage/reads/resultset_lineprotocol.go +++ b/storage/reads/resultset_lineprotocol.go @@ -32,7 +32,11 @@ func ResultSetToLineProtocol(wr io.Writer, rs ResultSet) (err error) { line = append(line, ' ') line = append(line, field...) line = append(line, '=') - err = cursorToLineProtocol(wr, line, rs.Cursor()) + cursor, err := rs.Cursor() + if err != nil { + return err + } + err = cursorToLineProtocol(wr, line, cursor) if err != nil { return err } diff --git a/storage/reads/store.go b/storage/reads/store.go index 72db72d130b..2e3fe2e7b90 100644 --- a/storage/reads/store.go +++ b/storage/reads/store.go @@ -15,7 +15,7 @@ type ResultSet interface { Next() bool // Cursor returns the most recent cursor after a call to Next. - Cursor() cursors.Cursor + Cursor() (cursors.Cursor, error) // Tags returns the tags for the most recent cursor after a call to Next. Tags() models.Tags @@ -47,7 +47,7 @@ type GroupCursor interface { Next() bool // Cursor returns the most recent cursor after a call to Next. - Cursor() cursors.Cursor + Cursor() (cursors.Cursor, error) // Tags returns the tags for the most recent cursor after a call to Next. Tags() models.Tags diff --git a/storage/reads/store_test.go b/storage/reads/store_test.go index 304557a93a4..b99911af223 100644 --- a/storage/reads/store_test.go +++ b/storage/reads/store_test.go @@ -119,7 +119,7 @@ func resultSetToString(wr io.Writer, rs reads.ResultSet, opts ...optionFn) { for rs.Next() { fmt.Fprint(wr, "series: ") tagsToString(wr, rs.Tags()) - cur := rs.Cursor() + cur, _ := rs.Cursor() if po.SkipNilCursor && cur == nil { continue diff --git a/v1/services/storage/store.go b/v1/services/storage/store.go index 703f1028689..e766b06ccd2 100644 --- a/v1/services/storage/store.go +++ b/v1/services/storage/store.go @@ -257,7 +257,11 @@ func (s *Store) tagKeysWithFieldPredicate(ctx context.Context, mqAttrs *metaquer rs := reads.NewFilteredResultSet(ctx, mqAttrs.start, mqAttrs.end, cur) for rs.Next() { func() { - c := rs.Cursor() + c, err := rs.Cursor() + if err != nil { + s.Logger.Error("failed to get next cursor during iteration", zap.Error(err)) + return + } if c == nil { // no data for series key + field combination return @@ -623,7 +627,11 @@ func (s *Store) tagValuesSlow(ctx context.Context, mqAttrs *metaqueryAttributes, rs := reads.NewFilteredResultSet(ctx, mqAttrs.start, mqAttrs.end, cur) for rs.Next() { func() { - c := rs.Cursor() + c, err := rs.Cursor() + if err != nil { + s.Logger.Error("failed to get next cursor during iteration", zap.Error(err)) + return + } if c == nil { // no data for series key + field combination? // It seems that even when there is no data for this series key + field @@ -764,7 +772,11 @@ func (s *Store) seriesCardinalityWithPredicateAndTime(ctx context.Context, shard rs := reads.NewFilteredResultSet(ctx, start, end, cur) for rs.Next() { func() { - c := rs.Cursor() + c, err := rs.Cursor() + if err != nil { + s.Logger.Error("failed to get next cursor during iteration", zap.Error(err)) + return + } if c == nil { // no data for series key + field combination return From 1f8fa62ae618c1c1e5666949d9635f50c8cd3c2d Mon Sep 17 00:00:00 2001 From: devanbenz Date: Thu, 20 Mar 2025 12:01:19 -0500 Subject: [PATCH 05/14] feat: Add error handling to flux table advanceCursor --- storage/flux/table.gen.go | 70 +++++++++++++++++++++++++++++++--- storage/flux/table.gen.go.tmpl | 14 ++++++- 2 files changed, 78 insertions(+), 6 deletions(-) diff --git a/storage/flux/table.gen.go b/storage/flux/table.gen.go index 3a5a4d22667..c7cfd16f443 100644 --- a/storage/flux/table.gen.go +++ b/storage/flux/table.gen.go @@ -970,7 +970,19 @@ func (t *floatGroupTable) advanceCursor() bool { t.cur.Close() t.cur = nil for t.gc.Next() { - cur, _ := t.gc.Cursor() + cur, err := t.gc.Cursor() + if err != nil { + cur.Close() + t.err = &errors.Error{ + Code: errors.EInvalid, + Err: &GroupCursorError{ + typ: "float", + cursor: cur, + }, + } + return false + } + if cur == nil { continue } @@ -1954,7 +1966,19 @@ func (t *integerGroupTable) advanceCursor() bool { t.cur.Close() t.cur = nil for t.gc.Next() { - cur, _ := t.gc.Cursor() + cur, err := t.gc.Cursor() + if err != nil { + cur.Close() + t.err = &errors.Error{ + Code: errors.EInvalid, + Err: &GroupCursorError{ + typ: "integer", + cursor: cur, + }, + } + return false + } + if cur == nil { continue } @@ -2935,7 +2959,19 @@ func (t *unsignedGroupTable) advanceCursor() bool { t.cur.Close() t.cur = nil for t.gc.Next() { - cur, _ := t.gc.Cursor() + cur, err := t.gc.Cursor() + if err != nil { + cur.Close() + t.err = &errors.Error{ + Code: errors.EInvalid, + Err: &GroupCursorError{ + typ: "unsigned", + cursor: cur, + }, + } + return false + } + if cur == nil { continue } @@ -3860,7 +3896,19 @@ func (t *stringGroupTable) advanceCursor() bool { t.cur.Close() t.cur = nil for t.gc.Next() { - cur, _ := t.gc.Cursor() + cur, err := t.gc.Cursor() + if err != nil { + cur.Close() + t.err = &errors.Error{ + Code: errors.EInvalid, + Err: &GroupCursorError{ + typ: "string", + cursor: cur, + }, + } + return false + } + if cur == nil { continue } @@ -4785,7 +4833,19 @@ func (t *booleanGroupTable) advanceCursor() bool { t.cur.Close() t.cur = nil for t.gc.Next() { - cur, _ := t.gc.Cursor() + cur, err := t.gc.Cursor() + if err != nil { + cur.Close() + t.err = &errors.Error{ + Code: errors.EInvalid, + Err: &GroupCursorError{ + typ: "boolean", + cursor: cur, + }, + } + return false + } + if cur == nil { continue } diff --git a/storage/flux/table.gen.go.tmpl b/storage/flux/table.gen.go.tmpl index d607954117b..6ee23f48e93 100644 --- a/storage/flux/table.gen.go.tmpl +++ b/storage/flux/table.gen.go.tmpl @@ -1000,7 +1000,19 @@ func (t *{{.name}}GroupTable) advanceCursor() bool { t.cur.Close() t.cur = nil for t.gc.Next() { - cur, _ := t.gc.Cursor() + cur, err := t.gc.Cursor() + if err != nil { + cur.Close() + t.err = &errors.Error { + Code: errors.EInvalid, + Err: &GroupCursorError { + typ: "{{.name}}", + cursor: cur, + }, + } + return false + } + if cur == nil { continue } From c050d9d91cee0707cdcdd1576caf7f5bb8f8e18b Mon Sep 17 00:00:00 2001 From: devanbenz Date: Thu, 20 Mar 2025 12:06:07 -0500 Subject: [PATCH 06/14] feat: bubble up error for handleRead --- storage/flux/reader.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/storage/flux/reader.go b/storage/flux/reader.go index 2321e85c917..b8fc3264742 100644 --- a/storage/flux/reader.go +++ b/storage/flux/reader.go @@ -334,7 +334,10 @@ func (gi *groupIterator) handleRead(f func(flux.Table) error, rs storage.GroupRe READ: for gc != nil { for gc.Next() { - cur, _ = gc.Cursor() + cur, err := gc.Cursor() + if err != nil { + return err + } if cur != nil { break } From ced6938c41b5eefe566d2e6eda85763c89241b88 Mon Sep 17 00:00:00 2001 From: devanbenz Date: Thu, 20 Mar 2025 12:20:50 -0500 Subject: [PATCH 07/14] feat: Modify return values so we return r.err for cursors --- storage/reads/group_resultset.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/storage/reads/group_resultset.go b/storage/reads/group_resultset.go index bc4d545e25b..72c4382c162 100644 --- a/storage/reads/group_resultset.go +++ b/storage/reads/group_resultset.go @@ -316,7 +316,7 @@ func (c *groupNoneCursor) createCursor(seriesRow SeriesRow) (cur cursors.Cursor, } func (c *groupNoneCursor) Cursor() (cursors.Cursor, error) { - return c.cursor, nil + return c.cursor, c.err } type groupByCursor struct { @@ -367,7 +367,7 @@ func (c *groupByCursor) createCursor(seriesRow SeriesRow) (cur cursors.Cursor, e } func (c *groupByCursor) Cursor() (cursors.Cursor, error) { - return c.cursor, nil + return c.cursor, c.err } func (c *groupByCursor) Stats() cursors.CursorStats { From fe208e5552b8cbba31d2c71532ebf8b08291286a Mon Sep 17 00:00:00 2001 From: devanbenz Date: Thu, 20 Mar 2025 14:28:17 -0500 Subject: [PATCH 08/14] feat: do not shadow cur! --- storage/flux/reader.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/storage/flux/reader.go b/storage/flux/reader.go index b8fc3264742..d93d9bb1bd3 100644 --- a/storage/flux/reader.go +++ b/storage/flux/reader.go @@ -314,6 +314,7 @@ func (gi *groupIterator) handleRead(f func(flux.Table) error, rs storage.GroupRe gc storage.GroupCursor cur cursors.Cursor table storageTable + err error ) defer func() { @@ -334,7 +335,7 @@ func (gi *groupIterator) handleRead(f func(flux.Table) error, rs storage.GroupRe READ: for gc != nil { for gc.Next() { - cur, err := gc.Cursor() + cur, err = gc.Cursor() if err != nil { return err } From a6ccc8b2a6ddc88ce9842949e67de144507902e1 Mon Sep 17 00:00:00 2001 From: devanbenz Date: Fri, 21 Mar 2025 09:16:43 -0500 Subject: [PATCH 09/14] feat: modify windowAggregateResultSet to use c.err for Cursor return --- storage/reads/aggregate_resultset.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/storage/reads/aggregate_resultset.go b/storage/reads/aggregate_resultset.go index 90f558b2627..bbadf8abd21 100644 --- a/storage/reads/aggregate_resultset.go +++ b/storage/reads/aggregate_resultset.go @@ -136,7 +136,7 @@ func (r *windowAggregateResultSet) createCursor(seriesRow SeriesRow) (cursors.Cu } func (r *windowAggregateResultSet) Cursor() (cursors.Cursor, error) { - return r.cursor, nil + return r.cursor, r.err } func (r *windowAggregateResultSet) Close() { From e627dec2e50c85ed64de1323418c3db80c385a79 Mon Sep 17 00:00:00 2001 From: devanbenz Date: Mon, 24 Mar 2025 14:05:39 -0500 Subject: [PATCH 10/14] feat: Modify seriesHasPoints to include error returns for groupResultSet move error var declaration outside of closed vars in flux reader --- storage/flux/reader.go | 3 ++- storage/reads/group_resultset.go | 32 +++++++++++++++++---------- storage/reads/group_resultset_test.go | 20 ++++++++++++----- v1/services/storage/store.go | 6 ++--- 4 files changed, 39 insertions(+), 22 deletions(-) diff --git a/storage/flux/reader.go b/storage/flux/reader.go index d93d9bb1bd3..64da6b8f37e 100644 --- a/storage/flux/reader.go +++ b/storage/flux/reader.go @@ -309,12 +309,13 @@ func (gi *groupIterator) Do(f func(flux.Table) error) error { } func (gi *groupIterator) handleRead(f func(flux.Table) error, rs storage.GroupResultSet) error { + // error declaration to be used with our cursor iterations below + var err error // these resources must be closed if not nil on return var ( gc storage.GroupCursor cur cursors.Cursor table storageTable - err error ) defer func() { diff --git a/storage/reads/group_resultset.go b/storage/reads/group_resultset.go index 72c4382c162..ed5ca7e8d48 100644 --- a/storage/reads/group_resultset.go +++ b/storage/reads/group_resultset.go @@ -47,7 +47,7 @@ func IsLastDescendingGroupOptimization(req *datatypes.ReadGroupRequest) bool { return req.Aggregate != nil && req.Aggregate.Type == datatypes.Aggregate_AggregateTypeLast } -func NewGroupResultSet(ctx context.Context, req *datatypes.ReadGroupRequest, newSeriesCursorFn func() (SeriesCursor, error), opts ...GroupOption) GroupResultSet { +func NewGroupResultSet(ctx context.Context, req *datatypes.ReadGroupRequest, newSeriesCursorFn func() (SeriesCursor, error), opts ...GroupOption) (GroupResultSet, error) { g := &groupResultSet{ ctx: ctx, req: req, @@ -79,21 +79,21 @@ func NewGroupResultSet(ctx context.Context, req *datatypes.ReadGroupRequest, new } if n, err := g.groupBySort(); n == 0 || err != nil { - return nil + return nil, err } case datatypes.ReadGroupRequest_GroupNone: g.nextGroupFn = groupNoneNextGroup if n, err := g.groupNoneSort(); n == 0 || err != nil { - return nil + return nil, err } default: - panic("not implemented") + return nil, fmt.Errorf("unknown group type: %s", req.Group) } - return g + return g, nil } // NilSort values determine the lexicographical order of nil values in the @@ -119,11 +119,11 @@ func (g *groupResultSet) Next() GroupCursor { // seriesHasPoints reads the first block of TSM data to verify the series has points for // the time range of the query. -func (g *groupResultSet) seriesHasPoints(row *SeriesRow) bool { +func (g *groupResultSet) seriesHasPoints(row *SeriesRow) (bool, error) { // TODO(sgc): this is expensive. Storage engine must provide efficient time range queries of series keys. cur, err := g.arrayCursors.createCursor(*row) if err != nil { - return false + return false, err } var ts []int64 switch c := cur.(type) { @@ -143,12 +143,12 @@ func (g *groupResultSet) seriesHasPoints(row *SeriesRow) bool { a := c.Next() ts = a.Timestamps case nil: - return false + return false, nil default: - panic(fmt.Sprintf("unreachable: %T", c)) + return false, fmt.Errorf("unexpected cursor type: %s", arrayCursorType(c)) } cur.Close() - return len(ts) > 0 + return len(ts) > 0, nil } func groupNoneNextGroup(g *groupResultSet) GroupCursor { @@ -183,7 +183,11 @@ func (g *groupResultSet) groupNoneSort() (int, error) { n := 0 seriesRow := seriesCursor.Next() for seriesRow != nil { - if allTime || g.seriesHasPoints(seriesRow) { + hasPoints, err := g.seriesHasPoints(seriesRow) + if err != nil { + return 0, err + } + if allTime || hasPoints { n++ g.km.MergeTagKeys(seriesRow.Tags) } @@ -234,7 +238,11 @@ func (g *groupResultSet) groupBySort() (int, error) { seriesRow := seriesCursor.Next() for seriesRow != nil { - if allTime || g.seriesHasPoints(seriesRow) { + hasPoints, err := g.seriesHasPoints(seriesRow) + if err != nil { + return 0, err + } + if allTime || hasPoints { nr := *seriesRow nr.SeriesTags = tagsBuf.copyTags(nr.SeriesTags) nr.Tags = tagsBuf.copyTags(nr.Tags) diff --git a/storage/reads/group_resultset_test.go b/storage/reads/group_resultset_test.go index 52b78f647c8..e3e8b037134 100644 --- a/storage/reads/group_resultset_test.go +++ b/storage/reads/group_resultset_test.go @@ -2,6 +2,7 @@ package reads_test import ( "context" + "github.com/stretchr/testify/require" "strings" "testing" @@ -259,7 +260,7 @@ group: var hints datatypes.HintFlags hints.SetHintSchemaAllTime() - rs := reads.NewGroupResultSet(context.Background(), &datatypes.ReadGroupRequest{ + rs, err := reads.NewGroupResultSet(context.Background(), &datatypes.ReadGroupRequest{ Group: tt.group, GroupKeys: tt.keys, // TODO(jlapacik): @@ -267,6 +268,7 @@ group: // Eventually this field should be removed entirely. Hints: uint32(hints), }, newCursor) + require.NoError(t, err, "group result set creation error") sb := new(strings.Builder) GroupResultSetToString(sb, rs, SkipNilCursor()) @@ -287,7 +289,8 @@ func TestNewGroupResultSet_GroupNone_NoDataReturnsNil(t *testing.T) { )}, nil } - rs := reads.NewGroupResultSet(context.Background(), &datatypes.ReadGroupRequest{Group: datatypes.ReadGroupRequest_GroupNone}, newCursor) + rs, err := reads.NewGroupResultSet(context.Background(), &datatypes.ReadGroupRequest{Group: datatypes.ReadGroupRequest_GroupNone}, newCursor) + require.NoError(t, err, "group result set creation error") if rs != nil { t.Errorf("expected nil cursor") } @@ -302,7 +305,8 @@ func TestNewGroupResultSet_GroupBy_NoDataReturnsNil(t *testing.T) { )}, nil } - rs := reads.NewGroupResultSet(context.Background(), &datatypes.ReadGroupRequest{Group: datatypes.ReadGroupRequest_GroupBy, GroupKeys: []string{"tag0"}}, newCursor) + rs, err := reads.NewGroupResultSet(context.Background(), &datatypes.ReadGroupRequest{Group: datatypes.ReadGroupRequest_GroupBy, GroupKeys: []string{"tag0"}}, newCursor) + require.NoError(t, err, "group result set creation error") if rs != nil { t.Errorf("expected nil cursor") } @@ -385,7 +389,7 @@ group: var hints datatypes.HintFlags hints.SetHintSchemaAllTime() - rs := reads.NewGroupResultSet(context.Background(), &datatypes.ReadGroupRequest{ + rs, err := reads.NewGroupResultSet(context.Background(), &datatypes.ReadGroupRequest{ Group: datatypes.ReadGroupRequest_GroupBy, GroupKeys: tt.keys, // TODO(jlapacik): @@ -393,6 +397,7 @@ group: // Eventually this field should be removed entirely. Hints: uint32(hints), }, newCursor, tt.opts...) + require.NoError(t, err, "group result set creation error") sb := new(strings.Builder) GroupResultSetToString(sb, rs, SkipNilCursor()) @@ -459,7 +464,8 @@ func BenchmarkNewGroupResultSet_GroupBy(b *testing.B) { hints.SetHintSchemaAllTime() for i := 0; i < b.N; i++ { - rs := reads.NewGroupResultSet(context.Background(), &datatypes.ReadGroupRequest{Group: datatypes.ReadGroupRequest_GroupBy, GroupKeys: []string{"tag2"}, Hints: uint32(hints)}, newCursor) + rs, err := reads.NewGroupResultSet(context.Background(), &datatypes.ReadGroupRequest{Group: datatypes.ReadGroupRequest_GroupBy, GroupKeys: []string{"tag2"}, Hints: uint32(hints)}, newCursor) + require.NoError(b, err, "group result set creation error") rs.Close() } } @@ -490,9 +496,11 @@ func TestNewGroupResultSet_TimeRange(t *testing.T) { }, } - resultSet := reads.NewGroupResultSet(ctx, &req, func() (reads.SeriesCursor, error) { + resultSet, err := reads.NewGroupResultSet(ctx, &req, func() (reads.SeriesCursor, error) { return &newCursor, nil }) + require.NoError(t, err, "group result set creation error") + groupByCursor := resultSet.Next() if groupByCursor == nil { t.Fatal("unexpected: groupByCursor was nil") diff --git a/v1/services/storage/store.go b/v1/services/storage/store.go index e766b06ccd2..9da8220aff9 100644 --- a/v1/services/storage/store.go +++ b/v1/services/storage/store.go @@ -229,9 +229,9 @@ func (s *Store) ReadGroup(ctx context.Context, req *datatypes.ReadGroupRequest) return cur, nil } - rs := reads.NewGroupResultSet(ctx, req, newCursor) - if rs == nil { - return nil, nil + rs, err := reads.NewGroupResultSet(ctx, req, newCursor) + if err != nil { + return nil, err } return rs, nil From a42e61190897a0bd9bb9b43faefc5c8e08e0714f Mon Sep 17 00:00:00 2001 From: devanbenz Date: Mon, 24 Mar 2025 15:42:27 -0500 Subject: [PATCH 11/14] feat: Modify lambdas to return err values --- storage/reads/aggregate_resultset.go | 2 ++ v1/services/storage/store.go | 33 ++++++++++++++++++---------- 2 files changed, 23 insertions(+), 12 deletions(-) diff --git a/storage/reads/aggregate_resultset.go b/storage/reads/aggregate_resultset.go index bbadf8abd21..ea565150287 100644 --- a/storage/reads/aggregate_resultset.go +++ b/storage/reads/aggregate_resultset.go @@ -98,6 +98,8 @@ func (r *windowAggregateResultSet) createCursor(seriesRow SeriesRow) (cursors.Cu every := r.req.WindowEvery offset := r.req.Offset cursor, err := r.arrayCursors.createCursor(seriesRow) + // If the createCursor interface method fails, it will + // always return a nil cursor. if err != nil { return nil, err } diff --git a/v1/services/storage/store.go b/v1/services/storage/store.go index 9da8220aff9..dccc0d36333 100644 --- a/v1/services/storage/store.go +++ b/v1/services/storage/store.go @@ -256,15 +256,15 @@ func (s *Store) tagKeysWithFieldPredicate(ctx context.Context, mqAttrs *metaquer m := make(map[string]struct{}) rs := reads.NewFilteredResultSet(ctx, mqAttrs.start, mqAttrs.end, cur) for rs.Next() { - func() { + if err := func() error { c, err := rs.Cursor() if err != nil { s.Logger.Error("failed to get next cursor during iteration", zap.Error(err)) - return + return err } if c == nil { // no data for series key + field combination - return + return nil } defer c.Close() if cursorHasData(c) { @@ -273,7 +273,10 @@ func (s *Store) tagKeysWithFieldPredicate(ctx context.Context, mqAttrs *metaquer m[string(tags[i].Key)] = struct{}{} } } - }() + return nil + }(); err != nil { + return nil, err + } } arr := make([]string, 0, len(m)) @@ -626,11 +629,11 @@ func (s *Store) tagValuesSlow(ctx context.Context, mqAttrs *metaqueryAttributes, rs := reads.NewFilteredResultSet(ctx, mqAttrs.start, mqAttrs.end, cur) for rs.Next() { - func() { + if err := func() error { c, err := rs.Cursor() if err != nil { s.Logger.Error("failed to get next cursor during iteration", zap.Error(err)) - return + return err } if c == nil { // no data for series key + field combination? @@ -638,7 +641,7 @@ func (s *Store) tagValuesSlow(ctx context.Context, mqAttrs *metaqueryAttributes, // combo that the cursor may be not nil. We need to // request invoke an array cursor to be sure. // This is the reason for the call to cursorHasData below. - return + return nil } defer c.Close() @@ -646,7 +649,10 @@ func (s *Store) tagValuesSlow(ctx context.Context, mqAttrs *metaqueryAttributes, f := rs.Tags().Get([]byte(tagKey)) m[string(f)] = struct{}{} } - }() + return nil + }(); err != nil { + return nil, err + } } names := make([]string, 0, len(m)) @@ -771,15 +777,15 @@ func (s *Store) seriesCardinalityWithPredicateAndTime(ctx context.Context, shard buf := make([]byte, 1024) rs := reads.NewFilteredResultSet(ctx, start, end, cur) for rs.Next() { - func() { + if err := func() error { c, err := rs.Cursor() if err != nil { s.Logger.Error("failed to get next cursor during iteration", zap.Error(err)) - return + return err } if c == nil { // no data for series key + field combination - return + return nil } defer c.Close() @@ -788,7 +794,10 @@ func (s *Store) seriesCardinalityWithPredicateAndTime(ctx context.Context, shard skey := sfile.SeriesID(r.Name, r.SeriesTags, buf) ss.Add(skey) } - }() + return nil + }(); err != nil { + return nil, err + } } return ss, nil From 6366b019f1a88b460544cfe1ddb7cff2a9b6261d Mon Sep 17 00:00:00 2001 From: devanbenz Date: Tue, 25 Mar 2025 14:04:32 -0500 Subject: [PATCH 12/14] feat: Working on adding tests --- storage/reads/group_resultset_test.go | 32 +++++++++++++++++++++++++++ storage/reads/store_test.go | 1 + 2 files changed, 33 insertions(+) diff --git a/storage/reads/group_resultset_test.go b/storage/reads/group_resultset_test.go index e3e8b037134..81581e572c4 100644 --- a/storage/reads/group_resultset_test.go +++ b/storage/reads/group_resultset_test.go @@ -280,6 +280,38 @@ group: } } +func TestNewGroupResultSet_ShouldFail_BadGroupType(t *testing.T) { + cur := &sliceSeriesCursor{ + rows: newSeriesRows( + "aaa,tag0=val00", + "aaa,tag0=val01", + "cpu,tag0=val00,tag1=val10", + "cpu,tag0=val00,tag1=val11", + "cpu,tag0=val00,tag1=val12", + "mem,tag1=val10,tag2=val20", + "mem,tag1=val11,tag2=val20", + "mem,tag1=val11,tag2=val21", + )} + + newCursor := func() (reads.SeriesCursor, error) { + return cur, nil + } + + var hints datatypes.HintFlags + hints.SetHintSchemaAllTime() + _, err := reads.NewGroupResultSet(context.Background(), &datatypes.ReadGroupRequest{ + // Group is of an int type but really is a proto enum. + // This should never be anything other than ReadGroupRequest_GroupNone + // or ReadGroupRequest_GroupBy. This test is here to only ensure that if + // the off chance something other than these two enums gets passed the server + // does not panic. + Group: 3, + GroupKeys: []string{"tag0", "tag2"}, + Hints: uint32(hints), + }, newCursor) + require.Error(t, err, "group result set creation should error") +} + func TestNewGroupResultSet_GroupNone_NoDataReturnsNil(t *testing.T) { newCursor := func() (reads.SeriesCursor, error) { return &sliceSeriesCursor{ diff --git a/storage/reads/store_test.go b/storage/reads/store_test.go index b99911af223..a05953980a7 100644 --- a/storage/reads/store_test.go +++ b/storage/reads/store_test.go @@ -119,6 +119,7 @@ func resultSetToString(wr io.Writer, rs reads.ResultSet, opts ...optionFn) { for rs.Next() { fmt.Fprint(wr, "series: ") tagsToString(wr, rs.Tags()) + // Cursor error is not used within these tests cur, _ := rs.Cursor() if po.SkipNilCursor && cur == nil { From 6a701e8351b765a7878c70815c2095da1d2849ee Mon Sep 17 00:00:00 2001 From: devanbenz Date: Wed, 26 Mar 2025 15:25:25 -0500 Subject: [PATCH 13/14] feat: cursor is always nil, don't use error --- storage/reads/store_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/storage/reads/store_test.go b/storage/reads/store_test.go index a05953980a7..190f3e026ba 100644 --- a/storage/reads/store_test.go +++ b/storage/reads/store_test.go @@ -119,9 +119,9 @@ func resultSetToString(wr io.Writer, rs reads.ResultSet, opts ...optionFn) { for rs.Next() { fmt.Fprint(wr, "series: ") tagsToString(wr, rs.Tags()) - // Cursor error is not used within these tests + // Tests withint group_resultset_test that call `resultSetToString` always have a nil cursor + // the error here will not be printed within the tests. cur, _ := rs.Cursor() - if po.SkipNilCursor && cur == nil { continue } From dc372952bc272625f4d317ab001d114526598e89 Mon Sep 17 00:00:00 2001 From: devanbenz Date: Wed, 26 Mar 2025 15:45:36 -0500 Subject: [PATCH 14/14] feat: removing more panics from cursor switches --- storage/flux/reader.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/storage/flux/reader.go b/storage/flux/reader.go index 64da6b8f37e..d0fd2437bb7 100644 --- a/storage/flux/reader.go +++ b/storage/flux/reader.go @@ -223,7 +223,7 @@ READ: cols, defs := determineTableColsForSeries(rs.Tags(), flux.TString) table = newStringTable(done, typedCur, bnds, key, cols, rs.Tags(), defs, fi.cache, fi.alloc) default: - panic(fmt.Sprintf("unreachable: %T", typedCur)) + return fmt.Errorf("unexpected cursor type: %T", typedCur) } cur = nil @@ -371,7 +371,7 @@ READ: cols, defs := determineTableColsForGroup(gc.Keys(), flux.TString, gc.Aggregate(), key) table = newStringGroupTable(done, gc, typedCur, bnds, key, cols, gc.Tags(), defs, gi.cache, gi.alloc) default: - panic(fmt.Sprintf("unreachable: %T", typedCur)) + return fmt.Errorf("unexpected cursor type: %T", typedCur) } // table owns these resources and is responsible for closing them @@ -837,7 +837,7 @@ READ: table = newStringWindowSelectorTable(done, typedCur, bnds, window, timeColumn, key, cols, rs.Tags(), defs, wai.cache, wai.alloc) } default: - panic(fmt.Sprintf("unreachable: %T", typedCur)) + return fmt.Errorf("unexpected cursor type: %T", typedCur) } cur = nil