Skip to content

Commit 8495287

Browse files
committed
sql/import: fix progress summary loss during concurrent updates
Previously, `accumulatedBulkSummary` was reset on each `updateJobProgress` call, assuming the job progress update would always succeed. However, if the update failed due to concurrent job state changes and retried, the reset buffer would cause progress data loss, resulting in incorrect row counts in progress reporting. Fix this by removing the reset logic and directly assigning `accumulatedBulkSummary` to the progress summary instead of adding to it. Release note (bug fix): Fixed import progress reporting to show correct row counts when concurrent job state changes occur.
1 parent 3693763 commit 8495287

File tree

3 files changed

+25
-6
lines changed

3 files changed

+25
-6
lines changed

pkg/sql/importer/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,7 @@ go_test(
187187
"//pkg/testutils",
188188
"//pkg/testutils/datapathutils",
189189
"//pkg/testutils/jobutils",
190+
"//pkg/testutils/kvclientutils",
190191
"//pkg/testutils/pgurlutils",
191192
"//pkg/testutils/serverutils",
192193
"//pkg/testutils/skip",

pkg/sql/importer/import_processor_planning.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,10 @@ var replanFrequency = settings.RegisterDurationSetting(
4747
settings.PositiveDuration,
4848
)
4949

50+
// importProgressDebugName is used to mark the transaction for updating
51+
// import job progress.
52+
const importProgressDebugName = `import_progress`
53+
5054
// distImport is used by IMPORT to run a DistSQL flow to ingest data by starting
5155
// reader processes on many nodes that each read and ingest their assigned files
5256
// and then send back a summary of what they ingested. The combined summary is
@@ -121,8 +125,7 @@ func distImport(
121125
}
122126

123127
// accumulatedBulkSummary accumulates the BulkOpSummary returned from each
124-
// processor in their progress updates. It stores stats about the amount of
125-
// data written since the last time we update the job progress.
128+
// processor in their progress updates. It is used to update the job progress.
126129
accumulatedBulkSummary := struct {
127130
syncutil.Mutex
128131
kvpb.BulkOpSummary
@@ -158,7 +161,7 @@ func distImport(
158161
fractionProgress := make([]uint32, len(from))
159162

160163
updateJobProgress := func() error {
161-
return job.NoTxn().FractionProgressed(ctx, func(
164+
return job.DebugNameNoTxn(importProgressDebugName).FractionProgressed(ctx, func(
162165
ctx context.Context, details jobspb.ProgressDetails,
163166
) float32 {
164167
var overall float32
@@ -173,8 +176,7 @@ func distImport(
173176
}
174177

175178
accumulatedBulkSummary.Lock()
176-
prog.Summary.Add(accumulatedBulkSummary.BulkOpSummary)
177-
accumulatedBulkSummary.Reset()
179+
prog.Summary = accumulatedBulkSummary.BulkOpSummary
178180
accumulatedBulkSummary.Unlock()
179181
return overall / float32(len(from))
180182
},

pkg/sql/importer/import_stmt_test.go

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import (
3232
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
3333
"github.com/cockroachdb/cockroach/pkg/jobs/jobstest"
3434
"github.com/cockroachdb/cockroach/pkg/keys"
35+
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord"
3536
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
3637
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
3738
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
@@ -54,6 +55,7 @@ import (
5455
"github.com/cockroachdb/cockroach/pkg/testutils"
5556
"github.com/cockroachdb/cockroach/pkg/testutils/datapathutils"
5657
"github.com/cockroachdb/cockroach/pkg/testutils/jobutils"
58+
"github.com/cockroachdb/cockroach/pkg/testutils/kvclientutils"
5759
"github.com/cockroachdb/cockroach/pkg/testutils/pgurlutils"
5860
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
5961
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
@@ -3836,6 +3838,8 @@ func TestImportDefaultWithResume(t *testing.T) {
38363838
defer TestingSetParallelImporterReaderBatchSize(batchSize)()
38373839
defer row.TestingSetDatumRowConverterBatchSize(2 * batchSize)()
38383840

3841+
ctx := context.Background()
3842+
filterFunc, verifyFunc := kvclientutils.PrefixTransactionRetryFilter(t, importProgressDebugName, 1)
38393843
s, db, _ := serverutils.StartServer(t,
38403844
base.TestServerArgs{
38413845
// Test hangs when run within a test tenant. More investigation
@@ -3846,11 +3850,13 @@ func TestImportDefaultWithResume(t *testing.T) {
38463850
DistSQL: &execinfra.TestingKnobs{
38473851
BulkAdderFlushesEveryBatch: true,
38483852
},
3853+
KVClient: &kvcoord.ClientTestingKnobs{
3854+
TransactionRetryFilter: filterFunc,
3855+
},
38493856
},
38503857
SQLMemoryPoolSize: 1 << 30, // 1 GiB
38513858
})
38523859
registry := s.JobRegistry().(*jobs.Registry)
3853-
ctx := context.Background()
38543860
defer s.Stopper().Stop(ctx)
38553861

38563862
sqlDB := sqlutils.MakeSQLRunner(db)
@@ -3971,6 +3977,16 @@ func TestImportDefaultWithResume(t *testing.T) {
39713977
sqlDB.QueryRow(t, fmt.Sprintf(`SELECT last_value FROM %s`,
39723978
test.sequence)).Scan(&seqValOnSuccess)
39733979
require.Equal(t, seqValOnPause, seqValOnSuccess)
3980+
3981+
verifyFunc()
3982+
3983+
// Verify final summary counts - should contain exactly the expected number of rows
3984+
// with no duplication despite multiple pause/resume cycles
3985+
require.Equal(t, 1, len(js.prog.Summary.EntryCounts))
3986+
3987+
for _, entry := range js.prog.Summary.EntryCounts {
3988+
require.Equal(t, int64(expectedNumRows), entry)
3989+
}
39743990
})
39753991
}
39763992
}

0 commit comments

Comments
 (0)