Skip to content
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions mock/reads_resultset.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package mock

import (
"fmt"

"github.com/influxdata/influxdb/v2/models"
"github.com/influxdata/influxdb/v2/pkg/data/gen"
"github.com/influxdata/influxdb/v2/storage/reads"
Expand Down Expand Up @@ -61,7 +63,7 @@ func (g *GeneratorResultSet) Next() bool {
return g.sg.Next() && (g.max == 0 || remain > 0)
}

func (g *GeneratorResultSet) Cursor() cursors.Cursor {
func (g *GeneratorResultSet) Cursor() (cursors.Cursor, error) {
switch g.sg.FieldType() {
case models.Float:
g.f.tv = g.sg.TimeValuesGenerator()
Expand All @@ -79,10 +81,10 @@ func (g *GeneratorResultSet) Cursor() cursors.Cursor {
g.b.tv = g.sg.TimeValuesGenerator()
g.cur = &g.b
default:
panic("unreachable")
return nil, fmt.Errorf("unsupported field type: %v", g.sg.FieldType())
}

return g.cur
return g.cur, nil
}

func copyTags(dst, src models.Tags) models.Tags {
Expand Down
16 changes: 13 additions & 3 deletions storage/flux/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,10 @@ func (fi *filterIterator) handleRead(f func(flux.Table) error, rs storage.Result

READ:
for rs.Next() {
cur = rs.Cursor()
cur, err := rs.Cursor()
if err != nil {
return err
}
if cur == nil {
// no data for series key + field combination
continue
Expand Down Expand Up @@ -311,6 +314,7 @@ func (gi *groupIterator) handleRead(f func(flux.Table) error, rs storage.GroupRe
gc storage.GroupCursor
cur cursors.Cursor
table storageTable
err error
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be a separate declaration, because it is not a resource that much be closed if not nil on return.

)

defer func() {
Expand All @@ -331,7 +335,10 @@ func (gi *groupIterator) handleRead(f func(flux.Table) error, rs storage.GroupRe
READ:
for gc != nil {
for gc.Next() {
cur = gc.Cursor()
cur, err = gc.Cursor()
if err != nil {
return err
}
if cur != nil {
break
}
Expand Down Expand Up @@ -740,7 +747,10 @@ func (wai *windowAggregateIterator) handleRead(f func(flux.Table) error, rs stor

READ:
for rs.Next() {
cur = rs.Cursor()
cur, err = rs.Cursor()
if err != nil {
return err
}
if cur == nil {
// no data for series key + field combination
continue
Expand Down
70 changes: 65 additions & 5 deletions storage/flux/table.gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -970,7 +970,19 @@ func (t *floatGroupTable) advanceCursor() bool {
t.cur.Close()
t.cur = nil
for t.gc.Next() {
cur := t.gc.Cursor()
cur, err := t.gc.Cursor()
if err != nil {
cur.Close()
t.err = &errors.Error{
Code: errors.EInvalid,
Err: &GroupCursorError{
typ: "float",
cursor: cur,
},
}
return false
}

if cur == nil {
continue
}
Expand Down Expand Up @@ -1954,7 +1966,19 @@ func (t *integerGroupTable) advanceCursor() bool {
t.cur.Close()
t.cur = nil
for t.gc.Next() {
cur := t.gc.Cursor()
cur, err := t.gc.Cursor()
if err != nil {
cur.Close()
t.err = &errors.Error{
Code: errors.EInvalid,
Err: &GroupCursorError{
typ: "integer",
cursor: cur,
},
}
return false
}

if cur == nil {
continue
}
Expand Down Expand Up @@ -2935,7 +2959,19 @@ func (t *unsignedGroupTable) advanceCursor() bool {
t.cur.Close()
t.cur = nil
for t.gc.Next() {
cur := t.gc.Cursor()
cur, err := t.gc.Cursor()
if err != nil {
cur.Close()
t.err = &errors.Error{
Code: errors.EInvalid,
Err: &GroupCursorError{
typ: "unsigned",
cursor: cur,
},
}
return false
}

if cur == nil {
continue
}
Expand Down Expand Up @@ -3860,7 +3896,19 @@ func (t *stringGroupTable) advanceCursor() bool {
t.cur.Close()
t.cur = nil
for t.gc.Next() {
cur := t.gc.Cursor()
cur, err := t.gc.Cursor()
if err != nil {
cur.Close()
t.err = &errors.Error{
Code: errors.EInvalid,
Err: &GroupCursorError{
typ: "string",
cursor: cur,
},
}
return false
}

if cur == nil {
continue
}
Expand Down Expand Up @@ -4785,7 +4833,19 @@ func (t *booleanGroupTable) advanceCursor() bool {
t.cur.Close()
t.cur = nil
for t.gc.Next() {
cur := t.gc.Cursor()
cur, err := t.gc.Cursor()
if err != nil {
cur.Close()
t.err = &errors.Error{
Code: errors.EInvalid,
Err: &GroupCursorError{
typ: "boolean",
cursor: cur,
},
}
return false
}

if cur == nil {
continue
}
Expand Down
14 changes: 13 additions & 1 deletion storage/flux/table.gen.go.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -1000,7 +1000,19 @@ func (t *{{.name}}GroupTable) advanceCursor() bool {
t.cur.Close()
t.cur = nil
for t.gc.Next() {
cur := t.gc.Cursor()
cur, err := t.gc.Cursor()
if err != nil {
cur.Close()
t.err = &errors.Error {
Code: errors.EInvalid,
Err: &GroupCursorError {
typ: "{{.name}}",
cursor: cur,
},
}
return false
}

if cur == nil {
continue
}
Expand Down
9 changes: 6 additions & 3 deletions storage/reads/aggregate_resultset.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,10 @@ func (r *windowAggregateResultSet) createCursor(seriesRow SeriesRow) (cursors.Cu
agg := r.req.Aggregate[0]
every := r.req.WindowEvery
offset := r.req.Offset
cursor := r.arrayCursors.createCursor(seriesRow)
cursor, err := r.arrayCursors.createCursor(seriesRow)
if err != nil {
return nil, err
}

var everyDur values.Duration
var offsetDur values.Duration
Expand Down Expand Up @@ -132,8 +135,8 @@ func (r *windowAggregateResultSet) createCursor(seriesRow SeriesRow) (cursors.Cu
}
}

func (r *windowAggregateResultSet) Cursor() cursors.Cursor {
return r.cursor
func (r *windowAggregateResultSet) Cursor() (cursors.Cursor, error) {
return r.cursor, r.err
}

func (r *windowAggregateResultSet) Close() {
Expand Down
7 changes: 5 additions & 2 deletions storage/reads/aggregate_resultset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package reads_test

import (
"context"
"github.com/stretchr/testify/require"
"reflect"
"testing"
"time"
Expand Down Expand Up @@ -196,7 +197,8 @@ func TestNewWindowAggregateResultSet_Mean(t *testing.T) {
if !resultSet.Next() {
t.Fatalf("unexpected: resultSet could not advance")
}
cursor := resultSet.Cursor()
cursor, err := resultSet.Cursor()
require.NoError(t, err, "create cursor failed")
if cursor == nil {
t.Fatalf("unexpected: cursor was nil")
}
Expand Down Expand Up @@ -238,7 +240,8 @@ func TestNewWindowAggregateResultSet_Months(t *testing.T) {
if !resultSet.Next() {
t.Fatalf("unexpected: resultSet could not advance")
}
cursor := resultSet.Cursor()
cursor, err := resultSet.Cursor()
require.NoError(t, err, "create cursor failed")
if cursor == nil {
t.Fatalf("unexpected: cursor was nil")
}
Expand Down
Loading