diff --git a/datastore.go b/datastore.go index 36ed4bd..ff92778 100644 --- a/datastore.go +++ b/datastore.go @@ -4,6 +4,7 @@ import ( "context" "errors" "io" + "iter" query "github.com/ipfs/go-datastore/query" ) @@ -94,6 +95,50 @@ type Read interface { Query(ctx context.Context, q query.Query) (query.Results, error) } +// QueryIter returns a go iterator that allows ranging over query results. +// The range yields two values, a result Entry and an error. If an error is +// returned then iteration stops. +// +// Example: +// +// qry := query.Query{ +// Prefix: keyPrefix, +// } +// var foundVal []byte +// for ent, err := range QueryIter(dstore, qry) { +// if err != nil { +// return err +// } +// if ent.Key == lookingFor { +// foundVal = ent.Val +// break +// } +// } +func QueryIter(ctx context.Context, ds Read, q query.Query) iter.Seq2[query.Entry, error] { + return func(yield func(query.Entry, error) bool) { + results, err := ds.Query(ctx, q) + if err != nil { + yield(query.Entry{}, err) + return + } + defer results.Close() + + for result := range results.Next() { + if ctx.Err() != nil { + yield(query.Entry{}, ctx.Err()) + return + } + if result.Error != nil { + yield(query.Entry{}, result.Error) + return + } + if !yield(result.Entry, nil) { + return + } + } + } +} + // Batching datastores support deferred, grouped updates to the database. // `Batch`es do NOT have transactional semantics: updates to the underlying // datastore are not guaranteed to occur in the same iota of time. Similarly, diff --git a/test/basic_tests.go b/test/basic_tests.go index 76193c6..aa0e6f3 100644 --- a/test/basic_tests.go +++ b/test/basic_tests.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "crypto/rand" + "errors" "fmt" "reflect" "strings" @@ -403,7 +404,8 @@ func randValue() []byte { } func subtestQuery(t *testing.T, ds dstore.Datastore, q dsq.Query, count int) { - ctx := context.Background() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() var input []dsq.Entry for i := 0; i < count; i++ { @@ -511,6 +513,58 @@ func subtestQuery(t *testing.T, ds dstore.Datastore, q dsq.Query, count int) { } } + // Test QueryIter for same results. + actual = actual[:0] + for ent, err := range dstore.QueryIter(ctx, ds, q) { + if err != nil { + t.Fatal("query result error: ", err) + } + actual = append(actual, ent) + } + if len(actual) != len(expected) { + t.Fatalf("expected %d results from QueryIter, got %d", len(expected), len(actual)) + } + if len(q.Orders) == 0 { + dsq.Sort([]dsq.Order{dsq.OrderByKey{}}, actual) + } + for i := range actual { + if actual[i].Key != expected[i].Key { + t.Errorf("for result %d, expected key %q, got %q", i, expected[i].Key, actual[i].Key) + continue + } + if !q.KeysOnly && !bytes.Equal(actual[i].Value, expected[i].Value) { + t.Errorf("value mismatch for result %d (key=%q)", i, expected[i].Key) + } + if q.ReturnsSizes && actual[i].Size <= 0 { + t.Errorf("for result %d, expected size > 0 with ReturnsSizes", i) + } + } + + const cancelAt = 1 + if len(actual) > cancelAt { + // Test that query iterator stops when context is canceled. + var i int + for ent, err := range dstore.QueryIter(ctx, ds, q) { + if err != nil { + if !errors.Is(err, context.Canceled) { + t.Fatal("query result error: ", err) + } + t.Log("err at:", i, err) + continue + } + if ent.Key == "" { + t.Fatal("entry has empty key") + } + i++ + if i == cancelAt { + cancel() + } + } + if i != cancelAt { + t.Fatal("expected iteration to be canceled at", cancelAt, "canceled at", i) + } + } + t.Log("deleting all keys") for _, e := range input { if err := ds.Delete(ctx, dstore.RawKey(e.Key)); err != nil {