Skip to content

Commit e2cce3a

Browse files
committed
Proposal: Activity to fetch config from the DB
This uses a new helper in `internal.FetchConfigFromDB` to fetch a fully hydrated `protos.FlowConnectionConfigs`. When passing this config to Temporal we strip the `tableMappings` array element which can cause it to go over the 2MB limit. This contains a subset of the changes which were originally proposed in: #3407
1 parent 18fc7f0 commit e2cce3a

File tree

5 files changed

+88
-54
lines changed

5 files changed

+88
-54
lines changed

flow/activities/flowable.go

Lines changed: 36 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,10 @@ import (
55
"errors"
66
"fmt"
77
"log/slog"
8+
"maps"
89
"net"
910
"os"
11+
"slices"
1012
"strconv"
1113
"sync/atomic"
1214
"time"
@@ -125,6 +127,13 @@ func (a *FlowableActivity) EnsurePullability(
125127
}
126128
defer connectors.CloseConnector(ctx, srcConn)
127129

130+
// We can fetch from the DB, as we are in the activity
131+
cfg, err := internal.FetchConfigFromDB(config.FlowJobName, ctx)
132+
if err != nil {
133+
return nil, err
134+
}
135+
config.SourceTableIdentifiers = slices.Sorted(maps.Keys(internal.TableNameMapping(cfg.TableMappings, cfg.Resync)))
136+
128137
output, err := srcConn.EnsurePullability(ctx, config)
129138
if err != nil {
130139
return nil, a.Alerter.LogFlowError(ctx, config.FlowJobName, fmt.Errorf("failed to ensure pullability: %w", err))
@@ -145,6 +154,12 @@ func (a *FlowableActivity) CreateRawTable(
145154
}
146155
defer connectors.CloseConnector(ctx, dstConn)
147156

157+
cfg, err := internal.FetchConfigFromDB(config.FlowJobName, ctx)
158+
if err != nil {
159+
return nil, err
160+
}
161+
config.TableNameMapping = internal.TableNameMapping(cfg.TableMappings, cfg.Resync)
162+
148163
res, err := dstConn.CreateRawTable(ctx, config)
149164
if err != nil {
150165
return nil, a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
@@ -174,11 +189,15 @@ func (a *FlowableActivity) SetupTableSchema(
174189
}
175190
defer connectors.CloseConnector(ctx, srcConn)
176191

177-
tableNameSchemaMapping, err := srcConn.GetTableSchema(ctx, config.Env, config.Version, config.System, config.TableMappings)
192+
cfg, err := internal.FetchConfigFromDB(config.FlowName, ctx)
193+
if err != nil {
194+
return err
195+
}
196+
tableNameSchemaMapping, err := srcConn.GetTableSchema(ctx, config.Env, config.Version, config.System, cfg.TableMappings)
178197
if err != nil {
179198
return a.Alerter.LogFlowError(ctx, config.FlowName, fmt.Errorf("failed to get GetTableSchemaConnector: %w", err))
180199
}
181-
processed := internal.BuildProcessedSchemaMapping(config.TableMappings, tableNameSchemaMapping, logger)
200+
processed := internal.BuildProcessedSchemaMapping(cfg.TableMappings, tableNameSchemaMapping, logger)
182201

183202
tx, err := a.CatalogPool.BeginTx(ctx, pgx.TxOptions{})
184203
if err != nil {
@@ -243,9 +262,13 @@ func (a *FlowableActivity) CreateNormalizedTable(
243262
return nil, err
244263
}
245264

265+
cfg, err := internal.FetchConfigFromDB(config.FlowName, ctx)
266+
if err != nil {
267+
return nil, err
268+
}
246269
numTablesToSetup.Store(int32(len(tableNameSchemaMapping)))
247270
tableExistsMapping := make(map[string]bool, len(tableNameSchemaMapping))
248-
for _, tableMapping := range config.TableMappings {
271+
for _, tableMapping := range cfg.TableMappings {
249272
tableIdentifier := tableMapping.DestinationTableIdentifier
250273
tableSchema := tableNameSchemaMapping[tableIdentifier]
251274
existing, err := conn.SetupNormalizedTable(
@@ -292,6 +315,16 @@ func (a *FlowableActivity) SyncFlow(
292315
var normalizeWaiting atomic.Bool
293316
var syncingBatchID atomic.Int64
294317
var syncState atomic.Pointer[string]
318+
319+
cfg, err := internal.FetchConfigFromDB(config.FlowJobName, ctx)
320+
if err != nil {
321+
return err
322+
}
323+
324+
// Override config with DB values to deal with the large fields.
325+
config = cfg
326+
options.TableMappings = cfg.TableMappings
327+
295328
syncState.Store(shared.Ptr("setup"))
296329
shutdown := heartbeatRoutine(ctx, func() string {
297330
// Must load Waiting after BatchID to avoid race saying we're waiting on currently processing batch

flow/activities/snapshot_activity.go

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,14 @@ func (a *SnapshotActivity) SetupReplication(
6565
return nil, a.Alerter.LogFlowError(ctx, config.FlowJobName, fmt.Errorf("failed to get connector: %w", err))
6666
}
6767

68+
configCtx := context.Background()
69+
defer configCtx.Done()
70+
cfg, err := internal.FetchConfigFromDB(config.FlowJobName, configCtx)
71+
if err != nil {
72+
return nil, err
73+
}
74+
config.TableNameMapping = internal.TableNameMapping(cfg.TableMappings, cfg.Resync)
75+
6876
logger.Info("waiting for slot to be created...")
6977
slotInfo, err := conn.SetupReplication(ctx, config)
7078

@@ -180,8 +188,12 @@ func (a *SnapshotActivity) GetDefaultPartitionKeyForTables(
180188
}
181189
defer connectors.CloseConnector(ctx, connector)
182190

191+
cfg, err := internal.FetchConfigFromDB(input.FlowJobName, ctx)
192+
if err != nil {
193+
return nil, err
194+
}
183195
output, err := connector.GetDefaultPartitionKeyForTables(ctx, &protos.GetDefaultPartitionKeyForTablesInput{
184-
TableMappings: input.TableMappings,
196+
TableMappings: cfg.TableMappings,
185197
})
186198
if err != nil {
187199
return nil, a.Alerter.LogFlowError(ctx, input.FlowJobName, fmt.Errorf("failed to check if tables can parallel load: %w", err))

flow/workflows/cdc_flow.go

Lines changed: 13 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -296,7 +296,7 @@ func processTableAdditions(
296296
alterPublicationAddAdditionalTablesFuture := workflow.ExecuteActivity(
297297
alterPublicationAddAdditionalTablesCtx,
298298
flowable.AddTablesToPublication,
299-
cfg, flowConfigUpdate.AdditionalTables)
299+
internal.MinimizeFlowConfiguration(cfg), flowConfigUpdate.AdditionalTables)
300300

301301
var res *CDCFlowWorkflowResult
302302
var addTablesFlowErr error
@@ -338,6 +338,7 @@ func processTableAdditions(
338338
childAddTablesCDCFlowFuture := workflow.ExecuteChildWorkflow(
339339
childAddTablesCDCFlowCtx,
340340
CDCFlowWorkflow,
341+
// TODO: `additonalTableCfg` this cannot be minimized in the main branch; but the limitation is minimal.
341342
additionalTablesCfg,
342343
nil,
343344
)
@@ -412,7 +413,7 @@ func processTableRemovals(
412413
rawTableCleanupFuture := workflow.ExecuteActivity(
413414
removeTablesCtx,
414415
flowable.RemoveTablesFromRawTable,
415-
cfg, state.FlowConfigUpdate.RemovedTables)
416+
internal.MinimizeFlowConfiguration(cfg), state.FlowConfigUpdate.RemovedTables)
416417
removeTablesSelector.AddFuture(rawTableCleanupFuture, func(f workflow.Future) {
417418
if err := f.Get(ctx, nil); err != nil {
418419
logger.Error("failed to clean up raw table for removed tables", slog.Any("error", err))
@@ -424,7 +425,7 @@ func processTableRemovals(
424425
removeTablesFromCatalogFuture := workflow.ExecuteActivity(
425426
removeTablesCtx,
426427
flowable.RemoveTablesFromCatalog,
427-
cfg, state.FlowConfigUpdate.RemovedTables)
428+
internal.MinimizeFlowConfiguration(cfg), state.FlowConfigUpdate.RemovedTables)
428429
removeTablesSelector.AddFuture(removeTablesFromCatalogFuture, func(f workflow.Future) {
429430
if err := f.Get(ctx, nil); err != nil {
430431
logger.Error("failed to clean up raw table for removed tables", slog.Any("error", err))
@@ -594,7 +595,7 @@ func CDCFlowWorkflow(
594595

595596
logger.Info("mirror resumed", slog.Duration("after", time.Since(startTime)))
596597
state.updateStatus(ctx, logger, protos.FlowStatus_STATUS_RUNNING)
597-
return state, workflow.NewContinueAsNewError(ctx, CDCFlowWorkflow, cfg, state)
598+
return state, workflow.NewContinueAsNewError(ctx, CDCFlowWorkflow, internal.MinimizeFlowConfiguration(cfg), state)
598599
}
599600

600601
originalRunID := workflow.GetInfo(ctx).OriginalRunID
@@ -627,22 +628,6 @@ func CDCFlowWorkflow(
627628
// for safety, rely on the idempotency of SetupFlow instead
628629
// also, no signals are being handled until the loop starts, so no PAUSE/DROP will take here.
629630
if state.CurrentFlowStatus != protos.FlowStatus_STATUS_RUNNING {
630-
originalTableMappings := make([]*protos.TableMapping, 0, len(cfg.TableMappings))
631-
for _, tableMapping := range cfg.TableMappings {
632-
originalTableMappings = append(originalTableMappings, proto.CloneOf(tableMapping))
633-
}
634-
// if resync is true, alter the table name schema mapping to temporarily add
635-
// a suffix to the table names.
636-
if cfg.Resync {
637-
for _, mapping := range state.SyncFlowOptions.TableMappings {
638-
if mapping.Engine != protos.TableEngine_CH_ENGINE_NULL {
639-
mapping.DestinationTableIdentifier += "_resync"
640-
}
641-
}
642-
// because we have renamed the tables.
643-
cfg.TableMappings = state.SyncFlowOptions.TableMappings
644-
}
645-
646631
// start the SetupFlow workflow as a child workflow, and wait for it to complete
647632
// it should return the table schema for the source peer
648633
setupFlowID := GetChildWorkflowID("setup-flow", cfg.FlowJobName, originalRunID)
@@ -666,7 +651,6 @@ func CDCFlowWorkflow(
666651
state.ActiveSignal = model.ResyncSignal
667652
cfg.Resync = true
668653
cfg.DoInitialSnapshot = true
669-
cfg.TableMappings = originalTableMappings
670654
// this is the only place where we can have a resync during a resync
671655
// so we need to NOT sync the tableMappings to catalog to preserve original names
672656
uploadConfigToCatalog(ctx, cfg)
@@ -690,7 +674,10 @@ func CDCFlowWorkflow(
690674
WaitForCancellation: true,
691675
}
692676
setupFlowCtx := workflow.WithChildOptions(ctx, childSetupFlowOpts)
693-
setupFlowFuture := workflow.ExecuteChildWorkflow(setupFlowCtx, SetupFlowWorkflow, cfg)
677+
// Resync will rely rely on the `cfg.Resync` flag to rename the tables
678+
// during the snapshot process. This is how we're able to also remove the need
679+
// to sync the config back into the DB / not rely on the `state.TableMappings`.
680+
setupFlowFuture := workflow.ExecuteChildWorkflow(setupFlowCtx, SetupFlowWorkflow, internal.MinimizeFlowConfiguration(cfg))
694681

695682
var setupFlowOutput *protos.SetupFlowOutput
696683
var setupFlowError error
@@ -826,7 +813,7 @@ func CDCFlowWorkflow(
826813
logger.Info("executed setup flow and snapshot flow, start running")
827814
state.updateStatus(ctx, logger, protos.FlowStatus_STATUS_RUNNING)
828815
}
829-
return state, workflow.NewContinueAsNewError(ctx, CDCFlowWorkflow, cfg, state)
816+
return state, workflow.NewContinueAsNewError(ctx, CDCFlowWorkflow, internal.MinimizeFlowConfiguration(cfg), state)
830817
}
831818

832819
var finished bool
@@ -837,7 +824,8 @@ func CDCFlowWorkflow(
837824
WaitForCancellation: true,
838825
RetryPolicy: &temporal.RetryPolicy{MaximumAttempts: 1},
839826
}))
840-
syncFlowFuture := workflow.ExecuteActivity(syncCtx, flowable.SyncFlow, cfg, state.SyncFlowOptions)
827+
state.SyncFlowOptions.TableMappings = []*protos.TableMapping{}
828+
syncFlowFuture := workflow.ExecuteActivity(syncCtx, flowable.SyncFlow, internal.MinimizeFlowConfiguration(cfg), state.SyncFlowOptions)
841829

842830
mainLoopSelector := workflow.NewNamedSelector(ctx, "MainLoop")
843831
mainLoopSelector.AddReceive(ctx.Done(), func(_ workflow.ReceiveChannel, _ bool) {
@@ -958,7 +946,7 @@ func CDCFlowWorkflow(
958946
if state.ActiveSignal == model.TerminateSignal || state.ActiveSignal == model.ResyncSignal {
959947
return state, workflow.NewContinueAsNewError(ctx, DropFlowWorkflow, state.DropFlowInput)
960948
}
961-
return state, workflow.NewContinueAsNewError(ctx, CDCFlowWorkflow, cfg, state)
949+
return state, workflow.NewContinueAsNewError(ctx, CDCFlowWorkflow, internal.MinimizeFlowConfiguration(cfg), state)
962950
}
963951
}
964952
}

flow/workflows/setup_flow.go

Lines changed: 14 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -36,15 +36,17 @@ type SetupFlowExecution struct {
3636
tableNameMapping map[string]string
3737
cdcFlowName string
3838
executionID string
39+
resync bool
3940
}
4041

4142
// NewSetupFlowExecution creates a new instance of SetupFlowExecution.
42-
func NewSetupFlowExecution(ctx workflow.Context, tableNameMapping map[string]string, cdcFlowName string) *SetupFlowExecution {
43+
func NewSetupFlowExecution(ctx workflow.Context, tableNameMapping map[string]string, cdcFlowName string, resync bool) *SetupFlowExecution {
4344
return &SetupFlowExecution{
4445
Logger: log.With(workflow.GetLogger(ctx), slog.String(string(shared.FlowNameKey), cdcFlowName)),
4546
tableNameMapping: tableNameMapping,
4647
cdcFlowName: cdcFlowName,
4748
executionID: workflow.GetInfo(ctx).WorkflowExecution.ID,
49+
resync: resync,
4850
}
4951
}
5052

@@ -119,10 +121,9 @@ func (s *SetupFlowExecution) ensurePullability(
119121

120122
// create EnsurePullabilityInput for the srcTableName
121123
ensurePullabilityInput := &protos.EnsurePullabilityBatchInput{
122-
PeerName: config.SourceName,
123-
FlowJobName: s.cdcFlowName,
124-
SourceTableIdentifiers: slices.Sorted(maps.Keys(s.tableNameMapping)),
125-
CheckConstraints: checkConstraints,
124+
PeerName: config.SourceName,
125+
FlowJobName: s.cdcFlowName,
126+
CheckConstraints: checkConstraints,
126127
}
127128

128129
future := workflow.ExecuteActivity(ctx, flowable.EnsurePullability, ensurePullabilityInput)
@@ -162,9 +163,8 @@ func (s *SetupFlowExecution) createRawTable(
162163

163164
// attempt to create the tables.
164165
createRawTblInput := &protos.CreateRawTableInput{
165-
PeerName: config.DestinationName,
166-
FlowJobName: s.cdcFlowName,
167-
TableNameMapping: s.tableNameMapping,
166+
PeerName: config.DestinationName,
167+
FlowJobName: s.cdcFlowName,
168168
}
169169

170170
rawTblFuture := workflow.ExecuteActivity(ctx, flowable.CreateRawTable, createRawTblInput)
@@ -191,12 +191,11 @@ func (s *SetupFlowExecution) setupNormalizedTables(
191191
})
192192

193193
tableSchemaInput := &protos.SetupTableSchemaBatchInput{
194-
PeerName: flowConnectionConfigs.SourceName,
195-
TableMappings: flowConnectionConfigs.TableMappings,
196-
FlowName: s.cdcFlowName,
197-
System: flowConnectionConfigs.System,
198-
Env: flowConnectionConfigs.Env,
199-
Version: flowConnectionConfigs.Version,
194+
PeerName: flowConnectionConfigs.SourceName,
195+
FlowName: s.cdcFlowName,
196+
System: flowConnectionConfigs.System,
197+
Env: flowConnectionConfigs.Env,
198+
Version: flowConnectionConfigs.Version,
200199
}
201200

202201
if err := workflow.ExecuteActivity(ctx, flowable.SetupTableSchema, tableSchemaInput).Get(ctx, nil); err != nil {
@@ -207,7 +206,6 @@ func (s *SetupFlowExecution) setupNormalizedTables(
207206
s.Info("setting up normalized tables on destination peer", slog.String("destination", flowConnectionConfigs.DestinationName))
208207
setupConfig := &protos.SetupNormalizedTableBatchInput{
209208
PeerName: flowConnectionConfigs.DestinationName,
210-
TableMappings: flowConnectionConfigs.TableMappings,
211209
SoftDeleteColName: flowConnectionConfigs.SoftDeleteColName,
212210
SyncedAtColName: flowConnectionConfigs.SyncedAtColName,
213211
FlowName: flowConnectionConfigs.FlowJobName,
@@ -267,7 +265,7 @@ func SetupFlowWorkflow(ctx workflow.Context, config *protos.FlowConnectionConfig
267265
}
268266

269267
// create the setup flow execution
270-
setupFlowExecution := NewSetupFlowExecution(ctx, tblNameMapping, config.FlowJobName)
268+
setupFlowExecution := NewSetupFlowExecution(ctx, tblNameMapping, config.FlowJobName, config.Resync)
271269

272270
// execute the setup flow
273271
setupFlowOutput, err := setupFlowExecution.executeSetupFlow(ctx, config)

flow/workflows/snapshot_flow.go

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package peerflow
22

33
import (
4+
"context"
45
"fmt"
56
"log/slog"
67
"slices"
@@ -55,15 +56,10 @@ func (s *SnapshotFlowExecution) setupReplication(
5556
},
5657
})
5758

58-
tblNameMapping := make(map[string]string, len(s.config.TableMappings))
59-
for _, v := range s.config.TableMappings {
60-
tblNameMapping[v.SourceTableIdentifier] = v.DestinationTableIdentifier
61-
}
62-
6359
setupReplicationInput := &protos.SetupReplicationInput{
64-
PeerName: s.config.SourceName,
65-
FlowJobName: flowName,
66-
TableNameMapping: tblNameMapping,
60+
PeerName: s.config.SourceName,
61+
FlowJobName: flowName,
62+
// TableNameMapping: tblNameMapping,
6763
DoInitialSnapshot: s.config.DoInitialSnapshot,
6864
ExistingPublicationName: s.config.PublicationName,
6965
ExistingReplicationSlotName: s.config.ReplicationSlotName,
@@ -284,7 +280,14 @@ func (s *SnapshotFlowExecution) cloneTables(
284280
return err
285281
}
286282

287-
for _, v := range s.config.TableMappings {
283+
configCtx := context.Background()
284+
defer configCtx.Done()
285+
cfg, err := internal.FetchConfigFromDB(s.config.FlowJobName, configCtx)
286+
if err != nil {
287+
return err
288+
}
289+
290+
for _, v := range cfg.TableMappings {
288291
source := v.SourceTableIdentifier
289292
destination := v.DestinationTableIdentifier
290293
s.logger.Info(

0 commit comments

Comments
 (0)