Skip to content

Commit 87b5f83

Browse files
craig[bot]dt
andcommitted
Merge #143311
143311: crosscluster/logical: fix crash when using UDTs in LDR on multiple nodes r=dt a=dt Previously running LDR into a table that included user-defined types could crash the nodes due to an error citing missing type hydration. This occurs during decoding of an incoming KV into a logical row, and is due to the fact the decode used in the LDR writer processor is configured with table descriptors received from the source cluster during LDR initialization, but the LDR writer processors setup was _not_ hydrating said descriptors before using them. This hydration step is modeled as an in-place mutation of said metadata that does not change its type signature, so even though downstream code requires its inputs be hydrated (and crashes if they're not); it is up to the specific execution path to ensure this step occurs as this is not checked during compilation. Furthermore, when this metadata is serialized and transmitted to another node as part of a distSQL flow spec, the table descriptor seen by the other node does _not_ contain hydrated types, while if the same distsql processor is passed the same spec on a local node, it sees the types in that descriptor already hydrated. Such cases are not unheard of and just mean we rely on tests rather than the compiler to ensure proper usage. And indeed, when UDT functionality was added, the addition included tests to exercise it. However this is where a subtly complication emerges, which meant that these tests, which ran LDR on a table including UDTs from one node to one node, did not catch this bug. Apparently when a TableDescriptor that has been hydrated is placed in a DistSQL spec, the TableDescriptor read back out of that spec by a DistSQL processor to which that spec is supplied may also be hydrated or may not be be, depending on whether the processor is on the same node as the coordinator where the spec was created or not. In the tests of UDT behavior, since these tests were configured to use a single source and destination node, this was always the case, so despite missing an explicit hydration in the processor, the table descriptor just so happened to already be hydrated when it was used in that processor. However in a multi-node cluster, the processors on remote nodes would encounter a non-hydrated descriptor when reading the same spec. Our bias towards using single-node test configurations for exercising features that do not appear to have anything to do with how work is distributed across many nodes in a larger cluster is generally sensible: it speeds up tests and thus allows us to add more test cases without making the test suite unwieldy. However in this case there was a very non-obvious interaction between the routines for decoding of some bytes and executing those routines on different nodes. This diff changes the UDT tests to now explicitly run on multiple nodes --and scatters many ranges to ensure those nodes are used. This caused these tests to reliably fail until the missing hydration steps were added to the processor to fix the underlying bug. Release note (bug fix): Fix a crash due to 'use of enum metadata before hydration' when using LDR with user-defined types. Epic: none. Co-authored-by: David Taylor <[email protected]>
2 parents e233359 + 8c48ebb commit 87b5f83

File tree

5 files changed

+20
-5
lines changed

5 files changed

+20
-5
lines changed

pkg/crosscluster/logical/logical_replication_dist.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"github.com/cockroachdb/cockroach/pkg/repstream/streampb"
1616
"github.com/cockroachdb/cockroach/pkg/roachpb"
1717
"github.com/cockroachdb/cockroach/pkg/sql"
18+
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
1819
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
1920
"github.com/cockroachdb/cockroach/pkg/util/hlc"
2021
"github.com/cockroachdb/cockroach/pkg/util/log"
@@ -29,6 +30,7 @@ func constructLogicalReplicationWriterSpecs(
2930
previousReplicatedTimestamp hlc.Timestamp,
3031
checkpoint jobspb.StreamIngestionCheckpoint,
3132
tableMetadataByDestID map[int32]execinfrapb.TableReplicationMetadata,
33+
srcTypes []*descpb.TypeDescriptor,
3234
jobID jobspb.JobID,
3335
streamID streampb.StreamID,
3436
discard jobspb.LogicalReplicationDetails_Discard,
@@ -47,6 +49,7 @@ func constructLogicalReplicationWriterSpecs(
4749
Discard: discard,
4850
Mode: mode,
4951
MetricsLabel: metricsLabel,
52+
TypeDescriptors: srcTypes,
5053
}
5154

5255
writerSpecs := make(map[base.SQLInstanceID][]execinfrapb.LogicalReplicationWriterSpec, len(destSQLInstances))

pkg/crosscluster/logical/logical_replication_job.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -583,6 +583,7 @@ func (p *logicalReplicationPlanner) generatePlanImpl(
583583
progress.ReplicatedTime,
584584
progress.Checkpoint,
585585
tableMetadataByDestID,
586+
plan.SourceTypes,
586587
p.job.ID(),
587588
streampb.StreamID(payload.StreamID),
588589
payload.Discard,

pkg/crosscluster/logical/logical_replication_job_test.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2176,6 +2176,7 @@ func TestLogicalReplicationSchemaChanges(t *testing.T) {
21762176
func TestUserDefinedTypes(t *testing.T) {
21772177
defer leaktest.AfterTest(t)()
21782178
defer log.Scope(t).Close(t)
2179+
skip.UnderDuress(t, "this needs to be multi-node but that tends to be too slow for duressed builds")
21792180

21802181
ctx := context.Background()
21812182
clusterArgs := base.TestClusterArgs{
@@ -2187,7 +2188,7 @@ func TestUserDefinedTypes(t *testing.T) {
21872188
},
21882189
}
21892190

2190-
server, s, dbA, dbB := setupLogicalTestServer(t, ctx, clusterArgs, 1)
2191+
server, s, dbA, dbB := setupLogicalTestServer(t, ctx, clusterArgs, 3)
21912192
defer server.Stopper().Stop(ctx)
21922193

21932194
dbBURL := replicationtestutils.GetExternalConnectionURI(t, s, s, serverutils.DBName("b"))
@@ -2206,6 +2207,8 @@ func TestUserDefinedTypes(t *testing.T) {
22062207
dbB.Exec(t, "CREATE TABLE data2 (pk INT PRIMARY KEY, val1 my_enum DEFAULT 'two', val2 my_composite)")
22072208

22082209
dbB.Exec(t, "INSERT INTO data VALUES (1, 'one', (3, 'cat'))")
2210+
dbB.Exec(t, "ALTER TABLE data SPLIT AT VALUES (1), (2), (3)")
2211+
dbB.Exec(t, "ALTER TABLE data SCATTER")
22092212
// Force default expression evaluation.
22102213
dbB.Exec(t, "INSERT INTO data (pk, val2) VALUES (2, (4, 'dog'))")
22112214

@@ -2216,8 +2219,8 @@ func TestUserDefinedTypes(t *testing.T) {
22162219
).Scan(&jobAID)
22172220
WaitUntilReplicatedTime(t, s.Clock().Now(), dbA, jobAID)
22182221
require.NoError(t, replicationtestutils.CheckEmptyDLQs(ctx, dbA.DB, "A"))
2219-
dbB.CheckQueryResults(t, "SELECT * FROM data", [][]string{{"1", "one", "(3,cat)"}, {"2", "two", "(4,dog)"}})
2220-
dbA.CheckQueryResults(t, "SELECT * FROM data", [][]string{{"1", "one", "(3,cat)"}, {"2", "two", "(4,dog)"}})
2222+
dbB.CheckQueryResults(t, "SELECT * FROM data ORDER BY pk", [][]string{{"1", "one", "(3,cat)"}, {"2", "two", "(4,dog)"}})
2223+
dbA.CheckQueryResults(t, "SELECT * FROM data ORDER BY pk", [][]string{{"1", "one", "(3,cat)"}, {"2", "two", "(4,dog)"}})
22212224

22222225
var jobBID jobspb.JobID
22232226
dbB.QueryRow(t,

pkg/crosscluster/logical/logical_replication_writer_processor.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
"github.com/cockroachdb/cockroach/pkg/sql"
2727
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
2828
"github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc"
29+
"github.com/cockroachdb/cockroach/pkg/sql/catalog/typedesc"
2930
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
3031
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
3132
"github.com/cockroachdb/cockroach/pkg/sql/isql"
@@ -180,11 +181,17 @@ func newLogicalReplicationWriterProcessor(
180181

181182
procConfigByDestTableID := make(map[descpb.ID]sqlProcessorTableConfig)
182183
destTableBySrcID := make(map[descpb.ID]dstTableMetadata)
184+
crossClusterResolver := crosscluster.MakeCrossClusterTypeResolver(spec.TypeDescriptors)
183185
for dstTableID, md := range spec.TableMetadataByDestID {
186+
srcDesc := tabledesc.NewBuilder(&md.SourceDescriptor).BuildImmutableTable()
187+
if err := typedesc.HydrateTypesInDescriptor(ctx, srcDesc, crossClusterResolver); err != nil {
188+
return nil, err
189+
}
184190
procConfigByDestTableID[descpb.ID(dstTableID)] = sqlProcessorTableConfig{
185-
srcDesc: tabledesc.NewBuilder(&md.SourceDescriptor).BuildImmutableTable(),
191+
srcDesc: srcDesc,
186192
dstOID: md.DestinationFunctionOID,
187193
}
194+
188195
destTableBySrcID[md.SourceDescriptor.GetID()] = dstTableMetadata{
189196
database: md.DestinationParentDatabaseName,
190197
schema: md.DestinationParentSchemaName,

pkg/sql/execinfrapb/processors_bulk_io.proto

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -535,7 +535,8 @@ message LogicalReplicationWriterSpec {
535535

536536
optional string metrics_label = 11 [(gogoproto.nullable) = false];
537537

538-
// Next ID: 13.
538+
repeated cockroach.sql.sqlbase.TypeDescriptor type_descriptors = 13;
539+
// Next ID: 14.
539540
}
540541

541542
message LogicalReplicationOfflineScanSpec {

0 commit comments

Comments
 (0)