Skip to content

Commit c104a01

Browse files
authored
Stability (#92)
* Add rate limit handling for event logs, VET transfers, and clause inspection * Refactor IndexerRunnerTest to improve block processing stubs and remove disabled tests * Bump project version to 7.1.1
1 parent fab461e commit c104a01

File tree

5 files changed

+153
-23
lines changed

5 files changed

+153
-23
lines changed

build.gradle.kts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ jacoco {
2020

2121
group = "org.vechain"
2222

23-
val projectVersion = System.getenv("PROJECT_VERSION") ?: "7.1.0"
23+
val projectVersion = System.getenv("PROJECT_VERSION") ?: "7.1.1"
2424
version = projectVersion
2525

2626
val isSnapshot = version.toString().endsWith("SNAPSHOT")

src/main/kotlin/org.vechain.indexer/IndexerRunner.kt

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import kotlinx.coroutines.launch
1515
import kotlinx.coroutines.withTimeoutOrNull
1616
import org.slf4j.Logger
1717
import org.slf4j.LoggerFactory
18+
import org.vechain.indexer.exception.RateLimitException
1819
import org.vechain.indexer.exception.ReorgException
1920
import org.vechain.indexer.thor.client.ThorClient
2021
import org.vechain.indexer.thor.model.Block
@@ -42,6 +43,9 @@ open class IndexerRunner {
4243
companion object {
4344
private const val BLOCK_INTERVAL_SECONDS = 10L
4445
private const val DEFAULT_RESHUFFLE_INTERVAL_MS = 300_000L
46+
private const val INITIAL_RETRY_DELAY_MS = 1_000L
47+
private const val MAX_RETRY_DELAY_MS = 60_000L
48+
private const val RATE_LIMIT_DELAY_MS = 30_000L
4549

4650
@Suppress("unused")
4751
fun launch(
@@ -409,6 +413,7 @@ open class IndexerRunner {
409413
}
410414

411415
private suspend fun <T> retryUntilSuccess(operation: suspend () -> T): T {
416+
var delayMs = INITIAL_RETRY_DELAY_MS
412417
while (true) {
413418
try {
414419
return operation()
@@ -417,9 +422,14 @@ open class IndexerRunner {
417422
} catch (e: ReorgException) {
418423
logger.error("Reorg detected, propagating to restart indexers", e)
419424
throw e
425+
} catch (e: RateLimitException) {
426+
logger.warn("Rate limited, backing off {}ms", RATE_LIMIT_DELAY_MS, e)
427+
delay(RATE_LIMIT_DELAY_MS)
428+
delayMs = INITIAL_RETRY_DELAY_MS
420429
} catch (e: Exception) {
421-
logger.error("Operation failed, retrying...", e)
422-
delay(1000L)
430+
logger.error("Operation failed, retrying in {}ms...", delayMs, e)
431+
delay(delayMs)
432+
delayMs = (delayMs * 2).coerceAtMost(MAX_RETRY_DELAY_MS)
423433
}
424434
}
425435
}

src/main/kotlin/org.vechain.indexer/thor/client/DefaultThorClient.kt

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -127,12 +127,16 @@ open class DefaultThorClient(
127127

128128
override suspend fun getEventLogs(req: EventLogsRequest): List<EventLog> =
129129
withContext(Dispatchers.IO) {
130-
val (_, _, result) =
130+
val (_, response, result) =
131131
Fuel.post("$baseUrl/logs/event")
132132
.body(JsonUtils.mapper.writeValueAsBytes(req))
133133
.appendHeader(*headers)
134134
.response()
135135

136+
if (response.statusCode == HTTP_TOO_MANY_REQUESTS) {
137+
throw RateLimitException("Rate limited fetching event logs")
138+
}
139+
136140
val responseBody =
137141
when (result) {
138142
is Result.Success -> result.get().toString(Charsets.UTF_8)
@@ -147,12 +151,16 @@ open class DefaultThorClient(
147151

148152
override suspend fun getVetTransfers(req: TransferLogsRequest): List<TransferLog> =
149153
withContext(Dispatchers.IO) {
150-
val (_, _, result) =
154+
val (_, response, result) =
151155
Fuel.post("$baseUrl/logs/transfer")
152156
.body(JsonUtils.mapper.writeValueAsBytes(req))
153157
.appendHeader(*headers)
154158
.response()
155159

160+
if (response.statusCode == HTTP_TOO_MANY_REQUESTS) {
161+
throw RateLimitException("Rate limited fetching VET transfers")
162+
}
163+
156164
val responseBody =
157165
when (result) {
158166
is Result.Success -> result.get().toString(Charsets.UTF_8)
@@ -173,12 +181,16 @@ open class DefaultThorClient(
173181
val req = InspectionRequest(clauses)
174182
val body = JsonUtils.mapper.writeValueAsBytes(req)
175183
val params = if (revision != null) listOf("revision" to revision.value) else emptyList()
176-
val (_, _, result) =
184+
val (_, response, result) =
177185
Fuel.post(path = "$baseUrl/accounts/*", parameters = params)
178186
.body(body)
179187
.appendHeader(*headers)
180188
.response()
181189

190+
if (response.statusCode == HTTP_TOO_MANY_REQUESTS) {
191+
throw RateLimitException("Rate limited inspecting clauses")
192+
}
193+
182194
val responseBody =
183195
when (result) {
184196
is Result.Success -> result.get().toString(Charsets.UTF_8)

src/test/kotlin/org/vechain/indexer/IndexerRunnerTest.kt

Lines changed: 79 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,13 @@ import kotlinx.coroutines.ExperimentalCoroutinesApi
1111
import kotlinx.coroutines.cancelAndJoin
1212
import kotlinx.coroutines.delay
1313
import kotlinx.coroutines.launch
14+
import kotlinx.coroutines.test.currentTime
1415
import kotlinx.coroutines.test.runTest
15-
import org.junit.jupiter.api.Disabled
1616
import org.junit.jupiter.api.Nested
1717
import org.junit.jupiter.api.Test
1818
import org.junit.jupiter.api.assertThrows
1919
import org.vechain.indexer.BlockTestBuilder.Companion.buildBlock
20+
import org.vechain.indexer.exception.RateLimitException
2021
import org.vechain.indexer.exception.ReorgException
2122
import org.vechain.indexer.thor.client.ThorClient
2223
import org.vechain.indexer.thor.model.Block
@@ -174,6 +175,50 @@ internal class IndexerRunnerTest {
174175
coVerify(exactly = 1) { indexer.initialise() }
175176
}
176177

178+
@Test
179+
fun `should use exponential backoff on repeated failures`() = runTest {
180+
var initAttempts = 0
181+
val indexer =
182+
createMockIndexer(
183+
name = "indexer1",
184+
initializeBlock = {
185+
initAttempts++
186+
if (initAttempts < 4) {
187+
throw RuntimeException("Init failed")
188+
}
189+
}
190+
)
191+
192+
val runner = IndexerRunner()
193+
runner.initialiseAll(listOf(indexer))
194+
195+
expectThat(initAttempts).isEqualTo(4)
196+
// Delays: 1s + 2s + 4s = 7s total virtual time
197+
expectThat(currentTime).isEqualTo(7_000L)
198+
}
199+
200+
@Test
201+
fun `should use longer delay for RateLimitException`() = runTest {
202+
var initAttempts = 0
203+
val indexer =
204+
createMockIndexer(
205+
name = "indexer1",
206+
initializeBlock = {
207+
initAttempts++
208+
if (initAttempts < 2) {
209+
throw RateLimitException("Rate limited")
210+
}
211+
}
212+
)
213+
214+
val runner = IndexerRunner()
215+
runner.initialiseAll(listOf(indexer))
216+
217+
expectThat(initAttempts).isEqualTo(2)
218+
// Rate limit delay is 30s
219+
expectThat(currentTime).isEqualTo(30_000L)
220+
}
221+
177222
@Test
178223
fun `should complete even if one indexer is slow`() = runTest {
179224
val fastIndexer = createMockIndexer("fast")
@@ -280,7 +325,6 @@ internal class IndexerRunnerTest {
280325
}
281326

282327
@Test
283-
@Disabled("Causes JVM instrumentation crash with byte-buddy agent")
284328
fun `BlockIndexer starts processing while LogsIndexer fast syncs`() = runTest {
285329
val processingStarted = mutableListOf<String>()
286330
var fastSyncCompleted = false
@@ -307,7 +351,14 @@ internal class IndexerRunnerTest {
307351
)
308352

309353
val thorClient = mockk<ThorClient>()
310-
coEvery { thorClient.waitForBlock(any<BlockRevision>()) } returns buildBlock(num = 0L)
354+
// Register catch-all first (with delay), then specific stub (MockK LIFO)
355+
coEvery { thorClient.waitForBlock(any<BlockRevision>()) } coAnswers
356+
{
357+
delay(5000)
358+
buildBlock(num = (firstArg<BlockRevision>() as BlockRevision.Number).number)
359+
}
360+
coEvery { thorClient.waitForBlock(BlockRevision.Number(0L)) } returns
361+
buildBlock(num = 0L)
311362

312363
val runner = IndexerRunner()
313364
runner.fastSyncWithEarlyProcessing(
@@ -326,7 +377,6 @@ internal class IndexerRunnerTest {
326377
inner class RunWithDynamicGroups {
327378

328379
@Test
329-
@Disabled("Causes JVM instrumentation crash with byte-buddy agent")
330380
fun `single group delegates to runAllIndexers`() = runTest {
331381
val thorClient = mockk<ThorClient>()
332382
val block0 = buildBlock(num = 0L)
@@ -335,7 +385,13 @@ internal class IndexerRunnerTest {
335385
val indexer1 = createMockIndexer("indexer1", currentBlock = 0L)
336386
val indexer2 = createMockIndexer("indexer2", currentBlock = 0L)
337387

338-
coEvery { thorClient.waitForBlock(any<BlockRevision>()) } returns block0
388+
// Register catch-all with delay first, then specific (MockK LIFO)
389+
coEvery { thorClient.waitForBlock(any<BlockRevision>()) } coAnswers
390+
{
391+
delay(5000)
392+
buildBlock(num = (firstArg<BlockRevision>() as BlockRevision.Number).number)
393+
}
394+
coEvery { thorClient.waitForBlock(BlockRevision.Number(0L)) } returns block0
339395

340396
val runner = IndexerRunner()
341397
val job = launch {
@@ -350,23 +406,23 @@ internal class IndexerRunnerTest {
350406
}
351407

352408
@Test
353-
@Disabled("Causes JVM instrumentation crash with byte-buddy agent")
354409
fun `multiple groups process independently`() = runTest {
355410
val thorClient = mockk<ThorClient>()
356411

357412
// Two indexers far apart -> two groups
358413
val indexer1 = createMockIndexer("indexer1", currentBlock = 0L)
359414
val indexer2 = createMockIndexer("indexer2", currentBlock = 200_000L)
360415

361-
coEvery { thorClient.waitForBlock(BlockRevision.Number(0L)) } returns
362-
buildBlock(num = 0L)
363-
coEvery { thorClient.waitForBlock(BlockRevision.Number(200_000L)) } returns
364-
buildBlock(num = 200_000L)
416+
// Register catch-all with delay first, then specific stubs (MockK LIFO)
365417
coEvery { thorClient.waitForBlock(any<BlockRevision>()) } coAnswers
366418
{
367419
delay(5000)
368420
buildBlock(num = (firstArg<BlockRevision>() as BlockRevision.Number).number)
369421
}
422+
coEvery { thorClient.waitForBlock(BlockRevision.Number(0L)) } returns
423+
buildBlock(num = 0L)
424+
coEvery { thorClient.waitForBlock(BlockRevision.Number(200_000L)) } returns
425+
buildBlock(num = 200_000L)
370426

371427
val runner = IndexerRunner()
372428
val job = launch {
@@ -420,7 +476,6 @@ internal class IndexerRunnerTest {
420476
}
421477

422478
@Test
423-
@Disabled("Test timing issue - blocks not processed before cancellation")
424479
fun `should process blocks through all indexers in same group concurrently`() = runTest {
425480
val thorClient = mockk<ThorClient>()
426481
val block0 = buildBlock(num = 0L)
@@ -456,12 +511,13 @@ internal class IndexerRunnerTest {
456511
}
457512
}
458513

459-
coEvery { thorClient.waitForBlock(BlockRevision.Number(0L)) } returns block0
514+
// Register catch-all with delay first, then specific (MockK LIFO)
460515
coEvery { thorClient.waitForBlock(any<BlockRevision>()) } coAnswers
461516
{
462517
delay(5000)
463518
buildBlock(num = (firstArg<BlockRevision>() as BlockRevision.Number).number)
464519
}
520+
coEvery { thorClient.waitForBlock(BlockRevision.Number(0L)) } returns block0
465521

466522
val runner = IndexerRunner()
467523
val job = launch { runner.runAllIndexers(listOf(indexer1, indexer2), thorClient, 1) }
@@ -542,7 +598,6 @@ internal class IndexerRunnerTest {
542598
}
543599

544600
@Test
545-
@Disabled("Causes OutOfMemoryError during test execution")
546601
fun `should retry block processing on failure`() = runTest {
547602
val thorClient = mockk<ThorClient>()
548603
val block0 = buildBlock(num = 0L)
@@ -566,7 +621,14 @@ internal class IndexerRunnerTest {
566621
}
567622
}
568623

569-
coEvery { thorClient.waitForBlock(any<BlockRevision>()) } returns block0
624+
// Return correct block numbers; delay after block 0 to prevent infinite loop
625+
coEvery { thorClient.waitForBlock(any<BlockRevision>()) } coAnswers
626+
{
627+
val blockNum = (firstArg<BlockRevision>() as BlockRevision.Number).number
628+
if (blockNum > 0L) delay(5000)
629+
buildBlock(num = blockNum)
630+
}
631+
coEvery { thorClient.waitForBlock(BlockRevision.Number(0L)) } returns block0
570632

571633
val runner = IndexerRunner()
572634
val job = launch { runner.runAllIndexers(listOf(indexer), thorClient, 1) }
@@ -974,21 +1036,21 @@ internal class IndexerRunnerTest {
9741036
}
9751037

9761038
@Test
977-
@Disabled("Test timing issue - processBlock not called before cancellation")
9781039
fun `should handle single indexer in multiple groups scenario`() = runTest {
9791040
val thorClient = mockk<ThorClient>()
9801041
val block0 = buildBlock(num = 0L)
9811042
val block1 = buildBlock(num = 1L)
9821043

9831044
val indexer = createMockIndexer("indexer1", currentBlock = 0L)
9841045

985-
coEvery { thorClient.waitForBlock(BlockRevision.Number(0L)) } returns block0
986-
coEvery { thorClient.waitForBlock(BlockRevision.Number(1L)) } returns block1
1046+
// Register catch-all with delay first, then specific stubs (MockK LIFO)
9871047
coEvery { thorClient.waitForBlock(any<BlockRevision>()) } coAnswers
9881048
{
9891049
delay(5000) // Block future fetches to prevent OOM
9901050
buildBlock(num = (firstArg<BlockRevision>() as BlockRevision.Number).number)
9911051
}
1052+
coEvery { thorClient.waitForBlock(BlockRevision.Number(0L)) } returns block0
1053+
coEvery { thorClient.waitForBlock(BlockRevision.Number(1L)) } returns block1
9921054

9931055
val runner = IndexerRunner()
9941056
val job = launch { runner.runAllIndexers(listOf(indexer), thorClient, 1) }

0 commit comments

Comments
 (0)