Skip to content

Commit 8aaf90c

Browse files
committed
colcontainer: harden closing of files in diskQueue
The AI review of 31c3419 pointed out that `io.Closer` interface is such that calling `Close` multiple times can result in an undefined behavior. Previously, we had multiple places where if we got an error on the first `Close` call of read or write file of the disk queue, we could try to close the same file again - this was the case since we nil-ed out the file pointer after the error short-circuiting logic. This commit hardens the disk queue around this error to guarantee that we always close the file exactly once. This also allows us to remove some of explicit nilling out logic which now became redundant. It's not immediately clear to me what actual "undefined behavior" would be in case of double-`Close` (or if it's even possible for implementations that we use to return an error), so I didn't include a release note. Still, it seems prudent to follow the interface's contract. Release note: None
1 parent 8b1e1a9 commit 8aaf90c

File tree

1 file changed

+31
-21
lines changed

1 file changed

+31
-21
lines changed

pkg/sql/colcontainer/diskqueue.go

Lines changed: 31 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -438,14 +438,32 @@ func newDiskQueue(
438438
return d, d.rotateFile(ctx)
439439
}
440440

441+
// CloseRead closes the read file descriptor.
442+
//
443+
// No-op if the read file is not open. readFile is guaranteed to be nil-ed out
444+
// on return.
441445
func (d *diskQueue) CloseRead() error {
442-
if d.readFile != nil {
443-
if err := d.readFile.Close(); err != nil {
444-
return err
445-
}
446+
if d.readFile == nil {
447+
return nil
448+
}
449+
defer func() {
446450
d.readFile = nil
451+
}()
452+
return d.readFile.Close()
453+
}
454+
455+
// closeWriteFile closes the write file descriptor.
456+
//
457+
// No-op if the write file is not open. writeFile is guaranteed to be nil-ed out
458+
// on return.
459+
func (d *diskQueue) closeWriteFile() error {
460+
if d.writeFile == nil {
461+
return nil
447462
}
448-
return nil
463+
defer func() {
464+
d.writeFile = nil
465+
}()
466+
return d.writeFile.Close()
449467
}
450468

451469
func (d *diskQueue) closeFileDeserializer(ctx context.Context) error {
@@ -460,12 +478,10 @@ func (d *diskQueue) closeFileDeserializer(ctx context.Context) error {
460478

461479
func (d *diskQueue) Close(ctx context.Context) (retErr error) {
462480
defer func() {
463-
if d.writeFile != nil {
464-
// Ensure that we always attempt to close the file in case we
465-
// short-circuit the method due to an error.
466-
if err := d.writeFile.Close(); err != nil {
467-
retErr = errors.CombineErrors(retErr, err)
468-
}
481+
// Ensure that we always attempt to close the file in case we
482+
// short-circuit the method due to an error.
483+
if err := d.closeWriteFile(); err != nil {
484+
retErr = errors.CombineErrors(retErr, err)
469485
}
470486
d.writer.memAcc.Shrink(ctx, d.writer.accountedFor.buffer+d.writer.accountedFor.compressedBuf)
471487
d.memAcc.Shrink(ctx, d.scratchAccountedFor)
@@ -485,11 +501,8 @@ func (d *diskQueue) Close(ctx context.Context) (retErr error) {
485501
if err := d.closeFileDeserializer(ctx); err != nil {
486502
return err
487503
}
488-
if d.writeFile != nil {
489-
if err := d.writeFile.Close(); err != nil {
490-
return err
491-
}
492-
d.writeFile = nil
504+
if err := d.closeWriteFile(); err != nil {
505+
return err
493506
}
494507
// The readFile will be removed below in DeleteDirAndFiles.
495508
if err := d.CloseRead(); err != nil {
@@ -559,7 +572,7 @@ func (d *diskQueue) rotateFile(ctx context.Context) (retErr error) {
559572

560573
if d.writeFile != nil {
561574
d.files[d.writeFileIdx].finishedWriting = true
562-
if err := d.writeFile.Close(); err != nil {
575+
if err = d.closeWriteFile(); err != nil {
563576
return err
564577
}
565578
}
@@ -638,11 +651,10 @@ func (d *diskQueue) Enqueue(ctx context.Context, b coldata.Batch) error {
638651
if err := d.writeFooterAndFlush(ctx); err != nil {
639652
return err
640653
}
641-
if err := d.writeFile.Close(); err != nil {
654+
if err := d.closeWriteFile(); err != nil {
642655
return err
643656
}
644657
d.files[d.writeFileIdx].finishedWriting = true
645-
d.writeFile = nil
646658
// Done with the serializer - close it and set it to nil. Not setting
647659
// this will cause us to attempt to flush the serializer on Close.
648660
d.serializer.Close(ctx)
@@ -713,7 +725,6 @@ func (d *diskQueue) maybeInitDeserializer(ctx context.Context) (bool, error) {
713725
}
714726
d.diskAcc.Shrink(ctx, fileSize)
715727
}
716-
d.readFile = nil
717728
// Read next file.
718729
d.readFileIdx++
719730
return d.maybeInitDeserializer(ctx)
@@ -876,7 +887,6 @@ func (d *diskQueue) Rewind(ctx context.Context) error {
876887
return err
877888
}
878889
d.deserializerState.curBatch = 0
879-
d.readFile = nil
880890
d.readFileIdx = 0
881891
for i := range d.files {
882892
d.files[i].curOffsetIdx = 0

0 commit comments

Comments
 (0)