diff --git a/mock/reads_resultset.go b/mock/reads_resultset.go index 0e32a9eaafc..aa8c4708c55 100644 --- a/mock/reads_resultset.go +++ b/mock/reads_resultset.go @@ -1,6 +1,8 @@ package mock import ( + "fmt" + "github.com/influxdata/influxdb/v2/models" "github.com/influxdata/influxdb/v2/pkg/data/gen" "github.com/influxdata/influxdb/v2/storage/reads" @@ -61,7 +63,7 @@ func (g *GeneratorResultSet) Next() bool { return g.sg.Next() && (g.max == 0 || remain > 0) } -func (g *GeneratorResultSet) Cursor() cursors.Cursor { +func (g *GeneratorResultSet) Cursor() (cursors.Cursor, error) { switch g.sg.FieldType() { case models.Float: g.f.tv = g.sg.TimeValuesGenerator() @@ -79,10 +81,10 @@ func (g *GeneratorResultSet) Cursor() cursors.Cursor { g.b.tv = g.sg.TimeValuesGenerator() g.cur = &g.b default: - panic("unreachable") + return nil, fmt.Errorf("unsupported field type: %v", g.sg.FieldType()) } - return g.cur + return g.cur, nil } func copyTags(dst, src models.Tags) models.Tags { diff --git a/storage/flux/reader.go b/storage/flux/reader.go index e7eeaec6b5c..d0fd2437bb7 100644 --- a/storage/flux/reader.go +++ b/storage/flux/reader.go @@ -194,7 +194,10 @@ func (fi *filterIterator) handleRead(f func(flux.Table) error, rs storage.Result READ: for rs.Next() { - cur = rs.Cursor() + cur, err := rs.Cursor() + if err != nil { + return err + } if cur == nil { // no data for series key + field combination continue @@ -220,7 +223,7 @@ READ: cols, defs := determineTableColsForSeries(rs.Tags(), flux.TString) table = newStringTable(done, typedCur, bnds, key, cols, rs.Tags(), defs, fi.cache, fi.alloc) default: - panic(fmt.Sprintf("unreachable: %T", typedCur)) + return fmt.Errorf("unexpected cursor type: %T", typedCur) } cur = nil @@ -306,6 +309,8 @@ func (gi *groupIterator) Do(f func(flux.Table) error) error { } func (gi *groupIterator) handleRead(f func(flux.Table) error, rs storage.GroupResultSet) error { + // error declaration to be used with our cursor iterations below + var err error // these resources must be closed if not nil on return var ( gc storage.GroupCursor @@ -331,7 +336,10 @@ func (gi *groupIterator) handleRead(f func(flux.Table) error, rs storage.GroupRe READ: for gc != nil { for gc.Next() { - cur = gc.Cursor() + cur, err = gc.Cursor() + if err != nil { + return err + } if cur != nil { break } @@ -363,7 +371,7 @@ READ: cols, defs := determineTableColsForGroup(gc.Keys(), flux.TString, gc.Aggregate(), key) table = newStringGroupTable(done, gc, typedCur, bnds, key, cols, gc.Tags(), defs, gi.cache, gi.alloc) default: - panic(fmt.Sprintf("unreachable: %T", typedCur)) + return fmt.Errorf("unexpected cursor type: %T", typedCur) } // table owns these resources and is responsible for closing them @@ -740,7 +748,10 @@ func (wai *windowAggregateIterator) handleRead(f func(flux.Table) error, rs stor READ: for rs.Next() { - cur = rs.Cursor() + cur, err = rs.Cursor() + if err != nil { + return err + } if cur == nil { // no data for series key + field combination continue @@ -826,7 +837,7 @@ READ: table = newStringWindowSelectorTable(done, typedCur, bnds, window, timeColumn, key, cols, rs.Tags(), defs, wai.cache, wai.alloc) } default: - panic(fmt.Sprintf("unreachable: %T", typedCur)) + return fmt.Errorf("unexpected cursor type: %T", typedCur) } cur = nil diff --git a/storage/flux/table.gen.go b/storage/flux/table.gen.go index 7e2ab6deb46..c7cfd16f443 100644 --- a/storage/flux/table.gen.go +++ b/storage/flux/table.gen.go @@ -970,7 +970,19 @@ func (t *floatGroupTable) advanceCursor() bool { t.cur.Close() t.cur = nil for t.gc.Next() { - cur := t.gc.Cursor() + cur, err := t.gc.Cursor() + if err != nil { + cur.Close() + t.err = &errors.Error{ + Code: errors.EInvalid, + Err: &GroupCursorError{ + typ: "float", + cursor: cur, + }, + } + return false + } + if cur == nil { continue } @@ -1954,7 +1966,19 @@ func (t *integerGroupTable) advanceCursor() bool { t.cur.Close() t.cur = nil for t.gc.Next() { - cur := t.gc.Cursor() + cur, err := t.gc.Cursor() + if err != nil { + cur.Close() + t.err = &errors.Error{ + Code: errors.EInvalid, + Err: &GroupCursorError{ + typ: "integer", + cursor: cur, + }, + } + return false + } + if cur == nil { continue } @@ -2935,7 +2959,19 @@ func (t *unsignedGroupTable) advanceCursor() bool { t.cur.Close() t.cur = nil for t.gc.Next() { - cur := t.gc.Cursor() + cur, err := t.gc.Cursor() + if err != nil { + cur.Close() + t.err = &errors.Error{ + Code: errors.EInvalid, + Err: &GroupCursorError{ + typ: "unsigned", + cursor: cur, + }, + } + return false + } + if cur == nil { continue } @@ -3860,7 +3896,19 @@ func (t *stringGroupTable) advanceCursor() bool { t.cur.Close() t.cur = nil for t.gc.Next() { - cur := t.gc.Cursor() + cur, err := t.gc.Cursor() + if err != nil { + cur.Close() + t.err = &errors.Error{ + Code: errors.EInvalid, + Err: &GroupCursorError{ + typ: "string", + cursor: cur, + }, + } + return false + } + if cur == nil { continue } @@ -4785,7 +4833,19 @@ func (t *booleanGroupTable) advanceCursor() bool { t.cur.Close() t.cur = nil for t.gc.Next() { - cur := t.gc.Cursor() + cur, err := t.gc.Cursor() + if err != nil { + cur.Close() + t.err = &errors.Error{ + Code: errors.EInvalid, + Err: &GroupCursorError{ + typ: "boolean", + cursor: cur, + }, + } + return false + } + if cur == nil { continue } diff --git a/storage/flux/table.gen.go.tmpl b/storage/flux/table.gen.go.tmpl index a55116fb84c..6ee23f48e93 100644 --- a/storage/flux/table.gen.go.tmpl +++ b/storage/flux/table.gen.go.tmpl @@ -1000,7 +1000,19 @@ func (t *{{.name}}GroupTable) advanceCursor() bool { t.cur.Close() t.cur = nil for t.gc.Next() { - cur := t.gc.Cursor() + cur, err := t.gc.Cursor() + if err != nil { + cur.Close() + t.err = &errors.Error { + Code: errors.EInvalid, + Err: &GroupCursorError { + typ: "{{.name}}", + cursor: cur, + }, + } + return false + } + if cur == nil { continue } diff --git a/storage/reads/aggregate_resultset.go b/storage/reads/aggregate_resultset.go index 1d41539457d..ea565150287 100644 --- a/storage/reads/aggregate_resultset.go +++ b/storage/reads/aggregate_resultset.go @@ -97,7 +97,12 @@ func (r *windowAggregateResultSet) createCursor(seriesRow SeriesRow) (cursors.Cu agg := r.req.Aggregate[0] every := r.req.WindowEvery offset := r.req.Offset - cursor := r.arrayCursors.createCursor(seriesRow) + cursor, err := r.arrayCursors.createCursor(seriesRow) + // If the createCursor interface method fails, it will + // always return a nil cursor. + if err != nil { + return nil, err + } var everyDur values.Duration var offsetDur values.Duration @@ -132,8 +137,8 @@ func (r *windowAggregateResultSet) createCursor(seriesRow SeriesRow) (cursors.Cu } } -func (r *windowAggregateResultSet) Cursor() cursors.Cursor { - return r.cursor +func (r *windowAggregateResultSet) Cursor() (cursors.Cursor, error) { + return r.cursor, r.err } func (r *windowAggregateResultSet) Close() { diff --git a/storage/reads/aggregate_resultset_test.go b/storage/reads/aggregate_resultset_test.go index ac86137eef4..59a33bdc0dc 100644 --- a/storage/reads/aggregate_resultset_test.go +++ b/storage/reads/aggregate_resultset_test.go @@ -2,6 +2,7 @@ package reads_test import ( "context" + "github.com/stretchr/testify/require" "reflect" "testing" "time" @@ -196,7 +197,8 @@ func TestNewWindowAggregateResultSet_Mean(t *testing.T) { if !resultSet.Next() { t.Fatalf("unexpected: resultSet could not advance") } - cursor := resultSet.Cursor() + cursor, err := resultSet.Cursor() + require.NoError(t, err, "create cursor failed") if cursor == nil { t.Fatalf("unexpected: cursor was nil") } @@ -238,7 +240,8 @@ func TestNewWindowAggregateResultSet_Months(t *testing.T) { if !resultSet.Next() { t.Fatalf("unexpected: resultSet could not advance") } - cursor := resultSet.Cursor() + cursor, err := resultSet.Cursor() + require.NoError(t, err, "create cursor failed") if cursor == nil { t.Fatalf("unexpected: cursor was nil") } diff --git a/storage/reads/array_cursor.gen.go b/storage/reads/array_cursor.gen.go index 50bfe8d3ab5..58c02486430 100644 --- a/storage/reads/array_cursor.gen.go +++ b/storage/reads/array_cursor.gen.go @@ -24,101 +24,113 @@ const ( MaxPointsPerBlock = 1000 ) -func newLimitArrayCursor(cur cursors.Cursor) cursors.Cursor { +func newLimitArrayCursor(cur cursors.Cursor) (cursors.Cursor, error) { switch cur := cur.(type) { case cursors.FloatArrayCursor: - return newFloatLimitArrayCursor(cur) + return newFloatLimitArrayCursor(cur), nil case cursors.IntegerArrayCursor: - return newIntegerLimitArrayCursor(cur) + return newIntegerLimitArrayCursor(cur), nil case cursors.UnsignedArrayCursor: - return newUnsignedLimitArrayCursor(cur) + return newUnsignedLimitArrayCursor(cur), nil case cursors.StringArrayCursor: - return newStringLimitArrayCursor(cur) + return newStringLimitArrayCursor(cur), nil case cursors.BooleanArrayCursor: - return newBooleanLimitArrayCursor(cur) + return newBooleanLimitArrayCursor(cur), nil default: - panic(fmt.Sprintf("unreachable: %T", cur)) + return nil, &errors2.Error{ + Code: errors2.EInvalid, + Msg: fmt.Sprintf("unsupported limit array cursor type: %s", arrayCursorType(cur)), + } } } -func newWindowFirstArrayCursor(cur cursors.Cursor, window interval.Window) cursors.Cursor { +func newWindowFirstArrayCursor(cur cursors.Cursor, window interval.Window) (cursors.Cursor, error) { if window.IsZero() { return newLimitArrayCursor(cur) } switch cur := cur.(type) { case cursors.FloatArrayCursor: - return newFloatWindowFirstArrayCursor(cur, window) + return newFloatWindowFirstArrayCursor(cur, window), nil case cursors.IntegerArrayCursor: - return newIntegerWindowFirstArrayCursor(cur, window) + return newIntegerWindowFirstArrayCursor(cur, window), nil case cursors.UnsignedArrayCursor: - return newUnsignedWindowFirstArrayCursor(cur, window) + return newUnsignedWindowFirstArrayCursor(cur, window), nil case cursors.StringArrayCursor: - return newStringWindowFirstArrayCursor(cur, window) + return newStringWindowFirstArrayCursor(cur, window), nil case cursors.BooleanArrayCursor: - return newBooleanWindowFirstArrayCursor(cur, window) + return newBooleanWindowFirstArrayCursor(cur, window), nil default: - panic(fmt.Sprintf("unreachable: %T", cur)) + return nil, &errors2.Error{ + Code: errors2.EInvalid, + Msg: fmt.Sprintf("unsupported window first cursor type: %s", arrayCursorType(cur)), + } } } -func newWindowLastArrayCursor(cur cursors.Cursor, window interval.Window) cursors.Cursor { +func newWindowLastArrayCursor(cur cursors.Cursor, window interval.Window) (cursors.Cursor, error) { if window.IsZero() { return newLimitArrayCursor(cur) } switch cur := cur.(type) { case cursors.FloatArrayCursor: - return newFloatWindowLastArrayCursor(cur, window) + return newFloatWindowLastArrayCursor(cur, window), nil case cursors.IntegerArrayCursor: - return newIntegerWindowLastArrayCursor(cur, window) + return newIntegerWindowLastArrayCursor(cur, window), nil case cursors.UnsignedArrayCursor: - return newUnsignedWindowLastArrayCursor(cur, window) + return newUnsignedWindowLastArrayCursor(cur, window), nil case cursors.StringArrayCursor: - return newStringWindowLastArrayCursor(cur, window) + return newStringWindowLastArrayCursor(cur, window), nil case cursors.BooleanArrayCursor: - return newBooleanWindowLastArrayCursor(cur, window) + return newBooleanWindowLastArrayCursor(cur, window), nil default: - panic(fmt.Sprintf("unreachable: %T", cur)) + return nil, &errors2.Error{ + Code: errors2.EInvalid, + Msg: fmt.Sprintf("unsupported window last cursor type: %s", arrayCursorType(cur)), + } } } -func newWindowCountArrayCursor(cur cursors.Cursor, window interval.Window) cursors.Cursor { +func newWindowCountArrayCursor(cur cursors.Cursor, window interval.Window) (cursors.Cursor, error) { switch cur := cur.(type) { case cursors.FloatArrayCursor: - return newFloatWindowCountArrayCursor(cur, window) + return newFloatWindowCountArrayCursor(cur, window), nil case cursors.IntegerArrayCursor: - return newIntegerWindowCountArrayCursor(cur, window) + return newIntegerWindowCountArrayCursor(cur, window), nil case cursors.UnsignedArrayCursor: - return newUnsignedWindowCountArrayCursor(cur, window) + return newUnsignedWindowCountArrayCursor(cur, window), nil case cursors.StringArrayCursor: - return newStringWindowCountArrayCursor(cur, window) + return newStringWindowCountArrayCursor(cur, window), nil case cursors.BooleanArrayCursor: - return newBooleanWindowCountArrayCursor(cur, window) + return newBooleanWindowCountArrayCursor(cur, window), nil default: - panic(fmt.Sprintf("unreachable: %T", cur)) + return nil, &errors2.Error{ + Code: errors2.EInvalid, + Msg: fmt.Sprintf("unsupported window count cursor type: %s", arrayCursorType(cur)), + } } } @@ -142,37 +154,43 @@ func newWindowSumArrayCursor(cur cursors.Cursor, window interval.Window) (cursor } } -func newWindowMinArrayCursor(cur cursors.Cursor, window interval.Window) cursors.Cursor { +func newWindowMinArrayCursor(cur cursors.Cursor, window interval.Window) (cursors.Cursor, error) { switch cur := cur.(type) { case cursors.FloatArrayCursor: - return newFloatWindowMinArrayCursor(cur, window) + return newFloatWindowMinArrayCursor(cur, window), nil case cursors.IntegerArrayCursor: - return newIntegerWindowMinArrayCursor(cur, window) + return newIntegerWindowMinArrayCursor(cur, window), nil case cursors.UnsignedArrayCursor: - return newUnsignedWindowMinArrayCursor(cur, window) + return newUnsignedWindowMinArrayCursor(cur, window), nil default: - panic(fmt.Sprintf("unsupported for aggregate min: %T", cur)) + return nil, &errors2.Error{ + Code: errors2.EInvalid, + Msg: fmt.Sprintf("unsupported for aggregate min: %s", arrayCursorType(cur)), + } } } -func newWindowMaxArrayCursor(cur cursors.Cursor, window interval.Window) cursors.Cursor { +func newWindowMaxArrayCursor(cur cursors.Cursor, window interval.Window) (cursors.Cursor, error) { switch cur := cur.(type) { case cursors.FloatArrayCursor: - return newFloatWindowMaxArrayCursor(cur, window) + return newFloatWindowMaxArrayCursor(cur, window), nil case cursors.IntegerArrayCursor: - return newIntegerWindowMaxArrayCursor(cur, window) + return newIntegerWindowMaxArrayCursor(cur, window), nil case cursors.UnsignedArrayCursor: - return newUnsignedWindowMaxArrayCursor(cur, window) + return newUnsignedWindowMaxArrayCursor(cur, window), nil default: - panic(fmt.Sprintf("unsupported for aggregate max: %T", cur)) + return nil, &errors2.Error{ + Code: errors2.EInvalid, + Msg: fmt.Sprintf("unsupported for aggregate max: %s", arrayCursorType(cur)), + } } } diff --git a/storage/reads/array_cursor.gen.go.tmpl b/storage/reads/array_cursor.gen.go.tmpl index 57cee9c6cb3..4d343013703 100644 --- a/storage/reads/array_cursor.gen.go.tmpl +++ b/storage/reads/array_cursor.gen.go.tmpl @@ -18,53 +18,65 @@ const ( MaxPointsPerBlock = 1000 ) -func newLimitArrayCursor(cur cursors.Cursor) cursors.Cursor { +func newLimitArrayCursor(cur cursors.Cursor) (cursors.Cursor, error) { switch cur := cur.(type) { {{range .}}{{/* every type supports limit */}} case cursors.{{.Name}}ArrayCursor: - return new{{.Name}}LimitArrayCursor(cur) + return new{{.Name}}LimitArrayCursor(cur), nil {{end}} default: - panic(fmt.Sprintf("unreachable: %T", cur)) + return nil, &errors2.Error{ + Code: errors2.EInvalid, + Msg: fmt.Sprintf("unsupported limit array cursor type: %s", arrayCursorType(cur)), + } } } -func newWindowFirstArrayCursor(cur cursors.Cursor, window interval.Window) cursors.Cursor { +func newWindowFirstArrayCursor(cur cursors.Cursor, window interval.Window) (cursors.Cursor, error) { if window.IsZero() { return newLimitArrayCursor(cur) } switch cur := cur.(type) { {{range .}}{{/* every type supports first */}} case cursors.{{.Name}}ArrayCursor: - return new{{.Name}}WindowFirstArrayCursor(cur, window) + return new{{.Name}}WindowFirstArrayCursor(cur, window), nil {{end}} default: - panic(fmt.Sprintf("unreachable: %T", cur)) + return nil, &errors2.Error{ + Code: errors2.EInvalid, + Msg: fmt.Sprintf("unsupported window first cursor type: %s", arrayCursorType(cur)), + } } } -func newWindowLastArrayCursor(cur cursors.Cursor, window interval.Window) cursors.Cursor { +func newWindowLastArrayCursor(cur cursors.Cursor, window interval.Window) (cursors.Cursor, error) { if window.IsZero() { return newLimitArrayCursor(cur) } switch cur := cur.(type) { {{range .}}{{/* every type supports last */}} case cursors.{{.Name}}ArrayCursor: - return new{{.Name}}WindowLastArrayCursor(cur, window) + return new{{.Name}}WindowLastArrayCursor(cur, window), nil {{end}} default: - panic(fmt.Sprintf("unreachable: %T", cur)) + return nil, &errors2.Error{ + Code: errors2.EInvalid, + Msg: fmt.Sprintf("unsupported window last cursor type: %s", arrayCursorType(cur)), + } } } -func newWindowCountArrayCursor(cur cursors.Cursor, window interval.Window) cursors.Cursor { +func newWindowCountArrayCursor(cur cursors.Cursor, window interval.Window) (cursors.Cursor, error) { switch cur := cur.(type) { {{range .}}{{/* every type supports count */}} case cursors.{{.Name}}ArrayCursor: - return new{{.Name}}WindowCountArrayCursor(cur, window) + return new{{.Name}}WindowCountArrayCursor(cur, window), nil {{end}} default: - panic(fmt.Sprintf("unreachable: %T", cur)) + return nil, &errors2.Error{ + Code: errors2.EInvalid, + Msg: fmt.Sprintf("unsupported window count cursor type: %s", arrayCursorType(cur)), + } } } @@ -87,35 +99,41 @@ func newWindowSumArrayCursor(cur cursors.Cursor, window interval.Window) (cursor } } -func newWindowMinArrayCursor(cur cursors.Cursor, window interval.Window) cursors.Cursor { +func newWindowMinArrayCursor(cur cursors.Cursor, window interval.Window) (cursors.Cursor, error) { switch cur := cur.(type) { {{range .}} {{$Type := .Name}} {{range .Aggs}} {{if eq .Name "Min"}} case cursors.{{$Type}}ArrayCursor: - return new{{$Type}}WindowMinArrayCursor(cur, window) + return new{{$Type}}WindowMinArrayCursor(cur, window), nil {{end}} {{end}}{{/* for each supported agg fn */}} {{end}}{{/* for each field type */}} default: - panic(fmt.Sprintf("unsupported for aggregate min: %T", cur)) + return nil, &errors2.Error{ + Code: errors2.EInvalid, + Msg: fmt.Sprintf("unsupported for aggregate min: %s", arrayCursorType(cur)), + } } } -func newWindowMaxArrayCursor(cur cursors.Cursor, window interval.Window) cursors.Cursor { +func newWindowMaxArrayCursor(cur cursors.Cursor, window interval.Window) (cursors.Cursor, error) { switch cur := cur.(type) { {{range .}} {{$Type := .Name}} {{range .Aggs}} {{if eq .Name "Max"}} case cursors.{{$Type}}ArrayCursor: - return new{{$Type}}WindowMaxArrayCursor(cur, window) + return new{{$Type}}WindowMaxArrayCursor(cur, window), nil {{end}} {{end}}{{/* for each supported agg fn */}} {{end}}{{/* for each field type */}} default: - panic(fmt.Sprintf("unsupported for aggregate max: %T", cur)) + return nil, &errors2.Error{ + Code: errors2.EInvalid, + Msg: fmt.Sprintf("unsupported for aggregate max: %s", arrayCursorType(cur)), + } } } diff --git a/storage/reads/array_cursor.go b/storage/reads/array_cursor.go index 215aab54dba..47d6975a173 100644 --- a/storage/reads/array_cursor.go +++ b/storage/reads/array_cursor.go @@ -4,6 +4,8 @@ import ( "context" "fmt" + errors2 "github.com/influxdata/influxdb/v2/kit/platform/errors" + "github.com/influxdata/flux/interval" "github.com/influxdata/influxdb/v2/storage/reads/datatypes" "github.com/influxdata/influxdb/v2/tsdb/cursors" @@ -20,7 +22,7 @@ func (v *singleValue) Value(key string) (interface{}, bool) { func newAggregateArrayCursor(ctx context.Context, agg *datatypes.Aggregate, cursor cursors.Cursor) (cursors.Cursor, error) { switch agg.Type { case datatypes.Aggregate_AggregateTypeFirst, datatypes.Aggregate_AggregateTypeLast: - return newLimitArrayCursor(cursor), nil + return newLimitArrayCursor(cursor) } return newWindowAggregateArrayCursor(ctx, agg, interval.Window{}, cursor) } @@ -32,22 +34,24 @@ func newWindowAggregateArrayCursor(ctx context.Context, agg *datatypes.Aggregate switch agg.Type { case datatypes.Aggregate_AggregateTypeCount: - return newWindowCountArrayCursor(cursor, window), nil + return newWindowCountArrayCursor(cursor, window) case datatypes.Aggregate_AggregateTypeSum: return newWindowSumArrayCursor(cursor, window) case datatypes.Aggregate_AggregateTypeFirst: - return newWindowFirstArrayCursor(cursor, window), nil + return newWindowFirstArrayCursor(cursor, window) case datatypes.Aggregate_AggregateTypeLast: - return newWindowLastArrayCursor(cursor, window), nil + return newWindowLastArrayCursor(cursor, window) case datatypes.Aggregate_AggregateTypeMin: - return newWindowMinArrayCursor(cursor, window), nil + return newWindowMinArrayCursor(cursor, window) case datatypes.Aggregate_AggregateTypeMax: - return newWindowMaxArrayCursor(cursor, window), nil + return newWindowMaxArrayCursor(cursor, window) case datatypes.Aggregate_AggregateTypeMean: return newWindowMeanArrayCursor(cursor, window) default: - // TODO(sgc): should be validated higher up - panic("invalid aggregate") + return nil, &errors2.Error{ + Code: errors2.EInvalid, + Msg: fmt.Sprintf("unsupported window aggregate cursor: %s", agg.Type), + } } } @@ -101,7 +105,7 @@ func newMultiShardArrayCursors(ctx context.Context, start, end int64, asc bool) return m } -func (m *multiShardArrayCursors) createCursor(row SeriesRow) cursors.Cursor { +func (m *multiShardArrayCursors) createCursor(row SeriesRow) (cursors.Cursor, error) { m.req.Name = row.Name m.req.Tags = row.SeriesTags m.req.Field = row.Field @@ -120,26 +124,29 @@ func (m *multiShardArrayCursors) createCursor(row SeriesRow) cursors.Cursor { } if cur == nil || err != nil { - return nil + return nil, err } switch c := cur.(type) { case cursors.IntegerArrayCursor: m.cursors.i.reset(c, row.Query, cond) - return &m.cursors.i + return &m.cursors.i, nil case cursors.FloatArrayCursor: m.cursors.f.reset(c, row.Query, cond) - return &m.cursors.f + return &m.cursors.f, nil case cursors.UnsignedArrayCursor: m.cursors.u.reset(c, row.Query, cond) - return &m.cursors.u + return &m.cursors.u, nil case cursors.StringArrayCursor: m.cursors.s.reset(c, row.Query, cond) - return &m.cursors.s + return &m.cursors.s, nil case cursors.BooleanArrayCursor: m.cursors.b.reset(c, row.Query, cond) - return &m.cursors.b + return &m.cursors.b, nil default: - panic(fmt.Sprintf("unreachable: %T", cur)) + return nil, &errors2.Error{ + Code: errors2.EInvalid, + Msg: fmt.Sprintf("unsupported cursor type: %s", arrayCursorType(cur)), + } } } diff --git a/storage/reads/array_cursor_test.go b/storage/reads/array_cursor_test.go index 3f4389f94e9..ceea5e43bb7 100644 --- a/storage/reads/array_cursor_test.go +++ b/storage/reads/array_cursor_test.go @@ -2227,7 +2227,9 @@ func TestMultiShardArrayCursor(t *testing.T) { row := SeriesRow{Query: iter} ctx := context.Background() msac := newMultiShardArrayCursors(ctx, models.MinNanoTime, models.MaxNanoTime, true) - cur, ok := msac.createCursor(row).(cursors.IntegerArrayCursor) + cursor, err := msac.createCursor(row) + require.NoError(t, err, "create cursor failed") + cur, ok := cursor.(cursors.IntegerArrayCursor) require.Truef(t, ok, "Expected IntegerArrayCursor") ia := cur.Next() diff --git a/storage/reads/group_resultset.go b/storage/reads/group_resultset.go index 04cbfa3c7fe..ed5ca7e8d48 100644 --- a/storage/reads/group_resultset.go +++ b/storage/reads/group_resultset.go @@ -47,7 +47,7 @@ func IsLastDescendingGroupOptimization(req *datatypes.ReadGroupRequest) bool { return req.Aggregate != nil && req.Aggregate.Type == datatypes.Aggregate_AggregateTypeLast } -func NewGroupResultSet(ctx context.Context, req *datatypes.ReadGroupRequest, newSeriesCursorFn func() (SeriesCursor, error), opts ...GroupOption) GroupResultSet { +func NewGroupResultSet(ctx context.Context, req *datatypes.ReadGroupRequest, newSeriesCursorFn func() (SeriesCursor, error), opts ...GroupOption) (GroupResultSet, error) { g := &groupResultSet{ ctx: ctx, req: req, @@ -79,21 +79,21 @@ func NewGroupResultSet(ctx context.Context, req *datatypes.ReadGroupRequest, new } if n, err := g.groupBySort(); n == 0 || err != nil { - return nil + return nil, err } case datatypes.ReadGroupRequest_GroupNone: g.nextGroupFn = groupNoneNextGroup if n, err := g.groupNoneSort(); n == 0 || err != nil { - return nil + return nil, err } default: - panic("not implemented") + return nil, fmt.Errorf("unknown group type: %s", req.Group) } - return g + return g, nil } // NilSort values determine the lexicographical order of nil values in the @@ -119,9 +119,12 @@ func (g *groupResultSet) Next() GroupCursor { // seriesHasPoints reads the first block of TSM data to verify the series has points for // the time range of the query. -func (g *groupResultSet) seriesHasPoints(row *SeriesRow) bool { +func (g *groupResultSet) seriesHasPoints(row *SeriesRow) (bool, error) { // TODO(sgc): this is expensive. Storage engine must provide efficient time range queries of series keys. - cur := g.arrayCursors.createCursor(*row) + cur, err := g.arrayCursors.createCursor(*row) + if err != nil { + return false, err + } var ts []int64 switch c := cur.(type) { case cursors.IntegerArrayCursor: @@ -140,12 +143,12 @@ func (g *groupResultSet) seriesHasPoints(row *SeriesRow) bool { a := c.Next() ts = a.Timestamps case nil: - return false + return false, nil default: - panic(fmt.Sprintf("unreachable: %T", c)) + return false, fmt.Errorf("unexpected cursor type: %s", arrayCursorType(c)) } cur.Close() - return len(ts) > 0 + return len(ts) > 0, nil } func groupNoneNextGroup(g *groupResultSet) GroupCursor { @@ -180,7 +183,11 @@ func (g *groupResultSet) groupNoneSort() (int, error) { n := 0 seriesRow := seriesCursor.Next() for seriesRow != nil { - if allTime || g.seriesHasPoints(seriesRow) { + hasPoints, err := g.seriesHasPoints(seriesRow) + if err != nil { + return 0, err + } + if allTime || hasPoints { n++ g.km.MergeTagKeys(seriesRow.Tags) } @@ -231,7 +238,11 @@ func (g *groupResultSet) groupBySort() (int, error) { seriesRow := seriesCursor.Next() for seriesRow != nil { - if allTime || g.seriesHasPoints(seriesRow) { + hasPoints, err := g.seriesHasPoints(seriesRow) + if err != nil { + return 0, err + } + if allTime || hasPoints { nr := *seriesRow nr.SeriesTags = tagsBuf.copyTags(nr.SeriesTags) nr.Tags = tagsBuf.copyTags(nr.Tags) @@ -302,15 +313,18 @@ func (c *groupNoneCursor) Next() bool { } func (c *groupNoneCursor) createCursor(seriesRow SeriesRow) (cur cursors.Cursor, err error) { - cur = c.arrayCursors.createCursor(c.row) + cur, err = c.arrayCursors.createCursor(c.row) + if err != nil { + return nil, err + } if c.agg != nil { cur, err = newAggregateArrayCursor(c.ctx, c.agg, cur) } return cur, err } -func (c *groupNoneCursor) Cursor() cursors.Cursor { - return c.cursor +func (c *groupNoneCursor) Cursor() (cursors.Cursor, error) { + return c.cursor, c.err } type groupByCursor struct { @@ -350,15 +364,18 @@ func (c *groupByCursor) Next() bool { } func (c *groupByCursor) createCursor(seriesRow SeriesRow) (cur cursors.Cursor, err error) { - cur = c.arrayCursors.createCursor(seriesRow) + cur, err = c.arrayCursors.createCursor(seriesRow) + if err != nil { + return nil, err + } if c.agg != nil { cur, err = newAggregateArrayCursor(c.ctx, c.agg, cur) } return cur, err } -func (c *groupByCursor) Cursor() cursors.Cursor { - return c.cursor +func (c *groupByCursor) Cursor() (cursors.Cursor, error) { + return c.cursor, c.err } func (c *groupByCursor) Stats() cursors.CursorStats { diff --git a/storage/reads/group_resultset_test.go b/storage/reads/group_resultset_test.go index 52b78f647c8..81581e572c4 100644 --- a/storage/reads/group_resultset_test.go +++ b/storage/reads/group_resultset_test.go @@ -2,6 +2,7 @@ package reads_test import ( "context" + "github.com/stretchr/testify/require" "strings" "testing" @@ -259,7 +260,7 @@ group: var hints datatypes.HintFlags hints.SetHintSchemaAllTime() - rs := reads.NewGroupResultSet(context.Background(), &datatypes.ReadGroupRequest{ + rs, err := reads.NewGroupResultSet(context.Background(), &datatypes.ReadGroupRequest{ Group: tt.group, GroupKeys: tt.keys, // TODO(jlapacik): @@ -267,6 +268,7 @@ group: // Eventually this field should be removed entirely. Hints: uint32(hints), }, newCursor) + require.NoError(t, err, "group result set creation error") sb := new(strings.Builder) GroupResultSetToString(sb, rs, SkipNilCursor()) @@ -278,6 +280,38 @@ group: } } +func TestNewGroupResultSet_ShouldFail_BadGroupType(t *testing.T) { + cur := &sliceSeriesCursor{ + rows: newSeriesRows( + "aaa,tag0=val00", + "aaa,tag0=val01", + "cpu,tag0=val00,tag1=val10", + "cpu,tag0=val00,tag1=val11", + "cpu,tag0=val00,tag1=val12", + "mem,tag1=val10,tag2=val20", + "mem,tag1=val11,tag2=val20", + "mem,tag1=val11,tag2=val21", + )} + + newCursor := func() (reads.SeriesCursor, error) { + return cur, nil + } + + var hints datatypes.HintFlags + hints.SetHintSchemaAllTime() + _, err := reads.NewGroupResultSet(context.Background(), &datatypes.ReadGroupRequest{ + // Group is of an int type but really is a proto enum. + // This should never be anything other than ReadGroupRequest_GroupNone + // or ReadGroupRequest_GroupBy. This test is here to only ensure that if + // the off chance something other than these two enums gets passed the server + // does not panic. + Group: 3, + GroupKeys: []string{"tag0", "tag2"}, + Hints: uint32(hints), + }, newCursor) + require.Error(t, err, "group result set creation should error") +} + func TestNewGroupResultSet_GroupNone_NoDataReturnsNil(t *testing.T) { newCursor := func() (reads.SeriesCursor, error) { return &sliceSeriesCursor{ @@ -287,7 +321,8 @@ func TestNewGroupResultSet_GroupNone_NoDataReturnsNil(t *testing.T) { )}, nil } - rs := reads.NewGroupResultSet(context.Background(), &datatypes.ReadGroupRequest{Group: datatypes.ReadGroupRequest_GroupNone}, newCursor) + rs, err := reads.NewGroupResultSet(context.Background(), &datatypes.ReadGroupRequest{Group: datatypes.ReadGroupRequest_GroupNone}, newCursor) + require.NoError(t, err, "group result set creation error") if rs != nil { t.Errorf("expected nil cursor") } @@ -302,7 +337,8 @@ func TestNewGroupResultSet_GroupBy_NoDataReturnsNil(t *testing.T) { )}, nil } - rs := reads.NewGroupResultSet(context.Background(), &datatypes.ReadGroupRequest{Group: datatypes.ReadGroupRequest_GroupBy, GroupKeys: []string{"tag0"}}, newCursor) + rs, err := reads.NewGroupResultSet(context.Background(), &datatypes.ReadGroupRequest{Group: datatypes.ReadGroupRequest_GroupBy, GroupKeys: []string{"tag0"}}, newCursor) + require.NoError(t, err, "group result set creation error") if rs != nil { t.Errorf("expected nil cursor") } @@ -385,7 +421,7 @@ group: var hints datatypes.HintFlags hints.SetHintSchemaAllTime() - rs := reads.NewGroupResultSet(context.Background(), &datatypes.ReadGroupRequest{ + rs, err := reads.NewGroupResultSet(context.Background(), &datatypes.ReadGroupRequest{ Group: datatypes.ReadGroupRequest_GroupBy, GroupKeys: tt.keys, // TODO(jlapacik): @@ -393,6 +429,7 @@ group: // Eventually this field should be removed entirely. Hints: uint32(hints), }, newCursor, tt.opts...) + require.NoError(t, err, "group result set creation error") sb := new(strings.Builder) GroupResultSetToString(sb, rs, SkipNilCursor()) @@ -459,7 +496,8 @@ func BenchmarkNewGroupResultSet_GroupBy(b *testing.B) { hints.SetHintSchemaAllTime() for i := 0; i < b.N; i++ { - rs := reads.NewGroupResultSet(context.Background(), &datatypes.ReadGroupRequest{Group: datatypes.ReadGroupRequest_GroupBy, GroupKeys: []string{"tag2"}, Hints: uint32(hints)}, newCursor) + rs, err := reads.NewGroupResultSet(context.Background(), &datatypes.ReadGroupRequest{Group: datatypes.ReadGroupRequest_GroupBy, GroupKeys: []string{"tag2"}, Hints: uint32(hints)}, newCursor) + require.NoError(b, err, "group result set creation error") rs.Close() } } @@ -490,9 +528,11 @@ func TestNewGroupResultSet_TimeRange(t *testing.T) { }, } - resultSet := reads.NewGroupResultSet(ctx, &req, func() (reads.SeriesCursor, error) { + resultSet, err := reads.NewGroupResultSet(ctx, &req, func() (reads.SeriesCursor, error) { return &newCursor, nil }) + require.NoError(t, err, "group result set creation error") + groupByCursor := resultSet.Next() if groupByCursor == nil { t.Fatal("unexpected: groupByCursor was nil") diff --git a/storage/reads/resultset.go b/storage/reads/resultset.go index 775d56417e2..39282223991 100644 --- a/storage/reads/resultset.go +++ b/storage/reads/resultset.go @@ -8,7 +8,7 @@ import ( ) type multiShardCursors interface { - createCursor(row SeriesRow) cursors.Cursor + createCursor(row SeriesRow) (cursors.Cursor, error) } type resultSet struct { @@ -57,7 +57,7 @@ func (r *resultSet) Next() bool { return true } -func (r *resultSet) Cursor() cursors.Cursor { +func (r *resultSet) Cursor() (cursors.Cursor, error) { return r.arrayCursors.createCursor(r.seriesRow) } diff --git a/storage/reads/resultset_lineprotocol.go b/storage/reads/resultset_lineprotocol.go index 3de073b522c..d0de8d7235b 100644 --- a/storage/reads/resultset_lineprotocol.go +++ b/storage/reads/resultset_lineprotocol.go @@ -32,7 +32,11 @@ func ResultSetToLineProtocol(wr io.Writer, rs ResultSet) (err error) { line = append(line, ' ') line = append(line, field...) line = append(line, '=') - err = cursorToLineProtocol(wr, line, rs.Cursor()) + cursor, err := rs.Cursor() + if err != nil { + return err + } + err = cursorToLineProtocol(wr, line, cursor) if err != nil { return err } diff --git a/storage/reads/store.go b/storage/reads/store.go index 72db72d130b..2e3fe2e7b90 100644 --- a/storage/reads/store.go +++ b/storage/reads/store.go @@ -15,7 +15,7 @@ type ResultSet interface { Next() bool // Cursor returns the most recent cursor after a call to Next. - Cursor() cursors.Cursor + Cursor() (cursors.Cursor, error) // Tags returns the tags for the most recent cursor after a call to Next. Tags() models.Tags @@ -47,7 +47,7 @@ type GroupCursor interface { Next() bool // Cursor returns the most recent cursor after a call to Next. - Cursor() cursors.Cursor + Cursor() (cursors.Cursor, error) // Tags returns the tags for the most recent cursor after a call to Next. Tags() models.Tags diff --git a/storage/reads/store_test.go b/storage/reads/store_test.go index 304557a93a4..190f3e026ba 100644 --- a/storage/reads/store_test.go +++ b/storage/reads/store_test.go @@ -119,8 +119,9 @@ func resultSetToString(wr io.Writer, rs reads.ResultSet, opts ...optionFn) { for rs.Next() { fmt.Fprint(wr, "series: ") tagsToString(wr, rs.Tags()) - cur := rs.Cursor() - + // Tests withint group_resultset_test that call `resultSetToString` always have a nil cursor + // the error here will not be printed within the tests. + cur, _ := rs.Cursor() if po.SkipNilCursor && cur == nil { continue } diff --git a/v1/services/storage/store.go b/v1/services/storage/store.go index 703f1028689..dccc0d36333 100644 --- a/v1/services/storage/store.go +++ b/v1/services/storage/store.go @@ -229,9 +229,9 @@ func (s *Store) ReadGroup(ctx context.Context, req *datatypes.ReadGroupRequest) return cur, nil } - rs := reads.NewGroupResultSet(ctx, req, newCursor) - if rs == nil { - return nil, nil + rs, err := reads.NewGroupResultSet(ctx, req, newCursor) + if err != nil { + return nil, err } return rs, nil @@ -256,11 +256,15 @@ func (s *Store) tagKeysWithFieldPredicate(ctx context.Context, mqAttrs *metaquer m := make(map[string]struct{}) rs := reads.NewFilteredResultSet(ctx, mqAttrs.start, mqAttrs.end, cur) for rs.Next() { - func() { - c := rs.Cursor() + if err := func() error { + c, err := rs.Cursor() + if err != nil { + s.Logger.Error("failed to get next cursor during iteration", zap.Error(err)) + return err + } if c == nil { // no data for series key + field combination - return + return nil } defer c.Close() if cursorHasData(c) { @@ -269,7 +273,10 @@ func (s *Store) tagKeysWithFieldPredicate(ctx context.Context, mqAttrs *metaquer m[string(tags[i].Key)] = struct{}{} } } - }() + return nil + }(); err != nil { + return nil, err + } } arr := make([]string, 0, len(m)) @@ -622,15 +629,19 @@ func (s *Store) tagValuesSlow(ctx context.Context, mqAttrs *metaqueryAttributes, rs := reads.NewFilteredResultSet(ctx, mqAttrs.start, mqAttrs.end, cur) for rs.Next() { - func() { - c := rs.Cursor() + if err := func() error { + c, err := rs.Cursor() + if err != nil { + s.Logger.Error("failed to get next cursor during iteration", zap.Error(err)) + return err + } if c == nil { // no data for series key + field combination? // It seems that even when there is no data for this series key + field // combo that the cursor may be not nil. We need to // request invoke an array cursor to be sure. // This is the reason for the call to cursorHasData below. - return + return nil } defer c.Close() @@ -638,7 +649,10 @@ func (s *Store) tagValuesSlow(ctx context.Context, mqAttrs *metaqueryAttributes, f := rs.Tags().Get([]byte(tagKey)) m[string(f)] = struct{}{} } - }() + return nil + }(); err != nil { + return nil, err + } } names := make([]string, 0, len(m)) @@ -763,11 +777,15 @@ func (s *Store) seriesCardinalityWithPredicateAndTime(ctx context.Context, shard buf := make([]byte, 1024) rs := reads.NewFilteredResultSet(ctx, start, end, cur) for rs.Next() { - func() { - c := rs.Cursor() + if err := func() error { + c, err := rs.Cursor() + if err != nil { + s.Logger.Error("failed to get next cursor during iteration", zap.Error(err)) + return err + } if c == nil { // no data for series key + field combination - return + return nil } defer c.Close() @@ -776,7 +794,10 @@ func (s *Store) seriesCardinalityWithPredicateAndTime(ctx context.Context, shard skey := sfile.SeriesID(r.Name, r.SeriesTags, buf) ss.Add(skey) } - }() + return nil + }(); err != nil { + return nil, err + } } return ss, nil