Skip to content

Commit d6cc88d

Browse files
author
James Cor
committed
support multi-like statements
1 parent 3281d09 commit d6cc88d

File tree

1 file changed

+89
-65
lines changed

1 file changed

+89
-65
lines changed

sql/planbuilder/ddl.go

Lines changed: 89 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -332,84 +332,108 @@ func assignColumnIndexesInSchema(schema sql.Schema) sql.Schema {
332332
}
333333

334334
func (b *Builder) buildCreateTableLike(inScope *scope, ct *ast.DDL) *scope {
335-
outScope, ok := b.buildTablescan(inScope, ct.OptLike.LikeTable, nil)
336-
if !ok {
337-
b.handleErr(sql.ErrTableNotFound.New(ct.OptLike.LikeTable.Name.String()))
338-
}
335+
database := b.resolveDbForTable(ct.Table)
336+
newTableName := strings.ToLower(ct.Table.Name.String())
339337

340-
likeTable, ok := outScope.node.(*plan.ResolvedTable)
341-
if !ok {
342-
err := fmt.Errorf("expected resolved table: %s", ct.OptLike.LikeTable.Name.String())
343-
b.handleErr(err)
338+
var pkSch sql.PrimaryKeySchema
339+
var coll sql.CollationID
340+
var comment string
341+
outScope := inScope.push()
342+
if ct.TableSpec != nil {
343+
pkSch, coll, _ = b.tableSpecToSchema(inScope, outScope, database, strings.ToLower(ct.Table.Name.String()), ct.TableSpec, false)
344344
}
345345

346-
newTableName := strings.ToLower(ct.Table.Name.String())
347-
outScope.setTableAlias(newTableName)
348-
346+
var ok bool
347+
var pkOrdinals []int
348+
newSch := pkSch.Schema
349349
var idxDefs sql.IndexDefs
350-
if indexableTable, ok := likeTable.Table.(sql.IndexAddressableTable); ok {
351-
indexes, err := indexableTable.GetIndexes(b.ctx)
352-
if err != nil {
350+
var checkDefs []*sql.CheckConstraint
351+
for _, likeTable := range ct.OptLike.LikeTables {
352+
outScope, ok = b.buildTablescan(outScope, likeTable, nil)
353+
if !ok {
354+
b.handleErr(sql.ErrTableNotFound.New(likeTable.Name.String()))
355+
}
356+
lTable, isResTbl := outScope.node.(*plan.ResolvedTable)
357+
if !isResTbl {
358+
err := fmt.Errorf("expected resolved table: %s", likeTable.Name.String())
353359
b.handleErr(err)
354360
}
355-
for _, index := range indexes {
356-
if index.IsGenerated() {
357-
continue
358-
}
359-
constraint := sql.IndexConstraint_None
360-
if index.IsUnique() {
361-
if index.ID() == "PRIMARY" {
362-
constraint = sql.IndexConstraint_Primary
363-
} else {
364-
constraint = sql.IndexConstraint_Unique
365-
}
366-
}
367361

368-
columns := make([]sql.IndexColumn, len(index.Expressions()))
369-
for i, col := range index.Expressions() {
370-
//TODO: find a better way to get only the column name if the table is present
371-
col = strings.TrimPrefix(col, indexableTable.Name()+".")
372-
columns[i] = sql.IndexColumn{Name: col}
362+
if coll == sql.Collation_Unspecified {
363+
coll = lTable.Collation()
364+
}
365+
366+
if comment == "" {
367+
comment = lTable.Comment()
368+
}
369+
370+
// Copy over primary key schema ordinals
371+
if pkTable, isPkTable := lTable.Table.(sql.PrimaryKeyTable); isPkTable {
372+
for _, pkOrd := range pkTable.PrimaryKeySchema().PkOrdinals {
373+
pkOrdinals = append(pkOrdinals, len(newSch) + pkOrd)
373374
}
374-
idxDefs = append(idxDefs, &sql.IndexDef{
375-
Name: index.ID(),
376-
Storage: sql.IndexUsing_Default,
377-
Constraint: constraint,
378-
Columns: columns,
379-
Comment: index.Comment(),
380-
})
381375
}
382-
}
383-
origSch := likeTable.Schema()
384-
newSch := make(sql.Schema, len(origSch))
385-
for i, col := range origSch {
386-
tempCol := *col
387-
tempCol.Source = newTableName
388-
newSch[i] = &tempCol
389-
}
390376

391-
var pkOrdinals []int
392-
if pkTable, ok := likeTable.Table.(sql.PrimaryKeyTable); ok {
393-
pkOrdinals = pkTable.PrimaryKeySchema().PkOrdinals
394-
}
377+
// Deep copy columns from schema
378+
for _, col := range lTable.Schema() {
379+
newCol := *col
380+
newCol.Source = newTableName
381+
newSch = append(newSch, &newCol)
382+
}
395383

396-
var checkDefs []*sql.CheckConstraint
397-
if checksTable, ok := likeTable.Table.(sql.CheckTable); ok {
398-
checks, err := checksTable.GetChecks(b.ctx)
399-
if err != nil {
400-
b.handleErr(err)
384+
// Load index definitions
385+
if idxTbl, isIdxTbl := lTable.Table.(sql.IndexAddressableTable); isIdxTbl {
386+
idxs, err := idxTbl.GetIndexes(b.ctx)
387+
if err != nil {
388+
b.handleErr(err)
389+
}
390+
for _, idx := range idxs {
391+
if idx.IsGenerated() {
392+
continue
393+
}
394+
constraint := sql.IndexConstraint_None
395+
if idx.IsUnique() {
396+
if idx.ID() == "PRIMARY" {
397+
// TODO: deal with multiple primary key constraints?
398+
constraint = sql.IndexConstraint_Primary
399+
} else {
400+
constraint = sql.IndexConstraint_Unique
401+
}
402+
}
403+
404+
columns := make([]sql.IndexColumn, len(idx.Expressions()))
405+
for i, col := range idx.Expressions() {
406+
//TODO: find a better way to get only the column name if the table is present
407+
col = strings.TrimPrefix(col, idxTbl.Name()+".")
408+
columns[i] = sql.IndexColumn{Name: col}
409+
}
410+
idxDefs = append(idxDefs, &sql.IndexDef{
411+
Name: idx.ID(),
412+
Storage: sql.IndexUsing_Default,
413+
Constraint: constraint,
414+
Columns: columns,
415+
Comment: idx.Comment(),
416+
})
417+
}
401418
}
402419

403-
for _, check := range checks {
404-
checkConstraint := b.buildCheckConstraint(outScope, &check)
420+
// Load check constraints
421+
if chkTable, isChkTable := lTable.Table.(sql.CheckTable); isChkTable {
422+
checks, err := chkTable.GetChecks(b.ctx)
405423
if err != nil {
406424
b.handleErr(err)
407425
}
426+
for _, check := range checks {
427+
checkConstraint := b.buildCheckConstraint(outScope, &check)
428+
if err != nil {
429+
b.handleErr(err)
430+
}
408431

409-
// Prevent a name collision between old and new checks.
410-
// New check will be assigned a name during building.
411-
checkConstraint.Name = ""
412-
checkDefs = append(checkDefs, checkConstraint)
432+
// Prevent a name collision between old and new checks.
433+
// New check name will be assigned a name during building.
434+
checkConstraint.Name = ""
435+
checkDefs = append(checkDefs, checkConstraint)
436+
}
413437
}
414438
}
415439

@@ -420,13 +444,13 @@ func (b *Builder) buildCreateTableLike(inScope *scope, ct *ast.DDL) *scope {
420444
Schema: pkSchema,
421445
IdxDefs: idxDefs,
422446
ChDefs: checkDefs,
423-
Collation: likeTable.Collation(),
424-
Comment: likeTable.Comment(),
447+
Collation: coll,
448+
Comment: comment,
425449
}
426450

427-
database := b.resolveDbForTable(ct.Table)
428-
429451
b.qFlags.Set(sql.QFlagSetDatabase)
452+
453+
outScope.setTableAlias(newTableName)
430454
outScope.node = plan.NewCreateTable(database, newTableName, ct.IfNotExists, ct.Temporary, tableSpec)
431455
return outScope
432456
}

0 commit comments

Comments
 (0)