Skip to content

Commit e47879d

Browse files
committed
conflict: create conflict workload for testing LDR
This creates a conflict workload and roachtest for stress testing LDR with random schemas. The `conflict` workload accepts connections for two independent clusters. For each randomly generated row, mutated versions of the row are inserted into the peer clusters at the same time. The test validates that the two clusters eventually converge and there are no entries in the DLQ. Release note: none Part of: #148386
1 parent 8f413be commit e47879d

File tree

17 files changed

+905
-129
lines changed

17 files changed

+905
-129
lines changed

pkg/BUILD.bazel

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,7 @@ ALL_TESTS = [
177177
"//pkg/config/zonepb:zonepb_test",
178178
"//pkg/config:config_disallowed_imports_test",
179179
"//pkg/config:config_test",
180+
"//pkg/crosscluster/ldrrandgen:ldrrandgen_test",
180181
"//pkg/crosscluster/logical:logical_disallowed_imports_test",
181182
"//pkg/crosscluster/logical:logical_test",
182183
"//pkg/crosscluster/physical:physical_test",
@@ -848,6 +849,7 @@ ALL_TESTS = [
848849
"//pkg/util:util_test",
849850
"//pkg/workload/bank:bank_test",
850851
"//pkg/workload/cli:cli_test",
852+
"//pkg/workload/conflict:conflict_test",
851853
"//pkg/workload/faker:faker_test",
852854
"//pkg/workload/histogram/exporter:exporter_test",
853855
"//pkg/workload/histogram:histogram_test",
@@ -1354,6 +1356,8 @@ GO_TARGETS = [
13541356
"//pkg/config/zonepb:zonepb_test",
13551357
"//pkg/config:config",
13561358
"//pkg/config:config_test",
1359+
"//pkg/crosscluster/ldrrandgen:ldrrandgen",
1360+
"//pkg/crosscluster/ldrrandgen:ldrrandgen_test",
13571361
"//pkg/crosscluster/logical:logical",
13581362
"//pkg/crosscluster/logical:logical_test",
13591363
"//pkg/crosscluster/physical:physical",
@@ -2863,6 +2867,8 @@ GO_TARGETS = [
28632867
"//pkg/workload/bulkingest:bulkingest",
28642868
"//pkg/workload/cli:cli",
28652869
"//pkg/workload/cli:cli_test",
2870+
"//pkg/workload/conflict:conflict",
2871+
"//pkg/workload/conflict:conflict_test",
28662872
"//pkg/workload/connectionlatency:connectionlatency",
28672873
"//pkg/workload/debug:debug",
28682874
"//pkg/workload/examples:examples",

pkg/ccl/workloadccl/allccl/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ go_library(
99
"//pkg/ccl/workloadccl/roachmartccl",
1010
"//pkg/workload/bank",
1111
"//pkg/workload/bulkingest",
12+
"//pkg/workload/conflict",
1213
"//pkg/workload/connectionlatency",
1314
"//pkg/workload/debug",
1415
"//pkg/workload/examples",

pkg/ccl/workloadccl/allccl/all.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
_ "github.com/cockroachdb/cockroach/pkg/ccl/workloadccl/roachmartccl"
1414
_ "github.com/cockroachdb/cockroach/pkg/workload/bank"
1515
_ "github.com/cockroachdb/cockroach/pkg/workload/bulkingest"
16+
_ "github.com/cockroachdb/cockroach/pkg/workload/conflict"
1617
_ "github.com/cockroachdb/cockroach/pkg/workload/connectionlatency"
1718
_ "github.com/cockroachdb/cockroach/pkg/workload/debug"
1819
_ "github.com/cockroachdb/cockroach/pkg/workload/examples"

pkg/cmd/roachtest/tests/logical_data_replication.go

Lines changed: 94 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -216,18 +216,35 @@ func registerLogicalDataReplicationTests(r registry.Registry) {
216216
},
217217
run: TestLDRCreateTablesTPCC,
218218
},
219+
{
220+
name: "ldr/conflict",
221+
clusterSpec: multiClusterSpec{
222+
leftNodes: 3,
223+
rightNodes: 3,
224+
clusterOpts: []spec.Option{
225+
spec.CPU(4),
226+
spec.WorkloadNode(),
227+
spec.WorkloadNodeCPU(4),
228+
spec.VolumeSize(100),
229+
},
230+
},
231+
ldrConfig: ldrConfig{
232+
createTables: true,
233+
},
234+
run: TestLDRConflict,
235+
},
219236
}
220237

221238
for _, sp := range specs {
222-
223239
r.Add(registry.TestSpec{
224-
Name: sp.name,
225-
Owner: registry.OwnerDisasterRecovery,
226-
Timeout: 60 * time.Minute,
227-
CompatibleClouds: registry.OnlyGCE,
228-
Suites: registry.Suites(registry.Nightly),
229-
Cluster: sp.clusterSpec.ToSpec(r),
230-
Leases: registry.MetamorphicLeases,
240+
Name: sp.name,
241+
Owner: registry.OwnerDisasterRecovery,
242+
Timeout: 60 * time.Minute,
243+
CompatibleClouds: registry.OnlyGCE,
244+
Suites: registry.Suites(registry.Nightly),
245+
Cluster: sp.clusterSpec.ToSpec(r),
246+
Leases: registry.MetamorphicLeases,
247+
RequiresDeprecatedWorkload: true, // TODO(jeffswenson): require this only for conflict test.
231248
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
232249
rng, seed := randutil.NewPseudoRand()
233250
t.L().Printf("random seed is %d", seed)
@@ -388,6 +405,73 @@ func TestLDRTPCC(
388405
VerifyCorrectness(ctx, c, t, setup, leftJobID, rightJobID, 2*time.Minute, workload)
389406
}
390407

408+
func TestLDRConflict(
409+
ctx context.Context, t test.Test, c cluster.Cluster, setup multiClusterSetup, ldrConfig ldrConfig,
410+
) {
411+
setup.left.sysSQL.Exec(t, "CREATE DATABASE conflict")
412+
setup.right.sysSQL.Exec(t, "CREATE DATABASE conflict")
413+
414+
var leftJobID, rightJobID int
415+
setup.right.sysSQL.Exec(t, fmt.Sprintf("CREATE EXTERNAL CONNECTION IF NOT EXISTS left AS '%s'", setup.left.PgURLForDatabase("conflict")))
416+
setup.left.sysSQL.Exec(t, fmt.Sprintf("CREATE EXTERNAL CONNECTION IF NOT EXISTS right AS '%s'", setup.right.PgURLForDatabase("conflict")))
417+
418+
leftURLs, err := c.InternalPGUrl(ctx, t.L(), setup.left.gatewayNodes, roachprod.PGURLOptions{
419+
Database: "conflict",
420+
})
421+
require.NoError(t, err)
422+
rightURLs, err := c.InternalPGUrl(ctx, t.L(), setup.right.gatewayNodes, roachprod.PGURLOptions{
423+
Database: "conflict",
424+
})
425+
require.NoError(t, err)
426+
427+
leftURL := fmt.Sprintf("\"%s\"", leftURLs[0])
428+
rightURL := fmt.Sprintf("\"%s\"", rightURLs[0])
429+
430+
c.Run(ctx, option.WithNodes(setup.workloadNode), "./workload", "init", "conflict", leftURL)
431+
432+
t.Status("creating bidirectional replication job")
433+
setup.right.sysSQL.QueryRow(t, `
434+
CREATE LOGICALLY REPLICATED TABLE conflict.conflict FROM TABLE conflict.conflict
435+
ON 'external://left'
436+
WITH BIDIRECTIONAL ON 'external://right'
437+
`).Scan(&rightJobID)
438+
439+
t.Status("waiting for right job to start up")
440+
waitForReplicatedTime(t, rightJobID, setup.right.db, getLogicalDataReplicationJobInfo, 2*time.Minute)
441+
442+
t.Status("waiting for left job to be created")
443+
testutils.SucceedsWithin(t, func() error {
444+
return setup.left.db.QueryRow("SELECT job_id FROM [SHOW JOBS] WHERE job_type = 'LOGICAL REPLICATION'").Scan(&leftJobID)
445+
}, 2*time.Minute)
446+
447+
t.Status("waiting for left job to start up")
448+
waitForReplicatedTime(t, leftJobID, setup.left.db, getLogicalDataReplicationJobInfo, 2*time.Minute)
449+
450+
// TODO(jeffswenson): mix in random schema changes. The high level plan is:
451+
// 1. Pause the workload.
452+
// 2. Wait for LDR replication to catch up.
453+
// 3. Stop the LDR jobs.
454+
// 4. Make random schema changes.
455+
// 5. Start the LDR jobs again using now() as the cursor.
456+
// 6. Resume the workload.
457+
t.Status("running workload")
458+
c.Run(ctx, option.WithNodes(setup.workloadNode),
459+
"./workload", "run", "conflict", "--duration=15m",
460+
// Tolerate errors because there are some we can't easily avoid in random schemas that
461+
// contain computed columns. For example, the computed column a+b may cause an insert error
462+
// if a+b overflows the type.
463+
"--tolerate-errors",
464+
"--peer_url", rightURL,
465+
leftURL)
466+
467+
t.Status("verifying results")
468+
VerifyCorrectness(ctx, c, t, setup, leftJobID, rightJobID, 2*time.Minute, LDRWorkload{
469+
dbName: "conflict",
470+
tableNames: []string{"conflict"},
471+
manualSchemaSetup: true,
472+
})
473+
}
474+
391475
// TestLDRCreateTablesTPCC inits the left cluster with 1000 warehouse tpcc,
392476
// begins unidirectional fast initial scan LDR, starts a tpcc 1000 wh workload
393477
// on the left, and observes initial scan, catchup scan, and steady state
@@ -990,15 +1074,15 @@ func VerifyCorrectness(
9901074
ldrWorkload LDRWorkload,
9911075
) {
9921076
now := timeutil.Now()
993-
t.L().Printf("Waiting for replicated times to catchup before verifying left and right clusters")
1077+
t.Status("waiting for replicated times to catchup before verifying left and right clusters")
9941078
if leftJobID != 0 {
9951079
waitForReplicatedTimeToReachTimestamp(t, leftJobID, setup.left.db, getLogicalDataReplicationJobInfo, waitTime, now)
9961080
require.NoError(t, replicationtestutils.CheckEmptyDLQs(ctx, setup.left.db, ldrWorkload.dbName))
9971081
}
9981082
waitForReplicatedTimeToReachTimestamp(t, rightJobID, setup.right.db, getLogicalDataReplicationJobInfo, waitTime, now)
9991083
require.NoError(t, replicationtestutils.CheckEmptyDLQs(ctx, setup.right.db, ldrWorkload.dbName))
10001084

1001-
t.L().Printf("Verifying equality of left and right clusters")
1085+
t.Status("verifying equality of left and right clusters")
10021086

10031087
type fingerprint struct {
10041088
table string
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
2+
3+
go_library(
4+
name = "ldrrandgen",
5+
srcs = ["logical.go"],
6+
importpath = "github.com/cockroachdb/cockroach/pkg/crosscluster/ldrrandgen",
7+
visibility = ["//visibility:public"],
8+
deps = [
9+
"//pkg/sql/catalog/colinfo",
10+
"//pkg/sql/randgen",
11+
"//pkg/sql/sem/tree",
12+
"//pkg/sql/types",
13+
],
14+
)
15+
16+
go_test(
17+
name = "ldrrandgen_test",
18+
srcs = [
19+
"logical_test.go",
20+
"main_test.go",
21+
],
22+
embed = [":ldrrandgen"],
23+
deps = [
24+
"//pkg/base",
25+
"//pkg/ccl",
26+
"//pkg/ccl/storageccl",
27+
"//pkg/security/securityassets",
28+
"//pkg/security/securitytest",
29+
"//pkg/server",
30+
"//pkg/sql/sem/tree",
31+
"//pkg/testutils/serverutils",
32+
"//pkg/testutils/sqlutils",
33+
"//pkg/testutils/testcluster",
34+
"//pkg/util/leaktest",
35+
"//pkg/util/log",
36+
"//pkg/util/randutil",
37+
"@com_github_stretchr_testify//require",
38+
],
39+
)
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
// Copyright 2025 The Cockroach Authors.
2+
//
3+
// Use of this software is governed by the CockroachDB Software License
4+
// included in the /LICENSE file.
5+
6+
package ldrrandgen
7+
8+
import (
9+
"context"
10+
"math/rand"
11+
12+
"github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo"
13+
"github.com/cockroachdb/cockroach/pkg/sql/randgen"
14+
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
15+
"github.com/cockroachdb/cockroach/pkg/sql/types"
16+
)
17+
18+
func GenerateLDRTable(
19+
ctx context.Context, rng *rand.Rand, tableName string, supportKVWriter bool,
20+
) *tree.CreateTable {
21+
columnByName := func(name tree.Name, columnDefs []*tree.ColumnTableDef) *tree.ColumnTableDef {
22+
for _, col := range columnDefs {
23+
if col.Name == name {
24+
return col
25+
}
26+
}
27+
return nil
28+
}
29+
30+
tableDef := randgen.RandCreateTableWithName(ctx, rng, tableName, 0, []randgen.TableOption{
31+
randgen.WithPrimaryIndexRequired(),
32+
randgen.WithSkipColumnFamilyMutations(),
33+
randgen.WithColumnFilter(func(columnDef *tree.ColumnTableDef) bool {
34+
// We don't allow Arrays of bits because rand.LoadTable doesn't correctly identify their bit width.
35+
if columnDef.Type.(*types.T).Family() == types.ArrayFamily && columnDef.Type.(*types.T).ArrayContents().Family() == types.BitFamily {
36+
return false
37+
}
38+
// We don't allow the special '"char"' column because pgwire truncates the value to 1 byte.
39+
// TODO(jeffswenson): remove this once #149427 is fixed.
40+
if columnDef.Type.(*types.T).Family() == types.StringFamily && columnDef.Type.(*types.T).Width() == 1 {
41+
return false
42+
}
43+
return true
44+
}),
45+
randgen.WithPrimaryIndexFilter(func(indexDef *tree.IndexTableDef, columnDefs []*tree.ColumnTableDef) bool {
46+
for _, col := range indexDef.Columns {
47+
columnDef := columnByName(col.Column, columnDefs)
48+
// TODO(127315): types with composite encoding are not supported in the
49+
// primary key by LDR.
50+
if colinfo.CanHaveCompositeKeyEncoding(columnDef.Type.(*types.T)) {
51+
return false
52+
}
53+
// Do not allow computed columns in the primary key. Non-virtual computed columns in the primary key is
54+
// allowed by LDR in general, but its not compatible with the conflict workload.
55+
// TODO(jeffswenson): support computed columns in the primary key in the conflict workload.
56+
if columnDef.IsComputed() {
57+
return false
58+
}
59+
}
60+
if supportKVWriter && indexDef.Sharded != nil {
61+
// The KV writer does not support hash sharded indexes.
62+
return false
63+
}
64+
return true
65+
}),
66+
randgen.WithIndexFilter(func(indexDef tree.TableDef, columnDefs []*tree.ColumnTableDef) bool {
67+
switch indexDef := indexDef.(type) {
68+
case *tree.UniqueConstraintTableDef:
69+
// Do not allow unique indexes. The random data may cause
70+
// spurious unique constraint violations.
71+
// TODO(jeffswenson): extend the conflict workload to support unique indexes on fields
72+
// that can randomly generate exclusively unique values for each row. E.g. UUIDs could be unique, but
73+
// BOOLs are too limiting.
74+
return false
75+
case *tree.IndexTableDef:
76+
for _, col := range indexDef.Columns {
77+
if supportKVWriter && col.Expr != nil {
78+
// Do not allow expression indexes. These cause SQL to generate a hidden computed column, which is not
79+
// supported by the kv writer.
80+
if col.Expr != nil {
81+
return false
82+
}
83+
}
84+
columnDef := columnByName(col.Column, columnDefs)
85+
if columnDef.IsVirtual() {
86+
// Virtual computed columns are not supported in indexes by the classic sql writer or the kv writer.
87+
// TODO(jeffswenson): remove this restriction once the crud writer is the only writer.
88+
return false
89+
}
90+
}
91+
if supportKVWriter && indexDef.Sharded != nil {
92+
// The KV writer does not support hash sharded indexes.
93+
return false
94+
}
95+
}
96+
return true
97+
}),
98+
})
99+
return tableDef
100+
}

pkg/crosscluster/replicationtestutils/logical_test.go renamed to pkg/crosscluster/ldrrandgen/logical_test.go

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,16 @@
33
// Use of this software is governed by the CockroachDB Software License
44
// included in the /LICENSE file.
55

6-
package replicationtestutils
6+
package ldrrandgen
77

88
import (
99
"context"
10+
"fmt"
1011
"testing"
1112
"time"
1213

1314
"github.com/cockroachdb/cockroach/pkg/base"
15+
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
1416
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
1517
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
1618
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
@@ -43,13 +45,14 @@ func TestGenerateLDRTable(t *testing.T) {
4345

4446
stmt := GenerateLDRTable(ctx, rndSrc, "test_writer", true)
4547
t.Logf("creating table: %s", stmt)
46-
dbA.Exec(t, stmt)
47-
48-
dbAURL := GetExternalConnectionURI(t, server, server, serverutils.DBName("a"))
49-
dbBURL := GetExternalConnectionURI(t, server, server, serverutils.DBName("b"))
50-
dbB.Exec(t,
51-
"CREATE LOGICALLY REPLICATED TABLE b.test_writer FROM TABLE a.test_writer ON $1 WITH BIDIRECTIONAL ON $2",
52-
dbAURL.String(),
53-
dbBURL.String(),
54-
)
48+
dbA.Exec(t, tree.AsStringWithFlags(stmt, tree.FmtParsable))
49+
50+
urlA, cleanupA := server.PGUrl(t, serverutils.DBName("a"))
51+
defer cleanupA()
52+
urlB, cleanupB := server.PGUrl(t, serverutils.DBName("b"))
53+
defer cleanupB()
54+
sqlDB.Exec(t, fmt.Sprintf("CREATE EXTERNAL CONNECTION a AS '%s'", urlA.String()))
55+
sqlDB.Exec(t, fmt.Sprintf("CREATE EXTERNAL CONNECTION b AS '%s'", urlB.String()))
56+
57+
dbB.Exec(t, "CREATE LOGICALLY REPLICATED TABLE b.test_writer FROM TABLE a.test_writer ON 'external://a' WITH BIDIRECTIONAL ON 'external://b'")
5558
}

0 commit comments

Comments
 (0)