Skip to content

Commit 7bab4f7

Browse files
committed
crosscluster/physical: persist standby poller progress
This patch sets the standby poller job's resolved time to the system time that standby descriptors have been updated to. This allows a reader tenant user to easily check that the poller job is running smoothly via SHOW JOB. Epic: none Release note: none
1 parent 8fefbf9 commit 7bab4f7

File tree

3 files changed

+30
-1
lines changed

3 files changed

+30
-1
lines changed

pkg/crosscluster/physical/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,7 @@ go_test(
196196
"//pkg/util/span",
197197
"//pkg/util/syncutil",
198198
"//pkg/util/timeutil",
199+
"@com_github_cockroachdb_apd_v3//:apd",
199200
"@com_github_cockroachdb_datadriven//:datadriven",
200201
"@com_github_cockroachdb_errors//:errors",
201202
"@com_github_stretchr_testify//require",

pkg/crosscluster/physical/standby_read_ts_poller_job.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ package physical
77

88
import (
99
"context"
10+
"math"
1011
"time"
1112

1213
"github.com/cockroachdb/cockroach/pkg/jobs"
@@ -15,6 +16,7 @@ import (
1516
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
1617
"github.com/cockroachdb/cockroach/pkg/sql"
1718
"github.com/cockroachdb/cockroach/pkg/sql/catalog/replication"
19+
"github.com/cockroachdb/cockroach/pkg/sql/isql"
1820
"github.com/cockroachdb/cockroach/pkg/util/hlc"
1921
"github.com/cockroachdb/cockroach/pkg/util/log"
2022
"github.com/cockroachdb/errors"
@@ -145,14 +147,20 @@ func (r *standbyReadTSPollerResumer) poll(ctx context.Context, execCfg *sql.Exec
145147
replicatedTime)
146148
}
147149
previousReplicatedTimestamp = replicatedTime
148-
if err = replication.SetupOrAdvanceStandbyReaderCatalog(
150+
if err := replication.SetupOrAdvanceStandbyReaderCatalog(
149151
ctx,
150152
tenantID,
151153
replicatedTime,
152154
execCfg.InternalDB,
153155
execCfg.Settings,
154156
); err != nil {
155157
log.Warningf(ctx, "failed to advance replicated timestamp for reader tenant {%d}: %v", tenantID, err)
158+
} else {
159+
if err := execCfg.InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
160+
return r.job.ProgressStorage().Set(ctx, txn, math.NaN(), replicatedTime)
161+
}); err != nil {
162+
log.Warningf(ctx, "failed to set standby poller job read time %v", err)
163+
}
156164
}
157165
}
158166
}

pkg/crosscluster/physical/standby_read_ts_poller_job_test.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"testing"
1313
"time"
1414

15+
apd "github.com/cockroachdb/apd/v3"
1516
"github.com/cockroachdb/cockroach/pkg/crosscluster/replicationtestutils"
1617
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
1718
"github.com/cockroachdb/cockroach/pkg/security/username"
@@ -66,7 +67,10 @@ INSERT INTO a VALUES (1);
6667

6768
c.SrcTenantSQL.Exec(t, defaultDBQuery)
6869
waitForPollerJobToStartDest(t, c, ingestionJobID)
70+
pollerResolvedTime := waitForPollerTimeToAdvance(t, c.ReaderTenantSQL, apd.New(0, 0))
71+
6972
observeValueInReaderTenant(t, c.ReaderTenantSQL)
73+
waitForPollerTimeToAdvance(t, c.ReaderTenantSQL, pollerResolvedTime)
7074

7175
// Failback and setup stanby reader tenant on the og source.
7276
{
@@ -164,6 +168,22 @@ WHERE job_type = 'STANDBY READ TS POLLER'
164168
jobutils.WaitForJobToRun(t, readerSQL, jobID)
165169
}
166170

171+
func waitForPollerTimeToAdvance(
172+
t *testing.T, readerSQL *sqlutils.SQLRunner, prevTime *apd.Decimal,
173+
) *apd.Decimal {
174+
var resolvedTime apd.Decimal
175+
testutils.SucceedsSoon(t, func() error {
176+
readerSQL.QueryRow(t, `SELECT COALESCE(high_water_timestamp, '0')
177+
FROM crdb_internal.jobs
178+
WHERE job_type = 'STANDBY READ TS POLLER'`).Scan(&resolvedTime)
179+
if resolvedTime.Cmp(prevTime) <= 0 {
180+
return errors.Errorf("resolved time has not advanced past %d", prevTime)
181+
}
182+
return nil
183+
})
184+
return &resolvedTime
185+
}
186+
167187
func TestReaderTenantCutover(t *testing.T) {
168188
defer leaktest.AfterTest(t)()
169189
defer log.Scope(t).Close(t)

0 commit comments

Comments
 (0)