Skip to content

Commit 9f98479

Browse files
mohityadav766gitar-botCopilot
committed
Fix Processing Partition picked up without active workers (#25989)
* Fix Processing Partition picked up without active workers * For Active Fixes * fix: remove dead code and duplicate failPartition call in search index Co-authored-by: mohityadav766 <mohityadav766@users.noreply.github.com> * Fix Processing Partition picked up without active workers (#26001) * Initial plan * Apply spotless formatting to fix Java checkstyle failures Co-authored-by: mohityadav766 <105265192+mohityadav766@users.noreply.github.com> --------- Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: mohityadav766 <105265192+mohityadav766@users.noreply.github.com> * Fix OpenSerach toJsonstring * Apply Review Comments --------- Co-authored-by: Gitar <noreply@gitar.ai> Co-authored-by: mohityadav766 <mohityadav766@users.noreply.github.com> Co-authored-by: Copilot <198982749+Copilot@users.noreply.github.com> (cherry picked from commit e1b1f4b)
1 parent 09754b8 commit 9f98479

File tree

10 files changed

+207
-108
lines changed

10 files changed

+207
-108
lines changed

openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/OrphanedIndexCleaner.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
public class OrphanedIndexCleaner {
3737

3838
private static final String REBUILD_PATTERN = "_rebuild_";
39+
private static final long MIN_AGE_MS = 30 * 60 * 1000L; // 30 minutes
3940

4041
public record OrphanedIndex(String indexName, Set<String> aliases) {
4142
public boolean isOrphaned() {
@@ -53,8 +54,15 @@ public List<OrphanedIndex> findOrphanedRebuildIndices(SearchClient client) {
5354

5455
LOG.info("Found {} rebuild indices to check for orphans", allRebuildIndices.size());
5556

57+
long now = System.currentTimeMillis();
58+
5659
for (String indexName : allRebuildIndices) {
5760
try {
61+
if (!isOldEnough(indexName, now)) {
62+
LOG.debug("Index {} is too recent, skipping", indexName);
63+
continue;
64+
}
65+
5866
Set<String> aliases = client.getAliases(indexName);
5967
if (aliases == null || aliases.isEmpty()) {
6068
orphaned.add(new OrphanedIndex(indexName, aliases));
@@ -136,6 +144,19 @@ private Set<String> findAllRebuildIndices(SearchClient client) {
136144
return rebuildIndices;
137145
}
138146

147+
private boolean isOldEnough(String indexName, long now) {
148+
int lastUnderscore = indexName.lastIndexOf('_');
149+
if (lastUnderscore < 0) {
150+
return true;
151+
}
152+
try {
153+
long timestamp = Long.parseLong(indexName.substring(lastUnderscore + 1));
154+
return (now - timestamp) > MIN_AGE_MS;
155+
} catch (NumberFormatException e) {
156+
return true;
157+
}
158+
}
159+
139160
private boolean isOrphaned(SearchClient client, String indexName) {
140161
if (!indexName.contains(REBUILD_PATTERN)) {
141162
return false;

openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/SearchIndexApp.java

Lines changed: 20 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ public ReindexingException(String message, Throwable cause) {
9292
private DistributedSearchIndexExecutor distributedExecutor;
9393
private ReindexContext recreateContext;
9494
private RecreateIndexHandler recreateIndexHandler;
95-
private BulkSink searchIndexSink;
95+
private volatile BulkSink searchIndexSink;
9696

9797
public SearchIndexApp(CollectionDAO collectionDAO, SearchRepository searchRepository) {
9898
super(collectionDAO, searchRepository);
@@ -814,9 +814,11 @@ private void finalizeEntityReindex(String entityType, boolean success) {
814814
}
815815

816816
private void handleExecutionException(Exception ex) {
817-
if (searchIndexSink != null) {
817+
BulkSink sink = searchIndexSink;
818+
if (sink != null) {
819+
searchIndexSink = null;
818820
try {
819-
searchIndexSink.close();
821+
sink.close();
820822
} catch (Exception e) {
821823
LOG.error("Error closing search index sink", e);
822824
}
@@ -885,10 +887,13 @@ public void updateRecordToDbAndNotify(JobExecutionContext jobExecutionContext) {
885887
SuccessContext successContext =
886888
new SuccessContext().withAdditionalProperty("stats", jobData.getStats());
887889

890+
SearchIndexJob distributedJob =
891+
distributedExecutor != null ? distributedExecutor.getJobWithFreshStats() : null;
892+
888893
try {
889894
String jobIdStr =
890-
distributedExecutor != null
891-
? distributedExecutor.getJobWithFreshStats().getId().toString()
895+
distributedJob != null
896+
? distributedJob.getId().toString()
892897
: getApp().getId().toString();
893898
int failureCount = collectionDAO.searchIndexFailureDAO().countByJobId(jobIdStr);
894899
if (failureCount > 0) {
@@ -898,15 +903,12 @@ public void updateRecordToDbAndNotify(JobExecutionContext jobExecutionContext) {
898903
LOG.debug("Could not get failure count", e);
899904
}
900905

901-
if (distributedExecutor != null) {
902-
SearchIndexJob distributedJob = distributedExecutor.getJobWithFreshStats();
903-
if (distributedJob != null && distributedJob.getServerStats() != null) {
904-
successContext.withAdditionalProperty("serverStats", distributedJob.getServerStats());
905-
successContext.withAdditionalProperty(
906-
"serverCount", distributedJob.getServerStats().size());
907-
successContext.withAdditionalProperty(
908-
"distributedJobId", distributedJob.getId().toString());
909-
}
906+
if (distributedJob != null && distributedJob.getServerStats() != null) {
907+
successContext.withAdditionalProperty("serverStats", distributedJob.getServerStats());
908+
successContext.withAdditionalProperty(
909+
"serverCount", distributedJob.getServerStats().size());
910+
successContext.withAdditionalProperty(
911+
"distributedJobId", distributedJob.getId().toString());
910912
}
911913

912914
appRecord.setSuccessContext(successContext);
@@ -952,9 +954,11 @@ public void stop() {
952954
sendUpdates(jobExecutionContext, true);
953955
}
954956

955-
if (searchIndexSink != null) {
957+
BulkSink sink = searchIndexSink;
958+
if (sink != null) {
959+
searchIndexSink = null;
956960
try {
957-
searchIndexSink.close();
961+
sink.close();
958962
} catch (Exception e) {
959963
LOG.error("Error closing search index sink", e);
960964
}

openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/SearchIndexExecutor.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -913,20 +913,18 @@ private void promoteEntityIndexIfReady(String entityType) {
913913
return;
914914
}
915915

916-
// Check if already promoted (avoid double promotion)
917-
if (promotedEntities.contains(entityType)) {
916+
if (!promotedEntities.add(entityType)) {
918917
LOG.debug("Entity '{}' already promoted, skipping.", entityType);
919918
return;
920919
}
921920

922-
// Determine success based on whether there were any batch failures
923921
AtomicInteger failures = entityBatchFailures.get(entityType);
924922
boolean entitySuccess = failures == null || failures.get() == 0;
925923

926-
// Build entity context and promote
927924
Optional<String> stagedIndexOpt = recreateContext.getStagedIndex(entityType);
928925
if (stagedIndexOpt.isEmpty()) {
929926
LOG.debug("No staged index found for entity '{}', skipping promotion.", entityType);
927+
promotedEntities.remove(entityType);
930928
return;
931929
}
932930

@@ -938,7 +936,6 @@ private void promoteEntityIndexIfReady(String entityType) {
938936
entitySuccess,
939937
stagedIndexOpt.get());
940938
defaultHandler.promoteEntityIndex(entityContext, entitySuccess);
941-
promotedEntities.add(entityType);
942939
}
943940
}
944941

openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/distributed/DistributedJobParticipant.java

Lines changed: 12 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -316,26 +316,19 @@ private void processJobPartitions(SearchIndexJob job) {
316316
partition.getId(),
317317
partition.getEntityType());
318318

319-
try {
320-
PartitionWorker.PartitionResult result = worker.processPartition(partition);
321-
partitionsProcessed++;
322-
totalReaderSuccess += result.successCount();
323-
totalReaderFailed += result.readerFailed();
324-
totalReaderWarnings += result.readerWarnings();
325-
326-
LOG.info(
327-
"Participant completed partition {} (success: {}, failed: {}, readerFailed: {}, readerWarnings: {})",
328-
partition.getId(),
329-
result.successCount(),
330-
result.failedCount(),
331-
result.readerFailed(),
332-
result.readerWarnings());
333-
334-
// Stats are tracked per-entityType by StageStatsTracker in PartitionWorker
319+
PartitionWorker.PartitionResult result = worker.processPartition(partition);
320+
partitionsProcessed++;
321+
totalReaderSuccess += result.successCount();
322+
totalReaderFailed += result.readerFailed();
323+
totalReaderWarnings += result.readerWarnings();
335324

336-
} catch (Exception e) {
337-
LOG.error("Error processing partition {}", partition.getId(), e);
338-
}
325+
LOG.info(
326+
"Participant completed partition {} (success: {}, failed: {}, readerFailed: {}, readerWarnings: {})",
327+
partition.getId(),
328+
result.successCount(),
329+
result.failedCount(),
330+
result.readerFailed(),
331+
result.readerWarnings());
339332
}
340333

341334
// Flush sink and wait for all pending bulk requests to complete

openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/distributed/DistributedSearchIndexExecutor.java

Lines changed: 37 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.util.Optional;
2424
import java.util.Set;
2525
import java.util.UUID;
26+
import java.util.concurrent.ConcurrentHashMap;
2627
import java.util.concurrent.CountDownLatch;
2728
import java.util.concurrent.ExecutorService;
2829
import java.util.concurrent.Executors;
@@ -100,6 +101,7 @@ public static boolean isCoordinatingJob(UUID jobId) {
100101
@Getter private SearchIndexJob currentJob;
101102
private DistributedJobStatsAggregator statsAggregator;
102103
private ExecutorService workerExecutor;
104+
private final Set<UUID> activePartitions = ConcurrentHashMap.newKeySet();
103105
private final List<PartitionWorker> activeWorkers = new ArrayList<>();
104106
private Thread lockRefreshThread;
105107
private Thread partitionHeartbeatThread;
@@ -620,29 +622,36 @@ private void runWorkerLoop(
620622
partition.getId(),
621623
partition.getEntityType());
622624

623-
PartitionWorker.PartitionResult result = worker.processPartition(partition);
624-
totalSuccess.addAndGet(result.successCount());
625-
totalFailed.addAndGet(result.failedCount());
625+
activePartitions.add(partition.getId());
626+
try {
627+
PartitionWorker.PartitionResult result = worker.processPartition(partition);
628+
totalSuccess.addAndGet(result.successCount());
629+
totalFailed.addAndGet(result.failedCount());
626630

627-
// Accumulate into coordinator stats for persistence
628-
// readerSuccess = entities successfully processed through reader
629-
// readerFailed = specifically reader failures (not sink failures)
630-
// readerWarnings = warnings from reader (e.g., stale references)
631-
coordinatorReaderSuccess.addAndGet(result.successCount());
632-
coordinatorReaderFailed.addAndGet(result.readerFailed());
633-
coordinatorReaderWarnings.addAndGet(result.readerWarnings());
634-
coordinatorPartitionsCompleted.incrementAndGet();
631+
coordinatorReaderSuccess.addAndGet(result.successCount());
632+
coordinatorReaderFailed.addAndGet(result.readerFailed());
633+
coordinatorReaderWarnings.addAndGet(result.readerWarnings());
634+
coordinatorPartitionsCompleted.incrementAndGet();
635635

636-
LOG.info(
637-
"Worker {} completed partition {} (success: {}, failed: {}, readerFailed: {}, readerWarnings: {})",
638-
workerId,
639-
partition.getId(),
640-
result.successCount(),
641-
result.failedCount(),
642-
result.readerFailed(),
643-
result.readerWarnings());
644-
645-
// Stats are tracked per-entityType by StageStatsTracker in PartitionWorker
636+
LOG.info(
637+
"Worker {} completed partition {} (success: {}, failed: {}, readerFailed: {}, readerWarnings: {})",
638+
workerId,
639+
partition.getId(),
640+
result.successCount(),
641+
result.failedCount(),
642+
result.readerFailed(),
643+
result.readerWarnings());
644+
} catch (Exception e) {
645+
LOG.error(
646+
"Worker {} failed partition {} for {}: {}",
647+
workerId,
648+
partition.getId(),
649+
partition.getEntityType(),
650+
e.getMessage(),
651+
e);
652+
} finally {
653+
activePartitions.remove(partition.getId());
654+
}
646655
}
647656
} finally {
648657
synchronized (activeWorkers) {
@@ -670,6 +679,11 @@ private void runStaleReclaimerLoop(UUID jobId) {
670679
LOG.info("Reclaimed {} stale partitions for job {}", reclaimed, jobId);
671680
}
672681

682+
if (entityTracker != null) {
683+
List<SearchIndexPartition> allPartitions = coordinator.getPartitions(jobId, null);
684+
entityTracker.reconcileFromDatabase(allPartitions);
685+
}
686+
673687
} catch (InterruptedException e) {
674688
Thread.currentThread().interrupt();
675689
break;
@@ -738,8 +752,8 @@ private void runPartitionHeartbeatLoop() {
738752
int updated = 0;
739753
long now = System.currentTimeMillis();
740754
for (SearchIndexPartition partition : processing) {
741-
if (serverId.equals(partition.getAssignedServer())) {
742-
// Update the heartbeat for this partition
755+
if (serverId.equals(partition.getAssignedServer())
756+
&& activePartitions.contains(partition.getId())) {
743757
collectionDAO
744758
.searchIndexPartitionDAO()
745759
.updateHeartbeat(partition.getId().toString(), now);

openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/distributed/EntityCompletionTracker.java

Lines changed: 63 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,14 @@
1313

1414
package org.openmetadata.service.apps.bundles.searchIndex.distributed;
1515

16+
import java.util.List;
1617
import java.util.Map;
1718
import java.util.Set;
1819
import java.util.UUID;
1920
import java.util.concurrent.ConcurrentHashMap;
2021
import java.util.concurrent.atomic.AtomicInteger;
2122
import java.util.function.BiConsumer;
23+
import java.util.stream.Collectors;
2224
import lombok.extern.slf4j.Slf4j;
2325

2426
/**
@@ -100,29 +102,9 @@ public void recordPartitionComplete(String entityType, boolean partitionFailed)
100102
jobId);
101103

102104
if (newCompleted >= totalCount) {
103-
promoteIfReady(entityType);
104-
}
105-
}
106-
107-
private void promoteIfReady(String entityType) {
108-
if (promotedEntities.add(entityType)) {
109105
AtomicInteger failed = failedPartitions.get(entityType);
110-
boolean success = (failed == null || failed.get() == 0);
111-
112-
LOG.info(
113-
"Entity '{}' all partitions complete (success={}, failedPartitions={}, job {})",
114-
entityType,
115-
success,
116-
failed != null ? failed.get() : 0,
117-
jobId);
118-
119-
if (onEntityComplete != null) {
120-
try {
121-
onEntityComplete.accept(entityType, success);
122-
} catch (Exception e) {
123-
LOG.error("Error in entity completion callback for '{}' (job {})", entityType, jobId, e);
124-
}
125-
}
106+
boolean hasFailed = failed != null && failed.get() > 0;
107+
promoteIfReady(entityType, hasFailed);
126108
}
127109
}
128110

@@ -154,6 +136,65 @@ public UUID getJobId() {
154136
return jobId;
155137
}
156138

139+
/**
140+
* Reconcile entity completion state from the database. This catches completions that were missed
141+
* by in-memory tracking (e.g., partitions completed by participant servers or failed by the stale
142+
* reclaimer SQL, both of which bypass the in-memory EntityCompletionTracker).
143+
*
144+
* @param partitions All partitions for the job (from DB)
145+
*/
146+
public void reconcileFromDatabase(List<SearchIndexPartition> partitions) {
147+
Map<String, List<SearchIndexPartition>> byEntity =
148+
partitions.stream().collect(Collectors.groupingBy(SearchIndexPartition::getEntityType));
149+
150+
for (Map.Entry<String, List<SearchIndexPartition>> entry : byEntity.entrySet()) {
151+
String entityType = entry.getKey();
152+
List<SearchIndexPartition> entityPartitions = entry.getValue();
153+
154+
if (promotedEntities.contains(entityType)) {
155+
continue;
156+
}
157+
158+
boolean allDone =
159+
entityPartitions.stream()
160+
.allMatch(
161+
p ->
162+
p.getStatus() == PartitionStatus.COMPLETED
163+
|| p.getStatus() == PartitionStatus.FAILED);
164+
165+
if (allDone && !entityPartitions.isEmpty()) {
166+
boolean hasFailed =
167+
entityPartitions.stream().anyMatch(p -> p.getStatus() == PartitionStatus.FAILED);
168+
169+
LOG.info(
170+
"DB reconciliation: entity '{}' all {} partitions done (hasFailed={}, job {})",
171+
entityType,
172+
entityPartitions.size(),
173+
hasFailed,
174+
jobId);
175+
176+
promoteIfReady(entityType, hasFailed);
177+
}
178+
}
179+
}
180+
181+
private void promoteIfReady(String entityType, boolean hasFailed) {
182+
if (promotedEntities.add(entityType)) {
183+
boolean success = !hasFailed;
184+
185+
LOG.info(
186+
"Entity '{}' all partitions complete (success={}, job {})", entityType, success, jobId);
187+
188+
if (onEntityComplete != null) {
189+
try {
190+
onEntityComplete.accept(entityType, success);
191+
} catch (Exception e) {
192+
LOG.error("Error in entity completion callback for '{}' (job {})", entityType, jobId, e);
193+
}
194+
}
195+
}
196+
}
197+
157198
/**
158199
* Get the completion status for an entity.
159200
*

0 commit comments

Comments
 (0)