Skip to content

Commit 0797048

Browse files
authored
feat: query iterator (#244)
* QueryIter returns go iterator for query results * Iterator checks context - Check context for error on each iteration. - Test that canceling context stops iteration. * ensure context is canceled
1 parent e34b22f commit 0797048

File tree

2 files changed

+100
-1
lines changed

2 files changed

+100
-1
lines changed

datastore.go

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"errors"
66
"io"
7+
"iter"
78

89
query "github.com/ipfs/go-datastore/query"
910
)
@@ -94,6 +95,50 @@ type Read interface {
9495
Query(ctx context.Context, q query.Query) (query.Results, error)
9596
}
9697

98+
// QueryIter returns a go iterator that allows ranging over query results.
99+
// The range yields two values, a result Entry and an error. If an error is
100+
// returned then iteration stops.
101+
//
102+
// Example:
103+
//
104+
// qry := query.Query{
105+
// Prefix: keyPrefix,
106+
// }
107+
// var foundVal []byte
108+
// for ent, err := range QueryIter(dstore, qry) {
109+
// if err != nil {
110+
// return err
111+
// }
112+
// if ent.Key == lookingFor {
113+
// foundVal = ent.Val
114+
// break
115+
// }
116+
// }
117+
func QueryIter(ctx context.Context, ds Read, q query.Query) iter.Seq2[query.Entry, error] {
118+
return func(yield func(query.Entry, error) bool) {
119+
results, err := ds.Query(ctx, q)
120+
if err != nil {
121+
yield(query.Entry{}, err)
122+
return
123+
}
124+
defer results.Close()
125+
126+
for result := range results.Next() {
127+
if ctx.Err() != nil {
128+
yield(query.Entry{}, ctx.Err())
129+
return
130+
}
131+
if result.Error != nil {
132+
yield(query.Entry{}, result.Error)
133+
return
134+
}
135+
if !yield(result.Entry, nil) {
136+
return
137+
}
138+
}
139+
}
140+
}
141+
97142
// Batching datastores support deferred, grouped updates to the database.
98143
// `Batch`es do NOT have transactional semantics: updates to the underlying
99144
// datastore are not guaranteed to occur in the same iota of time. Similarly,

test/basic_tests.go

Lines changed: 55 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"bytes"
55
"context"
66
"crypto/rand"
7+
"errors"
78
"fmt"
89
"reflect"
910
"strings"
@@ -403,7 +404,8 @@ func randValue() []byte {
403404
}
404405

405406
func subtestQuery(t *testing.T, ds dstore.Datastore, q dsq.Query, count int) {
406-
ctx := context.Background()
407+
ctx, cancel := context.WithCancel(context.Background())
408+
defer cancel()
407409

408410
var input []dsq.Entry
409411
for i := 0; i < count; i++ {
@@ -511,6 +513,58 @@ func subtestQuery(t *testing.T, ds dstore.Datastore, q dsq.Query, count int) {
511513
}
512514
}
513515

516+
// Test QueryIter for same results.
517+
actual = actual[:0]
518+
for ent, err := range dstore.QueryIter(ctx, ds, q) {
519+
if err != nil {
520+
t.Fatal("query result error: ", err)
521+
}
522+
actual = append(actual, ent)
523+
}
524+
if len(actual) != len(expected) {
525+
t.Fatalf("expected %d results from QueryIter, got %d", len(expected), len(actual))
526+
}
527+
if len(q.Orders) == 0 {
528+
dsq.Sort([]dsq.Order{dsq.OrderByKey{}}, actual)
529+
}
530+
for i := range actual {
531+
if actual[i].Key != expected[i].Key {
532+
t.Errorf("for result %d, expected key %q, got %q", i, expected[i].Key, actual[i].Key)
533+
continue
534+
}
535+
if !q.KeysOnly && !bytes.Equal(actual[i].Value, expected[i].Value) {
536+
t.Errorf("value mismatch for result %d (key=%q)", i, expected[i].Key)
537+
}
538+
if q.ReturnsSizes && actual[i].Size <= 0 {
539+
t.Errorf("for result %d, expected size > 0 with ReturnsSizes", i)
540+
}
541+
}
542+
543+
const cancelAt = 1
544+
if len(actual) > cancelAt {
545+
// Test that query iterator stops when context is canceled.
546+
var i int
547+
for ent, err := range dstore.QueryIter(ctx, ds, q) {
548+
if err != nil {
549+
if !errors.Is(err, context.Canceled) {
550+
t.Fatal("query result error: ", err)
551+
}
552+
t.Log("err at:", i, err)
553+
continue
554+
}
555+
if ent.Key == "" {
556+
t.Fatal("entry has empty key")
557+
}
558+
i++
559+
if i == cancelAt {
560+
cancel()
561+
}
562+
}
563+
if i != cancelAt {
564+
t.Fatal("expected iteration to be canceled at", cancelAt, "canceled at", i)
565+
}
566+
}
567+
514568
t.Log("deleting all keys")
515569
for _, e := range input {
516570
if err := ds.Delete(ctx, dstore.RawKey(e.Key)); err != nil {

0 commit comments

Comments
 (0)