Skip to content
This repository was archived by the owner on Aug 30, 2025. It is now read-only.

Commit 2914ae6

Browse files
committed
wip on hydrating job mappings
1 parent 1c2a9d7 commit 2914ae6

File tree

1 file changed

+248
-0
lines changed
  • internal/benthos/benthos-builder/builders

1 file changed

+248
-0
lines changed

internal/benthos/benthos-builder/builders/sql.go

Lines changed: 248 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,10 @@ import (
44
"context"
55
"errors"
66
"fmt"
7+
"iter"
78
"log/slog"
9+
"maps"
10+
"slices"
811
"strings"
912

1013
mgmtv1alpha1 "github.com/nucleuscloud/neosync/backend/gen/go/protos/mgmt/v1alpha1"
@@ -59,6 +62,244 @@ func NewSqlSyncBuilder(
5962
}
6063
}
6164

65+
type JobTransformationMapping struct {
66+
*mgmtv1alpha1.JobMapping
67+
68+
DestinationSchema string
69+
DestinationTable string
70+
}
71+
72+
type CascadeSchemaSettings struct {
73+
config *mgmtv1alpha1.JobTypeConfig_JobTypeSync
74+
}
75+
76+
func (c *CascadeSchemaSettings) GetSchemaStrategy() *mgmtv1alpha1.JobTypeConfig_JobTypeSync_SchemaStrategy {
77+
return c.config.GetSchemaChange().GetSchemaStrategy()
78+
}
79+
80+
func (c *CascadeSchemaSettings) GetTableStrategy(schemaName string) *mgmtv1alpha1.JobTypeConfig_JobTypeSync_TableStrategy {
81+
for _, schemaMapping := range c.config.GetSchemaMappings() {
82+
if schemaMapping.GetSchema() == schemaName {
83+
ts := schemaMapping.GetTableStrategy()
84+
if ts != nil {
85+
return ts
86+
} else {
87+
break // fall back to global table strategy
88+
}
89+
}
90+
}
91+
return c.config.GetSchemaChange().GetTableStrategy()
92+
}
93+
94+
func (c *CascadeSchemaSettings) GetColumnStrategy(schemaName, tableName string) *mgmtv1alpha1.JobTypeConfig_JobTypeSync_ColumnStrategy {
95+
for _, schemaMapping := range c.config.GetSchemaMappings() {
96+
if schemaMapping.GetSchema() == schemaName {
97+
for _, tableMapping := range schemaMapping.GetTableMappings() {
98+
if tableMapping.GetTable() == tableName {
99+
tableLevelColumnStrategy := tableMapping.GetColumnStrategy()
100+
if tableLevelColumnStrategy != nil {
101+
return tableLevelColumnStrategy
102+
}
103+
break // fall back to schema level column strategy
104+
}
105+
}
106+
schemaLevelColumnStrategy := schemaMapping.GetColumnStrategy()
107+
if schemaLevelColumnStrategy != nil {
108+
return schemaLevelColumnStrategy
109+
}
110+
break // fall back to global column strategy
111+
}
112+
}
113+
return c.config.GetSchemaChange().GetColumnStrategy()
114+
}
115+
116+
func (c *CascadeSchemaSettings) GetDefinedSchemas() []string {
117+
output := map[string]bool{}
118+
for _, schemaMapping := range c.config.GetSchemaMappings() {
119+
output[schemaMapping.GetSchema()] = true
120+
}
121+
return slices.Collect(maps.Keys(output))
122+
}
123+
124+
func (c *CascadeSchemaSettings) GetDefinedTables(schemaName string) []string {
125+
output := map[string]bool{}
126+
for _, schemaMapping := range c.config.GetSchemaMappings() {
127+
if schemaMapping.GetSchema() == schemaName {
128+
for _, tableMapping := range schemaMapping.GetTableMappings() {
129+
output[tableMapping.GetTable()] = true
130+
}
131+
}
132+
}
133+
return slices.Collect(maps.Keys(output))
134+
}
135+
136+
func (c *CascadeSchemaSettings) GetColumnTransforms(schemaName, tableName string, sourceColumnMap map[string]*sqlmanager_shared.DatabaseSchemaRow) map[string]*mgmtv1alpha1.TransformerConfig {
137+
colStrategy := c.GetColumnStrategy(schemaName, tableName)
138+
columnAddStrat := colStrategy.GetMapAllColumns().GetColumnAdditionStrategy()
139+
columnRemStrat := colStrategy.GetMapAllColumns().GetColumnRemovalStrategy()
140+
141+
output := map[string]*mgmtv1alpha1.TransformerConfig{}
142+
143+
switch columnAddStrat.GetStrategy().(type) {
144+
case *mgmtv1alpha1.JobTypeConfig_JobTypeSync_ColumnStrategy_MapAllColumns_ColumnAdditionStrategy_AutoMap_:
145+
for _, columnRow := range sourceColumnMap {
146+
output[columnRow.ColumnName] = &mgmtv1alpha1.TransformerConfig{
147+
Config: &mgmtv1alpha1.TransformerConfig_PassthroughConfig{ // todo: this should configure the correct transformer based on the column type
148+
PassthroughConfig: &mgmtv1alpha1.Passthrough{},
149+
},
150+
}
151+
}
152+
case *mgmtv1alpha1.JobTypeConfig_JobTypeSync_ColumnStrategy_MapAllColumns_ColumnAdditionStrategy_Passthrough_:
153+
for _, columnRow := range sourceColumnMap {
154+
output[columnRow.ColumnName] = &mgmtv1alpha1.TransformerConfig{
155+
Config: &mgmtv1alpha1.TransformerConfig_PassthroughConfig{
156+
PassthroughConfig: &mgmtv1alpha1.Passthrough{},
157+
},
158+
}
159+
}
160+
161+
case *mgmtv1alpha1.JobTypeConfig_JobTypeSync_ColumnStrategy_MapAllColumns_ColumnAdditionStrategy_Halt_:
162+
// do nothing for now, need to determine what the schema mappings are for the table
163+
}
164+
165+
// todo: may need to know the direct columns in order to properly halt
166+
maps.Insert(output, c.getDirectColumnTransforms(schemaName, tableName))
167+
168+
// this may just be called by the caller
169+
switch columnRemStrat.GetStrategy().(type) {
170+
case *mgmtv1alpha1.JobTypeConfig_JobTypeSync_ColumnStrategy_MapAllColumns_ColumnRemovalStrategy_Continue_:
171+
// do nothing
172+
case *mgmtv1alpha1.JobTypeConfig_JobTypeSync_ColumnStrategy_MapAllColumns_ColumnRemovalStrategy_Halt_:
173+
// do nothing for now
174+
}
175+
176+
// todo: handle halt here? not sure, maybe it should be handled in the caller
177+
return output
178+
}
179+
180+
func (c *CascadeSchemaSettings) getDirectColumnTransforms(schemaName, tableName string) iter.Seq2[string, *mgmtv1alpha1.TransformerConfig] {
181+
return func(yield func(string, *mgmtv1alpha1.TransformerConfig) bool) {
182+
for _, schemaMapping := range c.config.GetSchemaMappings() {
183+
if schemaMapping.GetSchema() == schemaName {
184+
for _, tableMapping := range schemaMapping.GetTableMappings() {
185+
if tableMapping.GetTable() == tableName {
186+
for _, columnMapping := range tableMapping.GetColumnMappings() {
187+
if !yield(columnMapping.GetColumn(), columnMapping.GetTransformer()) {
188+
return
189+
}
190+
}
191+
}
192+
}
193+
}
194+
}
195+
}
196+
}
197+
198+
func (b *sqlSyncBuilder) hydrateJobMappings(
199+
job *mgmtv1alpha1.Job,
200+
groupedColumnInfo map[string]map[string]*sqlmanager_shared.DatabaseSchemaRow,
201+
logger *slog.Logger,
202+
) ([]JobTransformationMapping, error) {
203+
legacyMappings := job.GetMappings()
204+
if len(legacyMappings) > 0 {
205+
jobMappings := make([]JobTransformationMapping, len(legacyMappings))
206+
for i, mapping := range legacyMappings {
207+
jobMappings[i] = JobTransformationMapping{
208+
JobMapping: mapping,
209+
}
210+
}
211+
return jobMappings, nil
212+
}
213+
214+
syncConfig := job.GetJobType().GetSync()
215+
if syncConfig == nil {
216+
return nil, fmt.Errorf("unable to hydrate job mappings: sync config not found")
217+
}
218+
219+
schemaTableColumnDbInfo := getSchemaTableColumnDbInfo(groupedColumnInfo)
220+
221+
// mapAllSchemas := true
222+
// switch syncConfig.GetSchemaChange().GetSchemaStrategy().GetStrategy().(type) {
223+
// case *mgmtv1alpha1.JobTypeConfig_JobTypeSync_SchemaStrategy_MapAllSchemas_:
224+
// logger.Debug("hydrating job mappings: map all schemas")
225+
// case *mgmtv1alpha1.JobTypeConfig_JobTypeSync_SchemaStrategy_MapDefinedSchemas_:
226+
// logger.Debug("hydrating job mappings: map defined schemas")
227+
// mapAllSchemas = false
228+
// default:
229+
// //
230+
// logger.Debug("hydrating job mappings: map all schemas via default fallback")
231+
// }
232+
233+
// mapAllTables := true
234+
// _ = mapAllTables
235+
// switch syncConfig.GetSchemaChange().GetTableStrategy().GetStrategy().(type) {
236+
// case *mgmtv1alpha1.JobTypeConfig_JobTypeSync_TableStrategy_MapAllTables_:
237+
// logger.Debug("hydrating job mappings: map all tables")
238+
// case *mgmtv1alpha1.JobTypeConfig_JobTypeSync_TableStrategy_MapDefinedTables_:
239+
// logger.Debug("hydrating job mappings: map defined tables")
240+
// mapAllTables = false
241+
// default:
242+
// logger.Debug("hydrating job mappings: map all tables via default fallback")
243+
// }
244+
245+
// globalColumnStrategy := syncConfig.GetSchemaChange().GetColumnStrategy()
246+
// _ = globalColumnStrategy
247+
248+
// for _, schemaMapping := range syncConfig.GetSchemaMappings() {
249+
// schema := schemaMapping.GetSchema()
250+
// schemaMapping.GetTableStrategy()
251+
// for _, tableMapping := range schemaMapping.GetTableMappings() {
252+
// table := tableMapping.GetTable()
253+
254+
// }
255+
// }
256+
257+
// jobMappings := []JobTransformationMapping{}
258+
259+
// if mapAllSchemas {
260+
// for schema, tableMap := range schemaTableColumnDbInfo {
261+
// for table, columnDetails := range tableMap {
262+
// for _, columnDetail := range columnDetails {
263+
// jobMappings = append(jobMappings, JobTransformationMapping{
264+
// JobMapping: &mgmtv1alpha1.JobMapping{
265+
// Schema: schema,
266+
// Table: table,
267+
// Column: columnDetail.ColumnName,
268+
// Transformer: &mgmtv1alpha1.JobMappingTransformer{
269+
// Config: &mgmtv1alpha1.TransformerConfig{
270+
// Config: &mgmtv1alpha1.TransformerConfig_PassthroughConfig{},
271+
// },
272+
// },
273+
// },
274+
// DestinationSchema: schema,
275+
// DestinationTable: table,
276+
// })
277+
// }
278+
// }
279+
// }
280+
// }
281+
282+
return jobMappings, nil
283+
}
284+
285+
// outputs schema -> table -> []column info
286+
func getSchemaTableColumnDbInfo(groupedColumnInfo map[string]map[string]*sqlmanager_shared.DatabaseSchemaRow) map[string]map[string][]*sqlmanager_shared.DatabaseSchemaRow {
287+
output := map[string]map[string][]*sqlmanager_shared.DatabaseSchemaRow{}
288+
289+
for _, columnMap := range groupedColumnInfo {
290+
for _, row := range columnMap {
291+
tableMap, ok := output[row.TableSchema]
292+
if !ok {
293+
tableMap = map[string][]*sqlmanager_shared.DatabaseSchemaRow{}
294+
}
295+
tableMap[row.TableName] = append(tableMap[row.TableName], row)
296+
output[row.TableSchema] = tableMap
297+
}
298+
}
299+
300+
return output
301+
}
302+
62303
func (b *sqlSyncBuilder) BuildSourceConfigs(ctx context.Context, params *bb_internal.SourceParams) ([]*bb_internal.BenthosSourceConfig, error) {
63304
sourceConnection := params.SourceConnection
64305
job := params.Job
@@ -84,6 +325,13 @@ func (b *sqlSyncBuilder) BuildSourceConfigs(ctx context.Context, params *bb_inte
84325
return nil, fmt.Errorf("unable to get database schema for connection: %w", err)
85326
}
86327

328+
job.Mappings, err = b.hydrateJobMappings(job, groupedColumnInfo)
329+
if err != nil {
330+
return nil, fmt.Errorf("unable to hydrate job mappings: %w", err)
331+
}
332+
333+
// I think here is where I need to hydrate the mappings if it's a v2 job
334+
87335
b.sqlSourceSchemaColumnInfoMap = groupedColumnInfo
88336
if sqlSourceOpts != nil && sqlSourceOpts.HaltOnNewColumnAddition {
89337
newColumns, shouldHalt := shouldHaltOnSchemaAddition(groupedColumnInfo, job.Mappings)

0 commit comments

Comments
 (0)