Skip to content

Commit 384f5db

Browse files
authored
feat: 24213 - Improve caching for hash chunk preload during reconnects (#24466)
Signed-off-by: Artur Kugal <artur.kugal@swirldslabs.com>
1 parent 4558182 commit 384f5db

File tree

10 files changed

+230
-112
lines changed

10 files changed

+230
-112
lines changed

platform-sdk/swirlds-common/src/main/java/com/swirlds/common/merkle/synchronization/TeachingSynchronizer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ public void synchronize() throws InterruptedException {
111111

112112
InterruptedException interruptException = null;
113113
try (view) {
114-
view.startTeacherTasks(this, time, workGroup, in, out);
114+
view.startTeacherTasks(time, workGroup, in, out);
115115
workGroup.waitForTermination();
116116
} catch (final InterruptedException e) { // NOSONAR: Exception is rethrown below after cleanup.
117117
interruptException = e;

platform-sdk/swirlds-common/src/main/java/com/swirlds/common/merkle/synchronization/views/TeacherTreeView.java

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
package com.swirlds.common.merkle.synchronization.views;
33

44
import com.swirlds.base.time.Time;
5-
import com.swirlds.common.merkle.synchronization.TeachingSynchronizer;
65
import com.swirlds.common.merkle.synchronization.streams.AsyncInputStream;
76
import com.swirlds.common.merkle.synchronization.streams.AsyncOutputStream;
87
import com.swirlds.common.merkle.synchronization.utility.MerkleSynchronizationException;
@@ -23,17 +22,12 @@ public interface TeacherTreeView extends TeacherHandleQueue, TeacherResponseQueu
2322
* new custom tree views are encountered, they must be added to {@code subtrees}, although it isn't
2423
* currently supported by virtual tree views, as nested virtual maps are not supported.
2524
*
26-
* @param teachingSynchronizer the teacher synchronizer
2725
* @param workGroup the work group to run teaching task(s) in
2826
* @param in the input stream to read data from learner
2927
* @param out the output stream to write data to learner
3028
*/
3129
void startTeacherTasks(
32-
final TeachingSynchronizer teachingSynchronizer,
33-
final Time time,
34-
final StandardWorkGroup workGroup,
35-
final AsyncInputStream in,
36-
final AsyncOutputStream out);
30+
final Time time, final StandardWorkGroup workGroup, final AsyncInputStream in, final AsyncOutputStream out);
3731

3832
/**
3933
* Write data for a merkle leaf to the stream.

platform-sdk/swirlds-virtualmap/src/main/java/com/swirlds/virtualmap/VirtualMap.java

Lines changed: 28 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import com.swirlds.metrics.api.Metrics;
3535
import com.swirlds.virtualmap.config.VirtualMapConfig;
3636
import com.swirlds.virtualmap.config.VirtualMapReconnectMode;
37+
import com.swirlds.virtualmap.datasource.DataSourceHashChunkPreloader;
3738
import com.swirlds.virtualmap.datasource.VirtualDataSource;
3839
import com.swirlds.virtualmap.datasource.VirtualDataSourceBuilder;
3940
import com.swirlds.virtualmap.datasource.VirtualHashChunk;
@@ -51,7 +52,6 @@
5152
import com.swirlds.virtualmap.internal.reconnect.ConcurrentBlockingIterator;
5253
import com.swirlds.virtualmap.internal.reconnect.LearnerPullVirtualTreeView;
5354
import com.swirlds.virtualmap.internal.reconnect.LearnerPushVirtualTreeView;
54-
import com.swirlds.virtualmap.internal.reconnect.NodeTraversalOrder;
5555
import com.swirlds.virtualmap.internal.reconnect.ParallelSyncTraversalOrder;
5656
import com.swirlds.virtualmap.internal.reconnect.ReconnectHashLeafFlusher;
5757
import com.swirlds.virtualmap.internal.reconnect.ReconnectHashListener;
@@ -321,22 +321,13 @@ public static class ClassVersion {
321321
*/
322322
private VirtualMapMetadata reconnectState;
323323

324-
private VirtualNodeCache reconnectCache;
325-
326-
/**
327-
* The {@link RecordAccessor} for the state, cache, and data source needed during reconnect.
328-
*/
329-
private RecordAccessor reconnectRecords;
330-
331324
/**
332325
* During reconnect as a learner, this is the root node in the old learner merkle tree.
333326
*/
334327
private VirtualMap originalMap;
335328

336329
private ReconnectHashLeafFlusher reconnectFlusher;
337330

338-
private ReconnectNodeRemover nodeRemover;
339-
340331
private final long fastCopyVersion;
341332

342333
private VirtualMapStatistics statistics;
@@ -423,8 +414,6 @@ private VirtualMap(final VirtualMap source) {
423414
reconnectHashingFuture = null;
424415
reconnectHashingStarted = null;
425416
reconnectIterator = null;
426-
reconnectCache = null;
427-
reconnectRecords = null;
428417
pipeline = source.pipeline;
429418
flushCandidateThreshold.set(source.flushCandidateThreshold.get());
430419
statistics = source.statistics;
@@ -869,10 +858,6 @@ public void onShutdown(final boolean immediately) {
869858
hasher.shutdown();
870859
}
871860
cache.shutdown();
872-
if (reconnectCache != null) {
873-
reconnectCache.shutdown();
874-
reconnectCache = null;
875-
}
876861
closeDataSource();
877862
}
878863

@@ -1214,6 +1199,19 @@ public void onHashChunkHashed(@NonNull VirtualHashChunk chunk) {
12141199
statistics.recordHash(end - start);
12151200
}
12161201

1202+
/**
1203+
* @return copy of underlying datasource with cache copy flushed into it, and running compaction
1204+
*/
1205+
public VirtualDataSource detachAsDataSourceCopy() {
1206+
return pipeline.pausePipelineAndRun("detach", () -> {
1207+
final Path snapshotPath = dataSourceBuilder.snapshot(null, dataSource);
1208+
VirtualDataSource dataSourceCopy = dataSourceBuilder.build(getLabel(), snapshotPath, true, false);
1209+
1210+
flush(cache.snapshot(), metadata, dataSourceCopy);
1211+
return dataSourceCopy;
1212+
});
1213+
}
1214+
12171215
/**
12181216
* Prepares a read-only copy so that it may be used even when removed from the pipeline.
12191217
* Can be called only on immutable hashed copy.
@@ -1291,31 +1289,7 @@ private void setupWithOriginalNode(@NonNull final VirtualMap originalMap) {
12911289

12921290
// Start with empty state, it will be updated from the teacher during reconnect
12931291
reconnectState = new VirtualMapMetadata();
1294-
reconnectRecords = originalMap.pipeline.pausePipelineAndRun("copy", () -> {
1295-
// shutdown background compaction on original data source as it is no longer needed to be running as all
1296-
// data
1297-
// in that data source is only there as a starting point for reconnect now. So compacting it further is not
1298-
// helpful and will just burn resources.
1299-
originalMap.dataSource.stopAndDisableBackgroundCompaction();
1300-
1301-
// Take a snapshot, and use the snapshot database as my data source
1302-
final Path snapshotPath = dataSourceBuilder.snapshot(null, originalMap.dataSource);
1303-
this.dataSource = dataSourceBuilder.build(originalMap.getLabel(), snapshotPath, true, false);
1304-
1305-
// The old map's cache is going to become immutable, but that's OK, because the old map
1306-
// will NEVER be updated again.
1307-
assert originalMap.isHashed() : "The system should have made sure this was hashed by this point!";
1308-
final VirtualNodeCache snapshotCache = originalMap.cache.snapshot();
1309-
flush(snapshotCache, originalMap.metadata, this.dataSource);
1310-
1311-
final int hashChunkHeight = dataSource.getHashChunkHeight();
1312-
reconnectCache = new VirtualNodeCache(
1313-
virtualMapConfig,
1314-
hashChunkHeight,
1315-
dataSource::loadHashChunk,
1316-
originalMap.cache.getFastCopyVersion());
1317-
return new RecordAccessor(reconnectState, hashChunkHeight, reconnectCache, dataSource);
1318-
});
1292+
this.dataSource = originalMap.detachAsDataSourceCopy();
13191293

13201294
// Set up the VirtualHasher which we will use during reconnect.
13211295
// Initial timeout is intentionally very long, timeout is reduced once we receive the first leaf in the tree.
@@ -1358,7 +1332,7 @@ public LearnerTreeView buildLearnerView(
13581332
final VirtualMapMetadata originalState = originalMap.getMetadata();
13591333
reconnectFlusher =
13601334
new ReconnectHashLeafFlusher(dataSource, virtualMapConfig.reconnectFlushInterval(), statistics);
1361-
nodeRemover = new ReconnectNodeRemover(
1335+
final ReconnectNodeRemover nodeRemover = new ReconnectNodeRemover(
13621336
originalMap.getRecords(),
13631337
originalState.getFirstLeafPath(),
13641338
originalState.getLastLeafPath(),
@@ -1367,42 +1341,36 @@ public LearnerTreeView buildLearnerView(
13671341
case VirtualMapReconnectMode.PUSH ->
13681342
new LearnerPushVirtualTreeView(
13691343
this, originalMap.records, originalState, reconnectState, nodeRemover, mapStats);
1370-
case VirtualMapReconnectMode.PULL_TOP_TO_BOTTOM -> {
1371-
final NodeTraversalOrder topToBottom = new TopToBottomTraversalOrder();
1372-
yield new LearnerPullVirtualTreeView(
1344+
case VirtualMapReconnectMode.PULL_TOP_TO_BOTTOM ->
1345+
new LearnerPullVirtualTreeView(
13731346
reconnectConfig,
13741347
this,
13751348
originalMap.records,
13761349
originalState,
13771350
reconnectState,
13781351
nodeRemover,
1379-
topToBottom,
1352+
new TopToBottomTraversalOrder(),
13801353
mapStats);
1381-
}
1382-
case VirtualMapReconnectMode.PULL_TWO_PHASE_PESSIMISTIC -> {
1383-
final NodeTraversalOrder twoPhasePessimistic = new TwoPhasePessimisticTraversalOrder();
1384-
yield new LearnerPullVirtualTreeView(
1354+
case VirtualMapReconnectMode.PULL_TWO_PHASE_PESSIMISTIC ->
1355+
new LearnerPullVirtualTreeView(
13851356
reconnectConfig,
13861357
this,
13871358
originalMap.records,
13881359
originalState,
13891360
reconnectState,
13901361
nodeRemover,
1391-
twoPhasePessimistic,
1362+
new TwoPhasePessimisticTraversalOrder(),
13921363
mapStats);
1393-
}
1394-
case VirtualMapReconnectMode.PULL_PARALLEL_SYNC -> {
1395-
final NodeTraversalOrder parallelSync = new ParallelSyncTraversalOrder();
1396-
yield new LearnerPullVirtualTreeView(
1364+
case VirtualMapReconnectMode.PULL_PARALLEL_SYNC ->
1365+
new LearnerPullVirtualTreeView(
13971366
reconnectConfig,
13981367
this,
13991368
originalMap.records,
14001369
originalState,
14011370
reconnectState,
14021371
nodeRemover,
1403-
parallelSync,
1372+
new ParallelSyncTraversalOrder(),
14041373
mapStats);
1405-
}
14061374
default ->
14071375
throw new UnsupportedOperationException("Unknown reconnect mode: " + virtualMapConfig.reconnectMode());
14081376
};
@@ -1442,8 +1410,9 @@ public void handleReconnectLeaf(@NonNull final VirtualLeafBytes<?> leafRecord) {
14421410

14431411
public void prepareReconnectHashing(final long firstLeafPath, final long lastLeafPath) {
14441412
assert reconnectFlusher != null : "Cannot prepare reconnect hashing, since reconnect is not started";
1413+
final DataSourceHashChunkPreloader hashChunkPreloader = new DataSourceHashChunkPreloader(dataSource);
14451414
// The hash listener will be responsible for flushing stuff to the reconnect data source
1446-
final ReconnectHashListener hashListener = new ReconnectHashListener(reconnectFlusher);
1415+
final ReconnectHashListener hashListener = new ReconnectHashListener(reconnectFlusher, hashChunkPreloader);
14471416

14481417
// This background thread will be responsible for hashing the tree and sending the
14491418
// data to the hash listener to flush.
@@ -1452,7 +1421,7 @@ public void prepareReconnectHashing(final long firstLeafPath, final long lastLea
14521421
.setThreadName("hasher")
14531422
.setRunnable(() -> reconnectHashingFuture.complete(hasher.hash(
14541423
dataSource.getHashChunkHeight(),
1455-
reconnectCache::preloadHashChunk,
1424+
hashChunkPreloader,
14561425
reconnectIterator,
14571426
firstLeafPath,
14581427
lastLeafPath,
@@ -1483,14 +1452,9 @@ public void endLearnerReconnect() {
14831452
logger.warn(RECONNECT.getMarker(), "virtual map hashing thread was never started");
14841453
}
14851454
logger.info(RECONNECT.getMarker(), "call postInit()");
1486-
nodeRemover = null;
14871455
originalMap = null;
14881456
metadata = new VirtualMapMetadata(reconnectState.getSize());
14891457
postInit();
1490-
if (reconnectCache != null) {
1491-
reconnectCache.shutdown();
1492-
reconnectCache = null;
1493-
}
14941458
} catch (ExecutionException e) {
14951459
throw new MerkleSynchronizationException(e);
14961460
} catch (InterruptedException e) {
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
package com.swirlds.virtualmap.datasource;
3+
4+
import edu.umd.cs.findbugs.annotations.NonNull;
5+
import java.io.IOException;
6+
import java.io.UncheckedIOException;
7+
import java.util.Map;
8+
import java.util.Objects;
9+
import java.util.concurrent.ConcurrentHashMap;
10+
import java.util.function.LongFunction;
11+
12+
/**
13+
* A {@link LongFunction<VirtualHashChunk>} implementation that preloads hash chunks from a {@link VirtualDataSource} and caches them in memory.
14+
* {@link #clearCache(long)} can be used to clear hash chunk cache when chunk is processed.
15+
*/
16+
public class DataSourceHashChunkPreloader implements LongFunction<VirtualHashChunk> {
17+
18+
private final VirtualDataSource dataSource;
19+
private final int hashChunkHeight;
20+
21+
/**
22+
* Hash chunk id to chunk map.
23+
*/
24+
private final Map<Long, VirtualHashChunk> hashChunkMap = new ConcurrentHashMap<>();
25+
26+
public DataSourceHashChunkPreloader(@NonNull VirtualDataSource dataSource) {
27+
this.dataSource = Objects.requireNonNull(dataSource, "dataSource must not be null");
28+
hashChunkHeight = dataSource.getHashChunkHeight();
29+
}
30+
31+
/**
32+
* Clear the cached hash chunk with the given id.
33+
* @param hashChunkId hash chunk id
34+
*/
35+
public void clearCache(long hashChunkId) {
36+
hashChunkMap.remove(hashChunkId);
37+
}
38+
39+
@Override
40+
public VirtualHashChunk apply(long path) {
41+
final long hashChunkId = VirtualHashChunk.chunkPathToChunkId(path, hashChunkHeight);
42+
43+
return hashChunkMap.computeIfAbsent(hashChunkId, id -> {
44+
VirtualHashChunk chunk;
45+
try {
46+
chunk = dataSource.loadHashChunk(id);
47+
} catch (IOException e) {
48+
throw new UncheckedIOException(e);
49+
}
50+
if (chunk == null) {
51+
final long hashChunkPath = VirtualHashChunk.chunkIdToChunkPath(hashChunkId, hashChunkHeight);
52+
chunk = new VirtualHashChunk(hashChunkPath, hashChunkHeight);
53+
}
54+
return chunk;
55+
});
56+
}
57+
}

platform-sdk/swirlds-virtualmap/src/main/java/com/swirlds/virtualmap/internal/reconnect/ReconnectHashListener.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
import static java.util.Objects.requireNonNull;
55

6+
import com.swirlds.virtualmap.datasource.DataSourceHashChunkPreloader;
67
import com.swirlds.virtualmap.datasource.VirtualHashChunk;
78
import com.swirlds.virtualmap.datasource.VirtualLeafBytes;
89
import com.swirlds.virtualmap.internal.hash.VirtualHashListener;
@@ -23,14 +24,18 @@
2324
public class ReconnectHashListener implements VirtualHashListener {
2425

2526
private final ReconnectHashLeafFlusher flusher;
27+
private final DataSourceHashChunkPreloader hashChunkPreloader;
2628

2729
/**
2830
* Create a new {@link ReconnectHashListener}.
2931
*
3032
* @param flusher Hash / leaf flusher to use to flush data to disk
3133
*/
32-
public ReconnectHashListener(@NonNull final ReconnectHashLeafFlusher flusher) {
34+
public ReconnectHashListener(
35+
@NonNull final ReconnectHashLeafFlusher flusher,
36+
@NonNull final DataSourceHashChunkPreloader hashChunkPreloader) {
3337
this.flusher = requireNonNull(flusher);
38+
this.hashChunkPreloader = requireNonNull(hashChunkPreloader);
3439
}
3540

3641
/**
@@ -47,6 +52,7 @@ public void onHashingStarted(long firstLeafPath, long lastLeafPath) {
4752
@Override
4853
public void onHashChunkHashed(@NonNull final VirtualHashChunk chunk) {
4954
flusher.updateHashChunk(chunk);
55+
hashChunkPreloader.clearCache(chunk.getChunkId());
5056
}
5157

5258
/**

platform-sdk/swirlds-virtualmap/src/main/java/com/swirlds/virtualmap/internal/reconnect/TeacherPullVirtualTreeView.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
import static com.swirlds.logging.legacy.LogMarker.EXCEPTION;
55

66
import com.swirlds.base.time.Time;
7-
import com.swirlds.common.merkle.synchronization.TeachingSynchronizer;
87
import com.swirlds.common.merkle.synchronization.streams.AsyncInputStream;
98
import com.swirlds.common.merkle.synchronization.streams.AsyncOutputStream;
109
import com.swirlds.common.merkle.synchronization.views.TeacherTreeView;
@@ -57,7 +56,6 @@ public TeacherPullVirtualTreeView(final ReconnectConfig reconnectConfig, final V
5756

5857
@Override
5958
public void startTeacherTasks(
60-
final TeachingSynchronizer teachingSynchronizer,
6159
final Time time,
6260
final StandardWorkGroup workGroup,
6361
final AsyncInputStream in,

platform-sdk/swirlds-virtualmap/src/main/java/com/swirlds/virtualmap/internal/reconnect/TeacherPushVirtualTreeView.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
import static com.swirlds.virtualmap.internal.Path.getRightChildPath;
99

1010
import com.swirlds.base.time.Time;
11-
import com.swirlds.common.merkle.synchronization.TeachingSynchronizer;
1211
import com.swirlds.common.merkle.synchronization.streams.AsyncInputStream;
1312
import com.swirlds.common.merkle.synchronization.streams.AsyncOutputStream;
1413
import com.swirlds.common.merkle.synchronization.task.TeacherPushReceiveTask;
@@ -120,7 +119,6 @@ public TeacherPushVirtualTreeView(final ReconnectConfig reconnectConfig, final V
120119

121120
@Override
122121
public void startTeacherTasks(
123-
final TeachingSynchronizer teachingSynchronizer,
124122
final Time time,
125123
final StandardWorkGroup workGroup,
126124
final AsyncInputStream in,

0 commit comments

Comments
 (0)