Skip to content

Commit 412346e

Browse files
committed
Use query builders for DML event queries
1 parent 9af3a07 commit 412346e

File tree

4 files changed

+223
-8
lines changed

4 files changed

+223
-8
lines changed

go/logic/applier.go

Lines changed: 44 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,10 @@ type Applier struct {
6060
migrationContext *base.MigrationContext
6161
finishedMigrating int64
6262
name string
63+
64+
dmlDeleteQueryBuilder *sql.DMLDeleteQueryBuilder
65+
dmlInsertQueryBuilder *sql.DMLInsertQueryBuilder
66+
dmlUpdateQueryBuilder *sql.DMLUpdateQueryBuilder
6367
}
6468

6569
func NewApplier(migrationContext *base.MigrationContext) *Applier {
@@ -106,6 +110,37 @@ func (this *Applier) InitDBConnections() (err error) {
106110
return nil
107111
}
108112

113+
func (this *Applier) prepareQueries() (err error) {
114+
if this.dmlDeleteQueryBuilder, err = sql.NewDMLDeleteQueryBuilder(
115+
this.migrationContext.DatabaseName,
116+
this.migrationContext.GetGhostTableName(),
117+
this.migrationContext.OriginalTableColumns,
118+
&this.migrationContext.UniqueKey.Columns,
119+
); err != nil {
120+
return err
121+
}
122+
if this.dmlInsertQueryBuilder, err = sql.NewDMLInsertQueryBuilder(
123+
this.migrationContext.DatabaseName,
124+
this.migrationContext.GetGhostTableName(),
125+
this.migrationContext.OriginalTableColumns,
126+
this.migrationContext.SharedColumns,
127+
this.migrationContext.MappedSharedColumns,
128+
); err != nil {
129+
return err
130+
}
131+
if this.dmlUpdateQueryBuilder, err = sql.NewDMLUpdateQueryBuilder(
132+
this.migrationContext.DatabaseName,
133+
this.migrationContext.GetGhostTableName(),
134+
this.migrationContext.OriginalTableColumns,
135+
this.migrationContext.SharedColumns,
136+
this.migrationContext.MappedSharedColumns,
137+
&this.migrationContext.UniqueKey.Columns,
138+
); err != nil {
139+
return err
140+
}
141+
return nil
142+
}
143+
109144
// validateAndReadGlobalVariables potentially reads server global variables, such as the time_zone and wait_timeout.
110145
func (this *Applier) validateAndReadGlobalVariables() error {
111146
query := `select /* gh-ost */ @@global.time_zone, @@global.wait_timeout`
@@ -1135,35 +1170,36 @@ func (this *Applier) updateModifiesUniqueKeyColumns(dmlEvent *binlog.BinlogDMLEv
11351170

11361171
// buildDMLEventQuery creates a query to operate on the ghost table, based on an intercepted binlog
11371172
// event entry on the original table.
1138-
func (this *Applier) buildDMLEventQuery(dmlEvent *binlog.BinlogDMLEvent) (results [](*dmlBuildResult)) {
1173+
func (this *Applier) buildDMLEventQuery(dmlEvent *binlog.BinlogDMLEvent) []*dmlBuildResult {
11391174
switch dmlEvent.DML {
11401175
case binlog.DeleteDML:
11411176
{
1142-
query, uniqueKeyArgs, err := sql.BuildDMLDeleteQuery(dmlEvent.DatabaseName, this.migrationContext.GetGhostTableName(), this.migrationContext.OriginalTableColumns, &this.migrationContext.UniqueKey.Columns, dmlEvent.WhereColumnValues.AbstractValues())
1143-
return append(results, newDmlBuildResult(query, uniqueKeyArgs, -1, err))
1177+
query, uniqueKeyArgs, err := this.dmlDeleteQueryBuilder.BuildQuery(dmlEvent.WhereColumnValues.AbstractValues())
1178+
return []*dmlBuildResult{newDmlBuildResult(query, uniqueKeyArgs, -1, err)}
11441179
}
11451180
case binlog.InsertDML:
11461181
{
1147-
query, sharedArgs, err := sql.BuildDMLInsertQuery(dmlEvent.DatabaseName, this.migrationContext.GetGhostTableName(), this.migrationContext.OriginalTableColumns, this.migrationContext.SharedColumns, this.migrationContext.MappedSharedColumns, dmlEvent.NewColumnValues.AbstractValues())
1148-
return append(results, newDmlBuildResult(query, sharedArgs, 1, err))
1182+
query, sharedArgs, err := this.dmlInsertQueryBuilder.BuildQuery(dmlEvent.NewColumnValues.AbstractValues())
1183+
return []*dmlBuildResult{newDmlBuildResult(query, sharedArgs, 1, err)}
11491184
}
11501185
case binlog.UpdateDML:
11511186
{
11521187
if _, isModified := this.updateModifiesUniqueKeyColumns(dmlEvent); isModified {
1188+
results := make([]*dmlBuildResult, 0, 2)
11531189
dmlEvent.DML = binlog.DeleteDML
11541190
results = append(results, this.buildDMLEventQuery(dmlEvent)...)
11551191
dmlEvent.DML = binlog.InsertDML
11561192
results = append(results, this.buildDMLEventQuery(dmlEvent)...)
11571193
return results
11581194
}
1159-
query, sharedArgs, uniqueKeyArgs, err := sql.BuildDMLUpdateQuery(dmlEvent.DatabaseName, this.migrationContext.GetGhostTableName(), this.migrationContext.OriginalTableColumns, this.migrationContext.SharedColumns, this.migrationContext.MappedSharedColumns, &this.migrationContext.UniqueKey.Columns, dmlEvent.NewColumnValues.AbstractValues(), dmlEvent.WhereColumnValues.AbstractValues())
1195+
query, sharedArgs, uniqueKeyArgs, err := this.dmlUpdateQueryBuilder.BuildQuery(dmlEvent.NewColumnValues.AbstractValues(), dmlEvent.WhereColumnValues.AbstractValues())
11601196
args := sqlutils.Args()
11611197
args = append(args, sharedArgs...)
11621198
args = append(args, uniqueKeyArgs...)
1163-
return append(results, newDmlBuildResult(query, args, 0, err))
1199+
return []*dmlBuildResult{newDmlBuildResult(query, args, 0, err)}
11641200
}
11651201
}
1166-
return append(results, newDmlBuildResultError(fmt.Errorf("Unknown dml event type: %+v", dmlEvent.DML)))
1202+
return []*dmlBuildResult{newDmlBuildResultError(fmt.Errorf("Unknown dml event type: %+v", dmlEvent.DML))}
11671203
}
11681204

11691205
// ApplyDMLEventQueries applies multiple DML queries onto the _ghost_ table

go/logic/applier_test.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@ func TestApplierBuildDMLEventQuery(t *testing.T) {
100100
columnValues := sql.ToColumnValues([]interface{}{123456, 42})
101101

102102
migrationContext := base.NewMigrationContext()
103+
migrationContext.DatabaseName = "test"
103104
migrationContext.OriginalTableName = "test"
104105
migrationContext.OriginalTableColumns = columns
105106
migrationContext.SharedColumns = columns
@@ -110,6 +111,7 @@ func TestApplierBuildDMLEventQuery(t *testing.T) {
110111
}
111112

112113
applier := NewApplier(migrationContext)
114+
applier.prepareQueries()
113115

114116
t.Run("delete", func(t *testing.T) {
115117
binlogEvent := &binlog.BinlogDMLEvent{
@@ -290,8 +292,13 @@ func (suite *ApplierTestSuite) TestApplyDMLEventQueries() {
290292
migrationContext.OriginalTableColumns = sql.NewColumnList([]string{"id", "item_id"})
291293
migrationContext.SharedColumns = sql.NewColumnList([]string{"id", "item_id"})
292294
migrationContext.MappedSharedColumns = sql.NewColumnList([]string{"id", "item_id"})
295+
migrationContext.UniqueKey = &sql.UniqueKey{
296+
Name: "primary_key",
297+
Columns: *sql.NewColumnList([]string{"id"}),
298+
}
293299

294300
applier := NewApplier(migrationContext)
301+
suite.Require().NoError(applier.prepareQueries())
295302
defer applier.Teardown()
296303

297304
err = applier.InitDBConnections()

go/logic/migrator.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -386,6 +386,10 @@ func (this *Migrator) Migrate() (err error) {
386386
if err := this.inspector.inspectOriginalAndGhostTables(); err != nil {
387387
return err
388388
}
389+
// We can prepare some of the queries on the applier
390+
if err := this.applier.prepareQueries(); err != nil {
391+
return err
392+
}
389393
// Validation complete! We're good to execute this migration
390394
if err := this.hooksExecutor.onValidated(); err != nil {
391395
return err

go/sql/builder.go

Lines changed: 168 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -530,3 +530,171 @@ func BuildDMLUpdateQuery(databaseName, tableName string, tableColumns, sharedCol
530530
)
531531
return result, sharedArgs, uniqueKeyArgs, nil
532532
}
533+
534+
type DMLDeleteQueryBuilder struct {
535+
tableColumns, uniqueKeyColumns *ColumnList
536+
preparedStatement string
537+
}
538+
539+
func NewDMLDeleteQueryBuilder(databaseName, tableName string, tableColumns, uniqueKeyColumns *ColumnList) (*DMLDeleteQueryBuilder, error) {
540+
if uniqueKeyColumns.Len() == 0 {
541+
return nil, fmt.Errorf("no unique key columns found in NewDMLDeleteQueryBuilder")
542+
}
543+
databaseName = EscapeName(databaseName)
544+
tableName = EscapeName(tableName)
545+
equalsComparison, err := BuildEqualsPreparedComparison(uniqueKeyColumns.Names())
546+
if err != nil {
547+
return nil, err
548+
}
549+
550+
stmt := fmt.Sprintf(`
551+
delete /* gh-ost %s.%s */
552+
from
553+
%s.%s
554+
where
555+
%s`,
556+
databaseName, tableName,
557+
databaseName, tableName,
558+
equalsComparison,
559+
)
560+
561+
b := &DMLDeleteQueryBuilder{
562+
tableColumns: tableColumns,
563+
uniqueKeyColumns: uniqueKeyColumns,
564+
preparedStatement: stmt,
565+
}
566+
return b, nil
567+
}
568+
569+
func (b *DMLDeleteQueryBuilder) BuildQuery(args []interface{}) (string, []interface{}, error) {
570+
if len(args) != b.tableColumns.Len() {
571+
return "", nil, fmt.Errorf("args count differs from table column count in BuildDMLDeleteQuery")
572+
}
573+
uniqueKeyArgs := make([]interface{}, 0, b.uniqueKeyColumns.Len())
574+
for _, column := range b.uniqueKeyColumns.Columns() {
575+
tableOrdinal := b.tableColumns.Ordinals[column.Name]
576+
arg := column.convertArg(args[tableOrdinal], true)
577+
uniqueKeyArgs = append(uniqueKeyArgs, arg)
578+
}
579+
return b.preparedStatement, uniqueKeyArgs, nil
580+
}
581+
582+
type DMLInsertQueryBuilder struct {
583+
tableColumns, sharedColumns *ColumnList
584+
preparedStatement string
585+
}
586+
587+
func NewDMLInsertQueryBuilder(databaseName, tableName string, tableColumns, sharedColumns, mappedSharedColumns *ColumnList) (*DMLInsertQueryBuilder, error) {
588+
if !sharedColumns.IsSubsetOf(tableColumns) {
589+
return nil, fmt.Errorf("shared columns is not a subset of table columns in NewDMLInsertQueryBuilder")
590+
}
591+
if sharedColumns.Len() == 0 {
592+
return nil, fmt.Errorf("no shared columns found in NewDMLInsertQueryBuilder")
593+
}
594+
databaseName = EscapeName(databaseName)
595+
tableName = EscapeName(tableName)
596+
mappedSharedColumnNames := duplicateNames(mappedSharedColumns.Names())
597+
for i := range mappedSharedColumnNames {
598+
mappedSharedColumnNames[i] = EscapeName(mappedSharedColumnNames[i])
599+
}
600+
preparedValues := buildColumnsPreparedValues(mappedSharedColumns)
601+
602+
stmt := fmt.Sprintf(`
603+
replace /* gh-ost %s.%s */
604+
into
605+
%s.%s
606+
(%s)
607+
values
608+
(%s)`,
609+
databaseName, tableName,
610+
databaseName, tableName,
611+
strings.Join(mappedSharedColumnNames, ", "),
612+
strings.Join(preparedValues, ", "),
613+
)
614+
615+
return &DMLInsertQueryBuilder{
616+
tableColumns: tableColumns,
617+
sharedColumns: sharedColumns,
618+
preparedStatement: stmt,
619+
}, nil
620+
}
621+
622+
func (b *DMLInsertQueryBuilder) BuildQuery(args []interface{}) (string, []interface{}, error) {
623+
if len(args) != b.tableColumns.Len() {
624+
return "", nil, fmt.Errorf("args count differs from table column count in BuildDMLInsertQuery")
625+
}
626+
sharedArgs := make([]interface{}, 0, b.sharedColumns.Len())
627+
for _, column := range b.sharedColumns.Columns() {
628+
tableOrdinal := b.tableColumns.Ordinals[column.Name]
629+
arg := column.convertArg(args[tableOrdinal], false)
630+
sharedArgs = append(sharedArgs, arg)
631+
}
632+
return b.preparedStatement, sharedArgs, nil
633+
}
634+
635+
type DMLUpdateQueryBuilder struct {
636+
tableColumns, sharedColumns, uniqueKeyColumns *ColumnList
637+
preparedStatement string
638+
}
639+
640+
func NewDMLUpdateQueryBuilder(databaseName, tableName string, tableColumns, sharedColumns, mappedSharedColumns, uniqueKeyColumns *ColumnList) (*DMLUpdateQueryBuilder, error) {
641+
if !sharedColumns.IsSubsetOf(tableColumns) {
642+
return nil, fmt.Errorf("shared columns is not a subset of table columns in NewDMLUpdateQueryBuilder")
643+
}
644+
if !uniqueKeyColumns.IsSubsetOf(sharedColumns) {
645+
return nil, fmt.Errorf("unique key columns is not a subset of shared columns in NewDMLUpdateQueryBuilder")
646+
}
647+
if sharedColumns.Len() == 0 {
648+
return nil, fmt.Errorf("no shared columns found in NewDMLUpdateQueryBuilder")
649+
}
650+
if uniqueKeyColumns.Len() == 0 {
651+
return nil, fmt.Errorf("no unique key columns found in NewDMLUpdateQueryBuilder")
652+
}
653+
databaseName = EscapeName(databaseName)
654+
tableName = EscapeName(tableName)
655+
setClause, err := BuildSetPreparedClause(mappedSharedColumns)
656+
if err != nil {
657+
return nil, err
658+
}
659+
660+
equalsComparison, err := BuildEqualsPreparedComparison(uniqueKeyColumns.Names())
661+
if err != nil {
662+
return nil, err
663+
}
664+
stmt := fmt.Sprintf(`
665+
update /* gh-ost %s.%s */
666+
%s.%s
667+
set
668+
%s
669+
where
670+
%s`,
671+
databaseName, tableName,
672+
databaseName, tableName,
673+
setClause,
674+
equalsComparison,
675+
)
676+
return &DMLUpdateQueryBuilder{
677+
tableColumns: tableColumns,
678+
sharedColumns: sharedColumns,
679+
uniqueKeyColumns: uniqueKeyColumns,
680+
preparedStatement: stmt,
681+
}, nil
682+
}
683+
684+
func (b *DMLUpdateQueryBuilder) BuildQuery(valueArgs, whereArgs []interface{}) (string, []interface{}, []interface{}, error) {
685+
sharedArgs := make([]interface{}, 0, b.sharedColumns.Len())
686+
for _, column := range b.sharedColumns.Columns() {
687+
tableOrdinal := b.tableColumns.Ordinals[column.Name]
688+
arg := column.convertArg(valueArgs[tableOrdinal], false)
689+
sharedArgs = append(sharedArgs, arg)
690+
}
691+
692+
uniqueKeyArgs := make([]interface{}, 0, b.uniqueKeyColumns.Len())
693+
for _, column := range b.uniqueKeyColumns.Columns() {
694+
tableOrdinal := b.tableColumns.Ordinals[column.Name]
695+
arg := column.convertArg(whereArgs[tableOrdinal], true)
696+
uniqueKeyArgs = append(uniqueKeyArgs, arg)
697+
}
698+
699+
return b.preparedStatement, sharedArgs, uniqueKeyArgs, nil
700+
}

0 commit comments

Comments
 (0)