@@ -7,6 +7,11 @@ import (
7
7
"time"
8
8
)
9
9
10
+ const (
11
+ NormalBufSize = 1
12
+ KeysOnlyBufSize = 128
13
+ )
14
+
10
15
/*
11
16
Query represents storage for any key-value pair.
12
17
@@ -227,9 +232,6 @@ func (rb *ResultBuilder) Results() Results {
227
232
}
228
233
}
229
234
230
- const NormalBufSize = 1
231
- const KeysOnlyBufSize = 128
232
-
233
235
func NewResultBuilder (q Query ) * ResultBuilder {
234
236
bufSize := NormalBufSize
235
237
if q .KeysOnly {
@@ -250,47 +252,29 @@ func NewResultBuilder(q Query) *ResultBuilder {
250
252
return b
251
253
}
252
254
253
- // ResultsWithChan returns a Results object from a channel
254
- // of Result entries.
255
- //
256
- // DEPRECATED: This iterator takes sepcial care to cancel correctly. Canceling
257
- // it will leave anything trying to write to the result channel hanging, unless
258
- // that write can select the result channel and Results.Done(). This requires
259
- // creating the result channel, calline ResultsWithChan, and then writing to
260
- // the results channel.
261
- func ResultsWithChan (q Query , res <- chan Result ) Results {
262
- return ResultsWithContext (q , func (ctx context.Context , out chan <- Result ) {
263
- for {
264
- select {
265
- case <- ctx .Done (): // client told us to close early
266
- return
267
- case e , more := <- res :
268
- if ! more {
269
- return
270
- }
271
-
272
- select {
273
- case out <- e :
274
- case <- ctx .Done (): // client told us to close early
275
- return
276
- }
277
- }
278
- }
279
- })
280
- }
281
-
282
255
// ResultsWithContext returns a Results object with the results generated by
283
256
// the passed proc function called in a separate goroutine.
284
257
func ResultsWithContext (q Query , proc func (context.Context , chan <- Result )) Results {
285
- b := NewResultBuilder (q )
286
- b .wg .Add (1 )
258
+ bufSize := NormalBufSize
259
+ if q .KeysOnly {
260
+ bufSize = KeysOnlyBufSize
261
+ }
262
+ output := make (chan Result , bufSize )
263
+ closed := make (chan struct {})
264
+ ctx , cancel := context .WithCancel (context .Background ())
265
+
287
266
go func () {
288
- proc (b . ctx , b . Output )
289
- b . cancel ( )
290
- b . wg . Done ( )
267
+ proc (ctx , output )
268
+ close ( output )
269
+ close ( closed )
291
270
}()
292
271
293
- return b .Results ()
272
+ return & results {
273
+ query : q ,
274
+ res : output ,
275
+ cancel : cancel ,
276
+ closed : closed ,
277
+ }
294
278
}
295
279
296
280
// ResultsWithEntries returns a Results object from a list of entries
@@ -315,11 +299,11 @@ func ResultsReplaceQuery(r Results, q Query) Results {
315
299
return & results {q , r .res , r .cancel , r .closed }
316
300
case * resultsIter :
317
301
// note: not using field names to make sure all fields are copied
318
- lr := r .legacyResults
319
- if lr != nil {
320
- lr = & results {q , lr .res , lr .cancel , lr .closed }
302
+ oldr := r .results
303
+ if oldr != nil {
304
+ oldr = & results {q , oldr .res , oldr .cancel , oldr .closed }
321
305
}
322
- return & resultsIter {q , r .next , r .close , lr }
306
+ return & resultsIter {q , r .next , r .close , oldr }
323
307
default :
324
308
panic ("unknown results type" )
325
309
}
@@ -349,20 +333,20 @@ type Iterator struct {
349
333
}
350
334
351
335
type resultsIter struct {
352
- query Query
353
- next func () (Result , bool )
354
- close func ()
355
- legacyResults * results
336
+ query Query
337
+ next func () (Result , bool )
338
+ close func ()
339
+ results * results
356
340
}
357
341
358
342
func (r * resultsIter ) Next () <- chan Result {
359
- r .useLegacyResults ()
360
- return r .legacyResults .Next ()
343
+ r .collectResults ()
344
+ return r .results .Next ()
361
345
}
362
346
363
347
func (r * resultsIter ) NextSync () (Result , bool ) {
364
- if r .legacyResults != nil {
365
- return r .legacyResults .NextSync ()
348
+ if r .results != nil {
349
+ return r .results .NextSync ()
366
350
}
367
351
res , ok := r .next ()
368
352
if ! ok {
@@ -387,9 +371,11 @@ func (r *resultsIter) Rest() ([]Entry, error) {
387
371
}
388
372
389
373
func (r * resultsIter ) Close () {
390
- if r .legacyResults != nil {
391
- r .legacyResults .Close ()
374
+ if r .results != nil {
375
+ // Close results collector. It will call r.close().
376
+ r .results .Close ()
392
377
} else {
378
+ // Call r.close() since there is no collector to call it when closed.
393
379
r .close ()
394
380
}
395
381
}
@@ -399,20 +385,17 @@ func (r *resultsIter) Query() Query {
399
385
}
400
386
401
387
func (r * resultsIter ) Done () <- chan struct {} {
402
- r .useLegacyResults ()
403
- return r .legacyResults .Done ()
388
+ r .collectResults ()
389
+ return r .results .Done ()
404
390
}
405
391
406
- func (r * resultsIter ) useLegacyResults () {
407
- if r .legacyResults != nil {
392
+ func (r * resultsIter ) collectResults () {
393
+ if r .results != nil {
408
394
return
409
395
}
410
396
411
- b := NewResultBuilder (r .query )
412
-
413
397
// go consume all the entries and add them to the results.
414
- go func (ctx context.Context , cancel context.CancelFunc , out chan <- Result ) {
415
- defer cancel ()
398
+ r .results = ResultsWithContext (r .query , func (ctx context.Context , out chan <- Result ) {
416
399
defer r .close ()
417
400
for {
418
401
e , ok := r .next ()
@@ -425,7 +408,5 @@ func (r *resultsIter) useLegacyResults() {
425
408
return
426
409
}
427
410
}
428
- }(b .ctx , b .cancel , b .Output )
429
-
430
- r .legacyResults = b .Results ().(* results )
411
+ }).(* results )
431
412
}
0 commit comments