1
1
package query
2
2
3
3
import (
4
+ "context"
4
5
"fmt"
5
6
"time"
6
-
7
- goprocess "github.com/jbenet/goprocess"
8
7
)
9
8
10
9
/*
@@ -149,14 +148,16 @@ type Results interface {
149
148
Next () <- chan Result // returns a channel to wait for the next result
150
149
NextSync () (Result , bool ) // blocks and waits to return the next result, second parameter returns false when results are exhausted
151
150
Rest () ([]Entry , error ) // waits till processing finishes, returns all entries at once.
152
- Close () error // client may call Close to signal early exit
151
+ Close () // client may call Close to signal early exit
153
152
}
154
153
155
154
// results implements Results
156
155
type results struct {
157
156
query Query
158
- proc goprocess.Process
159
157
res <- chan Result
158
+
159
+ ctx context.Context
160
+ cancel context.CancelFunc
160
161
}
161
162
162
163
func (r * results ) Next () <- chan Result {
@@ -176,12 +177,12 @@ func (r *results) Rest() ([]Entry, error) {
176
177
}
177
178
es = append (es , e .Entry )
178
179
}
179
- <- r .proc . Closed () // wait till the processing finishes.
180
+ <- r .ctx . Done () // wait till the processing finishes.
180
181
return es , nil
181
182
}
182
183
183
- func (r * results ) Close () error {
184
- return r . proc . Close ()
184
+ func (r * results ) Close () {
185
+ r . cancel ()
185
186
}
186
187
187
188
func (r * results ) Query () Query {
@@ -199,17 +200,21 @@ func (r *results) Query() Query {
199
200
// - datastores must respect <-Process.Closing(), which intermediates
200
201
// an early close signal from the client.
201
202
type ResultBuilder struct {
202
- Query Query
203
- process goprocess.Process
204
- Output chan Result
203
+ Query Query
204
+ Output chan Result
205
+
206
+ ctx context.Context
207
+ cancel context.CancelFunc
205
208
}
206
209
207
210
// Results returns a Results to to this builder.
208
211
func (rb * ResultBuilder ) Results () Results {
209
212
return & results {
210
213
query : rb .Query ,
211
- proc : rb .process ,
212
214
res : rb .Output ,
215
+
216
+ ctx : rb .ctx ,
217
+ cancel : rb .cancel ,
213
218
}
214
219
}
215
220
@@ -225,9 +230,9 @@ func NewResultBuilder(q Query) *ResultBuilder {
225
230
Query : q ,
226
231
Output : make (chan Result , bufSize ),
227
232
}
228
- b .process = goprocess .WithTeardown (func () error {
233
+ b .ctx , b .cancel = context .WithCancel (context .Background ())
234
+ context .AfterFunc (b .ctx , func () {
229
235
close (b .Output )
230
- return nil
231
236
})
232
237
return b
233
238
}
@@ -238,10 +243,11 @@ func NewResultBuilder(q Query) *ResultBuilder {
238
243
// DEPRECATED: This iterator is impossible to cancel correctly. Canceling it
239
244
// will leave anything trying to write to the result channel hanging.
240
245
func ResultsWithChan (q Query , res <- chan Result ) Results {
241
- proc := func (worker goprocess.Process , out chan <- Result ) {
246
+ proc := func (ctx context.Context , cancel context.CancelFunc , out chan <- Result ) {
247
+ defer cancel ()
242
248
for {
243
249
select {
244
- case <- worker . Closing (): // client told us to close early
250
+ case <- ctx . Done (): // client told us to close early
245
251
return
246
252
case e , more := <- res :
247
253
if ! more {
@@ -250,7 +256,7 @@ func ResultsWithChan(q Query, res <-chan Result) Results {
250
256
251
257
select {
252
258
case out <- e :
253
- case <- worker . Closing (): // client told us to close early
259
+ case <- ctx . Done (): // client told us to close early
254
260
return
255
261
}
256
262
}
@@ -260,11 +266,8 @@ func ResultsWithChan(q Query, res <-chan Result) Results {
260
266
b := NewResultBuilder (q )
261
267
262
268
// go consume all the entries and add them to the results.
263
- b .process .Go (func (worker goprocess.Process ) {
264
- proc (worker , b .Output )
265
- })
269
+ go proc (b .ctx , b .cancel , b .Output )
266
270
267
- go b .process .CloseAfterChildren () //nolint
268
271
return b .Results ()
269
272
}
270
273
@@ -287,12 +290,12 @@ func ResultsReplaceQuery(r Results, q Query) Results {
287
290
switch r := r .(type ) {
288
291
case * results :
289
292
// note: not using field names to make sure all fields are copied
290
- return & results {q , r .proc , r .res }
293
+ return & results {q , r .res , r .ctx , r . cancel }
291
294
case * resultsIter :
292
295
// note: not using field names to make sure all fields are copied
293
296
lr := r .legacyResults
294
297
if lr != nil {
295
- lr = & results {q , lr .proc , lr .res }
298
+ lr = & results {q , lr .res , lr .ctx , lr . cancel }
296
299
}
297
300
return & resultsIter {q , r .next , r .close , lr }
298
301
default :
@@ -316,19 +319,17 @@ func ResultsFromIterator(q Query, iter Iterator) Results {
316
319
}
317
320
}
318
321
319
- func noopClose () error {
320
- return nil
321
- }
322
+ func noopClose () {}
322
323
323
324
type Iterator struct {
324
325
Next func () (Result , bool )
325
- Close func () error // note: might be called more than once
326
+ Close func () // note: might be called more than once
326
327
}
327
328
328
329
type resultsIter struct {
329
330
query Query
330
331
next func () (Result , bool )
331
- close func () error
332
+ close func ()
332
333
legacyResults * results
333
334
}
334
335
@@ -340,13 +341,12 @@ func (r *resultsIter) Next() <-chan Result {
340
341
func (r * resultsIter ) NextSync () (Result , bool ) {
341
342
if r .legacyResults != nil {
342
343
return r .legacyResults .NextSync ()
343
- } else {
344
- res , ok := r .next ()
345
- if ! ok {
346
- r .close ()
347
- }
348
- return res , ok
349
344
}
345
+ res , ok := r .next ()
346
+ if ! ok {
347
+ r .close ()
348
+ }
349
+ return res , ok
350
350
}
351
351
352
352
func (r * resultsIter ) Rest () ([]Entry , error ) {
@@ -364,11 +364,11 @@ func (r *resultsIter) Rest() ([]Entry, error) {
364
364
return es , nil
365
365
}
366
366
367
- func (r * resultsIter ) Close () error {
367
+ func (r * resultsIter ) Close () {
368
368
if r .legacyResults != nil {
369
- return r .legacyResults .Close ()
369
+ r .legacyResults .Close ()
370
370
} else {
371
- return r .close ()
371
+ r .close ()
372
372
}
373
373
}
374
374
@@ -384,22 +384,21 @@ func (r *resultsIter) useLegacyResults() {
384
384
b := NewResultBuilder (r .query )
385
385
386
386
// go consume all the entries and add them to the results.
387
- b .process .Go (func (worker goprocess.Process ) {
387
+ go func (ctx context.Context , cancel context.CancelFunc , out chan <- Result ) {
388
+ defer cancel ()
388
389
defer r .close ()
389
390
for {
390
391
e , ok := r .next ()
391
392
if ! ok {
392
- break
393
+ return
393
394
}
394
395
select {
395
- case b . Output <- e :
396
- case <- worker . Closing (): // client told us to close early
396
+ case out <- e :
397
+ case <- ctx . Done (): // client told us to close early
397
398
return
398
399
}
399
400
}
400
- })
401
-
402
- go b .process .CloseAfterChildren () //nolint
401
+ }(b .ctx , b .cancel , b .Output )
403
402
404
403
r .legacyResults = b .Results ().(* results )
405
404
}
0 commit comments