Skip to content

Commit a077113

Browse files
committed
colexecdisk: propagate DiskFull errors as expected
We just saw a sentry report that was issued due to InternalError raised after Dequeue'ing from a disk queue. It's not clear what the error was (since it was redacted), but it might have been a DiskFull error. We already have special handling for it on the Enqueue path, but the Dequeue path can also trigger this error (on the first call to Dequeue after some Enqueue calls - in order to flush the buffered batches), so this commit audits all disk queue methods to use the helper for error propagation. The only place where we do disk usage accounting is `diskQueue.writeFooterAndFlush`, so I traced which methods could end up calling it (both Enqueue and Dequeue, but also Close) and their call sites - this is how the affected places were chosen. Additionally, I didn't want to introduce the error propagation via panics if it wasn't there already, so one spot wasn't modified. Release note: None
1 parent 930e2ef commit a077113

File tree

5 files changed

+15
-12
lines changed

5 files changed

+15
-12
lines changed

pkg/sql/colexec/colexecdisk/external_sort.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -462,10 +462,10 @@ func (s *externalSorter) Next() coldata.Batch {
462462
// resources to be properly released in CloseInactiveReadPartitions
463463
// call below.
464464
if err := s.partitioner.CloseAllOpenReadFileDescriptors(); err != nil {
465-
colexecerror.InternalError(err)
465+
colexecutils.HandleErrorFromDiskQueue(err)
466466
}
467467
if err := s.partitioner.CloseInactiveReadPartitions(s.Ctx); err != nil {
468-
colexecerror.InternalError(err)
468+
colexecutils.HandleErrorFromDiskQueue(err)
469469
}
470470
s.state = externalSorterNewPartition
471471

pkg/sql/colexec/colexecdisk/hash_based_partitioner.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -442,7 +442,7 @@ StateChanged:
442442
// FDs to the semaphore.
443443
for i := range op.inputs {
444444
if err := op.partitioners[i].CloseAllOpenWriteFileDescriptors(op.Ctx); err != nil {
445-
colexecerror.InternalError(err)
445+
colexecutils.HandleErrorFromDiskQueue(err)
446446
}
447447
}
448448
op.inMemMainOp.Init(op.Ctx)
@@ -487,7 +487,7 @@ StateChanged:
487487
for {
488488
op.unlimitedAllocator.PerformOperation(batch.ColVecs(), func() {
489489
if err := partitioner.Dequeue(op.Ctx, parentPartitionIdx, batch); err != nil {
490-
colexecerror.InternalError(err)
490+
colexecutils.HandleErrorFromDiskQueue(err)
491491
}
492492
})
493493
if batch.Length() == 0 {
@@ -498,7 +498,7 @@ StateChanged:
498498
// We're done reading from this partition, and it will never
499499
// be read from again, so we can close it.
500500
if err := partitioner.CloseInactiveReadPartitions(op.Ctx); err != nil {
501-
colexecerror.InternalError(err)
501+
colexecutils.HandleErrorFromDiskQueue(err)
502502
}
503503
// We're done writing to the newly created partitions.
504504
// TODO(yuzefovich): we should not release the descriptors
@@ -512,7 +512,7 @@ StateChanged:
512512
// want. This will allow us to remove the call to
513513
// CloseAllOpen... in the first state as well.
514514
if err := partitioner.CloseAllOpenWriteFileDescriptors(op.Ctx); err != nil {
515-
colexecerror.InternalError(err)
515+
colexecutils.HandleErrorFromDiskQueue(err)
516516
}
517517
}
518518
for idx := 0; idx < op.numBuckets; idx++ {
@@ -594,7 +594,7 @@ StateChanged:
594594
// transition to processing new ones.
595595
for i := range op.inputs {
596596
if err := op.partitioners[i].CloseInactiveReadPartitions(op.Ctx); err != nil {
597-
colexecerror.InternalError(err)
597+
colexecutils.HandleErrorFromDiskQueue(err)
598598
}
599599
}
600600
op.state = hbpProcessNewPartitionUsingMain
@@ -625,7 +625,7 @@ StateChanged:
625625
// transition to processing new ones.
626626
for i := range op.inputs {
627627
if err := op.partitioners[i].CloseInactiveReadPartitions(op.Ctx); err != nil {
628-
colexecerror.InternalError(err)
628+
colexecutils.HandleErrorFromDiskQueue(err)
629629
}
630630
}
631631
op.state = hbpProcessNewPartitionUsingFallback

pkg/sql/colexec/colexecdisk/utils.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import (
1010

1111
"github.com/cockroachdb/cockroach/pkg/col/coldata"
1212
"github.com/cockroachdb/cockroach/pkg/sql/colcontainer"
13-
"github.com/cockroachdb/cockroach/pkg/sql/colexecerror"
13+
"github.com/cockroachdb/cockroach/pkg/sql/colexec/colexecutils"
1414
"github.com/cockroachdb/cockroach/pkg/sql/colexecop"
1515
"github.com/cockroachdb/cockroach/pkg/sql/colmem"
1616
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
@@ -71,7 +71,7 @@ func (p *partitionerToOperator) Next() coldata.Batch {
7171
err = p.partitioner.Dequeue(p.Ctx, p.partitionIdx, p.batch)
7272
})
7373
if err != nil {
74-
colexecerror.InternalError(err)
74+
colexecutils.HandleErrorFromDiskQueue(err)
7575
}
7676
return p.batch
7777
}

pkg/sql/colexec/colexecutils/spilling_buffer.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -286,7 +286,7 @@ func (b *SpillingBuffer) GetVecWithTuple(
286286
var ok bool
287287
b.numDequeued += b.dequeueScratch.Length()
288288
if ok, err = b.diskQueue.Dequeue(ctx, b.dequeueScratch); err != nil {
289-
colexecerror.InternalError(err)
289+
HandleErrorFromDiskQueue(err)
290290
}
291291
if !ok || b.dequeueScratch.Length() == 0 {
292292
colexecerror.InternalError(
@@ -303,7 +303,7 @@ func (b *SpillingBuffer) Length() int {
303303
func (b *SpillingBuffer) closeSpillingQueue(ctx context.Context) {
304304
if b.diskQueue != nil {
305305
if err := b.diskQueue.Close(ctx); err != nil {
306-
colexecerror.InternalError(err)
306+
HandleErrorFromDiskQueue(err)
307307
}
308308
if b.fdSemaphore != nil {
309309
b.fdSemaphore.Release(numSpillingBufferFDs)

pkg/sql/colexec/colexecutils/spilling_queue.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -367,6 +367,9 @@ func (q *SpillingQueue) Dequeue(ctx context.Context) (coldata.Batch, error) {
367367
}
368368
ok, err := q.diskQueue.Dequeue(ctx, q.dequeueScratch)
369369
if err != nil {
370+
// TODO(yuzefovich): err here could be DiskFull, but we don't want
371+
// to use HandleErrorFromDiskQueue helper since it panics and we do
372+
// explicit error propagation from this method.
370373
return nil, err
371374
}
372375
if !ok {

0 commit comments

Comments
 (0)