Skip to content

Commit f2a6e55

Browse files
IGNITE-26605 Refactor SnapshotHandlerRestoreTask (#12419)
1 parent 357535e commit f2a6e55

File tree

3 files changed

+46
-107
lines changed

3 files changed

+46
-107
lines changed

modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotCheckProcess.java

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -232,11 +232,37 @@ private void reduceCustomHandlersResults(
232232
});
233233
}
234234

235-
snpChecker.checkCustomHandlersResults(ctx.req.snapshotName(), reduced);
235+
Map<String, List<SnapshotHandlerResult<?>>> clusterResults = new HashMap<>();
236+
Collection<UUID> execNodes = new ArrayList<>(reduced.size());
237+
238+
// Checking node -> Map by consistend id.
239+
for (Map.Entry<ClusterNode, Map<Object, Map<String, SnapshotHandlerResult<?>>>> nodeRes : reduced.entrySet()) {
240+
// Consistent id -> Map by handler name.
241+
for (Map.Entry<Object, Map<String, SnapshotHandlerResult<?>>> res : nodeRes.getValue().entrySet()) {
242+
// Depending on the job mapping, we can get several different results from one node.
243+
execNodes.add(nodeRes.getKey().id());
244+
245+
Map<String, SnapshotHandlerResult<?>> nodeDataMap = res.getValue();
246+
247+
assert nodeDataMap != null : "At least the default snapshot restore handler should have been executed ";
248+
249+
for (Map.Entry<String, SnapshotHandlerResult<?>> entry : nodeDataMap.entrySet()) {
250+
String hndName = entry.getKey();
251+
252+
clusterResults.computeIfAbsent(hndName, v -> new ArrayList<>()).add(entry.getValue());
253+
}
254+
}
255+
}
256+
257+
kctx.cache().context().snapshotMgr().handlers().completeAll(
258+
SnapshotHandlerType.RESTORE, ctx.req.snapshotName(), clusterResults, execNodes, wrns -> {});
236259

237260
fut.onDone(new SnapshotPartitionsVerifyResult(ctx.clusterMetas, null));
238261
}
239262
catch (Throwable err) {
263+
log.warning("The snapshot operation will be aborted due to a handler error " +
264+
"[snapshot=" + ctx.req.snapshotName() + "].", err);
265+
240266
fut.onDone(err);
241267
}
242268
}

modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotChecker.java

Lines changed: 2 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -164,8 +164,8 @@ public CompletableFuture<Map<String, SnapshotHandlerResult<Object>>> invokeCusto
164164
) {
165165
// The handlers use or may use the same snapshot pool. If it is configured with 1 thread, launching waiting task in
166166
// the same pool might block it.
167-
return CompletableFuture.supplyAsync(() ->
168-
new SnapshotHandlerRestoreTask(kctx.grid(), log, sft, grps, check).execute()
167+
return CompletableFuture.supplyAsync(
168+
new SnapshotHandlerRestoreTask(kctx.grid(), log, sft, grps, check)
169169
);
170170
}
171171

@@ -190,17 +190,4 @@ public CompletableFuture<Map<PartitionKey, PartitionHashRecord>> checkPartitions
190190
}
191191
}, executor);
192192
}
193-
194-
/**
195-
* Checks results of all the snapshot validation handlres.
196-
* @param snpName Snapshot name.
197-
* @param results Results: checking node -> snapshot part's consistend id -> custom handler name -> handler result.
198-
* @see #invokeCustomHandlers(SnapshotMetadata, SnapshotFileTree, Collection, boolean)
199-
*/
200-
public void checkCustomHandlersResults(
201-
String snpName,
202-
Map<ClusterNode, Map<Object, Map<String, SnapshotHandlerResult<?>>>> results
203-
) {
204-
new SnapshotHandlerRestoreTask(kctx.grid(), log, null, null, true).reduce(snpName, results);
205-
}
206193
}

modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotHandlerRestoreTask.java

Lines changed: 17 additions & 91 deletions
Original file line numberDiff line numberDiff line change
@@ -18,31 +18,30 @@
1818
package org.apache.ignite.internal.processors.cache.persistence.snapshot;
1919

2020
import java.io.IOException;
21-
import java.util.ArrayList;
2221
import java.util.Collection;
23-
import java.util.HashMap;
24-
import java.util.List;
2522
import java.util.Map;
26-
import java.util.UUID;
23+
import java.util.function.Supplier;
2724
import org.apache.ignite.IgniteCheckedException;
2825
import org.apache.ignite.IgniteException;
2926
import org.apache.ignite.IgniteLogger;
30-
import org.apache.ignite.cluster.ClusterNode;
3127
import org.apache.ignite.internal.IgniteEx;
3228
import org.apache.ignite.internal.processors.cache.persistence.filename.SnapshotFileTree;
3329

3430
/**
3531
* Snapshot restore operation handling task.
3632
*/
37-
public class SnapshotHandlerRestoreTask {
33+
public class SnapshotHandlerRestoreTask implements Supplier<Map<String, SnapshotHandlerResult<Object>>> {
3834
/** */
3935
private final IgniteEx ignite;
4036

4137
/** */
42-
private final IgniteLogger log;
38+
private final SnapshotFileTree sft;
4339

4440
/** */
45-
private final SnapshotHandlerRestoreJob job;
41+
private final Collection<String> rqGrps;
42+
43+
/** */
44+
private final boolean check;
4645

4746
/** */
4847
SnapshotHandlerRestoreTask(
@@ -52,96 +51,23 @@ public class SnapshotHandlerRestoreTask {
5251
Collection<String> grps,
5352
boolean check
5453
) {
55-
job = new SnapshotHandlerRestoreJob(ignite, sft, grps, check);
5654
this.ignite = ignite;
57-
this.log = log;
55+
this.sft = sft;
56+
this.rqGrps = grps;
57+
this.check = check;
5858
}
5959

6060
/** */
61-
public Map<String, SnapshotHandlerResult<Object>> execute() {
62-
return job.execute0();
63-
}
64-
65-
/** */
66-
public void reduce(
67-
String snapshotName,
68-
Map<ClusterNode, Map<Object, Map<String, SnapshotHandlerResult<?>>>> results
69-
) {
70-
Map<String, List<SnapshotHandlerResult<?>>> clusterResults = new HashMap<>();
71-
Collection<UUID> execNodes = new ArrayList<>(results.size());
72-
73-
// Checking node -> Map by consistend id.
74-
for (Map.Entry<ClusterNode, Map<Object, Map<String, SnapshotHandlerResult<?>>>> nodeRes : results.entrySet()) {
75-
// Consistent id -> Map by handler name.
76-
for (Map.Entry<Object, Map<String, SnapshotHandlerResult<?>>> res : nodeRes.getValue().entrySet()) {
77-
// Depending on the job mapping, we can get several different results from one node.
78-
execNodes.add(nodeRes.getKey().id());
79-
80-
Map<String, SnapshotHandlerResult<?>> nodeDataMap = res.getValue();
81-
82-
assert nodeDataMap != null : "At least the default snapshot restore handler should have been executed ";
83-
84-
for (Map.Entry<String, SnapshotHandlerResult<?>> entry : nodeDataMap.entrySet()) {
85-
String hndName = entry.getKey();
86-
87-
clusterResults.computeIfAbsent(hndName, v -> new ArrayList<>()).add(entry.getValue());
88-
}
89-
}
90-
}
91-
61+
@Override public Map<String, SnapshotHandlerResult<Object>> get() {
9262
try {
93-
ignite.context().cache().context().snapshotMgr().handlers().completeAll(
94-
SnapshotHandlerType.RESTORE, snapshotName, clusterResults, execNodes, wrns -> {});
95-
}
96-
catch (Exception e) {
97-
log.warning("The snapshot operation will be aborted due to a handler error [snapshot=" + snapshotName + "].", e);
98-
99-
throw new IgniteException(e);
100-
}
101-
}
102-
103-
/** Invokes all {@link SnapshotHandlerType#RESTORE} handlers locally. */
104-
private static class SnapshotHandlerRestoreJob {
105-
/** */
106-
private final IgniteEx ignite;
63+
IgniteSnapshotManager snpMgr = ignite.context().cache().context().snapshotMgr();
64+
SnapshotMetadata meta = snpMgr.readSnapshotMetadata(sft.meta());
10765

108-
/** */
109-
private final SnapshotFileTree sft;
110-
111-
/** */
112-
private final Collection<String> rqGrps;
113-
114-
/** */
115-
private final boolean check;
116-
117-
/**
118-
* @param grps Cache group names.
119-
* @param check If {@code true} check snapshot before restore.
120-
*/
121-
SnapshotHandlerRestoreJob(
122-
IgniteEx ignite,
123-
SnapshotFileTree sft,
124-
Collection<String> grps,
125-
boolean check
126-
) {
127-
this.ignite = ignite;
128-
this.sft = sft;
129-
this.rqGrps = grps;
130-
this.check = check;
66+
return snpMgr.handlers().invokeAll(SnapshotHandlerType.RESTORE,
67+
new SnapshotHandlerContext(meta, rqGrps, ignite.localNode(), sft, false, check));
13168
}
132-
133-
/** */
134-
public Map<String, SnapshotHandlerResult<Object>> execute0() {
135-
try {
136-
IgniteSnapshotManager snpMgr = ignite.context().cache().context().snapshotMgr();
137-
SnapshotMetadata meta = snpMgr.readSnapshotMetadata(sft.meta());
138-
139-
return snpMgr.handlers().invokeAll(SnapshotHandlerType.RESTORE,
140-
new SnapshotHandlerContext(meta, rqGrps, ignite.localNode(), sft, false, check));
141-
}
142-
catch (IgniteCheckedException | IOException e) {
143-
throw new IgniteException(e);
144-
}
69+
catch (IgniteCheckedException | IOException e) {
70+
throw new IgniteException(e);
14571
}
14672
}
14773
}

0 commit comments

Comments
 (0)