Skip to content

Commit 45c224a

Browse files
craig[bot]ZhouXing19
andcommitted
Merge #153111
153111: import: fix bulk summary override bug when import with multiple files r=ZhouXing19 a=ZhouXing19 When the import source is more than 1 csv file, there is a risk that the bulk summary is incorrectly reset after the first csv file progress is processed, leading to the actual progress being overriden with a nil value. This commit is to fix it. More internal discussion can be found [here](https://cockroachlabs.slack.com/archives/C02DSDS9TM1/p1756991625692929). Epic: None Release note (bug fix): fix bulk summary override bug with IMPORT with multiple files. Co-authored-by: ZhouXing19 <[email protected]>
2 parents 60a9984 + 25c34ed commit 45c224a

File tree

2 files changed

+21
-11
lines changed

2 files changed

+21
-11
lines changed

pkg/sql/importer/import_processor.go

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -431,10 +431,10 @@ func ingestKvs(
431431
pkIndexAdder.SetOnFlush(func(summary kvpb.BulkOpSummary) {
432432
for i, emitted := range writtenRow {
433433
atomic.StoreInt64(&pkFlushedRow[i], emitted)
434-
bulkSummaryMu.Lock()
435-
bulkSummaryMu.summary.Add(summary)
436-
bulkSummaryMu.Unlock()
437434
}
435+
bulkSummaryMu.Lock()
436+
bulkSummaryMu.summary.Add(summary)
437+
bulkSummaryMu.Unlock()
438438
if indexAdder.IsEmpty() {
439439
for i, emitted := range writtenRow {
440440
atomic.StoreInt64(&idxFlushedRow[i], emitted)
@@ -444,10 +444,10 @@ func ingestKvs(
444444
indexAdder.SetOnFlush(func(summary kvpb.BulkOpSummary) {
445445
for i, emitted := range writtenRow {
446446
atomic.StoreInt64(&idxFlushedRow[i], emitted)
447-
bulkSummaryMu.Lock()
448-
bulkSummaryMu.summary.Add(summary)
449-
bulkSummaryMu.Unlock()
450447
}
448+
bulkSummaryMu.Lock()
449+
bulkSummaryMu.summary.Add(summary)
450+
bulkSummaryMu.Unlock()
451451
})
452452

453453
// offsets maps input file ID to a slot in our progress tracking slices.
@@ -473,12 +473,12 @@ func ingestKvs(
473473
prog.ResumePos[file] = idx
474474
}
475475
prog.CompletedFraction[file] = math.Float32frombits(atomic.LoadUint32(&writtenFraction[offset]))
476-
// Write down the summary of how much we've ingested since the last update.
477-
bulkSummaryMu.Lock()
478-
prog.BulkSummary = bulkSummaryMu.summary
479-
bulkSummaryMu.summary.Reset()
480-
bulkSummaryMu.Unlock()
481476
}
477+
// Write down the summary of how much we've ingested since the last update.
478+
bulkSummaryMu.Lock()
479+
prog.BulkSummary = bulkSummaryMu.summary
480+
bulkSummaryMu.summary.Reset()
481+
bulkSummaryMu.Unlock()
482482
select {
483483
case progCh <- prog:
484484
case <-ctx.Done():

pkg/sql/importer/import_processor_test.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -774,6 +774,11 @@ func TestCSVImportCanBeResumed(t *testing.T) {
774774
sqlDB.CheckQueryResults(t, `SELECT id FROM t ORDER BY id`,
775775
sqlDB.QueryStr(t, `SELECT generate_series(0, $1)`, csv1.numRows-1),
776776
)
777+
778+
js = queryJobUntil(t, sqlDB.DB, jobID, func(js jobState) bool { return js.prog.Summary.EntryCounts != nil })
779+
for _, e := range js.prog.Summary.EntryCounts {
780+
require.Equal(t, int64(csv1.numRows), e)
781+
}
777782
}
778783

779784
func TestCSVImportMarksFilesFullyProcessed(t *testing.T) {
@@ -875,6 +880,11 @@ func TestCSVImportMarksFilesFullyProcessed(t *testing.T) {
875880

876881
// Verify that after resume we have not processed any additional rows.
877882
assert.Zero(t, importSummary.Rows)
883+
884+
js = queryJobUntil(t, sqlDB.DB, jobID, func(js jobState) bool { return js.prog.Summary.EntryCounts != nil })
885+
for _, e := range js.prog.Summary.EntryCounts {
886+
require.Equal(t, int64(csv1.numRows+csv2.numRows+csv3.numRows), e)
887+
}
878888
}
879889

880890
func (ses *generatedStorage) externalStorageFactory() cloud.ExternalStorageFactory {

0 commit comments

Comments
 (0)