Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions pkg/jobs/jobspb/jobs.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1607,6 +1607,13 @@ message InspectProgress {
message InspectProcessorProgress {
int64 checks_completed = 1; // Number of checks completed since last update.
bool finished = 2; // Processor finished all checks.

// SpanStarted indicates a span has begun processing. When set, the
// coordinator should set up protected timestamp protection for this span.
roachpb.Span span_started = 3 [(gogoproto.nullable) = false];

// StartedAt timestamp is the "now" timestamp chosen for this span.
util.hlc.Timestamp started_at = 4 [(gogoproto.nullable) = false];
}

message ImportRollbackProgress {}
Expand Down
1 change: 0 additions & 1 deletion pkg/jobs/jobsprotectedts/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ go_library(
"//pkg/kv/kvserver/protectedts/ptreconcile",
"//pkg/roachpb",
"//pkg/scheduledjobs",
"//pkg/sql/catalog",
"//pkg/sql/catalog/descpb",
"//pkg/sql/isql",
"//pkg/util/ctxgroup",
Expand Down
7 changes: 3 additions & 4 deletions pkg/jobs/jobsprotectedts/jobs_protected_ts_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/isql"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
Expand Down Expand Up @@ -107,7 +106,7 @@ func NewManager(
// timestamp. Note, the function assumes the in-memory job is up to date with
// the persisted job record.
func (p *Manager) TryToProtectBeforeGC(
ctx context.Context, job *jobs.Job, tableDesc catalog.TableDescriptor, readAsOf hlc.Timestamp,
ctx context.Context, job *jobs.Job, tableID descpb.ID, readAsOf hlc.Timestamp,
) Cleaner {
waitGrp := ctxgroup.WithContext(ctx)
protectedTSInstallCancel := make(chan struct{})
Expand All @@ -123,7 +122,7 @@ func (p *Manager) TryToProtectBeforeGC(
// figure out when to apply a protected timestamp as a percentage of the
// time until GC can occur.
zoneCfg, err := systemConfig.GetZoneConfigForObject(p.codec,
config.ObjectID(tableDesc.GetID()))
config.ObjectID(tableID))
if err != nil {
return err
}
Expand All @@ -138,7 +137,7 @@ func (p *Manager) TryToProtectBeforeGC(

select {
case <-time.After(waitBeforeProtectedTS):
target := ptpb.MakeSchemaObjectsTarget(descpb.IDs{tableDesc.GetID()})
target := ptpb.MakeSchemaObjectsTarget(descpb.IDs{tableID})
unprotectCallback, err = p.Protect(ctx, job, target, readAsOf)
if err != nil {
return err
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -1716,7 +1716,7 @@ func ValidateInvertedIndexes(

// Removes the protected timestamp, if one was added when this
// function returns.
protectedTSCleaner := protectedTSManager.TryToProtectBeforeGC(ctx, job, tableDesc, runHistoricalTxn.ReadAsOf())
protectedTSCleaner := protectedTSManager.TryToProtectBeforeGC(ctx, job, tableDesc.GetID(), runHistoricalTxn.ReadAsOf())
defer func() {
if unprotectErr := protectedTSCleaner(ctx); unprotectErr != nil {
err = errors.CombineErrors(err, unprotectErr)
Expand Down Expand Up @@ -1931,7 +1931,7 @@ func ValidateForwardIndexes(

// Removes the protected timestamp, if one was added when this
// function returns.
protectedTSCleaner := protectedTSManager.TryToProtectBeforeGC(ctx, job, tableDesc, runHistoricalTxn.ReadAsOf())
protectedTSCleaner := protectedTSManager.TryToProtectBeforeGC(ctx, job, tableDesc.GetID(), runHistoricalTxn.ReadAsOf())
defer func() {
if unprotectErr := protectedTSCleaner(ctx); unprotectErr != nil {
err = errors.CombineErrors(err, unprotectErr)
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/inspect/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ go_library(
"//pkg/jobs",
"//pkg/jobs/jobfrontier",
"//pkg/jobs/jobspb",
"//pkg/jobs/jobsprotectedts",
"//pkg/keys",
"//pkg/kv/kvserver/protectedts/ptpb",
"//pkg/roachpb",
Expand All @@ -46,6 +47,7 @@ go_library(
"//pkg/sql/physicalplan",
"//pkg/sql/privilege",
"//pkg/sql/rowexec",
"//pkg/sql/schemachanger/scexec",
"//pkg/sql/sem/idxtype",
"//pkg/sql/sem/tree",
"//pkg/sql/sessiondata",
Expand Down Expand Up @@ -92,6 +94,7 @@ go_test(
"//pkg/jobs",
"//pkg/jobs/jobfrontier",
"//pkg/jobs/jobspb",
"//pkg/jobs/jobsprotectedts",
"//pkg/keys",
"//pkg/kv",
"//pkg/kv/kvserver/protectedts",
Expand Down
39 changes: 39 additions & 0 deletions pkg/sql/inspect/inspect_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,14 @@ func (p *inspectProcessor) processSpan(

asOfToUse := p.getTimestampForSpan()

// If using "now" AOST, notify the coordinator so it can set up protected
// timestamp protection for this span.
if p.spec.InspectDetails.AsOf.IsEmpty() {
if err := p.sendSpanStartedProgress(ctx, output, span, asOfToUse); err != nil {
return err
}
}

// Only create checks that apply to this span.
var checks []inspectCheck
for _, factory := range p.checkFactories {
Expand Down Expand Up @@ -359,6 +367,37 @@ func (p *inspectProcessor) sendInspectProgress(
return nil
}

// sendSpanStartedProgress sends a progress message indicating a span has started
// processing with a specific timestamp. The coordinator uses this to set up
// protected timestamp protection for the span.
func (p *inspectProcessor) sendSpanStartedProgress(
ctx context.Context, output execinfra.RowReceiver, span roachpb.Span, asOf hlc.Timestamp,
) error {
log.VEventf(ctx, 2, "INSPECT: processor sending span started for %s at timestamp %s", span, asOf)

progressMsg := &jobspb.InspectProcessorProgress{
SpanStarted: span,
StartedAt: asOf,
}

progressAny, err := pbtypes.MarshalAny(progressMsg)
if err != nil {
return errors.Wrapf(err, "unable to marshal inspect processor progress")
}

meta := &execinfrapb.ProducerMetadata{
BulkProcessorProgress: &execinfrapb.RemoteProducerMetadata_BulkProcessorProgress{
ProgressDetails: *progressAny,
NodeID: p.flowCtx.NodeID.SQLInstanceID(),
FlowID: p.flowCtx.ID,
ProcessorID: p.processorID,
},
}

p.pushProgressMeta(output, meta)
return nil
}

// getTimestampForSpan returns the timestamp to use for processing a span.
// If AsOf is empty, it returns the current timestamp from the processor's clock.
// Otherwise, it returns the specified AsOf timestamp.
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/inspect/inspect_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,7 @@ func makeProcessor(
concurrency: concurrency,
clock: clock,
}

return proc, logger
}

Expand Down
4 changes: 3 additions & 1 deletion pkg/sql/inspect/inspect_resumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,14 +260,16 @@ func (c *inspectResumer) setupProgressTracking(
c.job,
&execCfg.Settings.SV,
execCfg.InternalDB,
execCfg.Codec,
execCfg.ProtectedTimestampManager,
)
completedSpans, err := progressTracker.initTracker(ctx)
if err != nil {
return nil, nil, nil, err
}

cleanup := func() {
progressTracker.terminateTracker()
progressTracker.terminateTracker(ctx)
}

return progressTracker, completedSpans, cleanup, nil
Expand Down
Loading