Skip to content

Commit adbe443

Browse files
mohityadav766harshachpmbrull
committed
Fix FlyWay Migration Issue (#24985)
* Fix FlyWay Migration Issue * Separate drop flyway to it's own method * Fix critical issue in calling getSqlQuery; Add recovery options for --force option * fix hash <> version in getSqlQuery * Add recover option for openmetadata-ops.sh * Add recover option for openmetadata-ops.sh * naming * Add tests recover option * handle all bots --------- Co-authored-by: Sriharsha Chintalapani <[email protected]> Co-authored-by: Pere Miquel Brull <[email protected]> (cherry picked from commit f0ecc2f)
1 parent e3512e3 commit adbe443

File tree

9 files changed

+1396
-117
lines changed

9 files changed

+1396
-117
lines changed

openmetadata-service/src/main/java/org/openmetadata/service/migration/api/MigrationProcessImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ public static Map<String, QueryStatus> performSqlExecutionAndUpdate(
120120
try {
121121
String previouslyRanSql = null;
122122
try {
123-
previouslyRanSql = migrationDAO.getSqlQuery(hash(sql), version);
123+
previouslyRanSql = migrationDAO.getSqlQuery(version, hash(sql));
124124
} catch (Exception dbException) {
125125
// If SERVER_MIGRATION_SQL_LOGS table doesn't exist yet, assume query hasn't run
126126
previouslyRanSql = null;

openmetadata-service/src/main/java/org/openmetadata/service/migration/api/MigrationWorkflow.java

Lines changed: 228 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,31 @@
11
package org.openmetadata.service.migration.api;
22

33
import static org.openmetadata.common.utils.CommonUtil.nullOrEmpty;
4+
import static org.openmetadata.service.util.EntityUtil.hash;
45
import static org.openmetadata.service.util.OpenMetadataOperations.printToAsciiTable;
56

67
import java.io.File;
8+
import java.nio.charset.StandardCharsets;
79
import java.util.ArrayList;
810
import java.util.Arrays;
911
import java.util.List;
1012
import java.util.Map;
1113
import java.util.Objects;
1214
import java.util.Optional;
1315
import java.util.UUID;
16+
import java.util.regex.Matcher;
17+
import java.util.regex.Pattern;
1418
import java.util.stream.Stream;
1519
import lombok.Getter;
1620
import lombok.extern.slf4j.Slf4j;
21+
import org.flywaydb.core.api.configuration.ClassicConfiguration;
22+
import org.flywaydb.core.api.configuration.Configuration;
23+
import org.flywaydb.core.internal.database.postgresql.PostgreSQLParser;
24+
import org.flywaydb.core.internal.parser.Parser;
25+
import org.flywaydb.core.internal.parser.ParsingContext;
26+
import org.flywaydb.core.internal.resource.filesystem.FileSystemResource;
27+
import org.flywaydb.core.internal.sqlscript.SqlStatementIterator;
28+
import org.flywaydb.database.mysql.MySQLParser;
1729
import org.jdbi.v3.core.Handle;
1830
import org.jdbi.v3.core.Jdbi;
1931
import org.json.JSONObject;
@@ -63,6 +75,17 @@ public MigrationWorkflow(
6375
}
6476

6577
public void loadMigrations() {
78+
// Migrate Flyway history if this is a force migration on an existing database that was
79+
// previously managed by Flyway. This must happen BEFORE parsing SQL files.
80+
// 1. Migrate DATABASE_CHANGE_LOG entries to SERVER_CHANGE_LOG
81+
// 2. Pre-populate SERVER_MIGRATION_SQL_LOGS so flyway queries don't re-execute
82+
// NOTE: DO NOT REMOVE
83+
if (hasExistingFlywayHistory()) {
84+
migrateFlywayToServerChangeLogs();
85+
prePopulateFlywayMigrationSQLLogs();
86+
dropFlywayTable();
87+
}
88+
6689
// Sort Migration on the basis of version
6790
List<MigrationFile> availableMigrations =
6891
getMigrationFiles(
@@ -427,4 +450,209 @@ public void updateMigrationStepInDB(
427450
UUID.randomUUID().toString(),
428451
metrics.toString());
429452
}
453+
454+
private boolean hasExistingFlywayHistory() {
455+
try (Handle handle = jdbi.open()) {
456+
String checkTableQuery =
457+
connectionType == ConnectionType.MYSQL
458+
? "SELECT COUNT(*) FROM information_schema.tables WHERE table_schema = DATABASE() AND table_name = 'DATABASE_CHANGE_LOG'"
459+
: "SELECT COUNT(*) FROM information_schema.tables WHERE table_schema = current_schema() AND table_name = 'DATABASE_CHANGE_LOG'";
460+
461+
Integer tableExists = handle.createQuery(checkTableQuery).mapTo(Integer.class).one();
462+
if (tableExists == null || tableExists == 0) {
463+
return false;
464+
}
465+
466+
String countQuery =
467+
connectionType == ConnectionType.MYSQL
468+
? "SELECT COUNT(*) FROM DATABASE_CHANGE_LOG WHERE success = true"
469+
: "SELECT COUNT(*) FROM \"DATABASE_CHANGE_LOG\" WHERE success = true";
470+
471+
Integer count = handle.createQuery(countQuery).mapTo(Integer.class).one();
472+
return count != null && count > 0;
473+
} catch (Exception e) {
474+
LOG.debug("Error checking for existing Flyway history: {}", e.getMessage());
475+
return false;
476+
}
477+
}
478+
479+
private void dropFlywayTable() {
480+
// Drop the old DATABASE_CHANGE_LOG table after successful migration
481+
try (Handle handle = jdbi.open()) {
482+
String dropTableQuery =
483+
connectionType == ConnectionType.MYSQL
484+
? "DROP TABLE IF EXISTS DATABASE_CHANGE_LOG"
485+
: "DROP TABLE IF EXISTS \"DATABASE_CHANGE_LOG\"";
486+
487+
try {
488+
handle.createUpdate(dropTableQuery).execute();
489+
LOG.info("Dropped legacy DATABASE_CHANGE_LOG table");
490+
} catch (Exception e) {
491+
LOG.warn("Could not drop DATABASE_CHANGE_LOG table: {}", e.getMessage());
492+
}
493+
}
494+
}
495+
496+
private void migrateFlywayToServerChangeLogs() {
497+
LOG.info("Migrating Flyway history from DATABASE_CHANGE_LOG to SERVER_CHANGE_LOG");
498+
499+
try (Handle handle = jdbi.open()) {
500+
// Check if Flyway records have already been migrated
501+
String checkMigratedQuery =
502+
connectionType == ConnectionType.MYSQL
503+
? """
504+
SELECT COUNT(*) FROM SERVER_CHANGE_LOG scl
505+
INNER JOIN DATABASE_CHANGE_LOG dcl ON CONCAT('0.0.', CAST(dcl.version AS UNSIGNED)) = scl.version
506+
WHERE scl.migrationfilename LIKE '%flyway%'
507+
"""
508+
: """
509+
SELECT COUNT(*) FROM SERVER_CHANGE_LOG scl
510+
INNER JOIN "DATABASE_CHANGE_LOG" dcl ON '0.0.' || CAST(dcl.version AS INTEGER) = scl.version
511+
WHERE scl.migrationfilename LIKE '%flyway%'
512+
""";
513+
514+
try {
515+
Integer alreadyMigrated = handle.createQuery(checkMigratedQuery).mapTo(Integer.class).one();
516+
if (alreadyMigrated != null && alreadyMigrated > 0) {
517+
LOG.info("Flyway records already migrated to SERVER_CHANGE_LOG, skipping");
518+
return;
519+
}
520+
} catch (Exception e) {
521+
// SERVER_CHANGE_LOG might not exist yet, continue with migration
522+
LOG.debug("Could not check if already migrated: {}", e.getMessage());
523+
}
524+
525+
// Insert v0.0.0 baseline record if not present
526+
String insertBaselineQuery =
527+
connectionType == ConnectionType.MYSQL
528+
? """
529+
INSERT IGNORE INTO SERVER_CHANGE_LOG (version, migrationfilename, checksum, installed_on, metrics)
530+
VALUES ('0.0.0', 'bootstrap/sql/migrations/flyway/com.mysql.cj.jdbc.Driver/v000__create_server_change_log.sql', '0', NOW(), NULL)
531+
"""
532+
: """
533+
INSERT INTO SERVER_CHANGE_LOG (version, migrationfilename, checksum, installed_on, metrics)
534+
VALUES ('0.0.0', 'bootstrap/sql/migrations/flyway/org.postgresql.Driver/v000__create_server_change_log.sql', '0', current_timestamp, NULL)
535+
ON CONFLICT (version) DO NOTHING
536+
""";
537+
538+
try {
539+
handle.createUpdate(insertBaselineQuery).execute();
540+
} catch (Exception e) {
541+
LOG.debug("Could not insert baseline record: {}", e.getMessage());
542+
}
543+
544+
// Migrate Flyway migration records to SERVER_CHANGE_LOG
545+
String dbDir = connectionType == ConnectionType.MYSQL ? "mysql" : "postgres";
546+
String insertQuery =
547+
connectionType == ConnectionType.MYSQL
548+
? String.format(
549+
"""
550+
INSERT INTO SERVER_CHANGE_LOG (version, migrationfilename, checksum, installed_on, metrics)
551+
SELECT CONCAT('0.0.', CAST(version AS UNSIGNED)) as version,
552+
CASE
553+
WHEN script LIKE 'v%%__.sql' THEN CONCAT('bootstrap/sql/migrations/flyway/%s/', script)
554+
ELSE CONCAT('bootstrap/sql/migrations/flyway/%s/v', version, '__', REPLACE(LOWER(description), ' ', '_'), '.sql')
555+
END as migrationfilename,
556+
'0' as checksum,
557+
installed_on,
558+
NULL as metrics
559+
FROM DATABASE_CHANGE_LOG
560+
WHERE CONCAT('0.0.', CAST(version AS UNSIGNED)) NOT IN (SELECT version FROM SERVER_CHANGE_LOG)
561+
AND success = true
562+
""",
563+
"com.mysql.cj.jdbc.Driver", "com.mysql.cj.jdbc.Driver")
564+
: String.format(
565+
"""
566+
INSERT INTO SERVER_CHANGE_LOG (version, migrationfilename, checksum, installed_on, metrics)
567+
SELECT '0.0.' || CAST(version AS INTEGER) as version,
568+
CASE
569+
WHEN script LIKE 'v%%__.sql' THEN 'bootstrap/sql/migrations/flyway/%s/' || script
570+
ELSE 'bootstrap/sql/migrations/flyway/%s/v' || version || '__' || REPLACE(LOWER(description), ' ', '_') || '.sql'
571+
END as migrationfilename,
572+
'0' as checksum,
573+
installed_on,
574+
NULL as metrics
575+
FROM "DATABASE_CHANGE_LOG"
576+
WHERE '0.0.' || CAST(version AS INTEGER) NOT IN (SELECT version FROM SERVER_CHANGE_LOG)
577+
AND success = true
578+
""",
579+
"org.postgresql.Driver", "org.postgresql.Driver");
580+
581+
int migratedCount = handle.createUpdate(insertQuery).execute();
582+
LOG.info("Migrated {} Flyway records to SERVER_CHANGE_LOG", migratedCount);
583+
} catch (Exception e) {
584+
LOG.error("Error during Flyway history migration to SERVER_CHANGE_LOG", e);
585+
}
586+
}
587+
588+
private void prePopulateFlywayMigrationSQLLogs() {
589+
LOG.info("Pre-populating SERVER_MIGRATION_SQL_LOGS with existing Flyway SQL statements");
590+
591+
if (flywayPath == null || flywayPath.isEmpty()) {
592+
return;
593+
}
594+
595+
String dbSubDir =
596+
connectionType == ConnectionType.MYSQL
597+
? "com.mysql.cj.jdbc.Driver"
598+
: "org.postgresql.Driver";
599+
File flywayDir = new File(flywayPath, dbSubDir);
600+
601+
if (!flywayDir.exists() || !flywayDir.isDirectory()) {
602+
LOG.info("Flyway migration directory does not exist: {}", flywayDir.getPath());
603+
return;
604+
}
605+
606+
File[] sqlFiles = flywayDir.listFiles((dir, name) -> name.endsWith(".sql"));
607+
if (sqlFiles == null || sqlFiles.length == 0) {
608+
return;
609+
}
610+
611+
Pattern versionPattern = Pattern.compile("v(\\d+)__.*\\.sql");
612+
ParsingContext parsingContext = new ParsingContext();
613+
Configuration configuration = new ClassicConfiguration();
614+
Parser parser =
615+
connectionType == ConnectionType.MYSQL
616+
? new MySQLParser(configuration, parsingContext)
617+
: new PostgreSQLParser(configuration, parsingContext);
618+
619+
int totalStatements = 0;
620+
for (File sqlFile : sqlFiles) {
621+
Matcher matcher = versionPattern.matcher(sqlFile.getName());
622+
if (!matcher.matches()) {
623+
continue;
624+
}
625+
626+
String flywayVersion = matcher.group(1);
627+
// Parse as integer to remove leading zeros (e.g., "001" -> 1)
628+
String omVersion = "0.0." + Integer.parseInt(flywayVersion);
629+
630+
try (SqlStatementIterator iterator =
631+
parser.parse(
632+
new FileSystemResource(
633+
null, sqlFile.getAbsolutePath(), StandardCharsets.UTF_8, true))) {
634+
while (iterator.hasNext()) {
635+
String sql = iterator.next().getSql();
636+
if (sql != null && !sql.isBlank()) {
637+
String checksum = hash(sql);
638+
try {
639+
String existingQuery = migrationDAO.checkIfQueryPreviouslyRan(checksum);
640+
if (existingQuery == null) {
641+
migrationDAO.upsertServerMigrationSQL(omVersion, sql, checksum);
642+
totalStatements++;
643+
}
644+
} catch (Exception e) {
645+
LOG.debug(
646+
"Error inserting SQL statement from {}: {}", sqlFile.getName(), e.getMessage());
647+
}
648+
}
649+
}
650+
} catch (Exception e) {
651+
LOG.warn("Failed to parse SQL file {}: {}", sqlFile.getName(), e.getMessage());
652+
}
653+
}
654+
655+
LOG.info(
656+
"Pre-populated {} Flyway SQL statements into SERVER_MIGRATION_SQL_LOGS", totalStatements);
657+
}
430658
}

openmetadata-service/src/main/java/org/openmetadata/service/migration/mysql/v1110/Migration.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ public Map<String, QueryStatus> runPostDDLScripts(boolean isForceMigration) {
3434
@Override
3535
@SneakyThrows
3636
public void runDataMigration() {
37-
this.migrationUtil.migrateFlywayHistory(handle);
37+
// Flyway history migration is now handled in MigrationWorkflow.loadMigrations()
38+
// before parsing SQL files, to ensure it runs before flyway migrations in force mode
3839
}
3940
}

openmetadata-service/src/main/java/org/openmetadata/service/migration/postgres/v1110/Migration.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ public Map<String, QueryStatus> runPostDDLScripts(boolean isForceMigration) {
3434
@Override
3535
@SneakyThrows
3636
public void runDataMigration() {
37-
this.migrationUtil.migrateFlywayHistory(handle);
37+
// Flyway history migration is now handled in MigrationWorkflow.loadMigrations()
38+
// before parsing SQL files, to ensure it runs before flyway migrations in force mode
3839
}
3940
}

0 commit comments

Comments
 (0)