Skip to content

Commit 682b39f

Browse files
committed
REesultsBuilder waits for results goroutine to finish before closing output channel
1 parent 65c4f51 commit 682b39f

File tree

1 file changed

+23
-6
lines changed

1 file changed

+23
-6
lines changed

query/query.go

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package query
33
import (
44
"context"
55
"fmt"
6+
"sync"
67
"time"
78
)
89

@@ -149,6 +150,7 @@ type Results interface {
149150
NextSync() (Result, bool) // blocks and waits to return the next result, second parameter returns false when results are exhausted
150151
Rest() ([]Entry, error) // waits till processing finishes, returns all entries at once.
151152
Close() // client may call Close to signal early exit
153+
Done() <-chan struct{} // signals that Results is closed
152154
}
153155

154156
// results implements Results
@@ -189,6 +191,10 @@ func (r *results) Query() Query {
189191
return r.query
190192
}
191193

194+
func (r *results) Done() <-chan struct{} {
195+
return r.ctx.Done()
196+
}
197+
192198
// ResultBuilder is what implementors use to construct results
193199
// Implementors of datastores and their clients must respect the
194200
// Process of the Request:
@@ -205,6 +211,7 @@ type ResultBuilder struct {
205211

206212
ctx context.Context
207213
cancel context.CancelFunc
214+
wg sync.WaitGroup
208215
}
209216

210217
// Results returns a Results to to this builder.
@@ -232,6 +239,7 @@ func NewResultBuilder(q Query) *ResultBuilder {
232239
}
233240
b.ctx, b.cancel = context.WithCancel(context.Background())
234241
context.AfterFunc(b.ctx, func() {
242+
b.wg.Wait()
235243
close(b.Output)
236244
})
237245
return b
@@ -240,8 +248,11 @@ func NewResultBuilder(q Query) *ResultBuilder {
240248
// ResultsWithChan returns a Results object from a channel
241249
// of Result entries.
242250
//
243-
// DEPRECATED: This iterator is impossible to cancel correctly. Canceling it
244-
// will leave anything trying to write to the result channel hanging.
251+
// DEPRECATED: This iterator takes sepcial care to cancel correctly. Canceling
252+
// it will leave anything trying to write to the result channel hanging, unless
253+
// that write can select the result channel and Results.Done(). This requires
254+
// creating the result channel, calline ResultsWithChan, and then writing to
255+
// the results channel.
245256
func ResultsWithChan(q Query, res <-chan Result) Results {
246257
return ResultsWithContext(q, func(ctx context.Context, out chan<- Result) {
247258
for {
@@ -263,14 +274,15 @@ func ResultsWithChan(q Query, res <-chan Result) Results {
263274
})
264275
}
265276

266-
// ResultsWithCtxs returns a Results object with the results generated by the
267-
// passed proc function called in a separate goroutine.
277+
// ResultsWithContext returns a Results object with the results generated by
278+
// the passed proc function called in a separate goroutine.
268279
func ResultsWithContext(q Query, proc func(context.Context, chan<- Result)) Results {
269280
b := NewResultBuilder(q)
270-
281+
b.wg.Add(1)
271282
go func() {
272-
defer b.cancel()
273283
proc(b.ctx, b.Output)
284+
b.cancel()
285+
b.wg.Done()
274286
}()
275287

276288
return b.Results()
@@ -381,6 +393,11 @@ func (r *resultsIter) Query() Query {
381393
return r.query
382394
}
383395

396+
func (r *resultsIter) Done() <-chan struct{} {
397+
r.useLegacyResults()
398+
return r.legacyResults.Done()
399+
}
400+
384401
func (r *resultsIter) useLegacyResults() {
385402
if r.legacyResults != nil {
386403
return

0 commit comments

Comments
 (0)