Skip to content

Commit 6033561

Browse files
author
James Cor
committed
handle duplicates and merge order
1 parent 4ea2b94 commit 6033561

File tree

1 file changed

+32
-10
lines changed

1 file changed

+32
-10
lines changed

sql/planbuilder/ddl.go

Lines changed: 32 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -345,7 +345,8 @@ func (b *Builder) buildCreateTableLike(inScope *scope, ct *ast.DDL) *scope {
345345

346346
var ok bool
347347
var pkOrdinals []int
348-
newSch := pkSch.Schema
348+
var newSch sql.Schema
349+
newSchMap := make(map[string]struct{})
349350
var idxDefs sql.IndexDefs
350351
var checkDefs []*sql.CheckConstraint
351352
for _, likeTable := range ct.OptLike.LikeTables {
@@ -367,20 +368,34 @@ func (b *Builder) buildCreateTableLike(inScope *scope, ct *ast.DDL) *scope {
367368
comment = lTable.Comment()
368369
}
369370

370-
// Copy over primary key schema ordinals
371-
if pkTable, isPkTable := lTable.Table.(sql.PrimaryKeyTable); isPkTable {
372-
for _, pkOrd := range pkTable.PrimaryKeySchema().PkOrdinals {
373-
pkOrdinals = append(pkOrdinals, len(newSch)+pkOrd)
374-
}
375-
}
376-
377-
// Deep copy columns from schema
371+
schOff := len(newSch)
372+
hasSkippedCols := false
378373
for _, col := range lTable.Schema() {
379374
newCol := *col
375+
name := strings.ToLower(newCol.Name)
376+
if _, ok := newSchMap[name]; ok {
377+
// TODO: throw warning
378+
hasSkippedCols = true
379+
continue
380+
}
381+
newSchMap[name] = struct{}{}
380382
newCol.Source = newTableName
381383
newSch = append(newSch, &newCol)
382384
}
383385

386+
// if a column was skipped due to duplicates, don't copy over PK ords, idxDefs, or checkDefs
387+
// since they might be incorrect
388+
if hasSkippedCols {
389+
continue
390+
}
391+
392+
// Copy over primary key schema ordinals
393+
if pkTable, isPkTable := lTable.Table.(sql.PrimaryKeyTable); isPkTable {
394+
for _, pkOrd := range pkTable.PrimaryKeySchema().PkOrdinals {
395+
pkOrdinals = append(pkOrdinals, schOff+pkOrd)
396+
}
397+
}
398+
384399
// Load index definitions
385400
if idxTbl, isIdxTbl := lTable.Table.(sql.IndexAddressableTable); isIdxTbl {
386401
idxs, err := idxTbl.GetIndexes(b.ctx)
@@ -403,7 +418,7 @@ func (b *Builder) buildCreateTableLike(inScope *scope, ct *ast.DDL) *scope {
403418

404419
columns := make([]sql.IndexColumn, len(idx.Expressions()))
405420
for i, col := range idx.Expressions() {
406-
//TODO: find a better way to get only the column name if the table is present
421+
// TODO: find a better way to get only the column name if the table is present
407422
col = strings.TrimPrefix(col, idxTbl.Name()+".")
408423
columns[i] = sql.IndexColumn{Name: col}
409424
}
@@ -437,6 +452,13 @@ func (b *Builder) buildCreateTableLike(inScope *scope, ct *ast.DDL) *scope {
437452
}
438453
}
439454

455+
for _, col := range pkSch.Schema {
456+
newSch = append(newSch, col)
457+
}
458+
for _, pkOrd := range pkSch.PkOrdinals {
459+
pkOrdinals = append(pkOrdinals, len(newSch) + pkOrd)
460+
}
461+
440462
pkSchema := sql.NewPrimaryKeySchema(newSch, pkOrdinals...)
441463
pkSchema.Schema = b.resolveSchemaDefaults(outScope, pkSchema.Schema)
442464

0 commit comments

Comments
 (0)