Skip to content

Commit c1c1e80

Browse files
authored
Revert "handle out-of-order columns in partitioned table (#4035)" (#4123)
This reverts commit fa5064f.
1 parent e3124aa commit c1c1e80

File tree

2 files changed

+13
-141
lines changed

2 files changed

+13
-141
lines changed

flow/connectors/postgres/cdc.go

Lines changed: 13 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@ import (
3838
"github.com/PeerDB-io/peerdb/flow/shared/types"
3939
)
4040

41-
//nolint:govet // fieldalignment: fields grouped by purpose for readability
4241
type PostgresCDCSource struct {
4342
*PostgresConnector
4443
srcTableIDNameMapping map[uint32]string
@@ -52,8 +51,6 @@ type PostgresCDCSource struct {
5251

5352
// for partitioned tables, maps child relid to parent relid
5453
childToParentRelIDMapping map[uint32]uint32
55-
idToRelKindMap map[uint32]byte
56-
publishViaPartitionRoot bool
5754

5855
// for storing schema delta audit logs to catalog
5956
catalogPool shared.CatalogPool
@@ -85,22 +82,13 @@ type PostgresCDCConfig struct {
8582

8683
// Create a new PostgresCDCSource
8784
func (c *PostgresConnector) NewPostgresCDCSource(ctx context.Context, cdcConfig *PostgresCDCConfig) (*PostgresCDCSource, error) {
88-
childToParentRelIDMap, idToRelKindMap, err := getChildToParentRelIDMap(ctx,
85+
childToParentRelIDMap, err := getChildToParentRelIDMap(ctx,
8986
c.conn, slices.Collect(maps.Keys(cdcConfig.SrcTableIDNameMapping)),
9087
cdcConfig.HandleInheritanceForNonPartitionedTables)
9188
if err != nil {
9289
return nil, fmt.Errorf("error getting child to parent relid map: %w", err)
9390
}
9491

95-
var publishViaPartitionRoot bool
96-
if err := c.conn.QueryRow(ctx,
97-
"SELECT COALESCE(pubviaroot, false) FROM pg_publication WHERE pubname=$1",
98-
cdcConfig.Publication,
99-
).Scan(&publishViaPartitionRoot); err != nil {
100-
return nil, fmt.Errorf("error checking publish_via_partition_root for publication %s: %w",
101-
cdcConfig.Publication, err)
102-
}
103-
10492
var schemaNameForRelID map[uint32]string
10593
if cdcConfig.SourceSchemaAsDestinationColumn {
10694
schemaNameForRelID = make(map[uint32]string, len(cdcConfig.TableNameSchemaMapping))
@@ -119,8 +107,6 @@ func (c *PostgresConnector) NewPostgresCDCSource(ctx context.Context, cdcConfig
119107
publication: cdcConfig.Publication,
120108
commitLock: nil,
121109
childToParentRelIDMapping: childToParentRelIDMap,
122-
idToRelKindMap: idToRelKindMap,
123-
publishViaPartitionRoot: publishViaPartitionRoot,
124110
catalogPool: cdcConfig.CatalogPool,
125111
otelManager: cdcConfig.OtelManager,
126112
hushWarnUnhandledMessageType: make(map[pglogrepl.MessageType]struct{}),
@@ -150,14 +136,14 @@ func (p *PostgresCDCSource) getSourceSchemaForDestinationColumn(relID uint32, ta
150136

151137
func getChildToParentRelIDMap(ctx context.Context,
152138
conn *pgx.Conn, parentTableOIDs []uint32, handleInheritanceForNonPartitionedTables bool,
153-
) (map[uint32]uint32, map[uint32]byte, error) {
139+
) (map[uint32]uint32, error) {
154140
relkinds := "'p'"
155141
if handleInheritanceForNonPartitionedTables {
156142
relkinds = "'p', 'r'"
157143
}
158144

159145
query := fmt.Sprintf(`
160-
SELECT parent.oid AS parentrelid, child.oid AS childrelid, parent.relkind
146+
SELECT parent.oid AS parentrelid, child.oid AS childrelid
161147
FROM pg_inherits
162148
JOIN pg_class parent ON pg_inherits.inhparent = parent.oid
163149
JOIN pg_class child ON pg_inherits.inhrelid = child.oid
@@ -166,22 +152,19 @@ func getChildToParentRelIDMap(ctx context.Context,
166152

167153
rows, err := conn.Query(ctx, query, parentTableOIDs)
168154
if err != nil {
169-
return nil, nil, fmt.Errorf("error querying for child to parent relid map: %w", err)
155+
return nil, fmt.Errorf("error querying for child to parent relid map: %w", err)
170156
}
171157

172158
childToParentRelIDMap := make(map[uint32]uint32)
173-
idToRelKindMap := make(map[uint32]byte)
174159
var parentRelID, childRelID pgtype.Uint32
175-
var relkind byte
176-
if _, err := pgx.ForEachRow(rows, []any{&parentRelID, &childRelID, &relkind}, func() error {
160+
if _, err := pgx.ForEachRow(rows, []any{&parentRelID, &childRelID}, func() error {
177161
childToParentRelIDMap[childRelID.Uint32] = parentRelID.Uint32
178-
idToRelKindMap[parentRelID.Uint32] = relkind
179162
return nil
180163
}); err != nil {
181-
return nil, nil, fmt.Errorf("error iterating over child to parent relid map: %w", err)
164+
return nil, fmt.Errorf("error iterating over child to parent relid map: %w", err)
182165
}
183166

184-
return childToParentRelIDMap, idToRelKindMap, nil
167+
return childToParentRelIDMap, nil
185168
}
186169

187170
// replProcessor implements ingesting PostgreSQL logical replication tuples into items.
@@ -934,10 +917,8 @@ func processMessage[Items model.Items](
934917
p.otelManager.Metrics.CommitLagGauge.Record(ctx, time.Now().UTC().Sub(msg.CommitTime).Microseconds())
935918
p.commitLock = nil
936919
case *pglogrepl.RelationMessage:
937-
originalRelID := msg.RelationID
938-
var parentRelKind byte
939920
// treat all relation messages as corresponding to parent if partitioned.
940-
msg.RelationID, parentRelKind, err = p.checkIfUnknownTableInherits(ctx, msg.RelationID)
921+
msg.RelationID, err = p.checkIfUnknownTableInherits(ctx, msg.RelationID)
941922
if err != nil {
942923
return nil, err
943924
}
@@ -946,14 +927,6 @@ func processMessage[Items model.Items](
946927
return nil, nil
947928
}
948929

949-
// With publish_via_partition_root = true, PG emits a parent RelationMessage
950-
// followed by a child RelationMessage for each partition. The parent's
951-
// column list matches the tuple data wire format, so skip the child's
952-
// to avoid overwriting with a potentially reordered column definition.
953-
if originalRelID != msg.RelationID && parentRelKind == 'p' && p.publishViaPartitionRoot {
954-
return nil, nil
955-
}
956-
957930
logger.Info("processing RelationMessage",
958931
slog.Any("LSN", currentClientXlogPos),
959932
slog.Uint64("RelationID", uint64(msg.RelationID)),
@@ -1326,7 +1299,7 @@ func (p *PostgresCDCSource) getParentRelIDIfPartitioned(relID uint32) uint32 {
13261299
// filtered by relkind; parent needs to be a partitioned table by default
13271300
func (p *PostgresCDCSource) checkIfUnknownTableInherits(ctx context.Context,
13281301
relID uint32,
1329-
) (uint32, byte, error) {
1302+
) (uint32, error) {
13301303
relID = p.getParentRelIDIfPartitioned(relID)
13311304
relkinds := "'p'"
13321305
if p.handleInheritanceForNonPartitionedTables {
@@ -1343,18 +1316,18 @@ func (p *PostgresCDCSource) checkIfUnknownTableInherits(ctx context.Context,
13431316
relID,
13441317
).Scan(&parentRelID); err != nil {
13451318
if errors.Is(err, pgx.ErrNoRows) {
1346-
return relID, 0, nil
1319+
return relID, nil
13471320
}
1348-
return 0, 0, fmt.Errorf("failed to query pg_inherits: %w", err)
1321+
return 0, fmt.Errorf("failed to query pg_inherits: %w", err)
13491322
}
13501323
p.childToParentRelIDMapping[relID] = parentRelID
13511324
p.hushWarnUnknownTableDetected[relID] = struct{}{}
13521325
p.logger.Info("Detected new child table in CDC stream, remapping to parent table",
13531326
slog.Uint64("childRelID", uint64(relID)),
13541327
slog.Uint64("parentRelID", uint64(parentRelID)),
13551328
slog.String("parentTableName", p.srcTableIDNameMapping[parentRelID]))
1356-
return parentRelID, p.idToRelKindMap[parentRelID], nil
1329+
return parentRelID, nil
13571330
}
13581331

1359-
return relID, p.idToRelKindMap[relID], nil
1332+
return relID, nil
13601333
}

flow/e2e/generic_test.go

Lines changed: 0 additions & 101 deletions
Original file line numberDiff line numberDiff line change
@@ -451,107 +451,6 @@ func (s Generic) Test_Partitioned_Table() {
451451
RequireEnvCanceled(t, env)
452452
}
453453

454-
func (s Generic) Test_Partitioned_Table_With_Different_Column_Ordering() {
455-
t := s.T()
456-
457-
pgSource, ok := s.Source().(*PostgresSource)
458-
if !ok {
459-
t.Skip("test only applies to postgres")
460-
}
461-
conn := pgSource.PostgresConnector
462-
463-
srcTable := "test_partition_reorder"
464-
dstTable := "test_partition_reorder_dst"
465-
srcSchemaTable := AttachSchema(s, srcTable)
466-
467-
// Parent: column ordering: id, key, value, created_at
468-
// p1: uses parent column ordering
469-
// p2: attaches partitioned table with different column ordering from parent
470-
// p3: attaches partitioned table with same column ordering as parent
471-
_, err := conn.Conn().Exec(t.Context(), fmt.Sprintf(`
472-
CREATE TABLE %[1]s(
473-
id INT NOT NULL,
474-
key TEXT,
475-
value INT,
476-
created_at TIMESTAMP DEFAULT now(),
477-
PRIMARY KEY (created_at, id)
478-
) PARTITION BY RANGE(created_at);
479-
CREATE TABLE %[1]s_p1
480-
PARTITION OF %[1]s
481-
FOR VALUES FROM ('2024-01-01') TO ('2024-05-01');
482-
CREATE TABLE %[1]s_p2(
483-
value INT,
484-
created_at TIMESTAMP DEFAULT now(),
485-
key TEXT,
486-
id INT NOT NULL,
487-
PRIMARY KEY (created_at, id)
488-
);
489-
ALTER TABLE %[1]s ATTACH PARTITION %[1]s_p2
490-
FOR VALUES FROM ('2024-05-01') TO ('2024-09-01');
491-
CREATE TABLE %[1]s_p3(
492-
id INT NOT NULL,
493-
key TEXT,
494-
value INT,
495-
created_at TIMESTAMP DEFAULT now(),
496-
PRIMARY KEY (created_at, id)
497-
);
498-
ALTER TABLE %[1]s ATTACH PARTITION %[1]s_p3
499-
FOR VALUES FROM ('2024-09-01') TO ('2025-01-01');
500-
`, srcSchemaTable))
501-
require.NoError(t, err)
502-
503-
connectionGen := FlowConnectionGenerationConfig{
504-
FlowJobName: AddSuffix(s, srcTable),
505-
TableMappings: TableMappings(s, srcTable, dstTable),
506-
Destination: s.Peer().Name,
507-
}
508-
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s)
509-
510-
tc := NewTemporalClient(t)
511-
env := ExecutePeerflow(t, tc, flowConnConfig)
512-
SetupCDCFlowStatusQuery(t, env, flowConnConfig)
513-
514-
// Batch 1: insert into all 3 partitions, verify correctness
515-
_, err = conn.Conn().Exec(t.Context(), fmt.Sprintf(
516-
`INSERT INTO %s(id, key, value, created_at) VALUES
517-
(1, 'a', 10, '2024-02-15'),
518-
(2, 'b', 20, '2024-06-15'),
519-
(3, 'c', 30, '2024-10-15')`, srcSchemaTable))
520-
EnvNoError(t, env, err)
521-
522-
EnvWaitForEqualTablesWithNames(env, s, "first batch 3 rows",
523-
srcTable, dstTable, `id,key,value,created_at`)
524-
525-
// Batch 2: insert into all 3 partitions, verify correctness across batches
526-
_, err = conn.Conn().Exec(t.Context(), fmt.Sprintf(
527-
`INSERT INTO %s(id, key, value, created_at) VALUES
528-
(4, 'd', 40, '2024-11-15'),
529-
(5, 'e', 50, '2024-03-15'),
530-
(6, 'f', 60, '2024-07-15')`, srcSchemaTable))
531-
EnvNoError(t, env, err)
532-
533-
EnvWaitForEqualTablesWithNames(env, s, "all 6 rows",
534-
srcTable, dstTable, `id,key,value,created_at`)
535-
536-
// DDL: add a new column to the parent, should propagate to all children
537-
_, err = conn.Conn().Exec(t.Context(), fmt.Sprintf(
538-
`ALTER TABLE %s ADD COLUMN extra TEXT`, srcSchemaTable))
539-
EnvNoError(t, env, err)
540-
541-
// Batch 3: insert into all 3 partitions, verify correctness with new column addition
542-
_, err = conn.Conn().Exec(t.Context(), fmt.Sprintf(
543-
`INSERT INTO %s(id, key, value, created_at, extra) VALUES
544-
(7, 'g', 70, '2024-08-15', 'x1'),
545-
(8, 'h', 80, '2024-12-15', 'x2'),
546-
(9, 'i', 90, '2024-04-15', 'x3')`, srcSchemaTable))
547-
EnvNoError(t, env, err)
548-
549-
EnvWaitForEqualTablesWithNames(env, s, "all 9 rows with new column",
550-
srcTable, dstTable, `id,key,value,created_at,extra`)
551-
env.Cancel(t.Context())
552-
RequireEnvCanceled(t, env)
553-
}
554-
555454
func (s Generic) Test_Schema_Changes_Cutoff_Bug() {
556455
t := s.T()
557456

0 commit comments

Comments
 (0)