11import assert from "node:assert" ;
2- import { type Job , type Processor , Worker } from "bullmq" ;
2+ import { DelayedError , type Job , type Processor , Worker } from "bullmq" ;
33import superjson from "superjson" ;
44import {
55 type Address ,
@@ -70,7 +70,10 @@ type VersionedUserOp = Awaited<ReturnType<typeof prepareUserOp>>;
7070 *
7171 * This worker also handles retried EOA transactions.
7272 */
73- const handler : Processor < string , void , string > = async ( job : Job < string > ) => {
73+ const handler : Processor < string , void , string > = async (
74+ job : Job < string > ,
75+ token ?: string ,
76+ ) => {
7477 const { queueId, resendCount } = superjson . parse < SendTransactionData > (
7578 job . data ,
7679 ) ;
@@ -89,9 +92,9 @@ const handler: Processor<string, void, string> = async (job: Job<string>) => {
8992
9093 if ( transaction . status === "queued" ) {
9194 if ( transaction . isUserOp ) {
92- resultTransaction = await _sendUserOp ( job , transaction ) ;
95+ resultTransaction = await _sendUserOp ( job , transaction , token ) ;
9396 } else {
94- resultTransaction = await _sendTransaction ( job , transaction ) ;
97+ resultTransaction = await _sendTransaction ( job , transaction , token ) ;
9598 }
9699 } else if ( transaction . status === "sent" ) {
97100 resultTransaction = await _resendTransaction ( job , transaction , resendCount ) ;
@@ -121,6 +124,7 @@ const handler: Processor<string, void, string> = async (job: Job<string>) => {
121124const _sendUserOp = async (
122125 job : Job ,
123126 queuedTransaction : QueuedTransaction ,
127+ token ?: string ,
124128) : Promise < SentTransaction | ErroredTransaction | null > => {
125129 assert ( queuedTransaction . isUserOp ) ;
126130
@@ -218,7 +222,7 @@ const _sendUserOp = async (
218222 }
219223
220224 // Step 2: Get entrypoint address
221- let entrypointAddress : string | undefined ;
225+ let entrypointAddress : Address | undefined ;
222226 if ( userProvidedEntrypointAddress ) {
223227 entrypointAddress = queuedTransaction . entrypointAddress ;
224228 } else {
@@ -298,16 +302,21 @@ const _sendUserOp = async (
298302
299303 // Handle if `maxFeePerGas` is overridden.
300304 // Set it if the transaction will be sent, otherwise delay the job.
301- if ( overrides ?. maxFeePerGas && unsignedUserOp . maxFeePerGas ) {
305+ if (
306+ typeof overrides ?. maxFeePerGas !== "undefined" &&
307+ unsignedUserOp . maxFeePerGas
308+ ) {
302309 if ( overrides . maxFeePerGas > unsignedUserOp . maxFeePerGas ) {
303310 unsignedUserOp . maxFeePerGas = overrides . maxFeePerGas ;
304311 } else {
305312 const retryAt = _minutesFromNow ( 5 ) ;
306313 job . log (
307314 `Override gas fee (${ overrides . maxFeePerGas } ) is lower than onchain fee (${ unsignedUserOp . maxFeePerGas } ). Delaying job until ${ retryAt } .` ,
308315 ) ;
309- await job . moveToDelayed ( retryAt . getTime ( ) ) ;
310- return null ;
316+ // token is required to acquire lock for delaying currently processing job: https://docs.bullmq.io/patterns/process-step-jobs#delaying
317+ await job . moveToDelayed ( retryAt . getTime ( ) , token ) ;
318+ // throwing delayed error is required to notify bullmq worker not to complete or fail the job
319+ throw new DelayedError ( "Delaying job due to gas fee override" ) ;
311320 }
312321 }
313322
@@ -374,6 +383,7 @@ const _sendUserOp = async (
374383const _sendTransaction = async (
375384 job : Job ,
376385 queuedTransaction : QueuedTransaction ,
386+ token ?: string ,
377387) : Promise < SentTransaction | ErroredTransaction | null > => {
378388 assert ( ! queuedTransaction . isUserOp ) ;
379389
@@ -463,8 +473,8 @@ const _sendTransaction = async (
463473 job . log (
464474 `Override gas fee (${ overrides . maxFeePerGas } ) is lower than onchain fee (${ populatedTransaction . maxFeePerGas } ). Delaying job until ${ retryAt } .` ,
465475 ) ;
466- await job . moveToDelayed ( retryAt . getTime ( ) ) ;
467- return null ;
476+ await job . moveToDelayed ( retryAt . getTime ( ) , token ) ;
477+ throw new DelayedError ( "Delaying job due to gas fee override" ) ;
468478 }
469479 }
470480
0 commit comments