Skip to content

Commit 7535f5f

Browse files
authored
Shadow deploy applySchemaDeltasV2 with validations (#3770)
This pull request introduces a more robust approach for applying schema deltas to the catalog. It's a follow-up to #3768 (and addresses the race condition described in the PR description), with e2e test coverage, and with the new approach shadow-applied for validation before it's fully enabled, given this PR introduces some non-trivial changes db state that is hard to debug/rollback. Changes: - Moved previous `applySchemaDeltas` logic to `applySchemaDeltasV1`, and introduce a separate `applySchemaDeltasV2`. Note that `applySchemaDeltasV2` applies change in-memory, and works in conjunction with a `ReadModifyWriteTableSchemasToCatalog` method to support transaction with ReadModifyWrite patterns. - Added a feature flag (`PEERDB_APPLY_SCHEMA_DELTA_TO_CATALOG`) and logic to choose between the legacy (v1) and new (v2) approaches for applying schema deltas to the catalog. For now the FF is **disabled** by default, which means the old approach continue to be used as source of truth. - The new v2 method is shadow-applied. This is done by adding temporary validation utilities to compare the results of v1 and v2 schema delta approaches, logging discrepancies and reporting metrics for monitoring. - Improved end-to-end tests to cover a race condition where columns added without subsequent DML operations could be lost in the destination schema, verifying catalog correctness after each sync.
1 parent 18eb7f7 commit 7535f5f

File tree

6 files changed

+683
-2
lines changed

6 files changed

+683
-2
lines changed

flow/activities/flowable_core.go

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,64 @@ func (a *FlowableActivity) applySchemaDeltas(
6767
config *protos.FlowConnectionConfigsCore,
6868
options *protos.SyncFlowOptions,
6969
schemaDeltas []*protos.TableSchemaDelta,
70+
) error {
71+
logger := internal.LoggerFromCtx(ctx)
72+
73+
dstTableNamesInDeltas := make([]string, 0, len(schemaDeltas))
74+
for _, schemaDelta := range schemaDeltas {
75+
dstTableNamesInDeltas = append(dstTableNamesInDeltas, schemaDelta.DstTableName)
76+
}
77+
78+
applyV2, err := internal.PeerDBApplySchemaDeltaToCatalogEnabled(ctx, config.Env)
79+
if err != nil {
80+
logger.Warn("failed to check if schema delta v2 is enabled, defaulting to false", slog.Any("error", err))
81+
applyV2 = false
82+
}
83+
84+
if applyV2 {
85+
err := internal.ReadModifyWriteTableSchemasToCatalog(
86+
ctx,
87+
a.CatalogPool,
88+
logger,
89+
config.FlowJobName,
90+
dstTableNamesInDeltas,
91+
// use a closure to keep ReadModifyWriteTableSchemasToCatalog's `modifyFn` flexible
92+
func(schemas map[string]*protos.TableSchema) (map[string]*protos.TableSchema, error) {
93+
return applySchemaDeltaV2(ctx, schemas, schemaDeltas)
94+
},
95+
)
96+
if err != nil {
97+
return fmt.Errorf("failed to update table schemas in catalog: %w", err)
98+
}
99+
return nil
100+
} else {
101+
skipValidate := false
102+
baseSchema, err := internal.LoadTableSchemasFromCatalog(ctx, a.CatalogPool, config.FlowJobName, dstTableNamesInDeltas)
103+
if err != nil {
104+
logger.Warn("skipping v2 validation: cannot load base schemas", slog.Any("error", err))
105+
skipValidate = true
106+
}
107+
108+
if err := a.applySchemaDeltasV1(ctx, config, options, schemaDeltas); err != nil {
109+
return err
110+
}
111+
112+
if !skipValidate {
113+
validateV2AgainstV1(ctx, a.CatalogPool, config.FlowJobName, baseSchema, schemaDeltas, dstTableNamesInDeltas)
114+
}
115+
}
116+
return nil
117+
}
118+
119+
// existing approach to applying schemaDeltas. `applySchemaDeltas` is actually
120+
// a bit misleading, as we are fetching the latest schema from the source database.
121+
// schemaDeltas is only used to identify matching tables.
122+
// This approach has a race condition where schema deltas do not get correctly
123+
// applied because the latest schema from source db includes the added column.
124+
func (a *FlowableActivity) applySchemaDeltasV1(ctx context.Context,
125+
config *protos.FlowConnectionConfigsCore,
126+
options *protos.SyncFlowOptions,
127+
schemaDeltas []*protos.TableSchemaDelta,
70128
) error {
71129
filteredTableMappings := make([]*protos.TableMapping, 0, len(schemaDeltas))
72130
for _, tableMapping := range options.TableMappings {
@@ -93,6 +151,49 @@ func (a *FlowableActivity) applySchemaDeltas(
93151
return nil
94152
}
95153

154+
// this is the updated approach of applying schema deltas to catalog. Unlike v1,
155+
// we use `table_schema_mapping` from catalog as base, and add new columns from
156+
// schemaDeltas that are not in the existing mapping. This function returns
157+
// a copy of schemasInCatalog with schemaDeltas applied.
158+
func applySchemaDeltaV2(
159+
ctx context.Context,
160+
schemasInCatalog map[string]*protos.TableSchema,
161+
schemaDeltas []*protos.TableSchemaDelta,
162+
) (map[string]*protos.TableSchema, error) {
163+
logger := internal.LoggerFromCtx(ctx)
164+
165+
// deep copy to avoid mutating input
166+
schemasInCatalogCopy := make(map[string]*protos.TableSchema, len(schemasInCatalog))
167+
for tableName, schema := range schemasInCatalog {
168+
if schema == nil {
169+
return nil, fmt.Errorf("failed to deep copy table schema from catalog: table %s has nil schema", tableName)
170+
}
171+
schemasInCatalogCopy[tableName] = proto.CloneOf(schema)
172+
}
173+
174+
for _, schemaDelta := range schemaDeltas {
175+
if schema, exists := schemasInCatalogCopy[schemaDelta.DstTableName]; exists {
176+
columnNames := make(map[string]struct{}, len(schema.GetColumns()))
177+
for _, col := range schema.GetColumns() {
178+
columnNames[col.Name] = struct{}{}
179+
}
180+
for _, newCol := range schemaDelta.GetAddedColumns() {
181+
// only add columns that don't already exist
182+
if _, exists := columnNames[newCol.Name]; !exists {
183+
schema.Columns = append(schema.Columns, newCol)
184+
columnNames[newCol.Name] = struct{}{}
185+
} else {
186+
logger.Warn(fmt.Sprintf("skip adding duplicated column '%s' (type '%s') in table %s",
187+
newCol.Name, newCol.Type, schemaDelta.DstTableName))
188+
}
189+
}
190+
} else {
191+
logger.Warn(fmt.Sprintf("skip adding columns for table '%s' because it's not in catalog", schemaDelta.DstTableName))
192+
}
193+
}
194+
return schemasInCatalogCopy, nil
195+
}
196+
96197
func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncConnectorCore, Items model.Items](
97198
ctx context.Context,
98199
a *FlowableActivity,
Lines changed: 187 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,187 @@
1+
// TODO: This file contains temporary validation code used during the migration from v1 to v2 schema delta approaches.
2+
// Once the v2 approach is fully rolled out and v1 is removed, this entire file should be deleted.
3+
// The validation ensures that v2 produces identical results to v1 during the transition period.
4+
//
5+
// Related cleanup tasks when deleting this file:
6+
// - Remove applySchemaDeltasV1() function once v1 is deprecated
7+
// - Clean up any references to this validation logic
8+
// - Clean up `PEERDB_APPLY_SCHEMA_DELTA_TO_CATALOG` in dynconf.go
9+
package activities
10+
11+
import (
12+
"context"
13+
"fmt"
14+
"log/slog"
15+
"slices"
16+
17+
"go.opentelemetry.io/otel/attribute"
18+
"go.opentelemetry.io/otel/metric"
19+
20+
"github.com/PeerDB-io/peerdb/flow/generated/protos"
21+
"github.com/PeerDB-io/peerdb/flow/internal"
22+
"github.com/PeerDB-io/peerdb/flow/otel_metrics"
23+
"github.com/PeerDB-io/peerdb/flow/shared"
24+
)
25+
26+
// validateV2AgainstV1 compares v1 and v2 schema delta approaches for validation.
27+
// Failures in this validation are logged but don't affect production behavior.
28+
func validateV2AgainstV1(
29+
ctx context.Context,
30+
pool shared.CatalogPool,
31+
flowName string,
32+
baseSchema map[string]*protos.TableSchema,
33+
schemaDeltas []*protos.TableSchemaDelta,
34+
affectedTables []string,
35+
) {
36+
logger := internal.LoggerFromCtx(ctx)
37+
38+
schemaAfterV1, err := internal.LoadTableSchemasFromCatalog(ctx, pool, flowName, affectedTables)
39+
if err != nil {
40+
logger.Warn("skipping v1/v2 validation: cannot load post-v1 schemas", slog.Any("error", err))
41+
return
42+
}
43+
44+
schemaAfterV2, err := applySchemaDeltaV2(ctx, baseSchema, schemaDeltas)
45+
if err != nil {
46+
logger.Warn("skipping v1/v2 validation: applySchemaDeltaV2 failed", slog.Any("error", err))
47+
return
48+
}
49+
50+
reportUnexpectedSchemaDiffs(ctx, flowName, baseSchema, schemaAfterV1, schemaAfterV2, schemaDeltas)
51+
}
52+
53+
func reportUnexpectedSchemaDiffs(
54+
ctx context.Context,
55+
flowName string,
56+
baseSchemas map[string]*protos.TableSchema,
57+
schemasAfterV1 map[string]*protos.TableSchema,
58+
schemasAfterV2 map[string]*protos.TableSchema,
59+
schemaDeltas []*protos.TableSchemaDelta,
60+
) {
61+
logger := internal.LoggerFromCtx(ctx)
62+
63+
for _, schemaDelta := range schemaDeltas {
64+
tableName := schemaDelta.DstTableName
65+
baseSchema := baseSchemas[tableName]
66+
schemaV1 := schemasAfterV1[tableName]
67+
schemaV2 := schemasAfterV2[tableName]
68+
69+
if baseSchema == nil || schemaV1 == nil || schemaV2 == nil {
70+
logger.Warn("skipping validation for table due to missing schema",
71+
slog.String("table", tableName),
72+
slog.Bool("hasBase", baseSchema != nil),
73+
slog.Bool("hasV1", schemaV1 != nil),
74+
slog.Bool("hasV2", schemaV2 != nil))
75+
continue
76+
}
77+
78+
var issues []string
79+
80+
v1ColumnMap := make(map[string]*protos.FieldDescription)
81+
for _, col := range schemaV1.Columns {
82+
v1ColumnMap[col.Name] = col
83+
}
84+
85+
v2ColumnMap := make(map[string]*protos.FieldDescription)
86+
for _, col := range schemaV2.Columns {
87+
v2ColumnMap[col.Name] = col
88+
}
89+
90+
// Validate existing columns match in v1 and v2
91+
for _, baseCol := range baseSchema.Columns {
92+
v1Col, existsInV1 := v1ColumnMap[baseCol.Name]
93+
v2Col, existsInV2 := v2ColumnMap[baseCol.Name]
94+
95+
// Column missing in v1 is expected:
96+
// - in applySchemaDeltaV1, we were syncing catalog with schema from source.
97+
// DROP COLUMN DDLs are ignored and do not trigger a schema fetch,
98+
// but subsequent ADD COLUMN DDLs would fetch the latest schema from source
99+
// with dropped columns removed.
100+
// - in applySchemaDeltaV2, we never fetch schema from source, therefore it's
101+
// expected that deleted columns are in v2, but not in v1.
102+
// Only report error if column exists in v1 but not in v2.
103+
if existsInV1 && !existsInV2 {
104+
issues = append(issues, fmt.Sprintf("existing column '%s' missing in v2", baseCol.Name))
105+
}
106+
107+
if existsInV1 && existsInV2 && !fieldDescriptionEqual(v1Col, v2Col) {
108+
issues = append(issues, fmt.Sprintf("existing column '%s' differs: v1=%+v, v2=%+v",
109+
baseCol.Name, v1Col, v2Col))
110+
}
111+
}
112+
113+
// Validate new columns match in v1 and v2
114+
for _, newCol := range schemaDelta.AddedColumns {
115+
v1Col, existsInV1 := v1ColumnMap[newCol.Name]
116+
v2Col, existsInV2 := v2ColumnMap[newCol.Name]
117+
118+
if !existsInV1 {
119+
issues = append(issues, fmt.Sprintf("new column '%s' missing in v1", newCol.Name))
120+
}
121+
if !existsInV2 {
122+
issues = append(issues, fmt.Sprintf("new column '%s' missing in v2", newCol.Name))
123+
}
124+
125+
if existsInV1 && existsInV2 && !fieldDescriptionEqual(v1Col, v2Col) {
126+
issues = append(issues, fmt.Sprintf("new column '%s' differs: v1=%+v, v2=%+v",
127+
newCol.Name, v1Col, v2Col))
128+
}
129+
}
130+
131+
// Note: v1 may have extra columns due to race condition (fetching latest schema from source)
132+
// This is expected and not considered an error.
133+
134+
// Check all other TableSchema fields match
135+
if schemaV1.TableIdentifier != schemaV2.TableIdentifier {
136+
issues = append(issues, fmt.Sprintf("table_identifier differs: v1='%s', v2='%s'",
137+
schemaV1.TableIdentifier, schemaV2.TableIdentifier))
138+
}
139+
if !slices.Equal(schemaV1.PrimaryKeyColumns, schemaV2.PrimaryKeyColumns) {
140+
issues = append(issues, fmt.Sprintf("primary_key_columns differs: v1=%v, v2=%v",
141+
schemaV1.PrimaryKeyColumns, schemaV2.PrimaryKeyColumns))
142+
}
143+
if schemaV1.IsReplicaIdentityFull != schemaV2.IsReplicaIdentityFull {
144+
issues = append(issues, fmt.Sprintf("is_replica_identity_full differs: v1=%v, v2=%v",
145+
schemaV1.IsReplicaIdentityFull, schemaV2.IsReplicaIdentityFull))
146+
}
147+
if schemaV1.System != schemaV2.System {
148+
issues = append(issues, fmt.Sprintf("system differs: v1=%v, v2=%v",
149+
schemaV1.System, schemaV2.System))
150+
}
151+
if schemaV1.NullableEnabled != schemaV2.NullableEnabled {
152+
issues = append(issues, fmt.Sprintf("nullable_enabled differs: v1=%v, v2=%v",
153+
schemaV1.NullableEnabled, schemaV2.NullableEnabled))
154+
}
155+
if schemaV1.TableOid != schemaV2.TableOid {
156+
issues = append(issues, fmt.Sprintf("table_oid differs: v1=%v, v2=%v",
157+
schemaV1.TableOid, schemaV2.TableOid))
158+
}
159+
160+
// summarize the findings in logs
161+
if len(issues) > 0 {
162+
logger.Warn("schema validation issues found: v1 and v2 differ",
163+
slog.String("flowName", flowName),
164+
slog.String("table", tableName),
165+
slog.Int("issueCount", len(issues)),
166+
slog.Any("issues", issues))
167+
168+
// if there is a discrepancy, report this as a metric to code_notification
169+
otel_metrics.CodeNotificationCounter.Add(ctx, 1, metric.WithAttributeSet(attribute.NewSet(
170+
attribute.String("message", fmt.Sprintf("Schema delta v2 validation failed for flow=%s table=%s: %d issues",
171+
flowName, tableName, len(issues))),
172+
attribute.String("flowName", flowName),
173+
attribute.String("tableName", tableName),
174+
)))
175+
}
176+
}
177+
}
178+
179+
func fieldDescriptionEqual(a, b *protos.FieldDescription) bool {
180+
if a == nil || b == nil {
181+
return a == b
182+
}
183+
return a.Name == b.Name &&
184+
a.Type == b.Type &&
185+
a.TypeModifier == b.TypeModifier &&
186+
a.Nullable == b.Nullable
187+
}

0 commit comments

Comments
 (0)