Skip to content

Commit c6603d3

Browse files
committed
cspann: support context cancelation in Process
Previously, the Process method did not take a context parameter and so couldn't be canceled. This was not a problem when it was only used in testing, but now it can be called as a built-in function. This commit adds a context parameter to the method and supports cancelation. Epic: CRDB-42943 Release note: None
1 parent 43b1a3a commit c6603d3

File tree

10 files changed

+252
-84
lines changed

10 files changed

+252
-84
lines changed

pkg/cmd/vecbench/mem_provider.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,9 @@ func (m *MemProvider) Save(ctx context.Context) error {
199199
}
200200

201201
// Wait for any remaining background fixups to be processed.
202-
m.index.ProcessFixups()
202+
if err := m.index.ProcessFixups(ctx); err != nil {
203+
return err
204+
}
203205

204206
startTime := timeutil.Now()
205207

pkg/sql/logictest/testdata/logic_test/vector_index

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -548,7 +548,7 @@ INSERT INTO exec_test (a, b, vec1) VALUES
548548
(8, NULL, NULL),
549549
(9, 3, NULL);
550550

551-
# Define
551+
# Define helper to force fixup processing for the given index.
552552
statement ok
553553
CREATE PROCEDURE process_vector_index_fixups_for(desc_name STRING, idx_name STRING)
554554
LANGUAGE SQL

pkg/sql/planner.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1086,6 +1086,5 @@ func (p *planner) ProcessVectorIndexFixups(
10861086
if err != nil {
10871087
return err
10881088
}
1089-
vi.ProcessFixups()
1090-
return nil
1089+
return vi.ProcessFixups(ctx)
10911090
}

pkg/sql/vecindex/cspann/fixup_processor.go

Lines changed: 119 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ import (
1717
"github.com/cockroachdb/cockroach/pkg/util/stop"
1818
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
1919
"github.com/cockroachdb/crlib/crtime"
20-
"github.com/cockroachdb/errors"
2120
)
2221

2322
// fixupType enumerates the different kinds of fixups.
@@ -179,9 +178,10 @@ type FixupProcessor struct {
179178
// fixups. Only once the channel is closed will they begin processing.
180179
// This is used for testing.
181180
suspended chan struct{}
182-
// discardFixups, if true, causes the processor to discard any queued
183-
// fixups rather than processing them. This is used for testing.
184-
discardFixups bool
181+
// fixupsDone is created when the caller to Process() is waiting for
182+
// pending fixups to be processed. It is closed when all fixups have been
183+
// processed.
184+
fixupsDone chan struct{}
185185
// waitForFixups broadcasts to any waiters when all fixups are processed.
186186
// This is used for testing.
187187
waitForFixups sync.Cond
@@ -220,7 +220,7 @@ func (fp *FixupProcessor) Init(
220220
if index.options.IsDeterministic {
221221
// A deterministic index should be suspended until an explicit call to
222222
// Process is called.
223-
fp.Suspend()
223+
fp.mu.suspended = make(chan struct{})
224224
}
225225
}
226226

@@ -341,48 +341,67 @@ func (fp *FixupProcessor) AddMerge(
341341
})
342342
}
343343

344-
// Suspend blocks all background workers from processing fixups until Process is
345-
// called to let them run. This is useful for testing.
346-
func (fp *FixupProcessor) Suspend() {
347-
fp.mu.Lock()
348-
defer fp.mu.Unlock()
349-
350-
if fp.mu.suspended != nil {
351-
panic(errors.AssertionFailedf("fixup processor was already suspended"))
352-
}
353-
354-
fp.mu.suspended = make(chan struct{})
355-
}
356-
357344
// Process waits until all pending fixups have been processed by background
358345
// workers. If background workers have been suspended, they are temporarily
359346
// allowed to run until all fixups have been processed. This is useful for
360347
// testing.
361-
func (fp *FixupProcessor) Process(discard bool) {
348+
//
349+
// nolint:deferunlockcheck
350+
func (fp *FixupProcessor) Process(ctx context.Context) error {
362351
fp.mu.Lock()
363352
defer fp.mu.Unlock()
364353

365-
fp.mu.discardFixups = discard
366-
367-
suspended := fp.mu.suspended
368-
if suspended != nil {
369-
// Signal any waiting background workers that they can process fixups.
370-
close(fp.mu.suspended)
371-
fp.mu.suspended = nil
354+
if len(fp.mu.pendingFixups) == 0 {
355+
// No pending fixups to process.
356+
return nil
372357
}
373358

374-
// Wait for all fixups to be processed. Note that this uses a sync.Cond, which
375-
// will unlock the mutex while waiting.
376-
for len(fp.mu.pendingFixups) > 0 {
377-
fp.mu.waitForFixups.Wait()
359+
// Check whether another goroutine has already called Process.
360+
fixupsDone := fp.mu.fixupsDone
361+
if fixupsDone == nil {
362+
// Create the channel that will be closed once all fixups have been
363+
// processed.
364+
fixupsDone = make(chan struct{})
365+
fp.mu.fixupsDone = fixupsDone
366+
367+
// Signal any waiting background workers that they can process fixups and
368+
// create a new suspended channel to replace the closed one.
369+
suspended := fp.mu.suspended
370+
if suspended != nil {
371+
close(fp.mu.suspended)
372+
fp.mu.suspended = make(chan struct{})
373+
}
378374
}
379375

380-
fp.mu.discardFixups = false
376+
func() {
377+
// Unlock while waiting for fixups to be processed.
378+
fp.mu.Unlock()
379+
defer fp.mu.Lock()
381380

382-
// Re-suspend the fixup processor if it was suspended.
383-
if suspended != nil {
384-
fp.mu.suspended = make(chan struct{})
385-
}
381+
select {
382+
case <-fixupsDone:
383+
// All fixups processed.
384+
break
385+
386+
case <-ctx.Done():
387+
// Context canceled.
388+
break
389+
390+
case <-fp.initCtx.Done():
391+
// Processor has been quiesced.
392+
break
393+
}
394+
}()
395+
396+
return nil
397+
}
398+
399+
// Discard pops all unprocessed fixups from the queue and discards them. This is
400+
// useful for testing.
401+
func (fp *FixupProcessor) Discard(ctx context.Context) {
402+
fp.mu.Lock()
403+
defer fp.mu.Unlock()
404+
fp.discardFixupsLocked()
386405
}
387406

388407
// addFixup enqueues the given fixup for later processing, assuming there is not
@@ -454,54 +473,62 @@ func (fp *FixupProcessor) addFixup(ctx context.Context, fixup fixup) {
454473

455474
// nextFixup fetches the next fixup in the queue so that it can be processed by
456475
// a background worker. It blocks until there is a fixup available (ok=true) or
457-
// until the processor shuts down (ok=false).
476+
// until the context is canceled or the processor shuts down (ok=false).
458477
func (fp *FixupProcessor) nextFixup(ctx context.Context) (next fixup, ok bool) {
478+
var suspended, fixupsDone chan struct{}
459479
for {
480+
// Within the scope of the mutex, check whether processor is suspended.
481+
suspended, fixupsDone = func() (chan struct{}, chan struct{}) {
482+
fp.mu.Lock()
483+
defer fp.mu.Unlock()
484+
return fp.mu.suspended, fp.mu.fixupsDone
485+
}()
486+
487+
if suspended != nil && fixupsDone == nil {
488+
// Processor is suspended, so wait until it is resumed.
489+
select {
490+
case <-suspended:
491+
// Jump back to top of loop to get the fixupsDone channel.
492+
continue
493+
494+
case <-ctx.Done():
495+
return fixup{}, false
496+
497+
case <-fp.initCtx.Done():
498+
// Processor is shutting down.
499+
return fixup{}, false
500+
}
501+
}
502+
503+
// Get the next fixup.
460504
select {
461505
case next = <-fp.fixups:
462-
// Within the scope of the mutex, increment running workers and check
463-
// whether processor is suspended.
464-
discard, suspended := func() (bool, chan struct{}) {
506+
func() {
507+
// Increment running workers.
465508
fp.mu.Lock()
466509
defer fp.mu.Unlock()
467510
fp.mu.runningWorkers++
468-
return fp.mu.discardFixups, fp.mu.suspended
469511
}()
470-
if suspended != nil {
471-
// Can't process the fixup until the processor is resumed, so wait
472-
// until that happens.
473-
select {
474-
case <-suspended:
475-
break
476-
477-
case <-ctx.Done():
478-
return fixup{}, false
479-
}
480-
481-
// Re-check the discard flag, in case it was set.
482-
discard = func() bool {
483-
fp.mu.Lock()
484-
defer fp.mu.Unlock()
485-
return fp.mu.discardFixups
486-
}()
487-
}
488-
489-
// Always process fixup if it's single-stepping.
490-
if discard && !next.SingleStep {
491-
fp.removeFixup(next)
492-
continue
493-
}
494512
return next, true
495513

514+
case <-fixupsDone:
515+
// Other goroutines have processed the fixups, so jump back to top of
516+
// loop to suspend again.
517+
continue
518+
496519
case <-ctx.Done():
497520
// Context was canceled, abort.
498521
return fixup{}, false
522+
523+
case <-fp.initCtx.Done():
524+
// Processor is shutting down.
525+
return fixup{}, false
499526
}
500527
}
501528
}
502529

503530
// removeFixup removes a pending fixup that has been processed by a worker.
504-
func (fp *FixupProcessor) removeFixup(toRemove fixup) {
531+
func (fp *FixupProcessor) removeFixup(ctx context.Context, toRemove fixup) {
505532
fp.mu.Lock()
506533
defer fp.mu.Unlock()
507534

@@ -520,8 +547,33 @@ func (fp *FixupProcessor) removeFixup(toRemove fixup) {
520547
}
521548
}
522549

550+
// If single-stepping, discard all fixups that might have been triggered
551+
// while processing this fixup.
552+
if toRemove.SingleStep {
553+
fp.discardFixupsLocked()
554+
}
555+
523556
// If there are no more pending fixups, notify any waiters.
524-
if len(fp.mu.pendingFixups) == 0 {
525-
fp.mu.waitForFixups.Broadcast()
557+
if fp.mu.fixupsDone != nil && len(fp.mu.pendingFixups) == 0 {
558+
close(fp.mu.fixupsDone)
559+
fp.mu.fixupsDone = nil
560+
}
561+
}
562+
563+
// discardFixupsLocked pops all unprocessed fixups from the queue and discards
564+
// them. Note that any in-progress fixups are not affected.
565+
//
566+
// NOTE: Caller must have acquired the fp.mu lock.
567+
func (fp *FixupProcessor) discardFixupsLocked() {
568+
for {
569+
select {
570+
case next := <-fp.fixups:
571+
// Remove from the map.
572+
delete(fp.mu.pendingFixups, next.CachedKey)
573+
574+
default:
575+
// No more unprocessed fixups.
576+
return
577+
}
526578
}
527579
}

0 commit comments

Comments
 (0)