Skip to content

Commit c00cbbf

Browse files
committed
Fix Processing Partition picked up without active workers
1 parent 6724762 commit c00cbbf

File tree

1 file changed

+37
-23
lines changed

1 file changed

+37
-23
lines changed

openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/distributed/DistributedSearchIndexExecutor.java

Lines changed: 37 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.util.Optional;
2424
import java.util.Set;
2525
import java.util.UUID;
26+
import java.util.concurrent.ConcurrentHashMap;
2627
import java.util.concurrent.CountDownLatch;
2728
import java.util.concurrent.ExecutorService;
2829
import java.util.concurrent.Executors;
@@ -100,6 +101,7 @@ public static boolean isCoordinatingJob(UUID jobId) {
100101
@Getter private SearchIndexJob currentJob;
101102
private DistributedJobStatsAggregator statsAggregator;
102103
private ExecutorService workerExecutor;
104+
private final Set<UUID> activePartitions = ConcurrentHashMap.newKeySet();
103105
private final List<PartitionWorker> activeWorkers = new ArrayList<>();
104106
private Thread lockRefreshThread;
105107
private Thread partitionHeartbeatThread;
@@ -620,29 +622,41 @@ private void runWorkerLoop(
620622
partition.getId(),
621623
partition.getEntityType());
622624

623-
PartitionWorker.PartitionResult result = worker.processPartition(partition);
624-
totalSuccess.addAndGet(result.successCount());
625-
totalFailed.addAndGet(result.failedCount());
625+
activePartitions.add(partition.getId());
626+
try {
627+
PartitionWorker.PartitionResult result = worker.processPartition(partition);
628+
totalSuccess.addAndGet(result.successCount());
629+
totalFailed.addAndGet(result.failedCount());
626630

627-
// Accumulate into coordinator stats for persistence
628-
// readerSuccess = entities successfully processed through reader
629-
// readerFailed = specifically reader failures (not sink failures)
630-
// readerWarnings = warnings from reader (e.g., stale references)
631-
coordinatorReaderSuccess.addAndGet(result.successCount());
632-
coordinatorReaderFailed.addAndGet(result.readerFailed());
633-
coordinatorReaderWarnings.addAndGet(result.readerWarnings());
634-
coordinatorPartitionsCompleted.incrementAndGet();
631+
coordinatorReaderSuccess.addAndGet(result.successCount());
632+
coordinatorReaderFailed.addAndGet(result.readerFailed());
633+
coordinatorReaderWarnings.addAndGet(result.readerWarnings());
634+
coordinatorPartitionsCompleted.incrementAndGet();
635635

636-
LOG.info(
637-
"Worker {} completed partition {} (success: {}, failed: {}, readerFailed: {}, readerWarnings: {})",
638-
workerId,
639-
partition.getId(),
640-
result.successCount(),
641-
result.failedCount(),
642-
result.readerFailed(),
643-
result.readerWarnings());
644-
645-
// Stats are tracked per-entityType by StageStatsTracker in PartitionWorker
636+
LOG.info(
637+
"Worker {} completed partition {} (success: {}, failed: {}, readerFailed: {}, readerWarnings: {})",
638+
workerId,
639+
partition.getId(),
640+
result.successCount(),
641+
result.failedCount(),
642+
result.readerFailed(),
643+
result.readerWarnings());
644+
} catch (Exception e) {
645+
LOG.error(
646+
"Worker {} failed partition {} for {}: {}",
647+
workerId,
648+
partition.getId(),
649+
partition.getEntityType(),
650+
e.getMessage(),
651+
e);
652+
try {
653+
coordinator.failPartition(partition.getId(), e.getMessage());
654+
} catch (Exception fe) {
655+
LOG.error("Failed to mark partition {} as failed", partition.getId(), fe);
656+
}
657+
} finally {
658+
activePartitions.remove(partition.getId());
659+
}
646660
}
647661
} finally {
648662
synchronized (activeWorkers) {
@@ -738,8 +752,8 @@ private void runPartitionHeartbeatLoop() {
738752
int updated = 0;
739753
long now = System.currentTimeMillis();
740754
for (SearchIndexPartition partition : processing) {
741-
if (serverId.equals(partition.getAssignedServer())) {
742-
// Update the heartbeat for this partition
755+
if (serverId.equals(partition.getAssignedServer())
756+
&& activePartitions.contains(partition.getId())) {
743757
collectionDAO
744758
.searchIndexPartitionDAO()
745759
.updateHeartbeat(partition.getId().toString(), now);

0 commit comments

Comments
 (0)