Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions packages/executor/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
"scripts": {
"build": "tsc",
"start": "node dist/index.js",
"dev": "ts-node src/index.ts"
"dev": "ts-node src/index.ts",
"test": "vitest",
"test:run": "vitest run"
},
"keywords": [],
"author": "",
Expand All @@ -15,7 +17,8 @@
"@types/node": "^20.12.11",
"@types/node-telegram-bot-api": "^0.64.7",
"ts-node": "^10.9.2",
"typescript": "^5.4.5"
"typescript": "^5.4.5",
"vitest": "^3.2.4"
},
"dependencies": {
"chalk": "4",
Expand Down
143 changes: 98 additions & 45 deletions packages/executor/src/executorHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,61 @@ async function initializeCounters() {
}
}

async function incrementRetryCount(position: Position): Promise<number> {
const retryKey = `retry:${position.network}:${position.marketAddress}:${position.positionId}`
const retries = await redis.incr(retryKey)
return retries
/**
* Check if an error is a market-specific error (OVLV1:!data)
* These errors indicate the market is not in a state to process liquidations
* but may become available later
*/
export async function isMarketError(error: any): Promise<boolean> {
if (error?.message?.includes('OVLV1:!data')) {
return true
}
if (error?.error?.message?.includes('OVLV1:!data')) {
return true
}
return false
}

/**
* Remove position from all Redis data structures
* Used when position is no longer liquidatable or successfully liquidated
*/
export async function removePositionFromRedis(position: Position) {
const { positionId, marketAddress, network } = position
await redis.hdel(`positions:${network}:${marketAddress}`, positionId)
await redis.zrem(`position_index:${network}:${marketAddress}`, positionId)
}

async function resetRetryCount(position: Position) {
const retryKey = `retry:${position.network}:${position.marketAddress}:${position.positionId}`
await redis.del(retryKey)
/**
* Check if a position is still liquidatable by querying the market state contract
* This prevents us from retrying positions that are no longer valid
*/
export async function checkIfPositionStillLiquidatable(position: Position): Promise<boolean> {
try {
const { marketAddress, network, positionId, owner } = position

const provider = new ethers.providers.JsonRpcProvider(networksConfig[network].rpc_url)

// Use the same market state contract that liquidation-checker uses
const ovlMarketStateContract = new ethers.Contract(
'0x10575a9C8F36F9F42D7DB71Ef179eD9BEf8Df238', // ovl_state_address from config
[
'function liquidatable(address market, address owner, uint256 positionId) view returns (bool)'
],
provider
)

// Check if position is still liquidatable
const isLiquidatable = await ovlMarketStateContract.liquidatable(marketAddress, owner, parseInt(positionId))
Comment on lines +75 to +91

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P1] Avoid parsing positionId into a JavaScript number

The new liquidatability check converts position.positionId with parseInt before calling the contract. parseInt returns a JavaScript number, which loses precision once the value exceeds Number.MAX_SAFE_INTEGER. For large position IDs the contract is called with a truncated ID, so valid positions may be reported as non‑liquidatable and removed from Redis while still being liquidatable on chain. The call can take the string or a BigNumber directly—no conversion is needed.

Useful? React with 👍 / 👎.


log(chalk.blue(`Position ${positionId} liquidatability check: ${isLiquidatable}`))
return isLiquidatable

} catch (error) {
// If we can't check liquidatability, assume it's not liquidatable to be safe
log(chalk.red(`Failed to check liquidatability for position ${position.positionId}:`, error))
return false
}
}

async function sendTelegramMessage(message: string) {
Expand Down Expand Up @@ -81,7 +127,7 @@ async function reportLiquidationError(position: Position, retries: number, error
await sendTelegramMessage(message)
}

async function liquidatePosition(position: Position) {
export async function liquidatePosition(position: Position) {
const { positionId, owner, marketAddress, network } = position

try {
Expand Down Expand Up @@ -125,9 +171,7 @@ async function liquidatePosition(position: Position) {
)

// remove position from Redis
await redis.hdel(`positions:${network}:${marketAddress}`, positionId)
await redis.zrem(`position_index:${network}:${marketAddress}`, positionId)
await resetRetryCount(position)
await removePositionFromRedis(position)

// track on redis total liquidated positions by executor
const executorKey = `total_liquidated_positions_by_executor:${network}:${wallet.address}`
Expand Down Expand Up @@ -162,40 +206,29 @@ async function liquidatePosition(position: Position) {
log(`${chalk.bgRed('Transaction failed! =>')} ${chalk.yellow('network:')} ${network} ${chalk.yellow('positionId:')} ${positionId}`)
log(chalk.red(error))

// Store error type in local variable for final reporting
let errorType = 'Transaction Failed'
if (error instanceof Error) {
if (error.message.includes('insufficient funds')) {
errorType = 'Insufficient Funds'
} else if (error.message.includes('nonce')) {
errorType = 'Nonce Error'
} else if (error.message.includes('revert')) {
errorType = 'Transaction Reverted'
} else if (error.message.includes('gas')) {
errorType = 'Gas Error'
}
}
// Check if position is still liquidatable before deciding what to do
const isStillLiquidatable = await checkIfPositionStillLiquidatable(position)

// Store in a simple variable for the final report
position.lastErrorType = errorType
}

// increment retry count and check if it exceeds the max retries
const retries = await incrementRetryCount(position);

if (retries > MAX_RETRIES) {
log(chalk.bold.red(`Position ${position.positionId} exceeded max retries, removing from queue`));

// Report error to Telegram when max retries exceeded
const errorType = position.lastErrorType || 'Max Retries Exceeded'
await reportLiquidationError(position, retries, errorType)

await redis.hdel(`positions:${network}:${marketAddress}`, position.positionId)
await redis.zrem(`position_index:${network}:${marketAddress}`, position.positionId)
await resetRetryCount(position)
} else {
log(chalk.bgBlue(`Re-queuing position ${position.positionId} for retry ${retries} of ${MAX_RETRIES}`))
await redis.lpush('liquidatable_positions', JSON.stringify(position))
if (isStillLiquidatable) {
// Determine wait time based on error type
const waitMinutes = await isMarketError(error) ? 7 : 3
const waitTime = Date.now() + (waitMinutes * 60 * 1000)
log(chalk.yellow(`Position ${positionId} failed but still liquidatable, retrying in ${waitMinutes} minutes`))
// Use separate delayed queue to avoid infinite loop
await redis.zadd('delayed_positions', waitTime, JSON.stringify(position))
return // Exit without incrementing retry count
} else {
// Position is no longer liquidatable - notify and remove immediately
log(chalk.red(`Position ${positionId} no longer liquidatable, removing from queue`))
// Notify only when position is removed (no spam)
const errorType = 'Position No Longer Liquidatable'
await reportLiquidationError(position, 0, errorType)
await removePositionFromRedis(position)
return
}
}
}

Expand All @@ -213,7 +246,23 @@ export async function liquidablePositionsListener() {

while (true) {
try {
const result = await redis.brpop('liquidatable_positions', 0)
// 1. Process delayed positions that are ready to retry (immediate, non-blocking)
const now = Date.now()
const readyDelayed = await redis.zrangebyscore('delayed_positions', 0, now, 'LIMIT', 0, 1)

if (readyDelayed.length > 0) {
const positionData = readyDelayed[0]
const delayedPosition: Position = JSON.parse(positionData)

// Remove first, then process (atomic operation)
await redis.zrem('delayed_positions', positionData)

log(chalk.blue(`Retrying delayed position ${delayedPosition.positionId} after backoff period`))
await liquidatePosition(delayedPosition)
}

// 2. Check for new liquidatable positions (with 1 second timeout to avoid blocking)
const result = await redis.brpop('liquidatable_positions', 1)
if (result) {
const [key, positionData] = result
const position: Position = JSON.parse(positionData)
Expand All @@ -227,6 +276,10 @@ export async function liquidablePositionsListener() {

await liquidatePosition(position)
}

// 3. Wait 1 second before next iteration to process both queues independently
await new Promise((resolve) => setTimeout(resolve, 1000))

} catch (error) {
console.error('Error processing position:', error)
await new Promise((resolve) => setTimeout(resolve, 10000)) // wait 10 seconds before retrying
Expand Down
Loading