Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"io"
"iter"

query "github.com/ipfs/go-datastore/query"
)
Expand Down Expand Up @@ -94,6 +95,50 @@ type Read interface {
Query(ctx context.Context, q query.Query) (query.Results, error)
}

// QueryIter returns a go iterator that allows ranging over query results.
// The range yields two values, a result Entry and an error. If an error is
// returned then iteration stops.
//
// Example:
//
// qry := query.Query{
// Prefix: keyPrefix,
// }
// var foundVal []byte
// for ent, err := range QueryIter(dstore, qry) {
// if err != nil {
// return err
// }
// if ent.Key == lookingFor {
// foundVal = ent.Val
// break
// }
// }
func QueryIter(ctx context.Context, ds Read, q query.Query) iter.Seq2[query.Entry, error] {
return func(yield func(query.Entry, error) bool) {
results, err := ds.Query(ctx, q)
if err != nil {
yield(query.Entry{}, err)
return
}
defer results.Close()

for result := range results.Next() {
if ctx.Err() != nil {
yield(query.Entry{}, ctx.Err())
return
}
if result.Error != nil {
yield(query.Entry{}, result.Error)
return
}
if !yield(result.Entry, nil) {
return
}
}
}
}

// Batching datastores support deferred, grouped updates to the database.
// `Batch`es do NOT have transactional semantics: updates to the underlying
// datastore are not guaranteed to occur in the same iota of time. Similarly,
Expand Down
56 changes: 55 additions & 1 deletion test/basic_tests.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"crypto/rand"
"errors"
"fmt"
"reflect"
"strings"
Expand Down Expand Up @@ -403,7 +404,8 @@ func randValue() []byte {
}

func subtestQuery(t *testing.T, ds dstore.Datastore, q dsq.Query, count int) {
ctx := context.Background()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

var input []dsq.Entry
for i := 0; i < count; i++ {
Expand Down Expand Up @@ -511,6 +513,58 @@ func subtestQuery(t *testing.T, ds dstore.Datastore, q dsq.Query, count int) {
}
}

// Test QueryIter for same results.
actual = actual[:0]
for ent, err := range dstore.QueryIter(ctx, ds, q) {
if err != nil {
t.Fatal("query result error: ", err)
}
actual = append(actual, ent)
}
if len(actual) != len(expected) {
t.Fatalf("expected %d results from QueryIter, got %d", len(expected), len(actual))
}
if len(q.Orders) == 0 {
dsq.Sort([]dsq.Order{dsq.OrderByKey{}}, actual)
}
for i := range actual {
if actual[i].Key != expected[i].Key {
t.Errorf("for result %d, expected key %q, got %q", i, expected[i].Key, actual[i].Key)
continue
}
if !q.KeysOnly && !bytes.Equal(actual[i].Value, expected[i].Value) {
t.Errorf("value mismatch for result %d (key=%q)", i, expected[i].Key)
}
if q.ReturnsSizes && actual[i].Size <= 0 {
t.Errorf("for result %d, expected size > 0 with ReturnsSizes", i)
}
}

const cancelAt = 1
if len(actual) > cancelAt {
// Test that query iterator stops when context is canceled.
var i int
for ent, err := range dstore.QueryIter(ctx, ds, q) {
if err != nil {
if !errors.Is(err, context.Canceled) {
t.Fatal("query result error: ", err)
}
t.Log("err at:", i, err)
continue
}
if ent.Key == "" {
t.Fatal("entry has empty key")
}
i++
if i == cancelAt {
cancel()
}
}
if i != cancelAt {
t.Fatal("expected iteration to be canceled at", cancelAt, "canceled at", i)
}
}

t.Log("deleting all keys")
for _, e := range input {
if err := ds.Delete(ctx, dstore.RawKey(e.Key)); err != nil {
Expand Down
Loading