Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 24 additions & 1 deletion examples/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,30 @@ func (d *Datastore) Query(ctx context.Context, q query.Query) (query.Results, er
filepath.Walk(d.path, walkFn)
close(results)
}()
r := query.ResultsWithChan(q, results)

r := query.ResultsWithContext(q, func(ctx context.Context, out chan<- query.Result) {
loop:
for {
select {
case <-ctx.Done(): // client told us to close early
break loop
case e, more := <-results:
if !more {
return
}

select {
case out <- e:
case <-ctx.Done(): // client told us to close early
break loop
}
}
}
// Drain results on cancel so writer is unblocked.
for range results {
}
})

r = query.NaiveQueryApply(q, r)
return r, nil
}
Expand Down
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ module github.com/ipfs/go-datastore
require (
github.com/google/uuid v1.6.0
github.com/ipfs/go-ipfs-delay v0.0.1
github.com/jbenet/goprocess v0.1.4
go.opentelemetry.io/otel v1.16.0
go.opentelemetry.io/otel/trace v1.16.0
go.uber.org/multierr v1.11.0
Expand Down
3 changes: 0 additions & 3 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,6 @@ github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/ipfs/go-ipfs-delay v0.0.1 h1:r/UXYyRcddO6thwOnhiznIAiSvxMECGgtv35Xs1IeRQ=
github.com/ipfs/go-ipfs-delay v0.0.1/go.mod h1:8SP1YXK1M1kXuc4KJZINY3TQQ03J2rwBG9QfXmbRPrw=
github.com/jbenet/go-cienv v0.1.0/go.mod h1:TqNnHUmJgXau0nCzC7kXWeotg3J9W34CUv5Djy1+FlA=
github.com/jbenet/goprocess v0.1.4 h1:DRGOFReOMqqDNXwW70QkacFW0YN9QnwLV0Vqk+3oU0o=
github.com/jbenet/goprocess v0.1.4/go.mod h1:5yspPrukOVuOLORacaBi858NqyClJPQxYZlqdZVfqY4=
github.com/kr/pretty v0.2.0 h1:s5hAObm+yFO5uHYt5dYjxi2rXrsnmRpJx4OYvIWUaQs=
github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
Expand Down
4 changes: 2 additions & 2 deletions keytransform/keytransform.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,8 @@ func (d *Datastore) Query(ctx context.Context, q dsq.Query) (dsq.Results, error)
}
return r, true
},
Close: func() error {
return cqr.Close()
Close: func() {
cqr.Close()
},
})
return dsq.NaiveQueryApply(nq, qr), nil
Expand Down
21 changes: 4 additions & 17 deletions mount/mount.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,13 +101,8 @@ func (qr *queryResults) advance() bool {
qr.next = query.Result{}
r, more := qr.results.NextSync()
if !more {
err := qr.results.Close()
qr.results.Close()
qr.results = nil
if err != nil {
// One more result, the error.
qr.next = query.Result{Error: err}
return true
}
return false
}

Expand Down Expand Up @@ -145,19 +140,11 @@ func (h *querySet) Pop() interface{} {
return last
}

func (h *querySet) close() error {
var errs []error
func (h *querySet) close() {
for _, qr := range h.heads {
err := qr.results.Close()
if err != nil {
errs = append(errs, err)
}
qr.results.Close()
}
h.heads = nil
if len(errs) > 0 {
return errs[0]
}
return nil
}

func (h *querySet) addResults(mount ds.Key, results query.Results) {
Expand Down Expand Up @@ -339,7 +326,7 @@ func (d *Datastore) Query(ctx context.Context, master query.Query) (query.Result
results, err := dstore.Query(ctx, qi)

if err != nil {
_ = queries.close()
queries.close()
return nil, err
}
queries.addResults(mount, results)
Expand Down
45 changes: 9 additions & 36 deletions mount/mount_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,10 +263,7 @@ func TestQuerySimple(t *testing.T) {
t.Errorf("did not see wanted key %q in %+v", myKey, entries)
}

err = res.Close()
if err != nil {
t.Errorf("result.Close failed %d", err)
}
res.Close()
}

func TestQueryAcrossMounts(t *testing.T) {
Expand Down Expand Up @@ -307,10 +304,7 @@ func TestQueryAcrossMounts(t *testing.T) {
}
entries, err := res.Rest()
if err != nil {
err = res.Close()
if err != nil {
t.Errorf("result.Close failed %d", err)
}
res.Close()
t.Fatalf("Query Results.Rest fail: %v\n", err)
}
if len(entries) != len(values) {
Expand Down Expand Up @@ -407,10 +401,7 @@ func TestQueryAcrossMountsWithSort(t *testing.T) {
}
}

err = res.Close()
if err != nil {
t.Errorf("result.Close failed %d", err)
}
res.Close()
}

func TestQueryLimitAcrossMountsWithSort(t *testing.T) {
Expand Down Expand Up @@ -473,10 +464,7 @@ func TestQueryLimitAcrossMountsWithSort(t *testing.T) {
}
}

err = res.Close()
if err != nil {
t.Errorf("result.Close failed %d", err)
}
res.Close()
}

func TestQueryLimitAndOffsetAcrossMountsWithSort(t *testing.T) {
Expand Down Expand Up @@ -540,10 +528,7 @@ func TestQueryLimitAndOffsetAcrossMountsWithSort(t *testing.T) {
}
}

err = res.Close()
if err != nil {
t.Errorf("result.Close failed %d", err)
}
res.Close()
}

func TestQueryFilterAcrossMountsWithSort(t *testing.T) {
Expand Down Expand Up @@ -606,10 +591,7 @@ func TestQueryFilterAcrossMountsWithSort(t *testing.T) {
}
}

err = res.Close()
if err != nil {
t.Errorf("result.Close failed %d", err)
}
res.Close()
}

func TestQueryLimitAndOffsetWithNoData(t *testing.T) {
Expand Down Expand Up @@ -639,10 +621,7 @@ func TestQueryLimitAndOffsetWithNoData(t *testing.T) {
t.Fatalf("expected %d entries, but got %d", len(expect), len(entries))
}

err = res.Close()
if err != nil {
t.Errorf("result.Close failed %d", err)
}
res.Close()
}

func TestQueryLimitWithNotEnoughData(t *testing.T) {
Expand Down Expand Up @@ -682,10 +661,7 @@ func TestQueryLimitWithNotEnoughData(t *testing.T) {
t.Fatalf("expected %d entries, but got %d", len(expect), len(entries))
}

err = res.Close()
if err != nil {
t.Errorf("result.Close failed %d", err)
}
res.Close()
}

func TestQueryOffsetWithNotEnoughData(t *testing.T) {
Expand Down Expand Up @@ -722,10 +698,7 @@ func TestQueryOffsetWithNotEnoughData(t *testing.T) {
t.Fatalf("expected %d entries, but got %d", len(expect), len(entries))
}

err = res.Close()
if err != nil {
t.Errorf("result.Close failed %d", err)
}
res.Close()
}

func TestLookupPrio(t *testing.T) {
Expand Down
3 changes: 1 addition & 2 deletions namespace/namespace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,7 @@ func (ks *DSSuite) TestQuery(c *C) {
c.Check(string(ent.Value), Equals, string(expect[i].Value))
}

err = qres.Close()
c.Check(err, Equals, nil)
qres.Close()

qres, err = nsds.Query(ctx, dsq.Query{Prefix: "bar"})
c.Check(err, Equals, nil)
Expand Down
Loading