Skip to content

Commit 0a5690a

Browse files
committed
common: make monitorQueue sequential, promise-guard concurrent executeApprovedActions, add logging
1 parent 5be3fe6 commit 0a5690a

File tree

4 files changed

+55
-31
lines changed

4 files changed

+55
-31
lines changed

packages/indexer-common/src/allocations/tap-collector.ts

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import {
2121
allocationSigner,
2222
tapAllocationIdProof,
2323
parseGraphQLAllocation,
24+
sequentialTimerMap,
2425
} from '..'
2526
import { BigNumber } from 'ethers'
2627
import pReduce from 'p-reduce'
@@ -183,8 +184,12 @@ export class TapCollector {
183184
}
184185

185186
private getPendingRAVs(): Eventual<RavWithAllocation[]> {
186-
return this.allocations.throttle(RAV_CHECK_INTERVAL_MS).tryMap(
187-
async (allocations) => {
187+
return sequentialTimerMap(
188+
{
189+
logger: this.logger,
190+
milliseconds: RAV_CHECK_INTERVAL_MS,
191+
},
192+
async () => {
188193
let ravs = await this.pendingRAVs()
189194
if (ravs.length === 0) {
190195
this.logger.info(`No pending RAVs to process`)
@@ -193,9 +198,10 @@ export class TapCollector {
193198
if (ravs.length > 0) {
194199
ravs = await this.filterAndUpdateRavs(ravs)
195200
}
196-
this.logger.debug(`matching allocations for pending ravs`, {
197-
allocationCount: allocations.length,
198-
ravCount: ravs.length,
201+
const allocations: Allocation[] = await this.getAllocationsfromAllocationIds(ravs)
202+
this.logger.info(`Retrieved allocations for pending RAVs`, {
203+
ravs: ravs.length,
204+
allocations: allocations.length,
199205
})
200206
return ravs
201207
.map((rav) => {

packages/indexer-common/src/indexer-management/actions.ts

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@ export class ActionManager {
3131
declare models: IndexerManagementModels
3232
declare allocationManagers: NetworkMapped<AllocationManager>
3333

34+
executeBatchActionsPromise: Promise<Action[]> | undefined
35+
3436
static async create(
3537
multiNetworks: MultiNetworks<Network>,
3638
logger: Logger,
@@ -228,19 +230,32 @@ export class ActionManager {
228230
return updatedActions
229231
}
230232

233+
// a promise guard to ensure that only one batch of actions is executed at a time
231234
async executeApprovedActions(network: Network): Promise<Action[]> {
235+
if (this.executeBatchActionsPromise) {
236+
this.logger.warn('Previous batch action execution is still in progress')
237+
return this.executeBatchActionsPromise
238+
}
239+
this.executeBatchActionsPromise = this.executeApprovedActionsInner(network)
240+
const updatedActions = await this.executeBatchActionsPromise
241+
this.executeBatchActionsPromise = undefined
242+
return updatedActions
243+
}
244+
245+
async executeApprovedActionsInner(network: Network): Promise<Action[]> {
232246
let updatedActions: Action[] = []
233247
const protocolNetwork = network.specification.networkIdentifier
234248
const logger = this.logger.child({
235249
function: 'executeApprovedActions',
236250
protocolNetwork,
237251
})
238252

239-
logger.trace('Begin database transaction for executing approved actions')
253+
logger.debug('Begin database transaction for executing approved actions')
240254
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
241255
await this.models.Action.sequelize!.transaction(
242256
{ isolationLevel: Transaction.ISOLATION_LEVELS.SERIALIZABLE },
243257
async (transaction) => {
258+
const transactionOpenTime = Date.now()
244259
let approvedActions
245260
try {
246261
// Execute already approved actions in the order of type and priority.
@@ -276,22 +291,33 @@ export class ActionManager {
276291
return []
277292
}
278293
try {
294+
logger.debug('Executing batch action', {
295+
approvedActions,
296+
startTimeMs: Date.now() - transactionOpenTime,
297+
})
298+
279299
// This will return all results if successful, if failed it will return the failed actions
280300
const allocationManager =
281301
this.allocationManagers[network.specification.networkIdentifier]
282302
const results = await allocationManager.executeBatch(approvedActions)
283303

284304
logger.debug('Completed batch action execution', {
285305
results,
306+
endTimeMs: Date.now() - transactionOpenTime,
286307
})
287308
updatedActions = await this.updateActionStatuses(results, transaction)
309+
310+
logger.debug('Updated action statuses', {
311+
updatedActions,
312+
updatedTimeMs: Date.now() - transactionOpenTime,
313+
})
288314
} catch (error) {
289315
logger.error(`Failed to execute batch tx on staking contract: ${error}`)
290316
throw indexerError(IndexerErrorCode.IE072, error)
291317
}
292318
},
293319
)
294-
logger.trace('End database transaction for executing approved actions')
320+
logger.debug('End database transaction for executing approved actions')
295321
return updatedActions
296322
}
297323

packages/indexer-common/src/indexer-management/allocations.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ export class AllocationManager {
116116
return await this.confirmTransactions(result, actions)
117117
}
118118

119-
async executeTransactions(actions: Action[]): Promise<TransactionResult> {
119+
private async executeTransactions(actions: Action[]): Promise<TransactionResult> {
120120
const logger = this.logger.child({ function: 'executeTransactions' })
121121
logger.trace('Begin executing transactions', { actions })
122122
if (actions.length < 1) {

packages/indexer-common/src/sequential-timer.ts

Lines changed: 15 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -20,17 +20,17 @@ export interface TimerTaskContext {
2020
function logWorkTime(
2121
workStarted: number,
2222
logger: Logger,
23-
loopTime: number,
2423
caller: string | undefined,
2524
milliseconds: number,
2625
) {
2726
const workTimeWarningThreshold = 5000
2827
const workTime = Date.now() - workStarted
2928
if (workTime > milliseconds + workTimeWarningThreshold) {
3029
logger.warn(
31-
'timer work took longer than the sequential timer was configured for (>5s)',
30+
`timer work took ${
31+
(workTime - milliseconds) / 1000
32+
}s longer than expected, next execution in ${milliseconds / 1000}s`,
3233
{
33-
loopTime,
3434
workTime,
3535
milliseconds,
3636
caller,
@@ -57,7 +57,6 @@ export function sequentialTimerReduce<T, U>(
5757
// obtain the calling method name from the call stack
5858
const stack = new Error().stack
5959
const caller = stack?.split('\n')[2].trim()
60-
let lastWorkStarted = Date.now()
6160

6261
let acc: U = initial
6362
let previousT: T | undefined
@@ -74,26 +73,23 @@ export function sequentialTimerReduce<T, U>(
7473
function work() {
7574
const workStarted = Date.now()
7675
const promiseOrT = reducer(acc, workStarted)
77-
const loopTime = workStarted - lastWorkStarted
78-
79-
lastWorkStarted = workStarted
8076
if (isPromiseLike(promiseOrT)) {
8177
promiseOrT.then(
8278
function onfulfilled(value) {
8379
outputReduce(value)
84-
logWorkTime(workStarted, logger, loopTime, caller, milliseconds)
85-
setTimeout(work, Math.max(0, milliseconds - (Date.now() - workStarted)))
80+
logWorkTime(workStarted, logger, caller, milliseconds)
81+
setTimeout(work, milliseconds)
8682
},
8783
function onrejected(err) {
8884
console.error(err)
89-
logWorkTime(workStarted, logger, loopTime, caller, milliseconds)
90-
setTimeout(work, Math.max(0, milliseconds - (Date.now() - workStarted)))
85+
logWorkTime(workStarted, logger, caller, milliseconds)
86+
setTimeout(work, milliseconds)
9187
},
9288
)
9389
} else {
9490
outputReduce(promiseOrT)
95-
logWorkTime(workStarted, logger, loopTime, caller, milliseconds)
96-
setTimeout(work, Math.max(0, milliseconds - (Date.now() - workStarted)))
91+
logWorkTime(workStarted, logger, caller, milliseconds)
92+
setTimeout(work, milliseconds)
9793
}
9894
}
9995
// initial call
@@ -118,8 +114,6 @@ export function sequentialTimerMap<U>(
118114
// obtain the calling method name from the call stack
119115
const stack = new Error().stack
120116
const caller = stack?.split('\n')[2].trim()
121-
let lastWorkStarted = Date.now()
122-
123117
const output = mutable<U>()
124118

125119
let latestU: U | undefined
@@ -135,27 +129,25 @@ export function sequentialTimerMap<U>(
135129
function work() {
136130
const workStarted = Date.now()
137131
const promiseOrU = mapper(workStarted)
138-
const loopTime = workStarted - lastWorkStarted
139-
lastWorkStarted = workStarted
140132

141133
if (isPromiseLike(promiseOrU)) {
142134
promiseOrU.then(
143135
function onfulfilled(value) {
144136
checkMappedValue(value)
145-
logWorkTime(workStarted, logger, loopTime, caller, milliseconds)
146-
setTimeout(work, Math.max(0, milliseconds - (Date.now() - workStarted)))
137+
logWorkTime(workStarted, logger, caller, milliseconds)
138+
setTimeout(work, milliseconds)
147139
},
148140
function onrejected(err) {
149141
options?.onError(err)
150-
logWorkTime(workStarted, logger, loopTime, caller, milliseconds)
151-
setTimeout(work, Math.max(0, milliseconds - (Date.now() - workStarted)))
142+
logWorkTime(workStarted, logger, caller, milliseconds)
143+
setTimeout(work, milliseconds)
152144
},
153145
)
154146
} else {
155147
// resolved value
156148
checkMappedValue(promiseOrU)
157-
logWorkTime(workStarted, logger, loopTime, caller, milliseconds)
158-
setTimeout(work, Math.max(0, milliseconds - (Date.now() - workStarted)))
149+
logWorkTime(workStarted, logger, caller, milliseconds)
150+
setTimeout(work, milliseconds)
159151
}
160152
}
161153

0 commit comments

Comments
 (0)