@@ -132,7 +132,12 @@ func (a *FlowableActivity) EnsurePullability(
132132 if err != nil {
133133 return nil , err
134134 }
135- config .SourceTableIdentifiers = slices .Sorted (maps .Keys (internal .TableNameMapping (cfg .TableMappings , cfg .Resync )))
135+ tableMappings , err := internal .FetchTableMappingsFromDB (ctx , config .FlowJobName , cfg .TableMappingVersion )
136+ if err != nil {
137+ return nil , err
138+ }
139+
140+ config .SourceTableIdentifiers = slices .Sorted (maps .Keys (internal .TableNameMapping (tableMappings , cfg .Resync )))
136141
137142 output , err := srcConn .EnsurePullability (ctx , config )
138143 if err != nil {
@@ -154,12 +159,6 @@ func (a *FlowableActivity) CreateRawTable(
154159 }
155160 defer connectors .CloseConnector (ctx , dstConn )
156161
157- cfg , err := internal .FetchConfigFromDB (config .FlowJobName , ctx )
158- if err != nil {
159- return nil , err
160- }
161- config .TableNameMapping = internal .TableNameMapping (cfg .TableMappings , cfg .Resync )
162-
163162 res , err := dstConn .CreateRawTable (ctx , config )
164163 if err != nil {
165164 return nil , a .Alerter .LogFlowError (ctx , config .FlowJobName , err )
@@ -193,11 +192,15 @@ func (a *FlowableActivity) SetupTableSchema(
193192 if err != nil {
194193 return err
195194 }
196- tableNameSchemaMapping , err := srcConn .GetTableSchema (ctx , config .Env , config .Version , config .System , cfg .TableMappings )
195+ tableMappings , err := internal .FetchTableMappingsFromDB (ctx , cfg .FlowJobName , cfg .TableMappingVersion )
196+ if err != nil {
197+ return err
198+ }
199+ tableNameSchemaMapping , err := srcConn .GetTableSchema (ctx , config .Env , config .Version , config .System , tableMappings )
197200 if err != nil {
198201 return a .Alerter .LogFlowError (ctx , config .FlowName , fmt .Errorf ("failed to get GetTableSchemaConnector: %w" , err ))
199202 }
200- processed := internal .BuildProcessedSchemaMapping (cfg . TableMappings , tableNameSchemaMapping , logger )
203+ processed := internal .BuildProcessedSchemaMapping (tableMappings , tableNameSchemaMapping , logger )
201204
202205 tx , err := a .CatalogPool .BeginTx (ctx , pgx.TxOptions {})
203206 if err != nil {
@@ -266,9 +269,13 @@ func (a *FlowableActivity) CreateNormalizedTable(
266269 if err != nil {
267270 return nil , err
268271 }
272+ tableMappings , err := internal .FetchTableMappingsFromDB (ctx , cfg .FlowJobName , cfg .TableMappingVersion )
273+ if err != nil {
274+ return nil , err
275+ }
269276 numTablesToSetup .Store (int32 (len (tableNameSchemaMapping )))
270277 tableExistsMapping := make (map [string ]bool , len (tableNameSchemaMapping ))
271- for _ , tableMapping := range cfg . TableMappings {
278+ for _ , tableMapping := range tableMappings {
272279 tableIdentifier := tableMapping .DestinationTableIdentifier
273280 tableSchema := tableNameSchemaMapping [tableIdentifier ]
274281 existing , err := conn .SetupNormalizedTable (
@@ -321,9 +328,14 @@ func (a *FlowableActivity) SyncFlow(
321328 return err
322329 }
323330
331+ tableMappings , err := internal .FetchTableMappingsFromDB (ctx , cfg .FlowJobName , cfg .TableMappingVersion )
332+ if err != nil {
333+ return err
334+ }
335+
324336 // Override config with DB values to deal with the large fields.
325337 config = cfg
326- options .TableMappings = cfg . TableMappings
338+ options .TableMappings = tableMappings
327339
328340 syncState .Store (shared .Ptr ("setup" ))
329341 shutdown := heartbeatRoutine (ctx , func () string {
@@ -1073,7 +1085,8 @@ func (a *FlowableActivity) RecordMetricsCritical(ctx context.Context) error {
10731085 if isActive {
10741086 activeFlows = append (activeFlows , info )
10751087 }
1076- a .OtelManager .Metrics .SyncedTablesGauge .Record (ctx , int64 (len (info .config .TableMappings )))
1088+ //TODO: this will need a special query as we can extract this straight from the DB.
1089+ //a.OtelManager.Metrics.SyncedTablesGauge.Record(ctx, int64(len(info.config.TableMappings)))
10771090 a .OtelManager .Metrics .FlowStatusGauge .Record (ctx , 1 , metric .WithAttributeSet (attribute .NewSet (
10781091 attribute .String (otel_metrics .FlowStatusKey , info .status .String ()),
10791092 attribute .Bool (otel_metrics .IsFlowActiveKey , isActive ),
0 commit comments