Skip to content

Commit 192314f

Browse files
committed
ttl: use a desc.Collection tied to the txn for TTL job metadata
Previously, we would pass in a desc.Collection from the flowCtx while gathering metadata for the TTL job, such as column names and types. This was incorrect, since it meant that the lease on any descriptors fetched in those transactions would never be released until the job completed. Release note: None
1 parent 592efbc commit 192314f

File tree

6 files changed

+69
-36
lines changed

6 files changed

+69
-36
lines changed

pkg/sql/ttl/ttljob/BUILD.bazel

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,6 @@ go_test(
9191
"//pkg/sql/catalog",
9292
"//pkg/sql/catalog/catenumpb",
9393
"//pkg/sql/catalog/catpb",
94-
"//pkg/sql/catalog/catsessiondata",
9594
"//pkg/sql/catalog/colinfo",
9695
"//pkg/sql/catalog/descpb",
9796
"//pkg/sql/catalog/descs",
@@ -105,7 +104,6 @@ go_test(
105104
"//pkg/sql/rowenc",
106105
"//pkg/sql/sem/eval",
107106
"//pkg/sql/sem/tree",
108-
"//pkg/sql/sessiondata",
109107
"//pkg/sql/ttl/ttlbase",
110108
"//pkg/sql/types",
111109
"//pkg/testutils",

pkg/sql/ttl/ttljob/ttljob.go

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ import (
1313
"github.com/cockroachdb/cockroach/pkg/jobs"
1414
"github.com/cockroachdb/cockroach/pkg/jobs/joberror"
1515
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
16-
"github.com/cockroachdb/cockroach/pkg/kv"
1716
"github.com/cockroachdb/cockroach/pkg/roachpb"
1817
"github.com/cockroachdb/cockroach/pkg/server/telemetry"
1918
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
@@ -59,8 +58,7 @@ func (t rowLevelTTLResumer) Resume(ctx context.Context, execCtx interface{}) (re
5958

6059
jobExecCtx := execCtx.(sql.JobExecContext)
6160
execCfg := jobExecCtx.ExecCfg()
62-
db := execCfg.DB
63-
descsCol := jobExecCtx.ExtendedEvalContext().Descs
61+
db := execCfg.InternalDB
6462

6563
settingsValues := execCfg.SV()
6664
if err := ttlbase.CheckJobEnabled(settingsValues); err != nil {
@@ -84,8 +82,8 @@ func (t rowLevelTTLResumer) Resume(ctx context.Context, execCtx interface{}) (re
8482
var rowLevelTTL *catpb.RowLevelTTL
8583
var relationName string
8684
var entirePKSpan roachpb.Span
87-
if err := db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
88-
desc, err := descsCol.ByIDWithLeased(txn).WithoutNonPublic().Get().Table(ctx, details.TableID)
85+
if err := db.DescsTxn(ctx, func(ctx context.Context, txn descs.Txn) error {
86+
desc, err := txn.Descriptors().ByIDWithLeased(txn.KV()).WithoutNonPublic().Get().Table(ctx, details.TableID)
8987
if err != nil {
9088
return err
9189
}
@@ -111,7 +109,7 @@ func (t rowLevelTTLResumer) Resume(ctx context.Context, execCtx interface{}) (re
111109
return pgerror.Newf(pgcode.OperatorIntervention, "ttl jobs on table %s are currently paused", tree.Name(desc.GetName()))
112110
}
113111

114-
tn, err := descs.GetObjectName(ctx, txn, descsCol, desc)
112+
tn, err := descs.GetObjectName(ctx, txn.KV(), txn.Descriptors(), desc)
115113
if err != nil {
116114
return errors.Wrapf(err, "error fetching table relation name for TTL")
117115
}

pkg/sql/ttl/ttljob/ttljob_plans_test.go

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,8 @@ import (
1616
"github.com/cockroachdb/cockroach/pkg/base"
1717
"github.com/cockroachdb/cockroach/pkg/sql"
1818
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catpb"
19-
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catsessiondata"
2019
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
21-
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
2220
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
23-
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
2421
"github.com/cockroachdb/cockroach/pkg/sql/ttl/ttlbase"
2522
"github.com/cockroachdb/cockroach/pkg/testutils/datapathutils"
2623
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
@@ -46,12 +43,6 @@ func TestQueryPlansDataDriven(t *testing.T) {
4643
cutoff, err := tree.MakeDTimestamp(time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC), time.Second)
4744
require.NoError(t, err)
4845

49-
execCfg := s.ExecutorConfig().(sql.ExecutorConfig)
50-
sd := sql.NewInternalSessionData(ctx, s.ClusterSettings(), "test" /* opName */)
51-
sds := sessiondata.NewStack(sd)
52-
dsdp := catsessiondata.NewDescriptorSessionDataStackProvider(sds)
53-
descsCol := execCfg.CollectionFactory.NewCollection(ctx, descs.WithDescriptorSessionDataProvider(dsdp))
54-
5546
getExplainPlan := func(query string, overrides string) string {
5647
rows := runner.Query(t, fmt.Sprintf(`SELECT crdb_internal.execute_internally('EXPLAIN %s', '%s');`, query, overrides))
5748
var sb strings.Builder
@@ -135,7 +126,7 @@ func TestQueryPlansDataDriven(t *testing.T) {
135126
row := runner.QueryRow(t, fmt.Sprintf("SELECT '%s'::REGCLASS::OID;", tableName))
136127
row.Scan(&tableID)
137128
relationName, _, pkColNames, _, pkColDirs, _, _, err := getTableInfo(
138-
ctx, db, descsCol, descpb.ID(tableID),
129+
ctx, db, descpb.ID(tableID),
139130
)
140131
require.NoError(t, err)
141132

pkg/sql/ttl/ttljob/ttljob_processor.go

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ func (t *ttlProcessor) Start(ctx context.Context) {
7373
}
7474

7575
func getTableInfo(
76-
ctx context.Context, db descs.DB, descsCol *descs.Collection, tableID descpb.ID,
76+
ctx context.Context, db descs.DB, tableID descpb.ID,
7777
) (
7878
relationName string,
7979
pkColIDs catalog.TableColMap,
@@ -84,8 +84,8 @@ func getTableInfo(
8484
labelMetrics bool,
8585
err error,
8686
) {
87-
err = db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
88-
desc, err := descsCol.ByIDWithLeased(txn.KV()).WithoutNonPublic().Get().Table(ctx, tableID)
87+
err = db.DescsTxn(ctx, func(ctx context.Context, txn descs.Txn) error {
88+
desc, err := txn.Descriptors().ByIDWithLeased(txn.KV()).WithoutNonPublic().Get().Table(ctx, tableID)
8989
if err != nil {
9090
return err
9191
}
@@ -116,7 +116,7 @@ func getTableInfo(
116116
rowLevelTTL := desc.GetRowLevelTTL()
117117
labelMetrics = rowLevelTTL.LabelMetrics
118118

119-
tn, err := descs.GetObjectName(ctx, txn.KV(), descsCol, desc)
119+
tn, err := descs.GetObjectName(ctx, txn.KV(), txn.Descriptors(), desc)
120120
if err != nil {
121121
return errors.Wrapf(err, "error fetching table relation name for TTL")
122122
}
@@ -133,7 +133,6 @@ func (t *ttlProcessor) work(ctx context.Context) error {
133133
flowCtx := t.FlowCtx
134134
serverCfg := flowCtx.Cfg
135135
db := serverCfg.DB
136-
descsCol := flowCtx.Descriptors
137136
codec := serverCfg.Codec
138137
details := ttlSpec.RowLevelTTLDetails
139138
tableID := details.TableID
@@ -161,7 +160,7 @@ func (t *ttlProcessor) work(ctx context.Context) error {
161160
)
162161

163162
relationName, pkColIDs, pkColNames, pkColTypes, pkColDirs, numFamilies, labelMetrics, err := getTableInfo(
164-
ctx, db, descsCol, tableID,
163+
ctx, db, tableID,
165164
)
166165
if err != nil {
167166
return err
@@ -430,7 +429,7 @@ func (t *ttlProcessor) runTTLOnQueryBounds(
430429
}
431430
deleteBatch := expiredRowsPKs[startRowIdx+processed : until]
432431
var batchRowCount int64
433-
do := func(ctx context.Context, txn isql.Txn) error {
432+
do := func(ctx context.Context, txn descs.Txn) error {
434433
txn.KV().SetDebugName("ttljob-delete-batch")
435434
// We explicitly specify a low retry limit because this operation is
436435
// wrapped with its own retry function that will also take care of
@@ -442,7 +441,7 @@ func (t *ttlProcessor) runTTLOnQueryBounds(
442441
}
443442
// If we detected a schema change here, the DELETE will not succeed
444443
// (the SELECT still will because of the AOST). Early exit here.
445-
desc, err := flowCtx.Descriptors.ByIDWithLeased(txn.KV()).WithoutNonPublic().Get().Table(ctx, details.TableID)
444+
desc, err := txn.Descriptors().ByIDWithLeased(txn.KV()).WithoutNonPublic().Get().Table(ctx, details.TableID)
446445
if err != nil {
447446
return err
448447
}
@@ -458,7 +457,7 @@ func (t *ttlProcessor) runTTLOnQueryBounds(
458457
}
459458
return nil
460459
}
461-
if err := serverCfg.DB.Txn(
460+
if err := serverCfg.DB.DescsTxn(
462461
ctx, do, isql.SteppingEnabled(), isql.WithPriority(admissionpb.BulkLowPri),
463462
); err != nil {
464463
return errors.Wrapf(err, "error during row deletion")

pkg/sql/ttl/ttljob/ttljob_processor_internal_test.go

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,12 @@ import (
1515
"github.com/cockroachdb/cockroach/pkg/kv"
1616
"github.com/cockroachdb/cockroach/pkg/sql"
1717
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
18-
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catsessiondata"
1918
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
2019
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
2120
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
2221
"github.com/cockroachdb/cockroach/pkg/sql/isql"
2322
"github.com/cockroachdb/cockroach/pkg/sql/sem/eval"
2423
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
25-
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
2624
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
2725
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
2826
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
@@ -96,13 +94,7 @@ func TestRetryDeleteBatch(t *testing.T) {
9694

9795
s := srv.ApplicationLayer()
9896

99-
// Create a descriptor collection for inclusion in the flowCtx
10097
execCfg := s.ExecutorConfig().(sql.ExecutorConfig)
101-
sd := sql.NewInternalSessionData(ctx, s.ClusterSettings(), "test" /* opName */)
102-
sds := sessiondata.NewStack(sd)
103-
dsdp := catsessiondata.NewDescriptorSessionDataStackProvider(sds)
104-
descsCol := execCfg.CollectionFactory.NewCollection(ctx, descs.WithDescriptorSessionDataProvider(dsdp))
105-
10698
flowCtx := execinfra.FlowCtx{
10799
Cfg: &execinfra.ServerConfig{
108100
DB: s.InternalDB().(descs.DB),
@@ -113,7 +105,6 @@ func TestRetryDeleteBatch(t *testing.T) {
113105
Codec: s.Codec(),
114106
Settings: s.ClusterSettings(),
115107
},
116-
Descriptors: descsCol,
117108
}
118109

119110
// We need to create a dummy table so that we have a table descriptor. The

pkg/sql/ttl/ttljob/ttljob_test.go

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -358,6 +358,62 @@ INSERT INTO t (id, crdb_internal_expiration) VALUES (1, now() - '1 month'), (2,
358358
}
359359
}
360360

361+
func TestRowLevelTTLAlterType(t *testing.T) {
362+
defer leaktest.AfterTest(t)()
363+
defer log.Scope(t).Close(t)
364+
365+
th, cleanupFunc := newRowLevelTTLTestJobTestHelper(
366+
t,
367+
&sql.TTLTestingKnobs{
368+
AOSTDuration: &zeroDuration,
369+
// Prior to https://github.com/cockroachdb/cockroach/pull/145374, this
370+
// pre-select ALTER TYPE statement would hang forever.
371+
PreSelectStatement: `ALTER TYPE defaultdb.public.typ ADD VALUE 'c'`,
372+
},
373+
false, /* testMultiTenant */
374+
1, /* numNodes */
375+
)
376+
defer cleanupFunc()
377+
th.sqlDB.Exec(t, `CREATE TYPE typ AS ENUM ('foo', 'bar')`)
378+
th.sqlDB.Exec(t, `CREATE TABLE t (
379+
id INT PRIMARY KEY,
380+
v typ
381+
) WITH (ttl_expire_after = '10 minutes')`)
382+
th.sqlDB.Exec(t, `ALTER TABLE t SPLIT AT VALUES (1), (2)`)
383+
th.sqlDB.Exec(t, `INSERT INTO t (id, v, crdb_internal_expiration) VALUES (1, 'foo', now() - '1 month'), (2, 'bar', now() - '1 month')`)
384+
385+
// Force the schedule to execute.
386+
th.waitForScheduledJob(t, jobs.StateSucceeded, "")
387+
}
388+
389+
func TestRowLevelTTLAlterTypeInPrimaryKey(t *testing.T) {
390+
defer leaktest.AfterTest(t)()
391+
defer log.Scope(t).Close(t)
392+
393+
th, cleanupFunc := newRowLevelTTLTestJobTestHelper(
394+
t,
395+
&sql.TTLTestingKnobs{
396+
AOSTDuration: &zeroDuration,
397+
PreSelectStatement: `ALTER TYPE defaultdb.public.typ ADD VALUE 'c'`,
398+
},
399+
false, /* testMultiTenant */
400+
1, /* numNodes */
401+
)
402+
defer cleanupFunc()
403+
th.sqlDB.Exec(t, `CREATE TYPE typ AS ENUM ('foo', 'bar')`)
404+
th.sqlDB.Exec(t, `CREATE TABLE t (
405+
id INT,
406+
v typ,
407+
PRIMARY KEY (id, v)
408+
) WITH (ttl_expire_after = '10 minutes')`)
409+
th.sqlDB.Exec(t, `ALTER TABLE t SPLIT AT VALUES (1), (2)`)
410+
th.sqlDB.Exec(t, `INSERT INTO t (id, v, crdb_internal_expiration) VALUES (1, 'foo', now() - '1 month'), (2, 'bar', now() - '1 month')`)
411+
412+
// Since the enum type is used in the primary key and we just modified it,
413+
// the job will fail.
414+
th.waitForScheduledJob(t, jobs.StateFailed, "comparison of two different versions of enum USER DEFINED ENUM: public.typ oid")
415+
}
416+
361417
func TestRowLevelTTLJobDisabled(t *testing.T) {
362418
defer leaktest.AfterTest(t)()
363419
defer log.Scope(t).Close(t)

0 commit comments

Comments
 (0)