Skip to content

Commit e06b5d1

Browse files
craig[bot]ZhouXing19spilchen
committed
152745: sql/import: fix progress summary loss during concurrent updates r=ZhouXing19 a=ZhouXing19 fixes #152543 fixes #152519 Previously, `accumulatedBulkSummary` was reset on each `updateJobProgress` call, assuming the job progress update would always succeed. However, if the update failed due to concurrent job state changes and retried, the reset buffer would cause progress data loss, resulting in incorrect row counts in progress reporting. Fix this by removing the reset logic and directly assigning `accumulatedBulkSummary` to the progress summary instead of adding to it. Release note (bug fix): Fixed import progress reporting to show correct row counts when concurrent job state changes occur. 152985: roachtest: use LeaderLeases for schemachange/bulkingest tests r=spilchen a=spilchen Changed from MetamorphicLeases to LeaderLeases in makeSchemaChangeBulkIngestTest to address potential test stability issues identified in issue #152857. Closes #152857 Release note: none Epic: None Co-authored-by: ZhouXing19 <[email protected]> Co-authored-by: Matt Spilchen <[email protected]>
3 parents c9ffd9a + 8495287 + 0d8a038 commit e06b5d1

File tree

4 files changed

+26
-7
lines changed

4 files changed

+26
-7
lines changed

pkg/cmd/roachtest/tests/schemachange.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -359,7 +359,7 @@ func makeSchemaChangeBulkIngestTest(
359359
Cluster: r.MakeClusterSpec(numNodes, spec.WorkloadNode(), spec.SSD(4)),
360360
CompatibleClouds: registry.AllExceptAWS,
361361
Suites: registry.Suites(registry.Nightly),
362-
Leases: registry.MetamorphicLeases,
362+
Leases: registry.LeaderLeases,
363363
Timeout: length * 2,
364364
PostProcessPerfMetrics: func(test string, histogram *roachtestutil.HistogramMetric) (roachtestutil.AggregatedPerfMetrics, error) {
365365
// The histogram tracks the total elapsed time for the CREATE INDEX operation.

pkg/sql/importer/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,7 @@ go_test(
187187
"//pkg/testutils",
188188
"//pkg/testutils/datapathutils",
189189
"//pkg/testutils/jobutils",
190+
"//pkg/testutils/kvclientutils",
190191
"//pkg/testutils/pgurlutils",
191192
"//pkg/testutils/serverutils",
192193
"//pkg/testutils/skip",

pkg/sql/importer/import_processor_planning.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,10 @@ var replanFrequency = settings.RegisterDurationSetting(
4747
settings.PositiveDuration,
4848
)
4949

50+
// importProgressDebugName is used to mark the transaction for updating
51+
// import job progress.
52+
const importProgressDebugName = `import_progress`
53+
5054
// distImport is used by IMPORT to run a DistSQL flow to ingest data by starting
5155
// reader processes on many nodes that each read and ingest their assigned files
5256
// and then send back a summary of what they ingested. The combined summary is
@@ -121,8 +125,7 @@ func distImport(
121125
}
122126

123127
// accumulatedBulkSummary accumulates the BulkOpSummary returned from each
124-
// processor in their progress updates. It stores stats about the amount of
125-
// data written since the last time we update the job progress.
128+
// processor in their progress updates. It is used to update the job progress.
126129
accumulatedBulkSummary := struct {
127130
syncutil.Mutex
128131
kvpb.BulkOpSummary
@@ -158,7 +161,7 @@ func distImport(
158161
fractionProgress := make([]uint32, len(from))
159162

160163
updateJobProgress := func() error {
161-
return job.NoTxn().FractionProgressed(ctx, func(
164+
return job.DebugNameNoTxn(importProgressDebugName).FractionProgressed(ctx, func(
162165
ctx context.Context, details jobspb.ProgressDetails,
163166
) float32 {
164167
var overall float32
@@ -173,8 +176,7 @@ func distImport(
173176
}
174177

175178
accumulatedBulkSummary.Lock()
176-
prog.Summary.Add(accumulatedBulkSummary.BulkOpSummary)
177-
accumulatedBulkSummary.Reset()
179+
prog.Summary = accumulatedBulkSummary.BulkOpSummary
178180
accumulatedBulkSummary.Unlock()
179181
return overall / float32(len(from))
180182
},

pkg/sql/importer/import_stmt_test.go

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import (
3232
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
3333
"github.com/cockroachdb/cockroach/pkg/jobs/jobstest"
3434
"github.com/cockroachdb/cockroach/pkg/keys"
35+
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord"
3536
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
3637
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
3738
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
@@ -54,6 +55,7 @@ import (
5455
"github.com/cockroachdb/cockroach/pkg/testutils"
5556
"github.com/cockroachdb/cockroach/pkg/testutils/datapathutils"
5657
"github.com/cockroachdb/cockroach/pkg/testutils/jobutils"
58+
"github.com/cockroachdb/cockroach/pkg/testutils/kvclientutils"
5759
"github.com/cockroachdb/cockroach/pkg/testutils/pgurlutils"
5860
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
5961
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
@@ -3836,6 +3838,8 @@ func TestImportDefaultWithResume(t *testing.T) {
38363838
defer TestingSetParallelImporterReaderBatchSize(batchSize)()
38373839
defer row.TestingSetDatumRowConverterBatchSize(2 * batchSize)()
38383840

3841+
ctx := context.Background()
3842+
filterFunc, verifyFunc := kvclientutils.PrefixTransactionRetryFilter(t, importProgressDebugName, 1)
38393843
s, db, _ := serverutils.StartServer(t,
38403844
base.TestServerArgs{
38413845
// Test hangs when run within a test tenant. More investigation
@@ -3846,11 +3850,13 @@ func TestImportDefaultWithResume(t *testing.T) {
38463850
DistSQL: &execinfra.TestingKnobs{
38473851
BulkAdderFlushesEveryBatch: true,
38483852
},
3853+
KVClient: &kvcoord.ClientTestingKnobs{
3854+
TransactionRetryFilter: filterFunc,
3855+
},
38493856
},
38503857
SQLMemoryPoolSize: 1 << 30, // 1 GiB
38513858
})
38523859
registry := s.JobRegistry().(*jobs.Registry)
3853-
ctx := context.Background()
38543860
defer s.Stopper().Stop(ctx)
38553861

38563862
sqlDB := sqlutils.MakeSQLRunner(db)
@@ -3971,6 +3977,16 @@ func TestImportDefaultWithResume(t *testing.T) {
39713977
sqlDB.QueryRow(t, fmt.Sprintf(`SELECT last_value FROM %s`,
39723978
test.sequence)).Scan(&seqValOnSuccess)
39733979
require.Equal(t, seqValOnPause, seqValOnSuccess)
3980+
3981+
verifyFunc()
3982+
3983+
// Verify final summary counts - should contain exactly the expected number of rows
3984+
// with no duplication despite multiple pause/resume cycles
3985+
require.Equal(t, 1, len(js.prog.Summary.EntryCounts))
3986+
3987+
for _, entry := range js.prog.Summary.EntryCounts {
3988+
require.Equal(t, int64(expectedNumRows), entry)
3989+
}
39743990
})
39753991
}
39763992
}

0 commit comments

Comments
 (0)