From fd12b84daeb175e9ce62e70e813a74d13fffc4e2 Mon Sep 17 00:00:00 2001 From: gammazero <11790789+gammazero@users.noreply.github.com> Date: Wed, 19 Feb 2025 07:32:09 -1000 Subject: [PATCH 1/9] remove goprocess from api --- query/query.go | 37 +++++++++---------------------------- 1 file changed, 9 insertions(+), 28 deletions(-) diff --git a/query/query.go b/query/query.go index 0a8898e..652efcc 100644 --- a/query/query.go +++ b/query/query.go @@ -150,12 +150,6 @@ type Results interface { NextSync() (Result, bool) // blocks and waits to return the next result, second parameter returns false when results are exhausted Rest() ([]Entry, error) // waits till processing finishes, returns all entries at once. Close() error // client may call Close to signal early exit - - // Process returns a goprocess.Process associated with these results. - // most users will not need this function (Close is all they want), - // but it's here in case you want to connect the results to other - // goprocess-friendly things. - Process() goprocess.Process } // results implements Results @@ -186,10 +180,6 @@ func (r *results) Rest() ([]Entry, error) { return es, nil } -func (r *results) Process() goprocess.Process { - return r.proc -} - func (r *results) Close() error { return r.proc.Close() } @@ -210,7 +200,7 @@ func (r *results) Query() Query { // an early close signal from the client. type ResultBuilder struct { Query Query - Process goprocess.Process + process goprocess.Process Output chan Result } @@ -218,7 +208,7 @@ type ResultBuilder struct { func (rb *ResultBuilder) Results() Results { return &results{ query: rb.Query, - proc: rb.Process, + proc: rb.process, res: rb.Output, } } @@ -235,7 +225,7 @@ func NewResultBuilder(q Query) *ResultBuilder { Query: q, Output: make(chan Result, bufSize), } - b.Process = goprocess.WithTeardown(func() error { + b.process = goprocess.WithTeardown(func() error { close(b.Output) return nil }) @@ -248,7 +238,7 @@ func NewResultBuilder(q Query) *ResultBuilder { // DEPRECATED: This iterator is impossible to cancel correctly. Canceling it // will leave anything trying to write to the result channel hanging. func ResultsWithChan(q Query, res <-chan Result) Results { - return ResultsWithProcess(q, func(worker goprocess.Process, out chan<- Result) { + proc := func(worker goprocess.Process, out chan<- Result) { for { select { case <-worker.Closing(): // client told us to close early @@ -265,20 +255,16 @@ func ResultsWithChan(q Query, res <-chan Result) Results { } } } - }) -} + } -// ResultsWithProcess returns a Results object with the results generated by the -// passed subprocess. -func ResultsWithProcess(q Query, proc func(goprocess.Process, chan<- Result)) Results { b := NewResultBuilder(q) // go consume all the entries and add them to the results. - b.Process.Go(func(worker goprocess.Process) { + b.process.Go(func(worker goprocess.Process) { proc(worker, b.Output) }) - go b.Process.CloseAfterChildren() //nolint + go b.process.CloseAfterChildren() //nolint return b.Results() } @@ -378,11 +364,6 @@ func (r *resultsIter) Rest() ([]Entry, error) { return es, nil } -func (r *resultsIter) Process() goprocess.Process { - r.useLegacyResults() - return r.legacyResults.Process() -} - func (r *resultsIter) Close() error { if r.legacyResults != nil { return r.legacyResults.Close() @@ -403,7 +384,7 @@ func (r *resultsIter) useLegacyResults() { b := NewResultBuilder(r.query) // go consume all the entries and add them to the results. - b.Process.Go(func(worker goprocess.Process) { + b.process.Go(func(worker goprocess.Process) { defer r.close() for { e, ok := r.next() @@ -418,7 +399,7 @@ func (r *resultsIter) useLegacyResults() { } }) - go b.Process.CloseAfterChildren() //nolint + go b.process.CloseAfterChildren() //nolint r.legacyResults = b.Results().(*results) } From 34ef082168325c7732b78b928262f7c49eebf963 Mon Sep 17 00:00:00 2001 From: gammazero <11790789+gammazero@users.noreply.github.com> Date: Wed, 19 Feb 2025 08:28:08 -1000 Subject: [PATCH 2/9] Replace goprocess with context - context.Context and context.AfterFunc provide all functionality previously provided by goprocess - Close functions do not need to return error --- keytransform/keytransform.go | 4 +- mount/mount.go | 21 ++------- mount/mount_test.go | 45 ++++--------------- namespace/namespace_test.go | 3 +- query/query.go | 85 ++++++++++++++++++------------------ query/query_impl.go | 19 ++++---- query/query_test.go | 5 +-- sync/sync.go | 11 ++--- 8 files changed, 72 insertions(+), 121 deletions(-) diff --git a/keytransform/keytransform.go b/keytransform/keytransform.go index c1c7414..3e4b8ed 100644 --- a/keytransform/keytransform.go +++ b/keytransform/keytransform.go @@ -94,8 +94,8 @@ func (d *Datastore) Query(ctx context.Context, q dsq.Query) (dsq.Results, error) } return r, true }, - Close: func() error { - return cqr.Close() + Close: func() { + cqr.Close() }, }) return dsq.NaiveQueryApply(nq, qr), nil diff --git a/mount/mount.go b/mount/mount.go index 027bdca..32737f4 100644 --- a/mount/mount.go +++ b/mount/mount.go @@ -101,13 +101,8 @@ func (qr *queryResults) advance() bool { qr.next = query.Result{} r, more := qr.results.NextSync() if !more { - err := qr.results.Close() + qr.results.Close() qr.results = nil - if err != nil { - // One more result, the error. - qr.next = query.Result{Error: err} - return true - } return false } @@ -145,19 +140,11 @@ func (h *querySet) Pop() interface{} { return last } -func (h *querySet) close() error { - var errs []error +func (h *querySet) close() { for _, qr := range h.heads { - err := qr.results.Close() - if err != nil { - errs = append(errs, err) - } + qr.results.Close() } h.heads = nil - if len(errs) > 0 { - return errs[0] - } - return nil } func (h *querySet) addResults(mount ds.Key, results query.Results) { @@ -339,7 +326,7 @@ func (d *Datastore) Query(ctx context.Context, master query.Query) (query.Result results, err := dstore.Query(ctx, qi) if err != nil { - _ = queries.close() + queries.close() return nil, err } queries.addResults(mount, results) diff --git a/mount/mount_test.go b/mount/mount_test.go index 4c12c58..fb765ea 100644 --- a/mount/mount_test.go +++ b/mount/mount_test.go @@ -263,10 +263,7 @@ func TestQuerySimple(t *testing.T) { t.Errorf("did not see wanted key %q in %+v", myKey, entries) } - err = res.Close() - if err != nil { - t.Errorf("result.Close failed %d", err) - } + res.Close() } func TestQueryAcrossMounts(t *testing.T) { @@ -307,10 +304,7 @@ func TestQueryAcrossMounts(t *testing.T) { } entries, err := res.Rest() if err != nil { - err = res.Close() - if err != nil { - t.Errorf("result.Close failed %d", err) - } + res.Close() t.Fatalf("Query Results.Rest fail: %v\n", err) } if len(entries) != len(values) { @@ -407,10 +401,7 @@ func TestQueryAcrossMountsWithSort(t *testing.T) { } } - err = res.Close() - if err != nil { - t.Errorf("result.Close failed %d", err) - } + res.Close() } func TestQueryLimitAcrossMountsWithSort(t *testing.T) { @@ -473,10 +464,7 @@ func TestQueryLimitAcrossMountsWithSort(t *testing.T) { } } - err = res.Close() - if err != nil { - t.Errorf("result.Close failed %d", err) - } + res.Close() } func TestQueryLimitAndOffsetAcrossMountsWithSort(t *testing.T) { @@ -540,10 +528,7 @@ func TestQueryLimitAndOffsetAcrossMountsWithSort(t *testing.T) { } } - err = res.Close() - if err != nil { - t.Errorf("result.Close failed %d", err) - } + res.Close() } func TestQueryFilterAcrossMountsWithSort(t *testing.T) { @@ -606,10 +591,7 @@ func TestQueryFilterAcrossMountsWithSort(t *testing.T) { } } - err = res.Close() - if err != nil { - t.Errorf("result.Close failed %d", err) - } + res.Close() } func TestQueryLimitAndOffsetWithNoData(t *testing.T) { @@ -639,10 +621,7 @@ func TestQueryLimitAndOffsetWithNoData(t *testing.T) { t.Fatalf("expected %d entries, but got %d", len(expect), len(entries)) } - err = res.Close() - if err != nil { - t.Errorf("result.Close failed %d", err) - } + res.Close() } func TestQueryLimitWithNotEnoughData(t *testing.T) { @@ -682,10 +661,7 @@ func TestQueryLimitWithNotEnoughData(t *testing.T) { t.Fatalf("expected %d entries, but got %d", len(expect), len(entries)) } - err = res.Close() - if err != nil { - t.Errorf("result.Close failed %d", err) - } + res.Close() } func TestQueryOffsetWithNotEnoughData(t *testing.T) { @@ -722,10 +698,7 @@ func TestQueryOffsetWithNotEnoughData(t *testing.T) { t.Fatalf("expected %d entries, but got %d", len(expect), len(entries)) } - err = res.Close() - if err != nil { - t.Errorf("result.Close failed %d", err) - } + res.Close() } func TestLookupPrio(t *testing.T) { diff --git a/namespace/namespace_test.go b/namespace/namespace_test.go index fac29e0..b873ddf 100644 --- a/namespace/namespace_test.go +++ b/namespace/namespace_test.go @@ -119,8 +119,7 @@ func (ks *DSSuite) TestQuery(c *C) { c.Check(string(ent.Value), Equals, string(expect[i].Value)) } - err = qres.Close() - c.Check(err, Equals, nil) + qres.Close() qres, err = nsds.Query(ctx, dsq.Query{Prefix: "bar"}) c.Check(err, Equals, nil) diff --git a/query/query.go b/query/query.go index 652efcc..61dd920 100644 --- a/query/query.go +++ b/query/query.go @@ -1,10 +1,9 @@ package query import ( + "context" "fmt" "time" - - goprocess "github.com/jbenet/goprocess" ) /* @@ -149,14 +148,16 @@ type Results interface { Next() <-chan Result // returns a channel to wait for the next result NextSync() (Result, bool) // blocks and waits to return the next result, second parameter returns false when results are exhausted Rest() ([]Entry, error) // waits till processing finishes, returns all entries at once. - Close() error // client may call Close to signal early exit + Close() // client may call Close to signal early exit } // results implements Results type results struct { query Query - proc goprocess.Process res <-chan Result + + ctx context.Context + cancel context.CancelFunc } func (r *results) Next() <-chan Result { @@ -176,12 +177,12 @@ func (r *results) Rest() ([]Entry, error) { } es = append(es, e.Entry) } - <-r.proc.Closed() // wait till the processing finishes. + <-r.ctx.Done() // wait till the processing finishes. return es, nil } -func (r *results) Close() error { - return r.proc.Close() +func (r *results) Close() { + r.cancel() } func (r *results) Query() Query { @@ -199,17 +200,21 @@ func (r *results) Query() Query { // - datastores must respect <-Process.Closing(), which intermediates // an early close signal from the client. type ResultBuilder struct { - Query Query - process goprocess.Process - Output chan Result + Query Query + Output chan Result + + ctx context.Context + cancel context.CancelFunc } // Results returns a Results to to this builder. func (rb *ResultBuilder) Results() Results { return &results{ query: rb.Query, - proc: rb.process, res: rb.Output, + + ctx: rb.ctx, + cancel: rb.cancel, } } @@ -225,9 +230,9 @@ func NewResultBuilder(q Query) *ResultBuilder { Query: q, Output: make(chan Result, bufSize), } - b.process = goprocess.WithTeardown(func() error { + b.ctx, b.cancel = context.WithCancel(context.Background()) + context.AfterFunc(b.ctx, func() { close(b.Output) - return nil }) return b } @@ -238,10 +243,11 @@ func NewResultBuilder(q Query) *ResultBuilder { // DEPRECATED: This iterator is impossible to cancel correctly. Canceling it // will leave anything trying to write to the result channel hanging. func ResultsWithChan(q Query, res <-chan Result) Results { - proc := func(worker goprocess.Process, out chan<- Result) { + proc := func(ctx context.Context, cancel context.CancelFunc, out chan<- Result) { + defer cancel() for { select { - case <-worker.Closing(): // client told us to close early + case <-ctx.Done(): // client told us to close early return case e, more := <-res: if !more { @@ -250,7 +256,7 @@ func ResultsWithChan(q Query, res <-chan Result) Results { select { case out <- e: - case <-worker.Closing(): // client told us to close early + case <-ctx.Done(): // client told us to close early return } } @@ -260,11 +266,8 @@ func ResultsWithChan(q Query, res <-chan Result) Results { b := NewResultBuilder(q) // go consume all the entries and add them to the results. - b.process.Go(func(worker goprocess.Process) { - proc(worker, b.Output) - }) + go proc(b.ctx, b.cancel, b.Output) - go b.process.CloseAfterChildren() //nolint return b.Results() } @@ -287,12 +290,12 @@ func ResultsReplaceQuery(r Results, q Query) Results { switch r := r.(type) { case *results: // note: not using field names to make sure all fields are copied - return &results{q, r.proc, r.res} + return &results{q, r.res, r.ctx, r.cancel} case *resultsIter: // note: not using field names to make sure all fields are copied lr := r.legacyResults if lr != nil { - lr = &results{q, lr.proc, lr.res} + lr = &results{q, lr.res, lr.ctx, lr.cancel} } return &resultsIter{q, r.next, r.close, lr} default: @@ -316,19 +319,17 @@ func ResultsFromIterator(q Query, iter Iterator) Results { } } -func noopClose() error { - return nil -} +func noopClose() {} type Iterator struct { Next func() (Result, bool) - Close func() error // note: might be called more than once + Close func() // note: might be called more than once } type resultsIter struct { query Query next func() (Result, bool) - close func() error + close func() legacyResults *results } @@ -340,13 +341,12 @@ func (r *resultsIter) Next() <-chan Result { func (r *resultsIter) NextSync() (Result, bool) { if r.legacyResults != nil { return r.legacyResults.NextSync() - } else { - res, ok := r.next() - if !ok { - r.close() - } - return res, ok } + res, ok := r.next() + if !ok { + r.close() + } + return res, ok } func (r *resultsIter) Rest() ([]Entry, error) { @@ -364,11 +364,11 @@ func (r *resultsIter) Rest() ([]Entry, error) { return es, nil } -func (r *resultsIter) Close() error { +func (r *resultsIter) Close() { if r.legacyResults != nil { - return r.legacyResults.Close() + r.legacyResults.Close() } else { - return r.close() + r.close() } } @@ -384,22 +384,21 @@ func (r *resultsIter) useLegacyResults() { b := NewResultBuilder(r.query) // go consume all the entries and add them to the results. - b.process.Go(func(worker goprocess.Process) { + go func(ctx context.Context, cancel context.CancelFunc, out chan<- Result) { + defer cancel() defer r.close() for { e, ok := r.next() if !ok { - break + return } select { - case b.Output <- e: - case <-worker.Closing(): // client told us to close early + case out <- e: + case <-ctx.Done(): // client told us to close early return } } - }) - - go b.process.CloseAfterChildren() //nolint + }(b.ctx, b.cancel, b.Output) r.legacyResults = b.Results().(*results) } diff --git a/query/query_impl.go b/query/query_impl.go index 4a4e79e..403dddb 100644 --- a/query/query_impl.go +++ b/query/query_impl.go @@ -18,8 +18,8 @@ func NaiveFilter(qr Results, filter Filter) Results { } } }, - Close: func() error { - return qr.Close() + Close: func() { + qr.Close() }, }) } @@ -36,22 +36,19 @@ func NaiveLimit(qr Results, limit int) Results { if limit == 0 { if !closed { closed = true - err := qr.Close() - if err != nil { - return Result{Error: err}, true - } + qr.Close() } return Result{}, false } limit-- return qr.NextSync() }, - Close: func() error { + Close: func() { if closed { - return nil + return } closed = true - return qr.Close() + qr.Close() }, }) } @@ -68,8 +65,8 @@ func NaiveOffset(qr Results, offset int) Results { } return qr.NextSync() }, - Close: func() error { - return qr.Close() + Close: func() { + qr.Close() }, }) } diff --git a/query/query_test.go b/query/query_test.go index 3347f09..0b3195a 100644 --- a/query/query_test.go +++ b/query/query_test.go @@ -200,7 +200,7 @@ func TestResultsFromIteratorNoClose(t *testing.T) { testResultsFromIterator(t, getKeysViaChan, nil) } -func testResultsFromIterator(t *testing.T, getKeys func(rs Results) []string, close func() error) { +func testResultsFromIterator(t *testing.T, getKeys func(rs Results) []string, close func()) { i := 0 results := ResultsFromIterator(Query{}, Iterator{ Next: func() (Result, bool) { @@ -221,9 +221,8 @@ func testResultsFromIterator(t *testing.T, getKeys func(rs Results) []string, cl func testResultsFromIteratorWClose(t *testing.T, getKeys func(rs Results) []string) { closeCalled := 0 - testResultsFromIterator(t, getKeys, func() error { + testResultsFromIterator(t, getKeys, func() { closeCalled++ - return nil }) if closeCalled != 1 { t.Errorf("close called %d times, expect it to be called just once", closeCalled) diff --git a/sync/sync.go b/sync/sync.go index 2cefa3f..762482a 100644 --- a/sync/sync.go +++ b/sync/sync.go @@ -90,13 +90,10 @@ func (d *MutexDatastore) Query(ctx context.Context, q dsq.Query) (dsq.Results, e return nil, err } - entries, err1 := results.Rest() - err2 := results.Close() - switch { - case err1 != nil: - return nil, err1 - case err2 != nil: - return nil, err2 + entries, err := results.Rest() + results.Close() + if err != nil { + return nil, err } return dsq.ResultsWithEntries(q, entries), nil } From e36c4b9c7e666e31ee7348401d4dc14d2d6dc193 Mon Sep 17 00:00:00 2001 From: gammazero <11790789+gammazero@users.noreply.github.com> Date: Wed, 19 Feb 2025 08:47:49 -1000 Subject: [PATCH 3/9] Replace ResultsWithProcess with ResultsWithContext --- query/query.go | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/query/query.go b/query/query.go index 61dd920..919c23a 100644 --- a/query/query.go +++ b/query/query.go @@ -243,8 +243,7 @@ func NewResultBuilder(q Query) *ResultBuilder { // DEPRECATED: This iterator is impossible to cancel correctly. Canceling it // will leave anything trying to write to the result channel hanging. func ResultsWithChan(q Query, res <-chan Result) Results { - proc := func(ctx context.Context, cancel context.CancelFunc, out chan<- Result) { - defer cancel() + return ResultsWithContext(q, func(ctx context.Context, out chan<- Result) { for { select { case <-ctx.Done(): // client told us to close early @@ -261,12 +260,18 @@ func ResultsWithChan(q Query, res <-chan Result) Results { } } } - } + }) +} +// ResultsWithCtxs returns a Results object with the results generated by the +// passed proc function called in a separate goroutine. +func ResultsWithContext(q Query, proc func(context.Context, chan<- Result)) Results { b := NewResultBuilder(q) - // go consume all the entries and add them to the results. - go proc(b.ctx, b.cancel, b.Output) + go func() { + defer b.cancel() + proc(b.ctx, b.Output) + }() return b.Results() } From 4733bce78f593d685d426d7b2479908255f6c36b Mon Sep 17 00:00:00 2001 From: gammazero <11790789+gammazero@users.noreply.github.com> Date: Wed, 19 Feb 2025 08:59:03 -1000 Subject: [PATCH 4/9] mod tidy --- go.mod | 1 - go.sum | 3 --- 2 files changed, 4 deletions(-) diff --git a/go.mod b/go.mod index 7f5994c..aba0ffa 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,6 @@ module github.com/ipfs/go-datastore require ( github.com/google/uuid v1.6.0 github.com/ipfs/go-ipfs-delay v0.0.1 - github.com/jbenet/goprocess v0.1.4 go.opentelemetry.io/otel v1.16.0 go.opentelemetry.io/otel/trace v1.16.0 go.uber.org/multierr v1.11.0 diff --git a/go.sum b/go.sum index d24796b..4d8c31b 100644 --- a/go.sum +++ b/go.sum @@ -11,9 +11,6 @@ github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/ipfs/go-ipfs-delay v0.0.1 h1:r/UXYyRcddO6thwOnhiznIAiSvxMECGgtv35Xs1IeRQ= github.com/ipfs/go-ipfs-delay v0.0.1/go.mod h1:8SP1YXK1M1kXuc4KJZINY3TQQ03J2rwBG9QfXmbRPrw= -github.com/jbenet/go-cienv v0.1.0/go.mod h1:TqNnHUmJgXau0nCzC7kXWeotg3J9W34CUv5Djy1+FlA= -github.com/jbenet/goprocess v0.1.4 h1:DRGOFReOMqqDNXwW70QkacFW0YN9QnwLV0Vqk+3oU0o= -github.com/jbenet/goprocess v0.1.4/go.mod h1:5yspPrukOVuOLORacaBi858NqyClJPQxYZlqdZVfqY4= github.com/kr/pretty v0.2.0 h1:s5hAObm+yFO5uHYt5dYjxi2rXrsnmRpJx4OYvIWUaQs= github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= From 1d3418bc546f0fbac85a65dfa82c75c5fa109140 Mon Sep 17 00:00:00 2001 From: gammazero <11790789+gammazero@users.noreply.github.com> Date: Wed, 19 Feb 2025 10:40:14 -1000 Subject: [PATCH 5/9] REesultsBuilder waits for results goroutine to finish before closing output channel --- query/query.go | 29 +++++++++++++++++++++++------ 1 file changed, 23 insertions(+), 6 deletions(-) diff --git a/query/query.go b/query/query.go index 919c23a..c564a66 100644 --- a/query/query.go +++ b/query/query.go @@ -3,6 +3,7 @@ package query import ( "context" "fmt" + "sync" "time" ) @@ -149,6 +150,7 @@ type Results interface { NextSync() (Result, bool) // blocks and waits to return the next result, second parameter returns false when results are exhausted Rest() ([]Entry, error) // waits till processing finishes, returns all entries at once. Close() // client may call Close to signal early exit + Done() <-chan struct{} // signals that Results is closed } // results implements Results @@ -189,6 +191,10 @@ func (r *results) Query() Query { return r.query } +func (r *results) Done() <-chan struct{} { + return r.ctx.Done() +} + // ResultBuilder is what implementors use to construct results // Implementors of datastores and their clients must respect the // Process of the Request: @@ -205,6 +211,7 @@ type ResultBuilder struct { ctx context.Context cancel context.CancelFunc + wg sync.WaitGroup } // Results returns a Results to to this builder. @@ -232,6 +239,7 @@ func NewResultBuilder(q Query) *ResultBuilder { } b.ctx, b.cancel = context.WithCancel(context.Background()) context.AfterFunc(b.ctx, func() { + b.wg.Wait() close(b.Output) }) return b @@ -240,8 +248,11 @@ func NewResultBuilder(q Query) *ResultBuilder { // ResultsWithChan returns a Results object from a channel // of Result entries. // -// DEPRECATED: This iterator is impossible to cancel correctly. Canceling it -// will leave anything trying to write to the result channel hanging. +// DEPRECATED: This iterator takes sepcial care to cancel correctly. Canceling +// it will leave anything trying to write to the result channel hanging, unless +// that write can select the result channel and Results.Done(). This requires +// creating the result channel, calline ResultsWithChan, and then writing to +// the results channel. func ResultsWithChan(q Query, res <-chan Result) Results { return ResultsWithContext(q, func(ctx context.Context, out chan<- Result) { for { @@ -263,14 +274,15 @@ func ResultsWithChan(q Query, res <-chan Result) Results { }) } -// ResultsWithCtxs returns a Results object with the results generated by the -// passed proc function called in a separate goroutine. +// ResultsWithContext returns a Results object with the results generated by +// the passed proc function called in a separate goroutine. func ResultsWithContext(q Query, proc func(context.Context, chan<- Result)) Results { b := NewResultBuilder(q) - + b.wg.Add(1) go func() { - defer b.cancel() proc(b.ctx, b.Output) + b.cancel() + b.wg.Done() }() return b.Results() @@ -381,6 +393,11 @@ func (r *resultsIter) Query() Query { return r.query } +func (r *resultsIter) Done() <-chan struct{} { + r.useLegacyResults() + return r.legacyResults.Done() +} + func (r *resultsIter) useLegacyResults() { if r.legacyResults != nil { return From 1fb456819d6c440481fa3a793c3e365b7afd3d47 Mon Sep 17 00:00:00 2001 From: gammazero <11790789+gammazero@users.noreply.github.com> Date: Wed, 19 Feb 2025 14:40:09 -1000 Subject: [PATCH 6/9] separate query closing and query closed signals --- query/query.go | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/query/query.go b/query/query.go index c564a66..90c30a6 100644 --- a/query/query.go +++ b/query/query.go @@ -158,8 +158,8 @@ type results struct { query Query res <-chan Result - ctx context.Context cancel context.CancelFunc + closed chan struct{} } func (r *results) Next() <-chan Result { @@ -179,7 +179,7 @@ func (r *results) Rest() ([]Entry, error) { } es = append(es, e.Entry) } - <-r.ctx.Done() // wait till the processing finishes. + <-r.Done() // wait till the processing finishes. return es, nil } @@ -192,7 +192,7 @@ func (r *results) Query() Query { } func (r *results) Done() <-chan struct{} { - return r.ctx.Done() + return r.closed } // ResultBuilder is what implementors use to construct results @@ -211,6 +211,7 @@ type ResultBuilder struct { ctx context.Context cancel context.CancelFunc + closed chan struct{} wg sync.WaitGroup } @@ -220,8 +221,8 @@ func (rb *ResultBuilder) Results() Results { query: rb.Query, res: rb.Output, - ctx: rb.ctx, cancel: rb.cancel, + closed: rb.closed, } } @@ -236,11 +237,14 @@ func NewResultBuilder(q Query) *ResultBuilder { b := &ResultBuilder{ Query: q, Output: make(chan Result, bufSize), + closed: make(chan struct{}), } + b.ctx, b.cancel = context.WithCancel(context.Background()) context.AfterFunc(b.ctx, func() { b.wg.Wait() close(b.Output) + close(b.closed) }) return b } @@ -307,12 +311,12 @@ func ResultsReplaceQuery(r Results, q Query) Results { switch r := r.(type) { case *results: // note: not using field names to make sure all fields are copied - return &results{q, r.res, r.ctx, r.cancel} + return &results{q, r.res, r.cancel, r.closed} case *resultsIter: // note: not using field names to make sure all fields are copied lr := r.legacyResults if lr != nil { - lr = &results{q, lr.res, lr.ctx, lr.cancel} + lr = &results{q, lr.res, lr.cancel, lr.closed} } return &resultsIter{q, r.next, r.close, lr} default: From b6d542d0afb8443cfd9a71c84a3aafd9fee04b18 Mon Sep 17 00:00:00 2001 From: gammazero <11790789+gammazero@users.noreply.github.com> Date: Wed, 19 Feb 2025 14:58:05 -1000 Subject: [PATCH 7/9] results.Close needs to wait for closed signal --- query/query.go | 1 + 1 file changed, 1 insertion(+) diff --git a/query/query.go b/query/query.go index 90c30a6..51ec50c 100644 --- a/query/query.go +++ b/query/query.go @@ -185,6 +185,7 @@ func (r *results) Rest() ([]Entry, error) { func (r *results) Close() { r.cancel() + <-r.closed } func (r *results) Query() Query { From 1676d105e58840f27a21b8cd75f6ae8a57365b85 Mon Sep 17 00:00:00 2001 From: gammazero <11790789+gammazero@users.noreply.github.com> Date: Tue, 25 Feb 2025 10:46:39 -1000 Subject: [PATCH 8/9] Remove ResultBuilder to simplify query API The ResultBuilder is unnecessary as all of its functionality is available through ResultsWithContext. Also remove depricated ResultsWithChan function. --- examples/fs.go | 25 +++++++++++- query/query.go | 107 ++++++++++++++++++++----------------------------- 2 files changed, 68 insertions(+), 64 deletions(-) diff --git a/examples/fs.go b/examples/fs.go index bb62aa3..a0ee579 100644 --- a/examples/fs.go +++ b/examples/fs.go @@ -134,7 +134,30 @@ func (d *Datastore) Query(ctx context.Context, q query.Query) (query.Results, er filepath.Walk(d.path, walkFn) close(results) }() - r := query.ResultsWithChan(q, results) + + r := query.ResultsWithContext(q, func(ctx context.Context, out chan<- query.Result) { + loop: + for { + select { + case <-ctx.Done(): // client told us to close early + break loop + case e, more := <-results: + if !more { + return + } + + select { + case out <- e: + case <-ctx.Done(): // client told us to close early + break loop + } + } + } + // Drain results on cancel so writer is unblocked. + for range results { + } + }) + r = query.NaiveQueryApply(q, r) return r, nil } diff --git a/query/query.go b/query/query.go index 51ec50c..b775ae6 100644 --- a/query/query.go +++ b/query/query.go @@ -7,6 +7,11 @@ import ( "time" ) +const ( + NormalBufSize = 1 + KeysOnlyBufSize = 128 +) + /* Query represents storage for any key-value pair. @@ -227,9 +232,6 @@ func (rb *ResultBuilder) Results() Results { } } -const NormalBufSize = 1 -const KeysOnlyBufSize = 128 - func NewResultBuilder(q Query) *ResultBuilder { bufSize := NormalBufSize if q.KeysOnly { @@ -250,47 +252,29 @@ func NewResultBuilder(q Query) *ResultBuilder { return b } -// ResultsWithChan returns a Results object from a channel -// of Result entries. -// -// DEPRECATED: This iterator takes sepcial care to cancel correctly. Canceling -// it will leave anything trying to write to the result channel hanging, unless -// that write can select the result channel and Results.Done(). This requires -// creating the result channel, calline ResultsWithChan, and then writing to -// the results channel. -func ResultsWithChan(q Query, res <-chan Result) Results { - return ResultsWithContext(q, func(ctx context.Context, out chan<- Result) { - for { - select { - case <-ctx.Done(): // client told us to close early - return - case e, more := <-res: - if !more { - return - } - - select { - case out <- e: - case <-ctx.Done(): // client told us to close early - return - } - } - } - }) -} - // ResultsWithContext returns a Results object with the results generated by // the passed proc function called in a separate goroutine. func ResultsWithContext(q Query, proc func(context.Context, chan<- Result)) Results { - b := NewResultBuilder(q) - b.wg.Add(1) + bufSize := NormalBufSize + if q.KeysOnly { + bufSize = KeysOnlyBufSize + } + output := make(chan Result, bufSize) + closed := make(chan struct{}) + ctx, cancel := context.WithCancel(context.Background()) + go func() { - proc(b.ctx, b.Output) - b.cancel() - b.wg.Done() + proc(ctx, output) + close(output) + close(closed) }() - return b.Results() + return &results{ + query: q, + res: output, + cancel: cancel, + closed: closed, + } } // ResultsWithEntries returns a Results object from a list of entries @@ -315,11 +299,11 @@ func ResultsReplaceQuery(r Results, q Query) Results { return &results{q, r.res, r.cancel, r.closed} case *resultsIter: // note: not using field names to make sure all fields are copied - lr := r.legacyResults - if lr != nil { - lr = &results{q, lr.res, lr.cancel, lr.closed} + oldr := r.results + if oldr != nil { + oldr = &results{q, oldr.res, oldr.cancel, oldr.closed} } - return &resultsIter{q, r.next, r.close, lr} + return &resultsIter{q, r.next, r.close, oldr} default: panic("unknown results type") } @@ -349,20 +333,20 @@ type Iterator struct { } type resultsIter struct { - query Query - next func() (Result, bool) - close func() - legacyResults *results + query Query + next func() (Result, bool) + close func() + results *results } func (r *resultsIter) Next() <-chan Result { - r.useLegacyResults() - return r.legacyResults.Next() + r.collectResults() + return r.results.Next() } func (r *resultsIter) NextSync() (Result, bool) { - if r.legacyResults != nil { - return r.legacyResults.NextSync() + if r.results != nil { + return r.results.NextSync() } res, ok := r.next() if !ok { @@ -387,9 +371,11 @@ func (r *resultsIter) Rest() ([]Entry, error) { } func (r *resultsIter) Close() { - if r.legacyResults != nil { - r.legacyResults.Close() + if r.results != nil { + // Close results collector. It will call r.close(). + r.results.Close() } else { + // Call r.close() since there is no collector to call it when closed. r.close() } } @@ -399,20 +385,17 @@ func (r *resultsIter) Query() Query { } func (r *resultsIter) Done() <-chan struct{} { - r.useLegacyResults() - return r.legacyResults.Done() + r.collectResults() + return r.results.Done() } -func (r *resultsIter) useLegacyResults() { - if r.legacyResults != nil { +func (r *resultsIter) collectResults() { + if r.results != nil { return } - b := NewResultBuilder(r.query) - // go consume all the entries and add them to the results. - go func(ctx context.Context, cancel context.CancelFunc, out chan<- Result) { - defer cancel() + r.results = ResultsWithContext(r.query, func(ctx context.Context, out chan<- Result) { defer r.close() for { e, ok := r.next() @@ -425,7 +408,5 @@ func (r *resultsIter) useLegacyResults() { return } } - }(b.ctx, b.cancel, b.Output) - - r.legacyResults = b.Results().(*results) + }).(*results) } From 0dbca94435fd67a857f3533b328871dc7518ce17 Mon Sep 17 00:00:00 2001 From: gammazero <11790789+gammazero@users.noreply.github.com> Date: Tue, 25 Feb 2025 14:00:51 -1000 Subject: [PATCH 9/9] remove ResultBuilder --- query/query.go | 52 -------------------------------------------------- 1 file changed, 52 deletions(-) diff --git a/query/query.go b/query/query.go index b775ae6..d769377 100644 --- a/query/query.go +++ b/query/query.go @@ -3,7 +3,6 @@ package query import ( "context" "fmt" - "sync" "time" ) @@ -201,57 +200,6 @@ func (r *results) Done() <-chan struct{} { return r.closed } -// ResultBuilder is what implementors use to construct results -// Implementors of datastores and their clients must respect the -// Process of the Request: -// -// - clients must call r.Process().Close() on an early exit, so -// implementations can reclaim resources. -// - if the Entries are read to completion (channel closed), Process -// should be closed automatically. -// - datastores must respect <-Process.Closing(), which intermediates -// an early close signal from the client. -type ResultBuilder struct { - Query Query - Output chan Result - - ctx context.Context - cancel context.CancelFunc - closed chan struct{} - wg sync.WaitGroup -} - -// Results returns a Results to to this builder. -func (rb *ResultBuilder) Results() Results { - return &results{ - query: rb.Query, - res: rb.Output, - - cancel: rb.cancel, - closed: rb.closed, - } -} - -func NewResultBuilder(q Query) *ResultBuilder { - bufSize := NormalBufSize - if q.KeysOnly { - bufSize = KeysOnlyBufSize - } - b := &ResultBuilder{ - Query: q, - Output: make(chan Result, bufSize), - closed: make(chan struct{}), - } - - b.ctx, b.cancel = context.WithCancel(context.Background()) - context.AfterFunc(b.ctx, func() { - b.wg.Wait() - close(b.Output) - close(b.closed) - }) - return b -} - // ResultsWithContext returns a Results object with the results generated by // the passed proc function called in a separate goroutine. func ResultsWithContext(q Query, proc func(context.Context, chan<- Result)) Results {