@@ -331,85 +331,136 @@ func assignColumnIndexesInSchema(schema sql.Schema) sql.Schema {
331331 return newSch
332332}
333333
334- func (b * Builder ) buildCreateTableLike ( inScope * scope , ct * ast. DDL ) * scope {
335- outScope , ok := b . buildTablescan ( inScope , ct . OptLike . LikeTable , nil )
336- if ! ok {
337- b . handleErr ( sql . ErrTableNotFound . New ( ct . OptLike . LikeTable . Name . String ()))
334+ func (b * Builder ) getIndexDefs ( table sql. Table ) sql. IndexDefs {
335+ idxTbl , isIdxTbl := table .(sql. IndexAddressableTable )
336+ if ! isIdxTbl {
337+ return nil
338338 }
339-
340- likeTable , ok := outScope .node .(* plan.ResolvedTable )
341- if ! ok {
342- err := fmt .Errorf ("expected resolved table: %s" , ct .OptLike .LikeTable .Name .String ())
339+ var idxDefs sql.IndexDefs
340+ idxs , err := idxTbl .GetIndexes (b .ctx )
341+ if err != nil {
343342 b .handleErr (err )
344343 }
344+ for _ , idx := range idxs {
345+ if idx .IsGenerated () {
346+ continue
347+ }
348+ constraint := sql .IndexConstraint_None
349+ if idx .IsUnique () {
350+ if idx .ID () == "PRIMARY" {
351+ constraint = sql .IndexConstraint_Primary
352+ } else {
353+ constraint = sql .IndexConstraint_Unique
354+ }
355+ }
356+ columns := make ([]sql.IndexColumn , len (idx .Expressions ()))
357+ for i , col := range idx .Expressions () {
358+ // TODO: find a better way to get only the column name if the table is present
359+ col = strings .TrimPrefix (col , idxTbl .Name ()+ "." )
360+ columns [i ] = sql.IndexColumn {Name : col }
361+ }
362+ idxDefs = append (idxDefs , & sql.IndexDef {
363+ Name : idx .ID (),
364+ Storage : sql .IndexUsing_Default ,
365+ Constraint : constraint ,
366+ Columns : columns ,
367+ Comment : idx .Comment (),
368+ })
369+ }
370+ return idxDefs
371+ }
345372
373+ func (b * Builder ) buildCreateTableLike (inScope * scope , ct * ast.DDL ) * scope {
374+ database := b .resolveDbForTable (ct .Table )
346375 newTableName := strings .ToLower (ct .Table .Name .String ())
347- outScope .setTableAlias (newTableName )
348376
377+ var pkSch sql.PrimaryKeySchema
378+ var coll sql.CollationID
379+ var comment string
380+ outScope := inScope .push ()
381+ if ct .TableSpec != nil {
382+ pkSch , coll , _ = b .tableSpecToSchema (inScope , outScope , database , strings .ToLower (ct .Table .Name .String ()), ct .TableSpec , false )
383+ }
384+
385+ var ok bool
386+ var pkOrdinals []int
387+ var newSch sql.Schema
388+ newSchMap := make (map [string ]struct {})
349389 var idxDefs sql.IndexDefs
350- if indexableTable , ok := likeTable .Table .(sql.IndexAddressableTable ); ok {
351- indexes , err := indexableTable .GetIndexes (b .ctx )
352- if err != nil {
390+ var checkDefs []* sql.CheckConstraint
391+ for _ , likeTable := range ct .OptLike .LikeTables {
392+ outScope , ok = b .buildTablescan (outScope , likeTable , nil )
393+ if ! ok {
394+ b .handleErr (sql .ErrTableNotFound .New (likeTable .Name .String ()))
395+ }
396+ lTable , isResTbl := outScope .node .(* plan.ResolvedTable )
397+ if ! isResTbl {
398+ err := fmt .Errorf ("expected resolved table: %s" , likeTable .Name .String ())
353399 b .handleErr (err )
354400 }
355- for _ , index := range indexes {
356- if index .IsGenerated () {
357- continue
358- }
359- constraint := sql .IndexConstraint_None
360- if index .IsUnique () {
361- if index .ID () == "PRIMARY" {
362- constraint = sql .IndexConstraint_Primary
363- } else {
364- constraint = sql .IndexConstraint_Unique
365- }
366- }
367401
368- columns := make ([]sql.IndexColumn , len (index .Expressions ()))
369- for i , col := range index .Expressions () {
370- //TODO: find a better way to get only the column name if the table is present
371- col = strings .TrimPrefix (col , indexableTable .Name ()+ "." )
372- columns [i ] = sql.IndexColumn {Name : col }
373- }
374- idxDefs = append (idxDefs , & sql.IndexDef {
375- Name : index .ID (),
376- Storage : sql .IndexUsing_Default ,
377- Constraint : constraint ,
378- Columns : columns ,
379- Comment : index .Comment (),
380- })
402+ if coll == sql .Collation_Unspecified {
403+ coll = lTable .Collation ()
381404 }
382- }
383- origSch := likeTable .Schema ()
384- newSch := make (sql.Schema , len (origSch ))
385- for i , col := range origSch {
386- tempCol := * col
387- tempCol .Source = newTableName
388- newSch [i ] = & tempCol
389- }
390405
391- var pkOrdinals []int
392- if pkTable , ok := likeTable .Table .(sql.PrimaryKeyTable ); ok {
393- pkOrdinals = pkTable .PrimaryKeySchema ().PkOrdinals
394- }
406+ if comment == "" {
407+ comment = lTable .Comment ()
408+ }
395409
396- var checkDefs []* sql.CheckConstraint
397- if checksTable , ok := likeTable .Table .(sql.CheckTable ); ok {
398- checks , err := checksTable .GetChecks (b .ctx )
399- if err != nil {
400- b .handleErr (err )
410+ schOff := len (newSch )
411+ hasSkippedCols := false
412+ for _ , col := range lTable .Schema () {
413+ newCol := * col
414+ name := strings .ToLower (newCol .Name )
415+ if _ , ok := newSchMap [name ]; ok {
416+ // TODO: throw warning
417+ hasSkippedCols = true
418+ continue
419+ }
420+ newSchMap [name ] = struct {}{}
421+ newCol .Source = newTableName
422+ newSch = append (newSch , & newCol )
401423 }
402424
403- for _ , check := range checks {
404- checkConstraint := b .buildCheckConstraint (outScope , & check )
405- if err != nil {
406- b .handleErr (err )
425+ // if a column was skipped due to duplicates, don't copy over PK ords, idxDefs, or checkDefs
426+ // since they might be incorrect
427+ if hasSkippedCols {
428+ continue
429+ }
430+
431+ // Copy over primary key schema ordinals
432+ if pkTable , isPkTable := lTable .Table .(sql.PrimaryKeyTable ); isPkTable {
433+ for _ , pkOrd := range pkTable .PrimaryKeySchema ().PkOrdinals {
434+ pkOrdinals = append (pkOrdinals , schOff + pkOrd )
407435 }
436+ }
437+
438+ // Load index definitions
439+ idxDefs = append (idxDefs , b .getIndexDefs (lTable .Table )... )
408440
441+ // Load check constraints
442+ newCheckDefs := b .loadChecksFromTable (outScope , lTable .Table )
443+ for _ , check := range newCheckDefs {
409444 // Prevent a name collision between old and new checks.
410- // New check will be assigned a name during building.
411- checkConstraint .Name = ""
412- checkDefs = append (checkDefs , checkConstraint )
445+ // New check name will be assigned a name during building.
446+ check .Name = ""
447+ }
448+ checkDefs = append (checkDefs , newCheckDefs ... )
449+ }
450+
451+ var hasSkippedCols bool
452+ for _ , col := range pkSch .Schema {
453+ name := strings .ToLower (col .Name )
454+ if _ , ok := newSchMap [name ]; ok {
455+ // TODO: throw warning
456+ hasSkippedCols = true
457+ continue
458+ }
459+ newSch = append (newSch , col )
460+ }
461+ if ! hasSkippedCols {
462+ for _ , pkOrd := range pkSch .PkOrdinals {
463+ pkOrdinals = append (pkOrdinals , len (newSch )+ pkOrd )
413464 }
414465 }
415466
@@ -420,13 +471,13 @@ func (b *Builder) buildCreateTableLike(inScope *scope, ct *ast.DDL) *scope {
420471 Schema : pkSchema ,
421472 IdxDefs : idxDefs ,
422473 ChDefs : checkDefs ,
423- Collation : likeTable . Collation () ,
424- Comment : likeTable . Comment () ,
474+ Collation : coll ,
475+ Comment : comment ,
425476 }
426477
427- database := b .resolveDbForTable (ct .Table )
428-
429478 b .qFlags .Set (sql .QFlagSetDatabase )
479+
480+ outScope .setTableAlias (newTableName )
430481 outScope .node = plan .NewCreateTable (database , newTableName , ct .IfNotExists , ct .Temporary , tableSpec )
431482 return outScope
432483}
0 commit comments