Skip to content

Commit f8fa2a9

Browse files
authored
Implement proximity grouping for indexers based on block numbers and … (#101)
* Implement proximity grouping for indexers based on block numbers and dependencies * Enhance proximity grouping logging with detailed summary of groups and indexers * Refactor test for cross-group dependency extraction to clarify standalone group behavior
1 parent 724e27e commit f8fa2a9

File tree

4 files changed

+464
-4
lines changed

4 files changed

+464
-4
lines changed

src/main/kotlin/org.vechain.indexer/IndexerRunner.kt

Lines changed: 68 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package org.vechain.indexer
22

3+
import kotlin.time.Duration
4+
import kotlin.time.Duration.Companion.minutes
35
import kotlin.time.TimeMark
46
import kotlin.time.TimeSource
57
import kotlinx.coroutines.CoroutineScope
@@ -17,6 +19,7 @@ import org.vechain.indexer.exception.ReorgException
1719
import org.vechain.indexer.thor.client.ThorClient
1820
import org.vechain.indexer.utils.ClauseIndexMapping
1921
import org.vechain.indexer.utils.ClauseUtils.buildClauseListWithMapping
22+
import org.vechain.indexer.utils.IndexerOrderUtils.proximityGroups
2023
import org.vechain.indexer.utils.IndexerOrderUtils.topologicalOrder
2124
import org.vechain.indexer.utils.retryOnFailure
2225

@@ -28,7 +31,9 @@ class IndexerRunner(private val timeSource: TimeSource = TimeSource.Monotonic) {
2831
scope: CoroutineScope,
2932
thorClient: ThorClient,
3033
indexers: List<Indexer>,
31-
blockBatchSize: Int = 1
34+
blockBatchSize: Int = 1,
35+
proximityThreshold: Long = 1_000_000L,
36+
reshuffleInterval: Duration = 5.minutes,
3237
): Job {
3338
require(indexers.isNotEmpty()) { "At least one indexer is required" }
3439

@@ -39,6 +44,8 @@ class IndexerRunner(private val timeSource: TimeSource = TimeSource.Monotonic) {
3944
indexers = indexers,
4045
batchSize = blockBatchSize,
4146
thorClient = thorClient,
47+
proximityThreshold = proximityThreshold,
48+
reshuffleInterval = reshuffleInterval,
4249
)
4350
}
4451
}
@@ -48,6 +55,8 @@ class IndexerRunner(private val timeSource: TimeSource = TimeSource.Monotonic) {
4855
indexers: List<Indexer>,
4956
batchSize: Int,
5057
thorClient: ThorClient,
58+
proximityThreshold: Long = 1_000_000L,
59+
reshuffleInterval: Duration = 5.minutes,
5160
): Unit = coroutineScope {
5261
require(indexers.isNotEmpty()) { "At least one indexer is required" }
5362

@@ -63,11 +72,19 @@ class IndexerRunner(private val timeSource: TimeSource = TimeSource.Monotonic) {
6372
nonFastSyncable,
6473
thorClient,
6574
batchSize,
75+
proximityThreshold,
76+
reshuffleInterval,
6677
)
6778
// Re-initialise non-fast-syncable indexers to recover from potential
6879
// mid-block cancellation during the intermediate run
6980
initialise(nonFastSyncable)
70-
runIndexers(indexers, thorClient, batchSize)
81+
runWithProximityGroups(
82+
indexers,
83+
thorClient,
84+
batchSize,
85+
proximityThreshold,
86+
reshuffleInterval,
87+
)
7188
} catch (e: ReorgException) {
7289
logger.error("Reorg detected, restarting all indexers", e)
7390
// Exception caught, job will complete normally and loop will restart
@@ -85,6 +102,8 @@ class IndexerRunner(private val timeSource: TimeSource = TimeSource.Monotonic) {
85102
nonFastSyncable: List<Indexer>,
86103
thorClient: ThorClient,
87104
batchSize: Int,
105+
proximityThreshold: Long = 1_000_000L,
106+
reshuffleInterval: Duration = 5.minutes,
88107
) {
89108
if (fastSyncable.isEmpty()) {
90109
initialise(nonFastSyncable)
@@ -100,7 +119,15 @@ class IndexerRunner(private val timeSource: TimeSource = TimeSource.Monotonic) {
100119
val nonFastJob: Job? =
101120
if (independentNonFast.isNotEmpty()) {
102121
initialise(independentNonFast)
103-
launch { runIndexers(independentNonFast, thorClient, batchSize) }
122+
launch {
123+
runWithProximityGroups(
124+
independentNonFast,
125+
thorClient,
126+
batchSize,
127+
proximityThreshold,
128+
reshuffleInterval,
129+
)
130+
}
104131
} else null
105132

106133
initialiseAndSync(fastSyncable)
@@ -149,6 +176,44 @@ class IndexerRunner(private val timeSource: TimeSource = TimeSource.Monotonic) {
149176
}
150177
}
151178

179+
suspend fun runWithProximityGroups(
180+
indexers: List<Indexer>,
181+
thorClient: ThorClient,
182+
batchSize: Int,
183+
proximityThreshold: Long,
184+
reshuffleInterval: Duration,
185+
) {
186+
while (true) {
187+
val groups = proximityGroups(indexers, proximityThreshold)
188+
if (groups.size <= 1) {
189+
// Steady state — single group, no deadline
190+
runIndexers(indexers, thorClient, batchSize)
191+
return
192+
}
193+
194+
val groupSummary = buildString {
195+
appendLine(
196+
"Proximity groups: ${groups.size} groups, ${indexers.size} indexers, threshold=$proximityThreshold"
197+
)
198+
groups.forEachIndexed { i, g ->
199+
val blockRange =
200+
"${g.minOf { it.getCurrentBlockNumber() }}..${g.maxOf { it.getCurrentBlockNumber() }}"
201+
appendLine(
202+
" Group ${i + 1} (${g.size} indexers, blocks $blockRange): ${g.map { it.name }}"
203+
)
204+
}
205+
}
206+
logger.info(groupSummary.trimEnd())
207+
val deadlineMark = timeSource.markNow() + reshuffleInterval
208+
coroutineScope {
209+
groups.forEach { group ->
210+
launch { runIndexers(group, thorClient, batchSize, deadlineMark) }
211+
}
212+
}
213+
// All groups completed naturally when deadline passed; loop to reshuffle
214+
}
215+
}
216+
152217
suspend fun runIndexers(
153218
indexers: List<Indexer>,
154219
thorClient: ThorClient,

src/main/kotlin/org.vechain.indexer/utils/IndexerOrderUtils.kt

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,99 @@ internal object IndexerOrderUtils {
5757
return listOf(ordered)
5858
}
5959

60+
/**
61+
* Groups indexers by proximity of their current block numbers, preserving dependency chains.
62+
*
63+
* Indexers within [threshold] blocks of each other share a group. Cross-group dependency chains
64+
* are extracted and re-merged into the closest group (or kept standalone). Each group is
65+
* topologically ordered internally.
66+
*/
67+
fun proximityGroups(indexers: List<Indexer>, threshold: Long): List<List<Indexer>> {
68+
if (indexers.isEmpty()) return emptyList()
69+
70+
// Phase A: Initial proximity grouping by current block number
71+
val sorted = indexers.sortedBy { it.getCurrentBlockNumber() }
72+
val groups = mutableListOf<MutableSet<Indexer>>()
73+
var currentGroup = mutableSetOf(sorted.first())
74+
for (i in 1 ..< sorted.size) {
75+
val gap = sorted[i].getCurrentBlockNumber() - sorted[i - 1].getCurrentBlockNumber()
76+
if (gap > threshold) {
77+
groups.add(currentGroup)
78+
currentGroup = mutableSetOf()
79+
}
80+
currentGroup.add(sorted[i])
81+
}
82+
groups.add(currentGroup)
83+
84+
// Phase B: Extract cross-group dependency chains
85+
val indexerSet = indexers.toSet()
86+
val indexerToGroup = mutableMapOf<Indexer, MutableSet<Indexer>>()
87+
for (group in groups) {
88+
for (indexer in group) {
89+
indexerToGroup[indexer] = group
90+
}
91+
}
92+
93+
val extracted = mutableListOf<MutableSet<Indexer>>()
94+
val alreadyExtracted = mutableSetOf<Indexer>()
95+
96+
for (indexer in indexers) {
97+
val dep = indexer.dependsOn ?: continue
98+
val indexerGroup = indexerToGroup[indexer] ?: continue
99+
val depGroup = indexerToGroup[dep] ?: continue
100+
if (indexerGroup === depGroup) continue
101+
if (indexer in alreadyExtracted) continue
102+
103+
// Collect full chain: walk up through dependsOn
104+
val chain = mutableSetOf<Indexer>()
105+
var current: Indexer? = indexer
106+
while (current != null && current in indexerSet) {
107+
chain.add(current)
108+
current = current.dependsOn
109+
}
110+
// Collect downstream dependents
111+
val dependentMap = indexers.filter { it.dependsOn != null }.groupBy { it.dependsOn!! }
112+
val queue = ArrayDeque(chain.toList())
113+
while (queue.isNotEmpty()) {
114+
val node = queue.removeFirst()
115+
dependentMap[node]?.forEach { dependent ->
116+
if (chain.add(dependent)) queue.addLast(dependent)
117+
}
118+
}
119+
120+
// Remove from original groups
121+
for (member in chain) {
122+
indexerToGroup[member]?.remove(member)
123+
alreadyExtracted.add(member)
124+
}
125+
extracted.add(chain)
126+
}
127+
128+
// Phase C: Re-merge extracted chains into closest group
129+
for (chain in extracted) {
130+
val chainMin = chain.minOf { it.getCurrentBlockNumber() }
131+
var closestGroup: MutableSet<Indexer>? = null
132+
var closestGap = Long.MAX_VALUE
133+
for (group in groups) {
134+
if (group.isEmpty()) continue
135+
val groupMin = group.minOf { it.getCurrentBlockNumber() }
136+
val gap = kotlin.math.abs(chainMin - groupMin)
137+
if (gap <= threshold && gap < closestGap) {
138+
closestGap = gap
139+
closestGroup = group
140+
}
141+
}
142+
if (closestGroup != null) {
143+
closestGroup.addAll(chain)
144+
} else {
145+
groups.add(chain)
146+
}
147+
}
148+
149+
// Filter empty groups and topologically order each
150+
return groups.filter { it.isNotEmpty() }.map { topologicalOrder(it.toList()).flatten() }
151+
}
152+
60153
private enum class VisitState {
61154
VISITING,
62155
VISITED,

0 commit comments

Comments
 (0)