107107import org .openmetadata .service .jdbi3 .TypeRepository ;
108108import org .openmetadata .service .jdbi3 .UserRepository ;
109109import org .openmetadata .service .jdbi3 .locator .ConnectionType ;
110+ import org .openmetadata .service .migration .MigrationValidationClient ;
110111import org .openmetadata .service .migration .api .MigrationWorkflow ;
111112import org .openmetadata .service .resources .CollectionRegistry ;
112113import org .openmetadata .service .resources .apps .AppMapper ;
@@ -174,6 +175,136 @@ public Integer call() {
174175 return 0 ;
175176 }
176177
178+ @ Command (
179+ name = "info" ,
180+ description =
181+ "Shows the list of migrations applied and the pending migration "
182+ + "waiting to be applied on the target database" )
183+ public Integer info () {
184+ try {
185+ parseConfig ();
186+
187+ // Then get the native migration info from SERVER_CHANGE_LOG and SERVER_MIGRATION_SQL_LOGS
188+ LOG .info ("Native System Data Migrations:" );
189+ MigrationDAO migrationDAO = jdbi .onDemand (MigrationDAO .class );
190+ List <MigrationDAO .ServerChangeLog > serverChangeLogs =
191+ migrationDAO .listMetricsFromDBMigrations ();
192+
193+ // Create a formatted display for native migrations
194+ Set <String > columns = new LinkedHashSet <>(Set .of ("version" , "installedOn" , "status" ));
195+ List <List <String >> rows = new ArrayList <>();
196+
197+ for (MigrationDAO .ServerChangeLog serverChangeLog : serverChangeLogs ) {
198+ List <String > row = new ArrayList <>();
199+ row .add (serverChangeLog .getVersion ());
200+ row .add (serverChangeLog .getInstalledOn ());
201+
202+ if (serverChangeLog .getMetrics () != null ) {
203+ JsonObject metricsJson =
204+ new Gson ().fromJson (serverChangeLog .getMetrics (), JsonObject .class );
205+ for (Map .Entry <String , JsonElement > entry : metricsJson .entrySet ()) {
206+ if (!columns .contains (entry .getKey ())) {
207+ columns .add (entry .getKey ());
208+ }
209+ row .add (entry .getValue ().toString ());
210+ }
211+ }
212+ rows .add (row );
213+ }
214+
215+ printToAsciiTable (columns .stream ().toList (), rows , "No Native Migrations Found" );
216+
217+ return 0 ;
218+ } catch (Exception e ) {
219+ LOG .error ("Failed due to " , e );
220+ return 1 ;
221+ }
222+ }
223+
224+ @ Command (
225+ name = "validate" ,
226+ description =
227+ "Checks if the all the migrations haven been applied " + "on the target database." )
228+ public Integer validate () {
229+ try {
230+ parseConfig ();
231+ // Validate native migrations
232+ ConnectionType connType = ConnectionType .from (config .getDataSourceFactory ().getDriverClass ());
233+ DatasourceConfig .initialize (connType .label );
234+ MigrationWorkflow workflow =
235+ new MigrationWorkflow (
236+ jdbi ,
237+ config .getMigrationConfiguration ().getNativePath (),
238+ connType ,
239+ config .getMigrationConfiguration ().getExtensionPath (),
240+ config .getMigrationConfiguration ().getFlywayPath (),
241+ config ,
242+ false );
243+ workflow .loadMigrations ();
244+ workflow .validateMigrationsForServer ();
245+ return 0 ;
246+ } catch (Exception e ) {
247+ LOG .error ("Database migration validation failed due to " , e );
248+ return 1 ;
249+ }
250+ }
251+
252+ @ Command (
253+ name = "repair" ,
254+ description =
255+ "Repairs the SERVER_MIGRATION_SQL_LOGS and SERVER_CHANGE_LOG tables which are used to track "
256+ + "all the migrations on the target database This involves removing entries for the failed migrations and update"
257+ + "the checksum of migrations already applied on the target database" )
258+ public Integer repair () {
259+ try {
260+ parseConfig ();
261+ // Get the migration workflow to repair native migrations
262+ ConnectionType connType = ConnectionType .from (config .getDataSourceFactory ().getDriverClass ());
263+ DatasourceConfig .initialize (connType .label );
264+
265+ // Handle repair of SERVER_MIGRATION_SQL_LOGS and SERVER_CHANGE_LOG tables
266+ try {
267+ List <String > failedVersions =
268+ jdbi .withHandle (
269+ handle ->
270+ handle
271+ .createQuery (
272+ "SELECT version FROM SERVER_CHANGE_LOG WHERE status = 'FAILED'" )
273+ .mapTo (String .class )
274+ .list ());
275+
276+ if (!failedVersions .isEmpty ()) {
277+ LOG .info ("Found {} failed migrations in SERVER_CHANGE_LOG" , failedVersions .size ());
278+
279+ // Remove failed migrations from SERVER_CHANGE_LOG
280+ jdbi .useHandle (
281+ handle ->
282+ handle
283+ .createUpdate ("DELETE FROM SERVER_CHANGE_LOG WHERE status = 'FAILED'" )
284+ .execute ());
285+
286+ // Clean up related entries in SERVER_MIGRATION_SQL_LOGS
287+ for (String version : failedVersions ) {
288+ jdbi .useHandle (
289+ handle ->
290+ handle
291+ .createUpdate (
292+ "DELETE FROM SERVER_MIGRATION_SQL_LOGS WHERE version = :version" )
293+ .bind ("version" , version )
294+ .execute ());
295+ }
296+ }
297+ } catch (Exception e ) {
298+ LOG .error ("Error repairing SERVER_CHANGE_LOG and SERVER_MIGRATION_SQL_LOGS tables" , e );
299+ throw e ;
300+ }
301+ return 0 ;
302+ } catch (Exception e ) {
303+ LOG .error ("Repair of migration tables failed due to " , e );
304+ return 1 ;
305+ }
306+ }
307+
177308 @ Command (
178309 name = "setOpenMetadataUrl" ,
179310 description = "Set or update the OpenMetadata URL in the system repository" )
@@ -537,6 +668,40 @@ private void installApplication(String appName, AppRepository appRepository) thr
537668 public Integer checkConnection () {
538669 try {
539670 parseConfig ();
671+ // Check native tables
672+ try {
673+ jdbi .withHandle (
674+ handle -> {
675+ try {
676+ handle
677+ .createQuery ("SELECT COUNT(*) FROM SERVER_CHANGE_LOG" )
678+ .mapTo (Integer .class )
679+ .findOne ();
680+ return true ;
681+ } catch (Exception e ) {
682+ LOG .warn ("Could not access SERVER_CHANGE_LOG table: {}" , e .getMessage ());
683+ return false ;
684+ }
685+ });
686+
687+ // querying SERVER_MIGRATION_SQL_LOGS table
688+ jdbi .withHandle (
689+ handle -> {
690+ try {
691+ handle
692+ .createQuery ("SELECT COUNT(*) FROM SERVER_MIGRATION_SQL_LOGS" )
693+ .mapTo (Integer .class )
694+ .findOne ();
695+ return true ;
696+ } catch (Exception e ) {
697+ LOG .warn ("Could not access SERVER_MIGRATION_SQL_LOGS table: {}" , e .getMessage ());
698+ return false ;
699+ }
700+ });
701+
702+ } catch (Exception e ) {
703+ LOG .warn ("Error checking migration tables: {}" , e .getMessage ());
704+ }
540705 jdbi .open ().getConnection ();
541706 return 0 ;
542707 } catch (Exception e ) {
@@ -2056,6 +2221,15 @@ public void parseConfig() throws Exception {
20562221
20572222 jdbi = JdbiUtils .createAndSetupJDBI (dataSourceFactory );
20582223
2224+ // Initialize the MigrationValidationClient, used in the Settings Repository
2225+ MigrationValidationClient .initialize (jdbi .onDemand (MigrationDAO .class ), config );
2226+ // Init repos
2227+ collectionDAO = jdbi .onDemand (CollectionDAO .class );
2228+ Entity .setJdbi (jdbi );
2229+ Entity .setCollectionDAO (collectionDAO );
2230+ Entity .setEntityRelationshipRepository (new EntityRelationshipRepository (collectionDAO ));
2231+ Entity .setSystemRepository (new SystemRepository ());
2232+
20592233 searchRepository =
20602234 SearchRepositoryFactory .createSearchRepository (
20612235 config .getElasticSearchConfiguration (), config .getDataSourceFactory ().getMaxSize ());
@@ -2065,12 +2239,7 @@ public void parseConfig() throws Exception {
20652239 SecretsManagerFactory .createSecretsManager (
20662240 config .getSecretsManagerConfiguration (), config .getClusterName ());
20672241
2068- collectionDAO = jdbi .onDemand (CollectionDAO .class );
20692242 Entity .setSearchRepository (searchRepository );
2070- Entity .setJdbi (jdbi );
2071- Entity .setCollectionDAO (collectionDAO );
2072- Entity .setEntityRelationshipRepository (new EntityRelationshipRepository (collectionDAO ));
2073- Entity .setSystemRepository (new SystemRepository ());
20742243 Entity .initializeRepositories (config , jdbi );
20752244 ConnectionType connType = ConnectionType .from (config .getDataSourceFactory ().getDriverClass ());
20762245 DatasourceConfig .initialize (connType .label );
0 commit comments