Skip to content

Commit 87aaab4

Browse files
craig[bot]msbutler
andcommitted
103142: c2c: introduce automatic distsql replanning r=stevendanna a=msbutler This patch introduces the ability for a replication job to periodically refresh its physical plan. Here's how replanning works: - The replication consumer's dist sql planner proposes a new physical plan every 10 minutes, as determined by the new `replication_stream.replan_flow.frequency` setting. - it then computes how different the proposed plan is to the current plan. This patch computes difference as the number of _participating_ node changes (addition or removal) in the source and destination clusters as a fraction of the total number of participating nodes in both clusters in the previous plan. - execute the new plan if this difference metric is greater than 0.1, the default of the new `repliaction_stream.replan_flow.threshold` setting. In future work, we should consider replanning on data distribution changes, as outlined in cockroachdb#99164. Fixes cockroachdb#102900 Release note: None Co-authored-by: Michael Butler <[email protected]>
2 parents c15e86a + 7cb6b19 commit 87aaab4

File tree

10 files changed

+427
-35
lines changed

10 files changed

+427
-35
lines changed

pkg/ccl/streamingccl/replicationtestutils/testutils.go

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -222,10 +222,7 @@ func waitForTenantPodsActive(
222222
}, 10*time.Second)
223223
}
224224

225-
func CreateTenantStreamingClusters(
226-
ctx context.Context, t *testing.T, args TenantStreamingClustersArgs,
227-
) (*TenantStreamingClusters, func()) {
228-
225+
func CreateServerArgs(args TenantStreamingClustersArgs) base.TestServerArgs {
229226
if args.TestingKnobs != nil && args.TestingKnobs.DistSQLRetryPolicy == nil {
230227
args.TestingKnobs.DistSQLRetryPolicy = &retry.Options{
231228
InitialBackoff: time.Microsecond,
@@ -234,7 +231,7 @@ func CreateTenantStreamingClusters(
234231
MaxRetries: TestingMaxDistSQLRetries,
235232
}
236233
}
237-
serverArgs := base.TestServerArgs{
234+
return base.TestServerArgs{
238235
// Test fails because it tries to set a cluster setting only accessible
239236
// to system tenants. Tracked with #76378.
240237
DefaultTestTenant: base.TestTenantDisabled,
@@ -252,6 +249,12 @@ func CreateTenantStreamingClusters(
252249
},
253250
},
254251
}
252+
}
253+
254+
func CreateTenantStreamingClusters(
255+
ctx context.Context, t *testing.T, args TenantStreamingClustersArgs,
256+
) (*TenantStreamingClusters, func()) {
257+
serverArgs := CreateServerArgs(args)
255258

256259
startTestCluster := func(
257260
ctx context.Context,

pkg/ccl/streamingccl/streamingest/BUILD.bazel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ go_test(
8888
"rangekey_batcher_test.go",
8989
"replication_random_client_test.go",
9090
"replication_stream_e2e_test.go",
91+
"stream_ingestion_dist_test.go",
9192
"stream_ingestion_job_test.go",
9293
"stream_ingestion_processor_test.go",
9394
],
@@ -129,6 +130,7 @@ go_test(
129130
"//pkg/sql/execinfra",
130131
"//pkg/sql/execinfrapb",
131132
"//pkg/sql/isql",
133+
"//pkg/sql/physicalplan",
132134
"//pkg/sql/sem/eval",
133135
"//pkg/sql/sem/tree",
134136
"//pkg/storage",

pkg/ccl/streamingccl/streamingest/metrics.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,12 @@ var (
128128
Measurement: "Ranges",
129129
Unit: metric.Unit_COUNT,
130130
}
131+
metaDistSQLReplanCount = metric.Metadata{
132+
Name: "replication.distsql_replan_count",
133+
Help: "Total number of dist sql replanning events",
134+
Measurement: "Events",
135+
Unit: metric.Unit_COUNT,
136+
}
131137
)
132138

133139
// Metrics are for production monitoring of stream ingestion jobs.
@@ -138,6 +144,7 @@ type Metrics struct {
138144
Flushes *metric.Counter
139145
JobProgressUpdates *metric.Counter
140146
ResolvedEvents *metric.Counter
147+
ReplanCount *metric.Counter
141148
FlushHistNanos metric.IHistogram
142149
CommitLatency metric.IHistogram
143150
AdmitLatency metric.IHistogram
@@ -162,6 +169,7 @@ func MakeMetrics(histogramWindow time.Duration) metric.Struct {
162169
Flushes: metric.NewCounter(metaReplicationFlushes),
163170
ResolvedEvents: metric.NewCounter(metaReplicationResolvedEventsIngested),
164171
JobProgressUpdates: metric.NewCounter(metaJobProgressUpdates),
172+
ReplanCount: metric.NewCounter(metaDistSQLReplanCount),
165173
FlushHistNanos: metric.NewHistogram(metric.HistogramOptions{
166174
Metadata: metaReplicationFlushHistNanos,
167175
Duration: histogramWindow,

pkg/ccl/streamingccl/streamingest/replication_stream_e2e_test.go

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -708,6 +708,87 @@ func TestTenantStreamingMultipleNodes(t *testing.T) {
708708
require.Greater(t, len(clientAddresses), 1)
709709
}
710710

711+
// TestStreamingAutoReplan asserts that if a new node can participate in the
712+
// replication job, it will trigger distSQL replanning.
713+
func TestStreamingAutoReplan(t *testing.T) {
714+
defer leaktest.AfterTest(t)()
715+
defer log.Scope(t).Close(t)
716+
717+
skip.UnderStressRace(t, "c2c multi node unit tests flake under stress race. see #106194")
718+
719+
ctx := context.Background()
720+
args := replicationtestutils.DefaultTenantStreamingClustersArgs
721+
args.SrcNumNodes = 1
722+
args.DestNumNodes = 1
723+
724+
retryErrorChan := make(chan error)
725+
turnOffReplanning := make(chan struct{})
726+
727+
// Track the number of unique addresses that we're connected to.
728+
clientAddresses := make(map[string]struct{})
729+
var addressesMu syncutil.Mutex
730+
args.TestingKnobs = &sql.StreamingTestingKnobs{
731+
BeforeClientSubscribe: func(addr string, token string, clientStartTime hlc.Timestamp) {
732+
addressesMu.Lock()
733+
defer addressesMu.Unlock()
734+
clientAddresses[addr] = struct{}{}
735+
},
736+
AfterRetryIteration: func(err error) {
737+
if err != nil {
738+
retryErrorChan <- err
739+
<-turnOffReplanning
740+
}
741+
},
742+
}
743+
744+
c, cleanup := replicationtestutils.CreateTenantStreamingClusters(ctx, t, args)
745+
defer cleanup()
746+
serverutils.SetClusterSetting(t, c.DestCluster, "stream_replication.replan_flow_threshold", 0.1)
747+
serverutils.SetClusterSetting(t, c.DestCluster, "stream_replication.replan_flow_frequency", time.Millisecond*500)
748+
749+
// Begin the job on a single source node.
750+
producerJobID, ingestionJobID := c.StartStreamReplication(ctx)
751+
jobutils.WaitForJobToRun(c.T, c.SrcSysSQL, jobspb.JobID(producerJobID))
752+
jobutils.WaitForJobToRun(c.T, c.DestSysSQL, jobspb.JobID(ingestionJobID))
753+
754+
c.WaitUntilStartTimeReached(jobspb.JobID(ingestionJobID))
755+
require.Equal(t, len(clientAddresses), 1)
756+
757+
// Add 2 source nodes to enable full replication.
758+
c.SrcCluster.AddAndStartServer(c.T, replicationtestutils.CreateServerArgs(c.Args))
759+
c.SrcCluster.AddAndStartServer(c.T, replicationtestutils.CreateServerArgs(c.Args))
760+
require.NoError(t, c.SrcCluster.WaitForFullReplication())
761+
762+
replicationtestutils.CreateScatteredTable(t, c, 3)
763+
require.NoError(t, c.SrcCluster.WaitForFullReplication())
764+
765+
// The ingestion job should eventually retry because it detects new nodes to add to the plan.
766+
require.Error(t, <-retryErrorChan, sql.ErrPlanChanged)
767+
768+
// Prevent continuous replanning to reduce test runtime. dsp.PartitionSpans()
769+
// on the src cluster may return a different set of src nodes that can
770+
// participate in the replication job (especially under stress), so if we
771+
// repeatedly replan the job, we will repeatedly restart the job, preventing
772+
// job progress.
773+
serverutils.SetClusterSetting(t, c.DestCluster, "stream_replication.replan_flow_threshold", 0)
774+
serverutils.SetClusterSetting(t, c.DestCluster, "stream_replication.replan_flow_frequency", time.Minute*10)
775+
close(turnOffReplanning)
776+
777+
cutoverTime := c.DestSysServer.Clock().Now()
778+
c.WaitUntilReplicatedTime(cutoverTime, jobspb.JobID(ingestionJobID))
779+
780+
// After the node additions, multiple nodes should've been connected to. When
781+
// this test is run under stress, however, dsp.PartitionSpans() on the src
782+
// cluster does not always return multiple src nodes that can participate in
783+
// the replication job, therefore, under stress, do not require that multiple
784+
// nodes participate from the src cluster. This potentially occurs because cpu
785+
// contention renders a test server "unhealthy". In general, running two
786+
// multinode 2 clusters makes everything messy.
787+
if !skip.Stress() {
788+
require.Greater(t, len(clientAddresses), 1)
789+
}
790+
}
791+
711792
// TestTenantReplicationProtectedTimestampManagement tests the active protected
712793
// timestamps management on the destination tenant's keyspan.
713794
func TestTenantReplicationProtectedTimestampManagement(t *testing.T) {

0 commit comments

Comments
 (0)