@@ -354,20 +354,25 @@ func dataDiffForAllTableRows(sdb *sqlite.Conn, schemaName string, tableName stri
354
354
// schemas match.
355
355
func dataDiffForModifiedTableRows (sdb * sqlite.Conn , tableName string , merge MergeStrategy ) (diff []DataDiff , err error ) {
356
356
// Retrieve a list of all primary key columns and other columns in this table
357
- pk , _ , other_columns , err := GetPrimaryKeyAndOtherColumns (sdb , "aux" , tableName )
357
+ pk , implicitPk , other_columns , err := GetPrimaryKeyAndOtherColumns (sdb , "aux" , tableName )
358
358
if err != nil {
359
359
return nil , err
360
360
}
361
361
362
- // Escape all column names
363
- var pk_escaped , other_escaped []string
364
- for _ , v := range pk {
365
- pk_escaped = append (pk_escaped , EscapeId (v ))
366
- }
367
- for _ , v := range other_columns {
368
- other_escaped = append (other_escaped , EscapeId (v ))
362
+ // If we need to produce merge statements using the NewPkMerge strategy we need to know if we can rely on SQLite
363
+ // to generate new primary keys or if we must generate them on our own.
364
+ var incrementingPk bool
365
+ if merge == NewPkMerge {
366
+ incrementingPk , err = hasIncrementingIntPk (sdb , "aux" , tableName )
367
+ if err != nil {
368
+ return nil , err
369
+ }
369
370
}
370
371
372
+ // Escape all column names
373
+ pkEscaped := EscapeIds (pk )
374
+ otherEscaped := EscapeIds (other_columns )
375
+
371
376
// Build query for getting differences. This is based on the query produced by the sqldiff utility for SQLite.
372
377
// The resulting query returns n+1+m*2 number of rows where n is the number of columns in the primary key and
373
378
// m is the number of columns in the table which are not part of the primary key. The extra column between the
@@ -381,22 +386,22 @@ func dataDiffForModifiedTableRows(sdb *sqlite.Conn, tableName string, merge Merg
381
386
// There can only be updated rows in tables with more columns than the primary key columns
382
387
if len (other_columns ) > 0 {
383
388
query = "SELECT "
384
- for _ , c := range pk_escaped { // Primary key columns first
389
+ for _ , c := range pkEscaped { // Primary key columns first
385
390
query += "B." + c + ","
386
391
}
387
392
query += "'" + string (ACTION_MODIFY ) + "'" // Updated row
388
- for _ , c := range other_escaped { // Other columns last
393
+ for _ , c := range otherEscaped { // Other columns last
389
394
query += ",A." + c + " IS NOT B." + c + ",B." + c
390
395
}
391
396
392
397
query += " FROM main." + EscapeId (tableName ) + " A, aux." + EscapeId (tableName ) + " B WHERE "
393
398
394
- for _ , c := range pk_escaped { // Where all primary key columns equal
399
+ for _ , c := range pkEscaped { // Where all primary key columns equal
395
400
query += "A." + c + "=B." + c + " AND "
396
401
}
397
402
398
403
query += "(" // And at least one of the other columns differs
399
- for _ , c := range other_escaped {
404
+ for _ , c := range otherEscaped {
400
405
query += "A." + c + " IS NOT B." + c + " OR "
401
406
}
402
407
query = strings .TrimSuffix (query , " OR " ) + ")"
@@ -406,34 +411,34 @@ func dataDiffForModifiedTableRows(sdb *sqlite.Conn, tableName string, merge Merg
406
411
407
412
// Deleted rows
408
413
query += "SELECT "
409
- for _ , c := range pk_escaped { // Primary key columns first. This needs to be from the first table for deleted rows
414
+ for _ , c := range pkEscaped { // Primary key columns first. This needs to be from the first table for deleted rows
410
415
query += "A." + c + ","
411
416
}
412
- query += "'" + string (ACTION_DELETE ) + "'" // Deleted row
413
- query += strings .Repeat (",NULL" , len (other_escaped )* 2 ) // Just NULL for all the other columns. They don't matter for deleted rows
417
+ query += "'" + string (ACTION_DELETE ) + "'" // Deleted row
418
+ query += strings .Repeat (",NULL" , len (otherEscaped )* 2 ) // Just NULL for all the other columns. They don't matter for deleted rows
414
419
415
420
query += " FROM main." + EscapeId (tableName ) + " A WHERE "
416
421
417
422
query += "NOT EXISTS(SELECT 1 FROM aux." + EscapeId (tableName ) + " B WHERE " // Where a row with the same primary key doesn't exist in the second table
418
- for _ , c := range pk_escaped {
423
+ for _ , c := range pkEscaped {
419
424
query += "A." + c + " IS B." + c + " AND "
420
425
}
421
426
query = strings .TrimSuffix (query , " AND " ) + ") UNION ALL "
422
427
423
428
// Inserted rows
424
429
query += "SELECT "
425
- for _ , c := range pk_escaped { // Primary key columns first. This needs to be from the second table for inserted rows
430
+ for _ , c := range pkEscaped { // Primary key columns first. This needs to be from the second table for inserted rows
426
431
query += "B." + c + ","
427
432
}
428
433
query += "'" + string (ACTION_ADD ) + "'" // Inserted row
429
- for _ , c := range other_escaped { // Other columns last. Always set the modified flag for inserted rows
434
+ for _ , c := range otherEscaped { // Other columns last. Always set the modified flag for inserted rows
430
435
query += ",1,B." + c
431
436
}
432
437
433
438
query += " FROM aux." + EscapeId (tableName ) + " B WHERE "
434
439
435
440
query += "NOT EXISTS(SELECT 1 FROM main." + EscapeId (tableName ) + " A WHERE " // Where a row with the same primary key doesn't exist in the first table
436
- for _ , c := range pk_escaped {
441
+ for _ , c := range pkEscaped {
437
442
query += "A." + c + " IS B." + c + " AND "
438
443
}
439
444
query = strings .TrimSuffix (query , " AND " ) + ")"
@@ -446,7 +451,8 @@ func dataDiffForModifiedTableRows(sdb *sqlite.Conn, tableName string, merge Merg
446
451
// update, insert or delete. While the primary key bit of the DataDiff object we create for each row can
447
452
// be taken directly from the first couple of columns and the action type of the DataDiff object can be
448
453
// deduced from the type column in a straightforward way, the generated SQL statements for merging highly
449
- // depend on the diff type.
454
+ // depend on the diff type. Additionally, we need to respect the merge strategy when producing the SQL in
455
+ // the DataDiff object.
450
456
451
457
// Retrieve data and generate a new DataDiff object for each row
452
458
_ , _ , data , err := SQLiteRunQuery (sdb , Internal , query , false , false )
@@ -465,8 +471,186 @@ func dataDiffForModifiedTableRows(sdb *sqlite.Conn, tableName string, merge Merg
465
471
d .Pk = append (d .Pk , row [i ])
466
472
}
467
473
474
+ // Produce the SQL statement for merging
475
+ if merge != NoMerge {
476
+ if d .ActionType == ACTION_MODIFY || d .ActionType == ACTION_DELETE {
477
+ // For updated and deleted rows the merge strategy doesn't matter
478
+
479
+ // The first part of the UPDATE and DELETE statements is different
480
+ if d .ActionType == ACTION_MODIFY {
481
+ d .Sql = "UPDATE " + EscapeId (tableName ) + " SET "
482
+
483
+ // For figuring out which values to set, start with the first column after the diff type column.
484
+ // It specifies whether the value of the first data column has changed. If it has, we set that
485
+ // column to the new value which is stored in the following column of the row. Because each
486
+ // comparison takes two fields (one for marking differences and one for the new value), we move
487
+ // forward in steps of two columns.
488
+ for i := len (pk ) + 1 ; i < len (row ); i += 2 {
489
+ // Only include field when it was updated
490
+ if row [i ].Value == "1" {
491
+ // From the column number in the results of the difference query we calculate the
492
+ // corresponding array index in the array of non-primary key columns. The new
493
+ // value is stored in the next column of the result set.
494
+ d .Sql += otherEscaped [(i - len (pk )- 1 )/ 2 ] + "=" + EscapeValue (row [i + 1 ]) + ","
495
+ }
496
+ }
497
+ d .Sql = strings .TrimSuffix (d .Sql , "," )
498
+ } else {
499
+ d .Sql = "DELETE FROM " + EscapeId (tableName )
500
+ }
501
+
502
+ d .Sql += " WHERE "
503
+
504
+ // The last part of the UPDATE and DELETE statements is the same
505
+ for _ , p := range d .Pk {
506
+ if p .Type == Null {
507
+ d .Sql += EscapeId (p .Name ) + " IS NULL"
508
+ } else {
509
+ d .Sql += EscapeId (p .Name ) + "=" + EscapeValue (p )
510
+ }
511
+ d .Sql += " AND "
512
+ }
513
+ d .Sql = strings .TrimSuffix (d .Sql , " AND " ) + ";"
514
+ } else if d .ActionType == ACTION_ADD {
515
+ // For inserted rows the merge strategy actually does matter. The PreservePkMerge strategy is simple:
516
+ // We just include all columns, no matter whether primary key or not, in the INSERT statement as-is.
517
+ // For tables which don't have a primary key the same applies even when using the NewPkMerge strategy.
518
+ // Finally for tables with an incrementing primary key we must omit the primary key columns too to make
519
+ // SQLite generate a new value for us.
520
+
521
+ d .Sql = "INSERT INTO " + EscapeId (tableName ) + "("
522
+
523
+ if merge == PreservePkMerge || implicitPk {
524
+ // Include all data we have in the INSERT statement but don't include the rowid column, the
525
+ // implicit primary key
526
+
527
+ // Add the explicit primary key columns first if any, then the other fields
528
+ if ! implicitPk {
529
+ d .Sql += strings .Join (pkEscaped , "," ) + ","
530
+ }
531
+ d .Sql += strings .Join (otherEscaped , "," ) + ") VALUES ("
532
+
533
+ // If there is an explicit primary key, add the values of that first
534
+ if ! implicitPk {
535
+ for i := 0 ; i < len (pk ); i ++ {
536
+ d .Sql += EscapeValue (row [i ]) + ","
537
+ }
538
+ }
539
+
540
+ // For the other columns start at the first data column after the diff type column and the first
541
+ // modified flag column and skip to the next data columns.
542
+ for i := len (pk ) + 2 ; i < len (row ); i += 2 {
543
+ d .Sql += EscapeValue (row [i ]) + ","
544
+ }
545
+ } else {
546
+ // For the NewPkMerge strategy for tables with an explicit primary key the generated INSERT
547
+ // statement depends on whether we can rely on SQLite to generate a new primary key value
548
+ // or whether we must generate a new value on our own.
549
+
550
+ if incrementingPk {
551
+ // SQLite can generate a new key for us if we omit the primary key columns from the
552
+ // INSERT statement.
553
+
554
+ d .Sql += strings .Join (otherEscaped , "," ) + ") VALUES ("
555
+
556
+ // For the other columns start at the first data column after the diff type column and the first
557
+ // modified flag column and skip to the next data columns.
558
+ for i := len (pk ) + 2 ; i < len (row ); i += 2 {
559
+ d .Sql += EscapeValue (row [i ]) + ","
560
+ }
561
+ } else {
562
+ // We need to generate a new key by ourselves by including the primary key columns in
563
+ // the INSERT statement and producing a new value.
564
+
565
+ // Add the (explicit) primary key columns first, then the other fields
566
+ d .Sql += strings .Join (pkEscaped , "," ) + ","
567
+ d .Sql += strings .Join (otherEscaped , "," ) + ") VALUES ("
568
+
569
+ // Add the (explicit) primary key values first using a SELECT statement which generates a
570
+ // new value for the first key column
571
+ d .Sql += "(SELECT max(" + pkEscaped [0 ] + ")+1 FROM " + EscapeId (tableName ) + "),"
572
+ for i := 1 ; i < len (pk ); i ++ {
573
+ d .Sql += EscapeValue (row [i ]) + ","
574
+ }
575
+
576
+ // For the other columns start at the first data column after the diff type column and the first
577
+ // modified flag column and skip to the next data columns.
578
+ for i := len (pk ) + 2 ; i < len (row ); i += 2 {
579
+ d .Sql += EscapeValue (row [i ]) + ","
580
+ }
581
+ }
582
+ }
583
+
584
+ d .Sql = strings .TrimSuffix (d .Sql , "," ) + ");"
585
+ }
586
+ }
587
+
468
588
diff = append (diff , d )
469
589
}
470
590
471
591
return diff , nil
472
592
}
593
+
594
+ // hasIncrementingIntPk returns true if the table with name tableName has a primary key of integer type which increments
595
+ // automatically. Note that in SQLite this does not require the AUTOINCREMENT specifier. It merely requires a column of
596
+ // type INTEGER which is used as the primary key of the table. The only other constraint is that the table must not be a
597
+ // WITHOUT ROWID table
598
+ func hasIncrementingIntPk (sdb * sqlite.Conn , schemaName string , tableName string ) (bool , error ) {
599
+ // Get column list
600
+ columns , err := sdb .Columns (schemaName , tableName )
601
+ if err != nil {
602
+ return false , err
603
+ }
604
+
605
+ // Check if there is an INTEGER column used as primary key
606
+ var numIntPks int
607
+ var hasColumnRowid , hasColumn_Rowid_ , hasColumnOid bool
608
+ for _ , c := range columns {
609
+ if c .DataType == "INTEGER" && c .Pk > 0 {
610
+ // If the column has also the AUTOINCREMENT specifier set, we don't need any extra checks and
611
+ // can return early
612
+ if c .Autoinc {
613
+ return true , nil
614
+ }
615
+
616
+ numIntPks += 1
617
+ }
618
+
619
+ // While here check if there are any columns called rowid or similar in this table
620
+ if c .Name == "rowid" {
621
+ hasColumnRowid = true
622
+ } else if c .Name == "_rowid_" {
623
+ hasColumn_Rowid_ = true
624
+ } else if c .Name == "oid" {
625
+ hasColumnOid = true
626
+ }
627
+ }
628
+
629
+ // Only exactly one integer primary key column works
630
+ if numIntPks != 1 {
631
+ return false , nil
632
+ }
633
+
634
+ // Check if this is a WITHOUT ROWID table. We do this by selecting the rowid column. If this produces an error
635
+ // this probably means there is no rowid column
636
+ var rowid string
637
+ if ! hasColumnRowid {
638
+ rowid = "rowid"
639
+ } else if ! hasColumn_Rowid_ {
640
+ rowid = "_rowid_"
641
+ } else if ! hasColumnOid {
642
+ rowid = "oid"
643
+ } else {
644
+ return false , nil
645
+ }
646
+ err = sdb .OneValue ("SELECT " + rowid + " FROM " + EscapeId (schemaName )+ "." + EscapeId (tableName )+ " LIMIT 1;" , nil )
647
+
648
+ // An error other than io.EOF (which just means there is no row in the table) means that there is no rowid column.
649
+ // So this would be a WITHOUT ROWID table which doesn't increment its primary key automatically. Otherwise this is
650
+ // a table with an incrementing primary key.
651
+ if err != nil && err != io .EOF {
652
+ return false , nil
653
+ } else {
654
+ return true , nil
655
+ }
656
+ }
0 commit comments