diff --git a/internal/datanode/compactor/sort_compaction.go b/internal/datanode/compactor/sort_compaction.go index 8957f5dc8861f..a6d254ba6cf49 100644 --- a/internal/datanode/compactor/sort_compaction.go +++ b/internal/datanode/compactor/sort_compaction.go @@ -204,6 +204,7 @@ func (t *sortCompactionTask) sortSegment(ctx context.Context) (*datapb.Compactio storage.WithStorageConfig(t.compactionParams.StorageConfig)) if err != nil { log.Warn("load deletePKs failed", zap.Error(err)) + srw.Close() return nil, err } loadDeltaCost := time.Since(phaseStart) @@ -262,6 +263,7 @@ func (t *sortCompactionTask) sortSegment(ctx context.Context) (*datapb.Compactio } if err != nil { log.Warn("error creating insert binlog reader", zap.Error(err)) + srw.Close() return nil, err } defer rr.Close() @@ -271,6 +273,7 @@ func (t *sortCompactionTask) sortSegment(ctx context.Context) (*datapb.Compactio numValidRows, sortTimings, err := storage.Sort(t.compactionParams.BinLogMaxSize, t.plan.GetSchema(), rrs, srw, predicate, t.sortByFieldIDs) if err != nil { log.Warn("sort failed", zap.Error(err)) + srw.Close() return nil, err } if sortTimings == nil {