Skip to content

Commit 6863ed2

Browse files
authored
IGNITE-23741 Partition storages are destroyed on zone partition stop (#5173)
1 parent 74fe719 commit 6863ed2

File tree

3 files changed

+26
-28
lines changed

3 files changed

+26
-28
lines changed

modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/LocalPartitionReplicaEvent.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ public enum LocalPartitionReplicaEvent implements Event {
2929
AFTER_REPLICA_STARTED,
3030

3131
/**
32-
* Fired when partition replica has stopped.
32+
* Fired when partition replica has been destroyed.
3333
*/
34-
AFTER_REPLICA_STOPPED
34+
AFTER_REPLICA_DESTROYED
3535
}

modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java

Lines changed: 21 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -914,16 +914,12 @@ private CompletableFuture<Void> stopAndDestroyPartitionAndUpdateClients(
914914
.noneMatch(assignment -> assignment.consistentId().equals(localNode().name()));
915915

916916
if (shouldStopLocalServices) {
917-
return clientUpdateFuture.thenCompose(v -> stopAndDestroyPartition(zonePartitionId, revision));
917+
return clientUpdateFuture.thenCompose(v -> weakStopAndDestroyPartition(zonePartitionId, revision));
918918
} else {
919919
return clientUpdateFuture;
920920
}
921921
}
922922

923-
private CompletableFuture<Void> stopAndDestroyPartition(ZonePartitionId zonePartitionId, long revision) {
924-
return weakStopPartition(zonePartitionId, revision);
925-
}
926-
927923
private CompletableFuture<Void> handleChangePendingAssignmentEvent(Entry pendingAssignmentsEntry, long revision) {
928924
if (pendingAssignmentsEntry.value() == null || pendingAssignmentsEntry.empty()) {
929925
return nullCompletedFuture();
@@ -1249,40 +1245,45 @@ public CompletableFuture<Void> stopAsync(ComponentContext componentContext) {
12491245
return Assignments.fromBytes(entry.value());
12501246
}
12511247

1252-
private CompletableFuture<Void> weakStopPartition(ZonePartitionId zonePartitionId, long revision) {
1248+
private CompletableFuture<Void> weakStopAndDestroyPartition(ZonePartitionId zonePartitionId, long revision) {
12531249
return replicaMgr.weakStopReplica(
12541250
zonePartitionId,
12551251
WeakReplicaStopReason.EXCLUDED_FROM_ASSIGNMENTS,
1256-
() -> stopPartition(zonePartitionId, revision)
1252+
() -> stopPartition(zonePartitionId).thenCompose(replicaWasStopped -> {
1253+
if (!replicaWasStopped) {
1254+
return nullCompletedFuture();
1255+
}
1256+
1257+
return fireEvent(
1258+
LocalPartitionReplicaEvent.AFTER_REPLICA_DESTROYED,
1259+
new LocalPartitionReplicaEventParameters(zonePartitionId, revision)
1260+
);
1261+
})
12571262
);
12581263
}
12591264

12601265
/**
12611266
* Stops all resources associated with a given partition, like replicas and partition trackers.
12621267
*
12631268
* @param zonePartitionId Partition ID.
1264-
* @return Future that will be completed after all resources have been closed.
1269+
* @return Future that will be completed after all resources have been closed and the future's result answers was replica was stopped
1270+
* correctly or not.
12651271
*/
1266-
private CompletableFuture<Void> stopPartition(ZonePartitionId zonePartitionId, long revision) {
1272+
private CompletableFuture<Boolean> stopPartition(ZonePartitionId zonePartitionId) {
12671273
return executeUnderZoneWriteLock(zonePartitionId.zoneId(), () -> {
12681274
try {
12691275
return replicaMgr.stopReplica(zonePartitionId)
1270-
.thenCompose((replicaWasStopped) -> {
1276+
.thenApply((replicaWasStopped) -> {
12711277
if (replicaWasStopped) {
12721278
zonePartitionRaftListeners.remove(zonePartitionId);
12731279
replicationGroupIds.remove(zonePartitionId);
1274-
1275-
return fireEvent(
1276-
LocalPartitionReplicaEvent.AFTER_REPLICA_STOPPED,
1277-
new LocalPartitionReplicaEventParameters(zonePartitionId, revision)
1278-
);
1279-
} else {
1280-
return nullCompletedFuture();
12811280
}
1281+
1282+
return replicaWasStopped;
12821283
});
12831284
} catch (NodeStoppingException e) {
12841285
// No-op.
1285-
return nullCompletedFuture();
1286+
return falseCompletedFuture();
12861287
}
12871288
});
12881289
}
@@ -1293,11 +1294,8 @@ private CompletableFuture<Void> stopPartition(ZonePartitionId zonePartitionId, l
12931294
* @param partitionIds Partitions to stop.
12941295
*/
12951296
private void cleanUpPartitionsResources(Set<ZonePartitionId> partitionIds) {
1296-
// TODO: Due to IGNITE-23741 we shouldn't destroy partitions on node stop thus the revision will be removed.
1297-
long revision = catalogMgr.latestCatalogVersion();
1298-
12991297
CompletableFuture<?>[] stopPartitionsFuture = partitionIds.stream()
1300-
.map(partId -> stopPartition(partId, revision))
1298+
.map(this::stopPartition)
13011299
.toArray(CompletableFuture[]::new);
13021300

13031301
try {
@@ -1354,7 +1352,7 @@ public void loadTableListenerToZoneReplica(
13541352
zonePartitionRaftListeners.get(zonePartitionId).addTablePartitionRaftListener(tablePartitionId, tablePartitionRaftListener);
13551353
}
13561354

1357-
private CompletableFuture<Void> executeUnderZoneWriteLock(int zoneId, Supplier<CompletableFuture<Void>> action) {
1355+
private <T> CompletableFuture<T> executeUnderZoneWriteLock(int zoneId, Supplier<CompletableFuture<T>> action) {
13581356
StampedLock lock = zonePartitionsLocks.computeIfAbsent(zoneId, id -> new StampedLock());
13591357

13601358
long stamp = lock.writeLock();

modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -606,8 +606,8 @@ public TableManager(
606606
);
607607

608608
partitionReplicaLifecycleManager.listen(
609-
LocalPartitionReplicaEvent.AFTER_REPLICA_STOPPED,
610-
this::onZoneReplicaStopped
609+
LocalPartitionReplicaEvent.AFTER_REPLICA_DESTROYED,
610+
this::onZoneReplicaDestroyed
611611
);
612612

613613
rebalanceRetryDelayConfiguration = new SystemDistributedConfigurationPropertyHolder<>(
@@ -718,7 +718,7 @@ private CompletableFuture<Boolean> onZoneReplicaCreated(LocalPartitionReplicaEve
718718
);
719719
}
720720

721-
private CompletableFuture<Boolean> onZoneReplicaStopped(LocalPartitionReplicaEventParameters parameters) {
721+
private CompletableFuture<Boolean> onZoneReplicaDestroyed(LocalPartitionReplicaEventParameters parameters) {
722722
if (!enabledColocationFeature) {
723723
return falseCompletedFuture();
724724
}

0 commit comments

Comments
 (0)