Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 26 additions & 3 deletions pkg/sql/importer/import_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -1479,13 +1479,19 @@ func (r *importResumer) OnFailOrCancel(
ctx context.Context, execCtx interface{}, jobErr error,
) error {
p := execCtx.(sql.JobExecContext)
details := r.job.Details().(jobspb.ImportDetails)

if details.TablesPublished {
// If the table was published, there is nothing for us to clean up, the
// descriptor is already online.
return nil
}

// Emit to the event log that the job has started reverting.
emitImportJobEvent(ctx, p, jobs.StatusReverting, r.job)

// TODO(sql-exp): increase telemetry count for import.total.failed and
// import.duration-sec.failed.
details := r.job.Details().(jobspb.ImportDetails)
logutil.LogJobCompletion(ctx, importJobRecoveryEventType, r.job.ID(), false, jobErr, r.res.Rows)

addToFileFormatTelemetry(details.Format.Format.String(), "failed")
Expand All @@ -1500,7 +1506,6 @@ func (r *importResumer) OnFailOrCancel(
} else {
log.Infof(ctx, "verified no nodes still ingesting on behalf of job %d", r.job.ID())
}

}

cfg := execCtx.(sql.JobExecContext).ExecCfg()
Expand Down Expand Up @@ -1630,7 +1635,25 @@ func (r *importResumer) dropTables(
if err := descsCol.WriteDescToBatch(ctx, kvTrace, intoDesc, b); err != nil {
return err
}
return errors.Wrap(txn.KV().Run(ctx, b), "putting IMPORT INTO table back online")
err = txn.KV().Run(ctx, b)
if err != nil {
return errors.Wrap(err, "bringing IMPORT INTO table back online")
}

err = r.job.WithTxn(txn).Update(ctx, func(
txn isql.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater,
) error {
// Mark the table as published to avoid running cleanup again.
details := md.Payload.GetImport()
details.TablesPublished = true
ju.UpdatePayload(md.Payload)
return nil
})
if err != nil {
return errors.Wrap(err, "updating job to mark table as published during cleanup")
}

return nil
}

// dropNewTables drops the tables that were created as part of an IMPORT and
Expand Down
51 changes: 51 additions & 0 deletions pkg/sql/importer/import_stmt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2145,6 +2145,7 @@ func TestImportIntoCSVCancel(t *testing.T) {
conn := tc.ServerConn(0)

setupDoneCh := make(chan struct{})
finishedCancelCh := make(chan struct{})
for i := 0; i < tc.NumServers(); i++ {
tc.Server(i).JobRegistry().(*jobs.Registry).TestingWrapResumerConstructor(
jobspb.TypeImport,
Expand All @@ -2153,6 +2154,12 @@ func TestImportIntoCSVCancel(t *testing.T) {
r.testingKnobs.onSetupFinish = func(flowinfra.Flow) {
close(setupDoneCh)
}
// We need this to ensure the cancel happens before the descriptor is
// marked online, and thus the rows ingested will be reverted.
r.testingKnobs.afterImport = func(_ roachpb.RowCount) error {
<-finishedCancelCh
return nil
}
return r
})
}
Expand All @@ -2167,8 +2174,52 @@ func TestImportIntoCSVCancel(t *testing.T) {
row.Scan(&jobID)
<-setupDoneCh
sqlDB.Exec(t, fmt.Sprintf("CANCEL JOB %d", jobID))
close(finishedCancelCh)
sqlDB.Exec(t, fmt.Sprintf("SHOW JOB WHEN COMPLETE %d", jobID))
sqlDB.CheckQueryResults(t, "SELECT count(*) FROM t", [][]string{{"0"}})

// Verify TablesPublished is set after cleanup.
job, err := tc.ApplicationLayer(0).JobRegistry().(*jobs.Registry).LoadJob(ctx, jobspb.JobID(jobID))
require.NoError(t, err)
payload := job.Payload()
importDetails := payload.GetImport()
require.True(t, importDetails.TablesPublished, "expected TablesPublished to be true after cleanup")
}

func TestImportCancelAfterSuccess(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

// This is a regression test for #159603. If the cancel logic ran after the
// descriptor was marked as offline, we could end up deleting live data.

ctx := context.Background()
baseDir := datapathutils.TestDataPath(t, "csv")
tc := serverutils.StartCluster(t, 1, base.TestClusterArgs{ServerArgs: base.TestServerArgs{
Knobs: base.TestingKnobs{
JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(),
},
ExternalIODir: baseDir,
}})
defer tc.Stopper().Stop(ctx)

sqlDB := sqlutils.MakeSQLRunner(tc.ServerConn(0))
testFiles := makeCSVData(t, 1, 100, 1, 100)

sqlDB.Exec(t, `CREATE TABLE t (a INT PRIMARY KEY, b STRING)`)

var jobID jobspb.JobID
sqlDB.QueryRow(t, fmt.Sprintf("WITH j AS (IMPORT INTO t (a, b) CSV DATA (%s)) SELECT job_id FROM j", testFiles.files[0])).Scan(&jobID)

var countBefore int
sqlDB.QueryRow(t, "SELECT count(*) FROM t").Scan(&countBefore)

sqlDB.Exec(t, "UPDATE system.jobs SET status = 'cancel-requested' WHERE id = $1", jobID)
jobutils.WaitForJobToCancel(t, sqlDB, jobID)

var countAfter int
sqlDB.QueryRow(t, "SELECT count(*) FROM t").Scan(&countAfter)
require.Equal(t, countBefore, countAfter, "data should not be rolled back")
}

// Verify that a failed import will clean up after itself. This means:
Expand Down