Skip to content

Commit a352f16

Browse files
craig[bot]kev-cao
andcommitted
Merge #154253
154253: physical: ignore status message from aggregate stats collector r=msbutler a=kev-cao #153893 taught PCR to update its status message based on the checkpoint events. However, as it also updates the status based on the range updates, this can cause race conditions that result in unexpected statuses. For now, we will ignore the status messages from the aggregate stats collector and keep the original behavior. Epic: None Fixes: #154230 Co-authored-by: Kevin Cao <[email protected]>
2 parents 458e0e8 + 83c591c commit a352f16

File tree

1 file changed

+4
-23
lines changed

1 file changed

+4
-23
lines changed

pkg/crosscluster/physical/stream_ingestion_frontier_processor.go

Lines changed: 4 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -86,12 +86,6 @@ type streamIngestionFrontier struct {
8686
replicatedTimeAtLastPositiveLagNodeCheck hlc.Timestamp
8787

8888
rangeStats replicationutils.AggregateRangeStatsCollector
89-
90-
// This stores the last aggregate stats we computed. Because stats are only
91-
// updated on a checkpoint event, the stats will be stale until the next
92-
// checkpoint and should not be used to update job statuses. Only on a fresh
93-
// checkpoint should we update job statuses.
94-
lastAggStats streampb.StreamEvent_RangeStats
9589
}
9690

9791
var _ execinfra.Processor = &streamIngestionFrontier{}
@@ -345,7 +339,7 @@ func (sf *streamIngestionFrontier) maybeUpdateProgress() error {
345339
sf.lastPartitionUpdate = timeutil.Now()
346340
log.Dev.VInfof(ctx, 2, "persisting replicated time of %s", replicatedTime)
347341

348-
statusByStats := sf.aggregateAndUpdateRangeMetrics()
342+
sf.aggregateAndUpdateRangeMetrics()
349343

350344
if err := registry.UpdateJobWithTxn(ctx, jobID, nil /* txn */, func(
351345
txn isql.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater,
@@ -361,8 +355,6 @@ func (sf *streamIngestionFrontier) maybeUpdateProgress() error {
361355
if replicatedTime.IsSet() && streamProgress.ReplicationStatus == jobspb.InitialScan {
362356
streamProgress.ReplicationStatus = jobspb.Replicating
363357
md.Progress.StatusMessage = streamProgress.ReplicationStatus.String()
364-
} else if statusByStats != "" {
365-
md.Progress.StatusMessage = statusByStats
366358
}
367359

368360
// Keep the recorded replicatedTime empty until some advancement has been made
@@ -447,24 +439,13 @@ func (sf *streamIngestionFrontier) maybeCollectRangeStats(
447439
}
448440

449441
// aggregateAndUpdateRangeMetrics aggregates the range stats collected from each
450-
// of the ingestion processors and updates the corresponding metrics. If the
451-
// stats have changed since the last aggregation, it returns a status message
452-
// to update the job status with. We do this to avoid overwriting job statuses
453-
// with stale stats as the stats will be the same until the next checkpoint
454-
// event.
455-
func (sf *streamIngestionFrontier) aggregateAndUpdateRangeMetrics() string {
456-
aggRangeStats, _, statusMsg := sf.rangeStats.RollupStats()
442+
// of the ingestion processors and updates the corresponding metrics.
443+
func (sf *streamIngestionFrontier) aggregateAndUpdateRangeMetrics() {
444+
aggRangeStats, _, _ := sf.rangeStats.RollupStats()
457445
if aggRangeStats.RangeCount != 0 {
458446
sf.metrics.ScanningRanges.Update(aggRangeStats.ScanningRangeCount)
459447
sf.metrics.CatchupRanges.Update(aggRangeStats.LaggingRangeCount)
460448
}
461-
if sf.lastAggStats == aggRangeStats {
462-
// This is the same stats as last time, so we don't need to update the job
463-
// status.
464-
return ""
465-
}
466-
sf.lastAggStats = aggRangeStats
467-
return statusMsg
468449
}
469450

470451
// maybePersistFrontierEntries periodically persists the current state of the

0 commit comments

Comments
 (0)