diff --git a/examples/fs.go b/examples/fs.go index bb62aa3b..a0ee579b 100644 --- a/examples/fs.go +++ b/examples/fs.go @@ -134,7 +134,30 @@ func (d *Datastore) Query(ctx context.Context, q query.Query) (query.Results, er filepath.Walk(d.path, walkFn) close(results) }() - r := query.ResultsWithChan(q, results) + + r := query.ResultsWithContext(q, func(ctx context.Context, out chan<- query.Result) { + loop: + for { + select { + case <-ctx.Done(): // client told us to close early + break loop + case e, more := <-results: + if !more { + return + } + + select { + case out <- e: + case <-ctx.Done(): // client told us to close early + break loop + } + } + } + // Drain results on cancel so writer is unblocked. + for range results { + } + }) + r = query.NaiveQueryApply(q, r) return r, nil } diff --git a/go.mod b/go.mod index 7f5994cc..aba0ffa9 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,6 @@ module github.com/ipfs/go-datastore require ( github.com/google/uuid v1.6.0 github.com/ipfs/go-ipfs-delay v0.0.1 - github.com/jbenet/goprocess v0.1.4 go.opentelemetry.io/otel v1.16.0 go.opentelemetry.io/otel/trace v1.16.0 go.uber.org/multierr v1.11.0 diff --git a/go.sum b/go.sum index d24796be..4d8c31b7 100644 --- a/go.sum +++ b/go.sum @@ -11,9 +11,6 @@ github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/ipfs/go-ipfs-delay v0.0.1 h1:r/UXYyRcddO6thwOnhiznIAiSvxMECGgtv35Xs1IeRQ= github.com/ipfs/go-ipfs-delay v0.0.1/go.mod h1:8SP1YXK1M1kXuc4KJZINY3TQQ03J2rwBG9QfXmbRPrw= -github.com/jbenet/go-cienv v0.1.0/go.mod h1:TqNnHUmJgXau0nCzC7kXWeotg3J9W34CUv5Djy1+FlA= -github.com/jbenet/goprocess v0.1.4 h1:DRGOFReOMqqDNXwW70QkacFW0YN9QnwLV0Vqk+3oU0o= -github.com/jbenet/goprocess v0.1.4/go.mod h1:5yspPrukOVuOLORacaBi858NqyClJPQxYZlqdZVfqY4= github.com/kr/pretty v0.2.0 h1:s5hAObm+yFO5uHYt5dYjxi2rXrsnmRpJx4OYvIWUaQs= github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= diff --git a/keytransform/keytransform.go b/keytransform/keytransform.go index c1c74149..3e4b8ed6 100644 --- a/keytransform/keytransform.go +++ b/keytransform/keytransform.go @@ -94,8 +94,8 @@ func (d *Datastore) Query(ctx context.Context, q dsq.Query) (dsq.Results, error) } return r, true }, - Close: func() error { - return cqr.Close() + Close: func() { + cqr.Close() }, }) return dsq.NaiveQueryApply(nq, qr), nil diff --git a/mount/mount.go b/mount/mount.go index 027bdca4..32737f4b 100644 --- a/mount/mount.go +++ b/mount/mount.go @@ -101,13 +101,8 @@ func (qr *queryResults) advance() bool { qr.next = query.Result{} r, more := qr.results.NextSync() if !more { - err := qr.results.Close() + qr.results.Close() qr.results = nil - if err != nil { - // One more result, the error. - qr.next = query.Result{Error: err} - return true - } return false } @@ -145,19 +140,11 @@ func (h *querySet) Pop() interface{} { return last } -func (h *querySet) close() error { - var errs []error +func (h *querySet) close() { for _, qr := range h.heads { - err := qr.results.Close() - if err != nil { - errs = append(errs, err) - } + qr.results.Close() } h.heads = nil - if len(errs) > 0 { - return errs[0] - } - return nil } func (h *querySet) addResults(mount ds.Key, results query.Results) { @@ -339,7 +326,7 @@ func (d *Datastore) Query(ctx context.Context, master query.Query) (query.Result results, err := dstore.Query(ctx, qi) if err != nil { - _ = queries.close() + queries.close() return nil, err } queries.addResults(mount, results) diff --git a/mount/mount_test.go b/mount/mount_test.go index 4c12c584..fb765ea8 100644 --- a/mount/mount_test.go +++ b/mount/mount_test.go @@ -263,10 +263,7 @@ func TestQuerySimple(t *testing.T) { t.Errorf("did not see wanted key %q in %+v", myKey, entries) } - err = res.Close() - if err != nil { - t.Errorf("result.Close failed %d", err) - } + res.Close() } func TestQueryAcrossMounts(t *testing.T) { @@ -307,10 +304,7 @@ func TestQueryAcrossMounts(t *testing.T) { } entries, err := res.Rest() if err != nil { - err = res.Close() - if err != nil { - t.Errorf("result.Close failed %d", err) - } + res.Close() t.Fatalf("Query Results.Rest fail: %v\n", err) } if len(entries) != len(values) { @@ -407,10 +401,7 @@ func TestQueryAcrossMountsWithSort(t *testing.T) { } } - err = res.Close() - if err != nil { - t.Errorf("result.Close failed %d", err) - } + res.Close() } func TestQueryLimitAcrossMountsWithSort(t *testing.T) { @@ -473,10 +464,7 @@ func TestQueryLimitAcrossMountsWithSort(t *testing.T) { } } - err = res.Close() - if err != nil { - t.Errorf("result.Close failed %d", err) - } + res.Close() } func TestQueryLimitAndOffsetAcrossMountsWithSort(t *testing.T) { @@ -540,10 +528,7 @@ func TestQueryLimitAndOffsetAcrossMountsWithSort(t *testing.T) { } } - err = res.Close() - if err != nil { - t.Errorf("result.Close failed %d", err) - } + res.Close() } func TestQueryFilterAcrossMountsWithSort(t *testing.T) { @@ -606,10 +591,7 @@ func TestQueryFilterAcrossMountsWithSort(t *testing.T) { } } - err = res.Close() - if err != nil { - t.Errorf("result.Close failed %d", err) - } + res.Close() } func TestQueryLimitAndOffsetWithNoData(t *testing.T) { @@ -639,10 +621,7 @@ func TestQueryLimitAndOffsetWithNoData(t *testing.T) { t.Fatalf("expected %d entries, but got %d", len(expect), len(entries)) } - err = res.Close() - if err != nil { - t.Errorf("result.Close failed %d", err) - } + res.Close() } func TestQueryLimitWithNotEnoughData(t *testing.T) { @@ -682,10 +661,7 @@ func TestQueryLimitWithNotEnoughData(t *testing.T) { t.Fatalf("expected %d entries, but got %d", len(expect), len(entries)) } - err = res.Close() - if err != nil { - t.Errorf("result.Close failed %d", err) - } + res.Close() } func TestQueryOffsetWithNotEnoughData(t *testing.T) { @@ -722,10 +698,7 @@ func TestQueryOffsetWithNotEnoughData(t *testing.T) { t.Fatalf("expected %d entries, but got %d", len(expect), len(entries)) } - err = res.Close() - if err != nil { - t.Errorf("result.Close failed %d", err) - } + res.Close() } func TestLookupPrio(t *testing.T) { diff --git a/namespace/namespace_test.go b/namespace/namespace_test.go index fac29e00..b873ddf3 100644 --- a/namespace/namespace_test.go +++ b/namespace/namespace_test.go @@ -119,8 +119,7 @@ func (ks *DSSuite) TestQuery(c *C) { c.Check(string(ent.Value), Equals, string(expect[i].Value)) } - err = qres.Close() - c.Check(err, Equals, nil) + qres.Close() qres, err = nsds.Query(ctx, dsq.Query{Prefix: "bar"}) c.Check(err, Equals, nil) diff --git a/query/query.go b/query/query.go index 0a8898ea..d7693771 100644 --- a/query/query.go +++ b/query/query.go @@ -1,10 +1,14 @@ package query import ( + "context" "fmt" "time" +) - goprocess "github.com/jbenet/goprocess" +const ( + NormalBufSize = 1 + KeysOnlyBufSize = 128 ) /* @@ -149,20 +153,17 @@ type Results interface { Next() <-chan Result // returns a channel to wait for the next result NextSync() (Result, bool) // blocks and waits to return the next result, second parameter returns false when results are exhausted Rest() ([]Entry, error) // waits till processing finishes, returns all entries at once. - Close() error // client may call Close to signal early exit - - // Process returns a goprocess.Process associated with these results. - // most users will not need this function (Close is all they want), - // but it's here in case you want to connect the results to other - // goprocess-friendly things. - Process() goprocess.Process + Close() // client may call Close to signal early exit + Done() <-chan struct{} // signals that Results is closed } // results implements Results type results struct { query Query - proc goprocess.Process res <-chan Result + + cancel context.CancelFunc + closed chan struct{} } func (r *results) Next() <-chan Result { @@ -182,104 +183,46 @@ func (r *results) Rest() ([]Entry, error) { } es = append(es, e.Entry) } - <-r.proc.Closed() // wait till the processing finishes. + <-r.Done() // wait till the processing finishes. return es, nil } -func (r *results) Process() goprocess.Process { - return r.proc -} - -func (r *results) Close() error { - return r.proc.Close() +func (r *results) Close() { + r.cancel() + <-r.closed } func (r *results) Query() Query { return r.query } -// ResultBuilder is what implementors use to construct results -// Implementors of datastores and their clients must respect the -// Process of the Request: -// -// - clients must call r.Process().Close() on an early exit, so -// implementations can reclaim resources. -// - if the Entries are read to completion (channel closed), Process -// should be closed automatically. -// - datastores must respect <-Process.Closing(), which intermediates -// an early close signal from the client. -type ResultBuilder struct { - Query Query - Process goprocess.Process - Output chan Result +func (r *results) Done() <-chan struct{} { + return r.closed } -// Results returns a Results to to this builder. -func (rb *ResultBuilder) Results() Results { - return &results{ - query: rb.Query, - proc: rb.Process, - res: rb.Output, - } -} - -const NormalBufSize = 1 -const KeysOnlyBufSize = 128 - -func NewResultBuilder(q Query) *ResultBuilder { +// ResultsWithContext returns a Results object with the results generated by +// the passed proc function called in a separate goroutine. +func ResultsWithContext(q Query, proc func(context.Context, chan<- Result)) Results { bufSize := NormalBufSize if q.KeysOnly { bufSize = KeysOnlyBufSize } - b := &ResultBuilder{ - Query: q, - Output: make(chan Result, bufSize), - } - b.Process = goprocess.WithTeardown(func() error { - close(b.Output) - return nil - }) - return b -} - -// ResultsWithChan returns a Results object from a channel -// of Result entries. -// -// DEPRECATED: This iterator is impossible to cancel correctly. Canceling it -// will leave anything trying to write to the result channel hanging. -func ResultsWithChan(q Query, res <-chan Result) Results { - return ResultsWithProcess(q, func(worker goprocess.Process, out chan<- Result) { - for { - select { - case <-worker.Closing(): // client told us to close early - return - case e, more := <-res: - if !more { - return - } - - select { - case out <- e: - case <-worker.Closing(): // client told us to close early - return - } - } - } - }) -} - -// ResultsWithProcess returns a Results object with the results generated by the -// passed subprocess. -func ResultsWithProcess(q Query, proc func(goprocess.Process, chan<- Result)) Results { - b := NewResultBuilder(q) + output := make(chan Result, bufSize) + closed := make(chan struct{}) + ctx, cancel := context.WithCancel(context.Background()) - // go consume all the entries and add them to the results. - b.Process.Go(func(worker goprocess.Process) { - proc(worker, b.Output) - }) + go func() { + proc(ctx, output) + close(output) + close(closed) + }() - go b.Process.CloseAfterChildren() //nolint - return b.Results() + return &results{ + query: q, + res: output, + cancel: cancel, + closed: closed, + } } // ResultsWithEntries returns a Results object from a list of entries @@ -301,14 +244,14 @@ func ResultsReplaceQuery(r Results, q Query) Results { switch r := r.(type) { case *results: // note: not using field names to make sure all fields are copied - return &results{q, r.proc, r.res} + return &results{q, r.res, r.cancel, r.closed} case *resultsIter: // note: not using field names to make sure all fields are copied - lr := r.legacyResults - if lr != nil { - lr = &results{q, lr.proc, lr.res} + oldr := r.results + if oldr != nil { + oldr = &results{q, oldr.res, oldr.cancel, oldr.closed} } - return &resultsIter{q, r.next, r.close, lr} + return &resultsIter{q, r.next, r.close, oldr} default: panic("unknown results type") } @@ -330,37 +273,34 @@ func ResultsFromIterator(q Query, iter Iterator) Results { } } -func noopClose() error { - return nil -} +func noopClose() {} type Iterator struct { Next func() (Result, bool) - Close func() error // note: might be called more than once + Close func() // note: might be called more than once } type resultsIter struct { - query Query - next func() (Result, bool) - close func() error - legacyResults *results + query Query + next func() (Result, bool) + close func() + results *results } func (r *resultsIter) Next() <-chan Result { - r.useLegacyResults() - return r.legacyResults.Next() + r.collectResults() + return r.results.Next() } func (r *resultsIter) NextSync() (Result, bool) { - if r.legacyResults != nil { - return r.legacyResults.NextSync() - } else { - res, ok := r.next() - if !ok { - r.close() - } - return res, ok + if r.results != nil { + return r.results.NextSync() } + res, ok := r.next() + if !ok { + r.close() + } + return res, ok } func (r *resultsIter) Rest() ([]Entry, error) { @@ -378,16 +318,13 @@ func (r *resultsIter) Rest() ([]Entry, error) { return es, nil } -func (r *resultsIter) Process() goprocess.Process { - r.useLegacyResults() - return r.legacyResults.Process() -} - -func (r *resultsIter) Close() error { - if r.legacyResults != nil { - return r.legacyResults.Close() +func (r *resultsIter) Close() { + if r.results != nil { + // Close results collector. It will call r.close(). + r.results.Close() } else { - return r.close() + // Call r.close() since there is no collector to call it when closed. + r.close() } } @@ -395,30 +332,29 @@ func (r *resultsIter) Query() Query { return r.query } -func (r *resultsIter) useLegacyResults() { - if r.legacyResults != nil { +func (r *resultsIter) Done() <-chan struct{} { + r.collectResults() + return r.results.Done() +} + +func (r *resultsIter) collectResults() { + if r.results != nil { return } - b := NewResultBuilder(r.query) - // go consume all the entries and add them to the results. - b.Process.Go(func(worker goprocess.Process) { + r.results = ResultsWithContext(r.query, func(ctx context.Context, out chan<- Result) { defer r.close() for { e, ok := r.next() if !ok { - break + return } select { - case b.Output <- e: - case <-worker.Closing(): // client told us to close early + case out <- e: + case <-ctx.Done(): // client told us to close early return } } - }) - - go b.Process.CloseAfterChildren() //nolint - - r.legacyResults = b.Results().(*results) + }).(*results) } diff --git a/query/query_impl.go b/query/query_impl.go index 4a4e79eb..403dddbd 100644 --- a/query/query_impl.go +++ b/query/query_impl.go @@ -18,8 +18,8 @@ func NaiveFilter(qr Results, filter Filter) Results { } } }, - Close: func() error { - return qr.Close() + Close: func() { + qr.Close() }, }) } @@ -36,22 +36,19 @@ func NaiveLimit(qr Results, limit int) Results { if limit == 0 { if !closed { closed = true - err := qr.Close() - if err != nil { - return Result{Error: err}, true - } + qr.Close() } return Result{}, false } limit-- return qr.NextSync() }, - Close: func() error { + Close: func() { if closed { - return nil + return } closed = true - return qr.Close() + qr.Close() }, }) } @@ -68,8 +65,8 @@ func NaiveOffset(qr Results, offset int) Results { } return qr.NextSync() }, - Close: func() error { - return qr.Close() + Close: func() { + qr.Close() }, }) } diff --git a/query/query_test.go b/query/query_test.go index 3347f09b..0b3195a5 100644 --- a/query/query_test.go +++ b/query/query_test.go @@ -200,7 +200,7 @@ func TestResultsFromIteratorNoClose(t *testing.T) { testResultsFromIterator(t, getKeysViaChan, nil) } -func testResultsFromIterator(t *testing.T, getKeys func(rs Results) []string, close func() error) { +func testResultsFromIterator(t *testing.T, getKeys func(rs Results) []string, close func()) { i := 0 results := ResultsFromIterator(Query{}, Iterator{ Next: func() (Result, bool) { @@ -221,9 +221,8 @@ func testResultsFromIterator(t *testing.T, getKeys func(rs Results) []string, cl func testResultsFromIteratorWClose(t *testing.T, getKeys func(rs Results) []string) { closeCalled := 0 - testResultsFromIterator(t, getKeys, func() error { + testResultsFromIterator(t, getKeys, func() { closeCalled++ - return nil }) if closeCalled != 1 { t.Errorf("close called %d times, expect it to be called just once", closeCalled) diff --git a/sync/sync.go b/sync/sync.go index 2cefa3fd..762482a1 100644 --- a/sync/sync.go +++ b/sync/sync.go @@ -90,13 +90,10 @@ func (d *MutexDatastore) Query(ctx context.Context, q dsq.Query) (dsq.Results, e return nil, err } - entries, err1 := results.Rest() - err2 := results.Close() - switch { - case err1 != nil: - return nil, err1 - case err2 != nil: - return nil, err2 + entries, err := results.Rest() + results.Close() + if err != nil { + return nil, err } return dsq.ResultsWithEntries(q, entries), nil }