Skip to content

Commit 064eb5d

Browse files
committed
common: mark actions as PENDING while executing
1 parent 2d83025 commit 064eb5d

File tree

1 file changed

+109
-33
lines changed
  • packages/indexer-common/src/indexer-management

1 file changed

+109
-33
lines changed

packages/indexer-common/src/indexer-management/actions.ts

Lines changed: 109 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,41 @@ export class ActionManager {
206206
})
207207
}
208208

209-
private async updateActionStatuses(
209+
/**
210+
* Mark actions with the given status.
211+
* @param actions
212+
* @param transaction
213+
* @param status
214+
* @returns updated actions
215+
* @throws error if the update fails
216+
*/
217+
private async markActions(
218+
actions: Action[],
219+
transaction: Transaction,
220+
status: ActionStatus,
221+
): Promise<Action[]> {
222+
const ids = actions.map((action) => action.id)
223+
const [, updatedActions] = await this.models.Action.update(
224+
{
225+
status,
226+
},
227+
{
228+
where: { id: ids },
229+
returning: true,
230+
transaction,
231+
},
232+
)
233+
return updatedActions
234+
}
235+
236+
/**
237+
* Update the action statuses from the results provided by execution.
238+
*
239+
* @param results
240+
* @param transaction
241+
* @returns updated actions
242+
*/
243+
private async updateActionStatusesWithResults(
210244
results: AllocationResult[],
211245
transaction: Transaction,
212246
): Promise<Action[]> {
@@ -255,12 +289,14 @@ export class ActionManager {
255289
protocolNetwork,
256290
})
257291

258-
logger.debug('Begin database transaction for executing approved actions')
292+
logger.debug('Begin executing approved actions')
293+
let batchStartTime
294+
259295
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
260-
await this.models.Action.sequelize!.transaction(
296+
const prioritizedActions: Action[] = await this.models.Action.sequelize!.transaction(
261297
{ isolationLevel: Transaction.ISOLATION_LEVELS.SERIALIZABLE },
262298
async (transaction) => {
263-
const transactionOpenTime = Date.now()
299+
batchStartTime = Date.now()
264300
let approvedActions
265301
try {
266302
// Execute already approved actions in the order of type and priority.
@@ -271,10 +307,7 @@ export class ActionManager {
271307
const actionTypePriority = ['unallocate', 'reallocate', 'allocate']
272308
approvedActions = (
273309
await this.models.Action.findAll({
274-
where: {
275-
status: ActionStatus.APPROVED,
276-
protocolNetwork,
277-
},
310+
where: { status: ActionStatus.APPROVED, protocolNetwork },
278311
order: [['priority', 'ASC']],
279312
transaction,
280313
lock: transaction.LOCK.UPDATE,
@@ -283,6 +316,16 @@ export class ActionManager {
283316
return actionTypePriority.indexOf(a.type) - actionTypePriority.indexOf(b.type)
284317
})
285318

319+
const pendingActions = await this.models.Action.findAll({
320+
where: { status: ActionStatus.PENDING, protocolNetwork },
321+
order: [['priority', 'ASC']],
322+
transaction,
323+
})
324+
if (pendingActions.length > 0) {
325+
logger.warn(`${pendingActions} Actions found in PENDING state when execution began. Was there a crash? \
326+
These indicate that execution was interrupted and will need to be cleared manually.`)
327+
}
328+
286329
if (approvedActions.length === 0) {
287330
logger.debug('No approved actions were found for this network')
288331
return []
@@ -295,34 +338,67 @@ export class ActionManager {
295338
logger.error('Failed to query approved actions for network', { error })
296339
return []
297340
}
298-
try {
299-
logger.debug('Executing batch action', {
300-
approvedActions,
301-
startTimeMs: Date.now() - transactionOpenTime,
302-
})
341+
// mark all approved actions as PENDING, this serves as a lock on other processing of them
342+
await this.markActions(approvedActions, transaction, ActionStatus.PENDING)
343+
return prioritizedActions
344+
},
345+
)
303346

304-
// This will return all results if successful, if failed it will return the failed actions
305-
const allocationManager =
306-
this.allocationManagers[network.specification.networkIdentifier]
307-
const results = await allocationManager.executeBatch(approvedActions)
347+
try {
348+
logger.debug('Executing batch action', {
349+
prioritizedActions,
350+
startTimeMs: Date.now() - batchStartTime,
351+
})
352+
353+
const allocationManager =
354+
this.allocationManagers[network.specification.networkIdentifier]
355+
356+
let results
357+
try {
358+
// This will return all results if successful, if failed it will return the failed actions
359+
results = await allocationManager.executeBatch(prioritizedActions)
360+
logger.debug('Completed batch action execution', {
361+
results,
362+
endTimeMs: Date.now() - batchStartTime,
363+
})
364+
} catch (error) {
365+
// Release the actions from the PENDING state. This means they will be retried again on the next batch execution.
366+
logger.error(
367+
`Error raised during executeBatch, releasing ${prioritizedActions.length} actions from PENDING state. \
368+
These will be attempted again on the next batch.`,
369+
error,
370+
)
371+
await this.models.Action.sequelize!.transaction(
372+
{ isolationLevel: Transaction.ISOLATION_LEVELS.SERIALIZABLE },
373+
async (transaction) => {
374+
return await this.markActions(
375+
prioritizedActions,
376+
transaction,
377+
ActionStatus.APPROVED,
378+
)
379+
},
380+
)
381+
return []
382+
}
308383

309-
logger.debug('Completed batch action execution', {
310-
results,
311-
endTimeMs: Date.now() - transactionOpenTime,
312-
})
313-
updatedActions = await this.updateActionStatuses(results, transaction)
384+
// Happy path: execution went well (success or failure but no exceptions). Update the actions with the results.
385+
updatedActions = await this.models.Action.sequelize!.transaction(
386+
{ isolationLevel: Transaction.ISOLATION_LEVELS.SERIALIZABLE },
387+
async (transaction) => {
388+
return await this.updateActionStatusesWithResults(results, transaction)
389+
},
390+
)
314391

315-
logger.debug('Updated action statuses', {
316-
updatedActions,
317-
updatedTimeMs: Date.now() - transactionOpenTime,
318-
})
319-
} catch (error) {
320-
logger.error(`Failed to execute batch tx on staking contract: ${error}`)
321-
throw indexerError(IndexerErrorCode.IE072, error)
322-
}
323-
},
324-
)
325-
logger.debug('End database transaction for executing approved actions')
392+
logger.debug('Updated action statuses', {
393+
updatedActions,
394+
updatedTimeMs: Date.now() - batchStartTime,
395+
})
396+
} catch (error) {
397+
logger.error(`Failed to execute batch tx on staking contract: ${error}`)
398+
throw indexerError(IndexerErrorCode.IE072, error)
399+
}
400+
401+
logger.debug('End executing approved actions')
326402
return updatedActions
327403
}
328404

0 commit comments

Comments
 (0)