Skip to content

Commit ce27952

Browse files
author
Lidor Carmel
committed
streamingest: unskip TestTenantStreamingUnavailableStreamAddress
Changing a few things to get this test to pass under stress: - use 50 ranges instead of 10, because there are already 50-ish system ranges, so if we write only 10 more ranges those might not get distributed on all servers. - avoid reading from the source cluster after stopping a node, it's flaky, see cockroachdb#107499 for more info. Epic: none Fixes: cockroachdb#107023 Fixes: cockroachdb#106865 Release note: None
1 parent 167da65 commit ce27952

File tree

2 files changed

+20
-35
lines changed

2 files changed

+20
-35
lines changed

pkg/ccl/streamingccl/replicationtestutils/testutils.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -420,8 +420,11 @@ func requireReplicatedTime(targetTime hlc.Timestamp, progress *jobspb.Progress)
420420
}
421421

422422
func CreateScatteredTable(t *testing.T, c *TenantStreamingClusters, numNodes int) {
423-
// Create a source table with multiple ranges spread across multiple nodes
424-
numRanges := 10
423+
// Create a source table with multiple ranges spread across multiple nodes. We
424+
// need around 50 or more ranges because there are already over 50 system
425+
// ranges, so if we write just a few ranges those might all be on a single
426+
// server, which will cause the test to flake.
427+
numRanges := 50
425428
rowsPerRange := 20
426429
c.SrcTenantSQL.Exec(t, "CREATE TABLE d.scattered (key INT PRIMARY KEY)")
427430
c.SrcTenantSQL.Exec(t, "INSERT INTO d.scattered (key) SELECT * FROM generate_series(1, $1)",

pkg/ccl/streamingccl/streamingest/replication_stream_e2e_test.go

Lines changed: 15 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ import (
1515
"testing"
1616
"time"
1717

18-
"github.com/cockroachdb/cockroach/pkg/base"
1918
"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/replicationtestutils"
2019
"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/replicationutils"
2120
_ "github.com/cockroachdb/cockroach/pkg/cloud/impl"
@@ -460,11 +459,12 @@ func TestTenantStreamingDropTenantCancelsStream(t *testing.T) {
460459
})
461460
}
462461

462+
// TestTenantStreamingUnavailableStreamAddress verifies that after a
463+
// pause/resume (replan) we will not use a dead server as a source.
463464
func TestTenantStreamingUnavailableStreamAddress(t *testing.T) {
464465
defer leaktest.AfterTest(t)()
465466
defer log.Scope(t).Close(t)
466467

467-
skip.WithIssue(t, 106865)
468468
skip.UnderDeadlock(t, "multi-node may time out under deadlock")
469469
skip.UnderRace(t, "takes too long with multiple nodes")
470470

@@ -495,38 +495,23 @@ func TestTenantStreamingUnavailableStreamAddress(t *testing.T) {
495495
streamAddresses := progress.GetStreamIngest().StreamAddresses
496496
require.Greater(t, len(streamAddresses), 1)
497497

498-
destroyedAddress := c.SrcURL.String()
498+
// Write something to the source cluster, note that the job is paused - and
499+
// therefore not replicated for now.
500+
c.SrcTenantSQL.Exec(t, "CREATE TABLE d.x (id INT PRIMARY KEY, n INT)")
501+
c.SrcTenantSQL.Exec(t, `INSERT INTO d.x VALUES (3);`)
499502

503+
// Stop a server on the source cluster. Note that in this test we are trying
504+
// to avoid using the source cluster after this point because if we do the
505+
// test flakes, see #107499 for more info.
506+
destroyedAddress := c.SrcURL.String()
500507
require.NoError(t, c.SrcTenantConn.Close())
501508
c.SrcTenantServer.Stopper().Stop(ctx)
502509
c.SrcCluster.StopServer(0)
503510

504-
// Once SrcCluster.Server(0) is shut down queries must be ran against a different server
505-
alternateSrcSysSQL := sqlutils.MakeSQLRunner(c.SrcCluster.ServerConn(1))
506-
_, alternateSrcTenantConn := serverutils.StartTenant(t, c.SrcCluster.Server(1),
507-
base.TestTenantArgs{
508-
TenantID: c.Args.SrcTenantID,
509-
TenantName: c.Args.SrcTenantName,
510-
DisableCreateTenant: true,
511-
})
512-
defer alternateSrcTenantConn.Close()
513-
alternateSrcTenantSQL := sqlutils.MakeSQLRunner(alternateSrcTenantConn)
514-
515-
alternateCompareResult := func(query string) {
516-
sourceData := alternateSrcTenantSQL.QueryStr(c.T, query)
517-
destData := c.DestTenantSQL.QueryStr(c.T, query)
518-
require.Equal(c.T, sourceData, destData)
519-
}
520-
521511
c.DestSysSQL.Exec(t, `RESUME JOB $1`, ingestionJobID)
522512
jobutils.WaitForJobToRun(t, c.DestSysSQL, jobspb.JobID(ingestionJobID))
523513

524-
alternateSrcTenantSQL.Exec(t, "CREATE TABLE d.x (id INT PRIMARY KEY, n INT)")
525-
alternateSrcTenantSQL.Exec(t, `INSERT INTO d.x VALUES (3);`)
526-
527-
var cutoverTime time.Time
528-
alternateSrcSysSQL.QueryRow(t, "SELECT clock_timestamp()").Scan(&cutoverTime)
529-
514+
cutoverTime := c.SrcCluster.Server(1).Clock().Now().GoTime()
530515
var cutoverStr string
531516
c.DestSysSQL.QueryRow(c.T, `ALTER TENANT $1 COMPLETE REPLICATION TO SYSTEM TIME $2::string`,
532517
c.Args.DestTenantName, cutoverTime).Scan(&cutoverStr)
@@ -539,18 +524,15 @@ func TestTenantStreamingUnavailableStreamAddress(t *testing.T) {
539524
require.NoError(t, cleanUpTenant())
540525
}()
541526

542-
// The destroyed address should have been removed from the topology
527+
// The destroyed address should have been removed from the topology.
543528
progress = jobutils.GetJobProgress(c.T, c.DestSysSQL, jobspb.JobID(ingestionJobID))
544529
newStreamAddresses := progress.GetStreamIngest().StreamAddresses
545530
require.Contains(t, streamAddresses, destroyedAddress)
546531
require.NotContains(t, newStreamAddresses, destroyedAddress)
547532

548-
alternateCompareResult("SELECT * FROM d.t1")
549-
alternateCompareResult("SELECT * FROM d.t2")
550-
alternateCompareResult("SELECT * FROM d.x")
551-
552-
// We can't use alternateCompareResult because it'll try to contact the deceased
553-
// n1 even if the lease holders for d.scattered have all moved to other nodes
533+
// Verify the destination tenant is fully replicated.
534+
destData := c.DestTenantSQL.QueryStr(c.T, "SELECT * FROM d.x")
535+
require.Equal(c.T, [][]string{{"3", "NULL"}}, destData)
554536
dstScatteredData := c.DestTenantSQL.QueryStr(c.T, "SELECT * FROM d.scattered ORDER BY key")
555537
require.Equal(t, srcScatteredData, dstScatteredData)
556538
}

0 commit comments

Comments
 (0)