Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ object FlintMetadataLogEntry {
val FAILED: IndexState.Value = Value("failed")
val RECOVERING: IndexState.Value = Value("recovering")
val VACUUMING: IndexState.Value = Value("vacuuming")
val CORRUPTED: IndexState.Value = Value("corrupted")
val UNKNOWN: IndexState.Value = Value("unknown")

def from(s: String): IndexState.Value = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@
package org.opensearch.flint.spark

import org.opensearch.flint.common.metadata.log.{FlintMetadataLogService, OptimisticTransaction}
import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry.IndexState.{CREATING, EMPTY, VACUUMING}
import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry.IndexState.{CORRUPTED, CREATING, EMPTY, VACUUMING}
import org.opensearch.flint.common.metadata.log.OptimisticTransaction.NO_LOG_ENTRY
import org.opensearch.flint.core.FlintClient
import org.opensearch.flint.spark.scheduler.FlintSparkJobSchedulingService

import org.apache.spark.internal.Logging

Expand Down Expand Up @@ -111,10 +112,26 @@ trait FlintSparkTransactionSupport extends Logging {
}

private def cleanupCorruptedIndex(indexName: String): Unit = {
flintMetadataLogService
.startTransaction(indexName)
.initialLog(_ => true)
.finalLog(_ => NO_LOG_ENTRY)
.commit(_ => {})
val logEntry = flintMetadataLogService
.getIndexMetadataLog(indexName)
.flatMap(_.getLatest)

if (logEntry.isPresent && logEntry.get().state == CORRUPTED) {
// If already in CORRUPTED state (logically corrupted), purge the log entry
flintMetadataLogService
.startTransaction(indexName)
.initialLog(_ => true)
.finalLog(_ => NO_LOG_ENTRY)
.commit(_ => {})
logInfo(s"Purged log entry for logically corrupted index: $indexName")
} else {
// If not already in CORRUPTED state (physically corrupted), update to CORRUPTED'
flintMetadataLogService
.startTransaction(indexName)
.initialLog(_ => true)
.finalLog(latest => latest.copy(state = CORRUPTED))
.commit(_ => {})
logInfo(s"Marked physically corrupted index as CORRUPTED: $indexName")
}
}
}
Loading