Skip to content

Commit c0a4b84

Browse files
committed
sql/ttl: centralize row-level TTL progress updates at coordinator
Previously, each processor in a row-level TTL job directly updated job progress, leading to fan-out issues in large clusters and performance hits during restarts when many spans had no expired rows. Gating logic was previously added to mitigate this. This change delegates all progress updates to the coordinator. Each processor now sends its progress to the coordinator, which performs the job progress update. The original gating logic remains but is now enforced by the coordinator. Closes: #135227 Epic: none Release note: none
1 parent 9ddf582 commit c0a4b84

File tree

13 files changed

+1045
-323
lines changed

13 files changed

+1045
-323
lines changed

pkg/jobs/ingeststopped/ingesting_checker.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ func checkAllNodesForIngestingJob(
9292
)
9393
sql.FinalizePlan(ctx, planCtx, p)
9494

95-
res := sql.NewMetadataOnlyMetadataCallbackWriter()
95+
res := sql.NewMetadataOnlyMetadataCallbackWriter(func(context.Context, *execinfrapb.ProducerMetadata) error { return nil })
9696

9797
recv := sql.MakeDistSQLReceiver(
9898
ctx,

pkg/jobs/jobspb/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ go_library(
2323
"//pkg/util/iterutil",
2424
"//pkg/util/tracing/tracingpb",
2525
"@com_github_cockroachdb_errors//:errors",
26+
"@com_github_cockroachdb_redact//:redact",
2627
"@com_github_gogo_protobuf//jsonpb",
2728
],
2829
)

pkg/jobs/jobspb/jobs.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"github.com/cockroachdb/cockroach/pkg/util/hlc"
1717
"github.com/cockroachdb/cockroach/pkg/util/iterutil"
1818
"github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb"
19+
"github.com/cockroachdb/redact"
1920
)
2021

2122
// JobID is the ID of a job.
@@ -164,3 +165,10 @@ func (b *BackupEncryptionOptions) IsEncrypted() bool {
164165
// For dumb reasons, there are two ways to represent no encryption.
165166
return !(b == nil || b.Mode == EncryptionMode_None)
166167
}
168+
169+
var _ redact.SafeFormatter = (*RowLevelTTLProcessorProgress)(nil)
170+
171+
// SafeFormat implements the redact.SafeFormatter interface.
172+
func (r *RowLevelTTLProcessorProgress) SafeFormat(p redact.SafePrinter, _ rune) {
173+
p.SafeString(redact.SafeString(r.String()))
174+
}

pkg/jobs/jobspb/jobs.proto

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1336,14 +1336,19 @@ message RowLevelTTLProcessorProgress {
13361336
(gogoproto.nullable) = false
13371337
];
13381338

1339-
// ProcessorRowCount is the row count of the DistSQL processor.
1340-
int64 processor_row_count = 3;
1339+
// DeletedRowCount is the number of rows deleted by this DistSQL processor.
1340+
int64 deleted_row_count = 3;
13411341

1342-
// ProcessorSpanCount is the number of spans of the DistSQL processor;
1343-
int64 processor_span_count = 4;
1342+
// TotalSpanCount is the total number of spans assigned to the DistSQL processor.
1343+
int64 total_span_count = 4;
1344+
1345+
// ProcessedSpanCount is the number of spans already processed.
1346+
int64 processed_span_count = 6;
13441347

13451348
// ProcessorConcurrency is the number parallel tasks the processor will do at once.
13461349
int64 processor_concurrency = 5;
1350+
1351+
// NEXT ID: 7
13471352
}
13481353

13491354
message SchemaTelemetryDetails {

pkg/sql/distsql_running.go

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"unsafe"
1919

2020
"github.com/cockroachdb/cockroach/pkg/base"
21+
"github.com/cockroachdb/cockroach/pkg/clusterversion"
2122
"github.com/cockroachdb/cockroach/pkg/col/coldata"
2223
"github.com/cockroachdb/cockroach/pkg/kv"
2324
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangecache"
@@ -456,9 +457,13 @@ func (dsp *DistSQLPlanner) setupFlows(
456457
if len(statementSQL) > setupFlowRequestStmtMaxLength {
457458
statementSQL = statementSQL[:setupFlowRequestStmtMaxLength]
458459
}
460+
v := execversion.V25_2
461+
if dsp.st.Version.IsActive(ctx, clusterversion.V25_4) {
462+
v = execversion.V25_4
463+
}
459464
setupReq := execinfrapb.SetupFlowRequest{
460465
LeafTxnInputState: leafInputState,
461-
Version: execversion.V25_2,
466+
Version: v,
462467
TraceKV: recv.tracing.KVTracingEnabled(),
463468
CollectStats: planCtx.collectExecStats,
464469
StatementSQL: statementSQL,
@@ -1163,12 +1168,12 @@ func NewMetadataCallbackWriter(
11631168
// NewMetadataOnlyMetadataCallbackWriter creates a new MetadataCallbackWriter
11641169
// that uses errOnlyResultWriter and only supports receiving
11651170
// execinfrapb.ProducerMetadata.
1166-
func NewMetadataOnlyMetadataCallbackWriter() *MetadataCallbackWriter {
1171+
func NewMetadataOnlyMetadataCallbackWriter(
1172+
metaFn func(ctx context.Context, meta *execinfrapb.ProducerMetadata) error,
1173+
) *MetadataCallbackWriter {
11671174
return NewMetadataCallbackWriter(
11681175
&errOnlyResultWriter{},
1169-
func(ctx context.Context, meta *execinfrapb.ProducerMetadata) error {
1170-
return nil
1171-
},
1176+
metaFn,
11721177
)
11731178
}
11741179

pkg/sql/execversion/version.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,16 @@ type V uint32
2121
// only be used by the flows once the cluster has upgraded to 25.2.
2222
const V25_2 = V(73)
2323

24+
// V25_4 is the exec version of all binaries of 25.4 cockroach versions. It can
25+
// only be used by the flows once the cluster has upgraded to 25.4.
26+
const V25_4 = V(74)
27+
2428
// MinAccepted is the oldest version that the server is compatible with. A
2529
// server will not accept flows with older versions.
2630
const MinAccepted = V25_2
2731

2832
// Latest is the latest exec version supported by this binary.
29-
const Latest = V25_2
33+
const Latest = V25_4
3034

3135
var contextVersionKey = ctxutil.RegisterFastValueKey()
3236

pkg/sql/inspect/inspect_job.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,8 @@ func (c *inspectResumer) runInspectPlan(
164164
) error {
165165
execCfg := jobExecCtx.ExecCfg()
166166

167-
metadataCallbackWriter := sql.NewMetadataOnlyMetadataCallbackWriter()
167+
metadataCallbackWriter := sql.NewMetadataOnlyMetadataCallbackWriter(
168+
func(context.Context, *execinfrapb.ProducerMetadata) error { return nil })
168169

169170
distSQLReceiver := sql.MakeDistSQLReceiver(
170171
ctx,

pkg/sql/ttl/ttljob/BUILD.bazel

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ go_library(
2929
"//pkg/sql/catalog/descs",
3030
"//pkg/sql/execinfra",
3131
"//pkg/sql/execinfrapb",
32+
"//pkg/sql/execversion",
3233
"//pkg/sql/isql",
3334
"//pkg/sql/lexbase",
3435
"//pkg/sql/pgwire/pgcode",
@@ -48,12 +49,14 @@ go_library(
4849
"//pkg/util/log",
4950
"//pkg/util/metric",
5051
"//pkg/util/metric/aggmetric",
52+
"//pkg/util/protoutil",
5153
"//pkg/util/quotapool",
5254
"//pkg/util/retry",
5355
"//pkg/util/syncutil",
5456
"//pkg/util/timeutil",
5557
"@com_github_cockroachdb_errors//:errors",
5658
"@com_github_cockroachdb_redact//:redact",
59+
"@com_github_gogo_protobuf//types",
5760
"@com_github_prometheus_client_model//go",
5861
],
5962
)
@@ -63,6 +66,7 @@ go_test(
6366
size = "large",
6467
srcs = [
6568
"main_test.go",
69+
"ttljob_internal_test.go",
6670
"ttljob_plans_test.go",
6771
"ttljob_processor_internal_test.go",
6872
"ttljob_query_builder_test.go",
@@ -75,6 +79,7 @@ go_test(
7579
deps = [
7680
"//pkg/base",
7781
"//pkg/ccl/kvccl/kvtenantccl",
82+
"//pkg/clusterversion",
7883
"//pkg/jobs",
7984
"//pkg/jobs/jobspb",
8085
"//pkg/jobs/jobstest",
@@ -85,6 +90,7 @@ go_test(
8590
"//pkg/security/securityassets",
8691
"//pkg/security/securitytest",
8792
"//pkg/server",
93+
"//pkg/settings/cluster",
8894
"//pkg/sql",
8995
"//pkg/sql/catalog",
9096
"//pkg/sql/catalog/catenumpb",
@@ -98,7 +104,9 @@ go_test(
98104
"//pkg/sql/isql",
99105
"//pkg/sql/lexbase",
100106
"//pkg/sql/parser",
107+
"//pkg/sql/physicalplan",
101108
"//pkg/sql/randgen",
109+
"//pkg/sql/rowenc",
102110
"//pkg/sql/sem/eval",
103111
"//pkg/sql/sem/tree",
104112
"//pkg/sql/spanutils",
@@ -118,8 +126,10 @@ go_test(
118126
"//pkg/util/quotapool",
119127
"//pkg/util/randutil",
120128
"//pkg/util/timeutil",
129+
"//pkg/util/uuid",
121130
"@com_github_cockroachdb_datadriven//:datadriven",
122131
"@com_github_cockroachdb_errors//:errors",
132+
"@com_github_gogo_protobuf//types",
123133
"@com_github_stretchr_testify//require",
124134
],
125135
)

0 commit comments

Comments
 (0)