Skip to content

Commit 724e27e

Browse files
authored
sync during fast sync phase (#96)
* Refactor retry logic to use a new utility function with exponential backoff and jitter * Bump project version to 7.1.5 * Implement BlockFetcher for parallel block fetching and inspection result prefetching * Add ClauseUtils for mapping indexers to clause indices and deduplicating clauses * Refactor indexer initialization and syncing to improve concurrency and error handling * Refactor indexer methods for clarity and consistency in naming * Add deadline handling to BlockFetcher and IndexerRunner for improved control over block fetching duration * Refactor IndexerRunner to remove duration handling and simplify run logic * Implement FastSyncable interface for indexers and update IndexerRunner to conditionally call fastSync * Implement FastSyncable interface for indexers and update IndexerRunner to conditionally call fastSync * Exclude non-fast-syncable indexers depending on fast-syncable from intermediate run
1 parent 28c2ee5 commit 724e27e

File tree

10 files changed

+1484
-383
lines changed

10 files changed

+1484
-383
lines changed
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
package org.vechain.indexer
2+
3+
import kotlin.time.TimeMark
4+
import kotlinx.coroutines.async
5+
import kotlinx.coroutines.channels.Channel
6+
import kotlinx.coroutines.coroutineScope
7+
import kotlinx.coroutines.isActive
8+
import org.slf4j.Logger
9+
import org.slf4j.LoggerFactory
10+
import org.vechain.indexer.thor.client.ThorClient
11+
import org.vechain.indexer.thor.model.BlockRevision
12+
import org.vechain.indexer.thor.model.Clause
13+
import org.vechain.indexer.thor.model.InspectionResult
14+
import org.vechain.indexer.utils.retryOnFailure
15+
16+
/**
17+
* Data class holding a block with its pre-fetched inspection results. Used to pipeline block
18+
* fetching and contract calls ahead of processing.
19+
*/
20+
data class PreparedBlock(
21+
val block: org.vechain.indexer.thor.model.Block,
22+
val inspectionResults: List<InspectionResult>,
23+
)
24+
25+
class BlockFetcher(
26+
private val thorClient: ThorClient,
27+
private val allClauses: List<Clause>,
28+
) {
29+
private val logger: Logger = LoggerFactory.getLogger(this::class.java)
30+
31+
companion object {
32+
internal const val BLOCK_INTERVAL_SECONDS = 10L
33+
}
34+
35+
/**
36+
* Prefetches blocks and their inspection results in parallel while maintaining order. Uses a
37+
* sliding window of concurrent fetches to hide network latency.
38+
*/
39+
suspend fun prefetchBlocksInOrder(
40+
startBlock: Long,
41+
maxBatchSize: Int,
42+
groupChannels: List<Channel<PreparedBlock>>,
43+
deadlineMark: TimeMark? = null,
44+
) = coroutineScope {
45+
require(startBlock >= 0) { "startBlock must be >= 0" }
46+
require(maxBatchSize >= 1) { "maxBatchSize must be >= 1" }
47+
48+
var nextBlockNumber = startBlock
49+
var lastBlockTimestamp: Long? = null
50+
51+
while (isActive && (deadlineMark == null || deadlineMark.hasNotPassedNow())) {
52+
val currentBlock = nextBlockNumber
53+
val windowSize = calculateWindowSize(lastBlockTimestamp, maxBatchSize)
54+
logger.debug("Block fetch window size: $windowSize")
55+
// Launch prefetch for next batch of blocks in parallel
56+
val deferredBlocks =
57+
(0 ..< windowSize).map { offset ->
58+
val blockNum = currentBlock + offset
59+
async { fetchAndPrepareBlock(blockNum) }
60+
}
61+
62+
// Await and send in order
63+
deferredBlocks.forEach { deferred ->
64+
val preparedBlock = deferred.await()
65+
groupChannels.forEach { channel -> channel.send(preparedBlock) }
66+
nextBlockNumber++
67+
lastBlockTimestamp = preparedBlock.block.timestamp
68+
}
69+
}
70+
}
71+
72+
/**
73+
* Fetches a block and performs inspection calls, returning a PreparedBlock. This combines the
74+
* network calls so they can be pipelined.
75+
*/
76+
internal suspend fun fetchAndPrepareBlock(blockNumber: Long): PreparedBlock {
77+
return retryOnFailure {
78+
val block = thorClient.waitForBlock(BlockRevision.Number(blockNumber))
79+
val inspectionResults =
80+
if (allClauses.isNotEmpty()) {
81+
thorClient.inspectClauses(allClauses, BlockRevision.Id(block.id))
82+
} else {
83+
emptyList()
84+
}
85+
PreparedBlock(block, inspectionResults)
86+
}
87+
}
88+
89+
internal fun calculateWindowSize(
90+
lastBlockTimestampSeconds: Long?,
91+
maxPrefetchSize: Int,
92+
): Int {
93+
if (lastBlockTimestampSeconds == null) {
94+
return maxPrefetchSize
95+
}
96+
val nowSeconds = System.currentTimeMillis() / 1000
97+
val secondsBehind = (nowSeconds - lastBlockTimestampSeconds).coerceAtLeast(0)
98+
val estimatedBlocksBehind =
99+
(secondsBehind / BLOCK_INTERVAL_SECONDS).toInt().coerceAtLeast(0) + 1
100+
return minOf(maxPrefetchSize, estimatedBlocksBehind)
101+
}
102+
}

src/main/kotlin/org.vechain.indexer/Indexer.kt

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -65,14 +65,13 @@ interface Indexer : IndexerProcessor {
6565
// Optional inspection clauses for contract calls during block processing
6666
fun getInspectionClauses(): List<Clause>? = null
6767

68-
// Optional fast sync phase
69-
suspend fun fastSync() {
70-
// Default implementation does nothing
71-
}
72-
7368
fun shutDown()
7469
}
7570

71+
interface FastSyncableIndexer : Indexer {
72+
suspend fun fastSync()
73+
}
74+
7675
sealed class IndexingResult {
7776
abstract val status: Status
7877

0 commit comments

Comments
 (0)