Skip to content

Commit b403073

Browse files
committed
import: do not revert data if cancel called after success
Currently, it is possible that canceling an import after the job marked the descriptor as online could cause the non-mvcc compliant rollback code to run on an online range. Note, this fix is still slightly incomplete. If there is a zombie import job revert logic, that zombie may revert changes its not supposed to, but the liveness subsystem makes zombie jobs unlikely. Release note: Fixes an unlikely bug where import rollback could revert writes in an online descriptor. This is not a silent failure. It would only occur if the job entered a failed or cancelled state after the actual import succeeded and marked the descriptor as on line. Fixes: #159603
1 parent a3467f5 commit b403073

File tree

2 files changed

+69
-3
lines changed

2 files changed

+69
-3
lines changed

pkg/sql/importer/import_job.go

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1479,13 +1479,19 @@ func (r *importResumer) OnFailOrCancel(
14791479
ctx context.Context, execCtx interface{}, jobErr error,
14801480
) error {
14811481
p := execCtx.(sql.JobExecContext)
1482+
details := r.job.Details().(jobspb.ImportDetails)
1483+
1484+
if details.TablePublished {
1485+
// If the table was published, there is nothing for us to clean up, the
1486+
// descriptor is already online.
1487+
return nil
1488+
}
14821489

14831490
// Emit to the event log that the job has started reverting.
14841491
emitImportJobEvent(ctx, p, jobs.StatusReverting, r.job)
14851492

14861493
// TODO(sql-exp): increase telemetry count for import.total.failed and
14871494
// import.duration-sec.failed.
1488-
details := r.job.Details().(jobspb.ImportDetails)
14891495
logutil.LogJobCompletion(ctx, importJobRecoveryEventType, r.job.ID(), false, jobErr, r.res.Rows)
14901496

14911497
addToFileFormatTelemetry(details.Format.Format.String(), "failed")
@@ -1500,7 +1506,6 @@ func (r *importResumer) OnFailOrCancel(
15001506
} else {
15011507
log.Infof(ctx, "verified no nodes still ingesting on behalf of job %d", r.job.ID())
15021508
}
1503-
15041509
}
15051510

15061511
cfg := execCtx.(sql.JobExecContext).ExecCfg()
@@ -1630,7 +1635,25 @@ func (r *importResumer) dropTables(
16301635
if err := descsCol.WriteDescToBatch(ctx, kvTrace, intoDesc, b); err != nil {
16311636
return err
16321637
}
1633-
return errors.Wrap(txn.KV().Run(ctx, b), "putting IMPORT INTO table back online")
1638+
err = txn.KV().Run(ctx, b)
1639+
if err != nil {
1640+
return errors.Wrap(err, "bringing IMPORT INTO table back online")
1641+
}
1642+
1643+
err = r.job.WithTxn(txn).Update(ctx, func(
1644+
txn isql.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater,
1645+
) error {
1646+
// Mark the table as published to avoid running cleanup again.
1647+
details := md.Payload.GetImport()
1648+
details.TablePublished = true
1649+
ju.UpdatePayload(md.Payload)
1650+
return nil
1651+
})
1652+
if err != nil {
1653+
return errors.Wrap(err, "updating job to mark table as published during cleanup")
1654+
}
1655+
1656+
return nil
16341657
}
16351658

16361659
// dropNewTables drops the tables that were created as part of an IMPORT and

pkg/sql/importer/import_stmt_test.go

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2169,6 +2169,49 @@ func TestImportIntoCSVCancel(t *testing.T) {
21692169
sqlDB.Exec(t, fmt.Sprintf("CANCEL JOB %d", jobID))
21702170
sqlDB.Exec(t, fmt.Sprintf("SHOW JOB WHEN COMPLETE %d", jobID))
21712171
sqlDB.CheckQueryResults(t, "SELECT count(*) FROM t", [][]string{{"0"}})
2172+
2173+
// Verify TablePublished is set after cleanup.
2174+
job, err := tc.ApplicationLayer(0).JobRegistry().(*jobs.Registry).LoadJob(ctx, jobspb.JobID(jobID))
2175+
require.NoError(t, err)
2176+
payload := job.Payload()
2177+
importDetails := payload.GetImport()
2178+
require.True(t, importDetails.TablePublished, "expected TablePublished to be true after cleanup")
2179+
}
2180+
2181+
func TestImportCancelAfterSuccess(t *testing.T) {
2182+
defer leaktest.AfterTest(t)()
2183+
defer log.Scope(t).Close(t)
2184+
2185+
// This is a regression test for #159603. If the cancel logic ran after the
2186+
// descriptor was marked as offline, we could end up deleting live data.
2187+
2188+
ctx := context.Background()
2189+
baseDir := datapathutils.TestDataPath(t, "csv")
2190+
tc := serverutils.StartCluster(t, 1, base.TestClusterArgs{ServerArgs: base.TestServerArgs{
2191+
Knobs: base.TestingKnobs{
2192+
JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(),
2193+
},
2194+
ExternalIODir: baseDir,
2195+
}})
2196+
defer tc.Stopper().Stop(ctx)
2197+
2198+
sqlDB := sqlutils.MakeSQLRunner(tc.ServerConn(0))
2199+
testFiles := makeCSVData(t, 1, 100, 1, 100)
2200+
2201+
sqlDB.Exec(t, `CREATE TABLE t (a INT PRIMARY KEY, b STRING)`)
2202+
2203+
var jobID jobspb.JobID
2204+
sqlDB.QueryRow(t, fmt.Sprintf("WITH j AS (IMPORT INTO t (a, b) CSV DATA (%s)) SELECT job_id FROM j", testFiles.files[0])).Scan(&jobID)
2205+
2206+
var countBefore int
2207+
sqlDB.QueryRow(t, "SELECT count(*) FROM t").Scan(&countBefore)
2208+
2209+
sqlDB.Exec(t, "UPDATE system.jobs SET status = 'cancel-requested' WHERE id = $1", jobID)
2210+
jobutils.WaitForJobToCancel(t, sqlDB, jobID)
2211+
2212+
var countAfter int
2213+
sqlDB.QueryRow(t, "SELECT count(*) FROM t").Scan(&countAfter)
2214+
require.Equal(t, countBefore, countAfter, "data should not be rolled back")
21722215
}
21732216

21742217
// Verify that a failed import will clean up after itself. This means:

0 commit comments

Comments
 (0)