Skip to content

Commit 6d045b1

Browse files
committed
Dealing with the removed tableMappings - pt2; build is passing
1 parent e6add9b commit 6d045b1

File tree

8 files changed

+59
-22
lines changed

8 files changed

+59
-22
lines changed

flow/activities/flowable.go

Lines changed: 25 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,12 @@ func (a *FlowableActivity) EnsurePullability(
132132
if err != nil {
133133
return nil, err
134134
}
135-
config.SourceTableIdentifiers = slices.Sorted(maps.Keys(internal.TableNameMapping(cfg.TableMappings, cfg.Resync)))
135+
tableMappings, err := internal.FetchTableMappingsFromDB(ctx, config.FlowJobName, cfg.TableMappingVersion)
136+
if err != nil {
137+
return nil, err
138+
}
139+
140+
config.SourceTableIdentifiers = slices.Sorted(maps.Keys(internal.TableNameMapping(tableMappings, cfg.Resync)))
136141

137142
output, err := srcConn.EnsurePullability(ctx, config)
138143
if err != nil {
@@ -154,12 +159,6 @@ func (a *FlowableActivity) CreateRawTable(
154159
}
155160
defer connectors.CloseConnector(ctx, dstConn)
156161

157-
cfg, err := internal.FetchConfigFromDB(config.FlowJobName, ctx)
158-
if err != nil {
159-
return nil, err
160-
}
161-
config.TableNameMapping = internal.TableNameMapping(cfg.TableMappings, cfg.Resync)
162-
163162
res, err := dstConn.CreateRawTable(ctx, config)
164163
if err != nil {
165164
return nil, a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
@@ -193,11 +192,15 @@ func (a *FlowableActivity) SetupTableSchema(
193192
if err != nil {
194193
return err
195194
}
196-
tableNameSchemaMapping, err := srcConn.GetTableSchema(ctx, config.Env, config.Version, config.System, cfg.TableMappings)
195+
tableMappings, err := internal.FetchTableMappingsFromDB(ctx, cfg.FlowJobName, cfg.TableMappingVersion)
196+
if err != nil {
197+
return err
198+
}
199+
tableNameSchemaMapping, err := srcConn.GetTableSchema(ctx, config.Env, config.Version, config.System, tableMappings)
197200
if err != nil {
198201
return a.Alerter.LogFlowError(ctx, config.FlowName, fmt.Errorf("failed to get GetTableSchemaConnector: %w", err))
199202
}
200-
processed := internal.BuildProcessedSchemaMapping(cfg.TableMappings, tableNameSchemaMapping, logger)
203+
processed := internal.BuildProcessedSchemaMapping(tableMappings, tableNameSchemaMapping, logger)
201204

202205
tx, err := a.CatalogPool.BeginTx(ctx, pgx.TxOptions{})
203206
if err != nil {
@@ -266,9 +269,13 @@ func (a *FlowableActivity) CreateNormalizedTable(
266269
if err != nil {
267270
return nil, err
268271
}
272+
tableMappings, err := internal.FetchTableMappingsFromDB(ctx, cfg.FlowJobName, cfg.TableMappingVersion)
273+
if err != nil {
274+
return nil, err
275+
}
269276
numTablesToSetup.Store(int32(len(tableNameSchemaMapping)))
270277
tableExistsMapping := make(map[string]bool, len(tableNameSchemaMapping))
271-
for _, tableMapping := range cfg.TableMappings {
278+
for _, tableMapping := range tableMappings {
272279
tableIdentifier := tableMapping.DestinationTableIdentifier
273280
tableSchema := tableNameSchemaMapping[tableIdentifier]
274281
existing, err := conn.SetupNormalizedTable(
@@ -321,9 +328,14 @@ func (a *FlowableActivity) SyncFlow(
321328
return err
322329
}
323330

331+
tableMappings, err := internal.FetchTableMappingsFromDB(ctx, cfg.FlowJobName, cfg.TableMappingVersion)
332+
if err != nil {
333+
return err
334+
}
335+
324336
// Override config with DB values to deal with the large fields.
325337
config = cfg
326-
options.TableMappings = cfg.TableMappings
338+
options.TableMappings = tableMappings
327339

328340
syncState.Store(shared.Ptr("setup"))
329341
shutdown := heartbeatRoutine(ctx, func() string {
@@ -967,7 +979,8 @@ func (a *FlowableActivity) RecordMetricsCritical(ctx context.Context) error {
967979
if isActive {
968980
activeFlows = append(activeFlows, info)
969981
}
970-
a.OtelManager.Metrics.SyncedTablesGauge.Record(ctx, int64(len(info.config.TableMappings)))
982+
//TODO: this will need a special query as we can extract this straight from the DB.
983+
//a.OtelManager.Metrics.SyncedTablesGauge.Record(ctx, int64(len(info.config.TableMappings)))
971984
a.OtelManager.Metrics.FlowStatusGauge.Record(ctx, 1, metric.WithAttributeSet(attribute.NewSet(
972985
attribute.String(otel_metrics.FlowStatusKey, info.status.String()),
973986
attribute.Bool(otel_metrics.IsFlowActiveKey, isActive),

flow/activities/flowable_core.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -682,13 +682,18 @@ func (a *FlowableActivity) startNormalize(
682682
return fmt.Errorf("failed to get table name schema mapping: %w", err)
683683
}
684684

685+
tableMappings, err := internal.FetchTableMappingsFromDB(ctx, config.FlowJobName, config.TableMappingVersion)
686+
if err != nil {
687+
return err
688+
}
689+
685690
for {
686691
logger.Info("normalizing batch", slog.Int64("syncBatchID", batchID))
687692
res, err := dstConn.NormalizeRecords(ctx, &model.NormalizeRecordsRequest{
688693
FlowJobName: config.FlowJobName,
689694
Env: config.Env,
690695
TableNameSchemaMapping: tableNameSchemaMapping,
691-
TableMappings: config.TableMappings,
696+
TableMappings: tableMappings,
692697
SoftDeleteColName: config.SoftDeleteColName,
693698
SyncedAtColName: config.SyncedAtColName,
694699
SyncBatchID: batchID,

flow/activities/snapshot_activity.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,11 @@ func (a *SnapshotActivity) SetupReplication(
7171
if err != nil {
7272
return nil, err
7373
}
74-
config.TableNameMapping = internal.TableNameMapping(cfg.TableMappings, cfg.Resync)
74+
tableMappings, err := internal.FetchTableMappingsFromDB(ctx, cfg.FlowJobName, cfg.TableMappingVersion)
75+
if err != nil {
76+
return nil, err
77+
}
78+
config.TableNameMapping = internal.TableNameMapping(tableMappings, cfg.Resync)
7579

7680
logger.Info("waiting for slot to be created...")
7781
slotInfo, err := conn.SetupReplication(ctx, config)
@@ -192,8 +196,12 @@ func (a *SnapshotActivity) GetDefaultPartitionKeyForTables(
192196
if err != nil {
193197
return nil, err
194198
}
199+
tableMappings, err := internal.FetchTableMappingsFromDB(ctx, cfg.FlowJobName, cfg.TableMappingVersion)
200+
if err != nil {
201+
return nil, err
202+
}
195203
output, err := connector.GetDefaultPartitionKeyForTables(ctx, &protos.GetDefaultPartitionKeyForTablesInput{
196-
TableMappings: cfg.TableMappings,
204+
TableMappings: tableMappings,
197205
})
198206
if err != nil {
199207
return nil, a.Alerter.LogFlowError(ctx, input.FlowJobName, fmt.Errorf("failed to check if tables can parallel load: %w", err))

flow/cmd/handler.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -468,8 +468,7 @@ func (h *FlowRequestHandler) FlowStateChange(
468468
if err != nil {
469469
return nil, NewInternalApiError(fmt.Errorf("unable to get flow config: %w", err))
470470
}
471-
tableMappings, err := internal.FetchTableMappingsFromDB(ctx, h.pool, req.FlowJobName)
472-
471+
tableMappings, err := internal.FetchTableMappingsFromDB(ctx, config.FlowJobName, config.TableMappingVersion)
473472
if err != nil {
474473
return nil, NewInternalApiError(fmt.Errorf("unable to get table mappings: %w", err))
475474
}

flow/cmd/mirror_status.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,8 @@ func (h *FlowRequestHandler) cdcFlowStatus(
163163
if state.SyncFlowOptions != nil {
164164
config.IdleTimeoutSeconds = state.SyncFlowOptions.IdleTimeoutSeconds
165165
config.MaxBatchSize = state.SyncFlowOptions.BatchSize
166-
config.TableMappings = state.SyncFlowOptions.TableMappings
166+
//TODO: this will need to fetch from the DB?
167+
//config.TableMappings = state.SyncFlowOptions.TableMappings
167168
}
168169

169170
srcType, err := connectors.LoadPeerType(ctx, h.pool, config.SourceName)

flow/cmd/validate_mirror.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ func (h *FlowRequestHandler) validateCDCMirrorImpl(
8282
}
8383
defer connectors.CloseConnector(ctx, srcConn)
8484

85-
if err := srcConn.ValidateMirrorSource(ctx, req.ConnectionConfigs); err != nil {
85+
if err := srcConn.ValidateMirrorSource(ctx, req.ConnectionConfigs, req.TableMappings); err != nil {
8686
return nil, NewFailedPreconditionApiError(
8787
fmt.Errorf("failed to validate source connector %s: %w", req.ConnectionConfigs.SourceName, err))
8888
}
@@ -107,7 +107,7 @@ func (h *FlowRequestHandler) validateCDCMirrorImpl(
107107
return nil, NewFailedPreconditionApiError(fmt.Errorf("failed to get source table schema: %w", getTableSchemaError))
108108
}
109109
}
110-
if err := dstConn.ValidateMirrorDestination(ctx, req.ConnectionConfigs, tableSchemaMap); err != nil {
110+
if err := dstConn.ValidateMirrorDestination(ctx, req.ConnectionConfigs, tableSchemaMap, req.TableMappings); err != nil {
111111
return nil, NewFailedPreconditionApiError(
112112
fmt.Errorf("failed to validate destination connector %s: %w", req.ConnectionConfigs.DestinationName, err))
113113
}

flow/workflows/cdc_flow.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -305,7 +305,11 @@ func processTableAdditions(
305305
additionalTablesCfg := proto.CloneOf(cfg)
306306
additionalTablesCfg.DoInitialSnapshot = !flowConfigUpdate.SkipInitialSnapshotForTableAdditions
307307
additionalTablesCfg.InitialSnapshotOnly = true
308-
additionalTablesCfg.TableMappings = flowConfigUpdate.AdditionalTables
308+
// TODO: thought - maybe we need to pass additionalTables and then in the
309+
// CDCWorkflow we persist them to the DB and send an incremented `tableMappingVersion`
310+
// to the new workflow?
311+
//additionalTablesCfg.TableMappings = flowConfigUpdate.AdditionalTables
312+
309313
additionalTablesCfg.Resync = false
310314
if state.SnapshotNumRowsPerPartition > 0 {
311315
additionalTablesCfg.SnapshotNumRowsPerPartition = state.SnapshotNumRowsPerPartition

flow/workflows/snapshot_flow.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -287,7 +287,14 @@ func (s *SnapshotFlowExecution) cloneTables(
287287
return err
288288
}
289289

290-
for _, v := range cfg.TableMappings {
290+
tableMappingsCtx := context.Background()
291+
defer tableMappingsCtx.Done()
292+
tableMappings, err := internal.FetchTableMappingsFromDB(tableMappingsCtx, cfg.FlowJobName, cfg.TableMappingVersion)
293+
if err != nil {
294+
return err
295+
}
296+
297+
for _, v := range tableMappings {
291298
source := v.SourceTableIdentifier
292299
destination := v.DestinationTableIdentifier
293300
s.logger.Info(

0 commit comments

Comments
 (0)