@@ -11,7 +11,7 @@ use engine_executors::{
1111 } ,
1212 webhook:: { WebhookJobHandler , WebhookRetryConfig } ,
1313} ;
14- use twmq:: { Queue , queue:: QueueOptions } ;
14+ use twmq:: { Queue , queue:: QueueOptions , shutdown :: ShutdownHandle } ;
1515
1616use crate :: {
1717 chains:: ThirdwebChainService ,
@@ -20,10 +20,21 @@ use crate::{
2020
2121pub struct QueueManager {
2222 pub webhook_queue : Arc < Queue < WebhookJobHandler > > ,
23- pub erc4337_send_queue : Arc < Queue < ExternalBundlerSendHandler < ThirdwebChainService > > > ,
24- pub erc4337_confirm_queue : Arc < Queue < UserOpConfirmationHandler < ThirdwebChainService > > > ,
23+ pub external_bundler_send_queue : Arc < Queue < ExternalBundlerSendHandler < ThirdwebChainService > > > ,
24+ pub userop_confirm_queue : Arc < Queue < UserOpConfirmationHandler < ThirdwebChainService > > > ,
2525}
2626
27+ fn get_queue_name_for_namespace ( namespace : & Option < String > , name : & str ) -> String {
28+ match namespace {
29+ Some ( namespace) => format ! ( "{}_{}" , namespace, name) ,
30+ None => name. to_owned ( ) ,
31+ }
32+ }
33+
34+ const EXTERNAL_BUNDLER_SEND_QUEUE_NAME : & str = "external_bundler_send" ;
35+ const USEROP_CONFIRM_QUEUE_NAME : & str = "userop_confirm" ;
36+ const WEBHOOK_QUEUE_NAME : & str = "webhook" ;
37+
2738impl QueueManager {
2839 pub async fn new (
2940 redis_config : & RedisConfig ,
@@ -49,12 +60,13 @@ impl QueueManager {
4960 } ;
5061
5162 let mut external_bundler_send_queue_opts = base_queue_opts. clone ( ) ;
52- let mut external_bundler_confirm_queue_opts = base_queue_opts. clone ( ) ;
53- let mut webhook_queue_opts = base_queue_opts. clone ( ) ;
63+ external_bundler_send_queue_opts. local_concurrency =
64+ queue_config. external_bundler_send_workers ;
65+
66+ let mut userop_confirm_queue_opts = base_queue_opts. clone ( ) ;
67+ userop_confirm_queue_opts. local_concurrency = queue_config. userop_confirm_workers ;
5468
55- external_bundler_send_queue_opts. local_concurrency = queue_config. erc4337_send_workers ;
56- external_bundler_confirm_queue_opts. local_concurrency =
57- queue_config. erc4337_confirm_workers ;
69+ let mut webhook_queue_opts = base_queue_opts. clone ( ) ;
5870 webhook_queue_opts. local_concurrency = queue_config. webhook_workers ;
5971
6072 // Create webhook queue
@@ -63,32 +75,43 @@ impl QueueManager {
6375 retry_config : Arc :: new ( WebhookRetryConfig :: default ( ) ) ,
6476 } ;
6577
66- let webhook_queue = Arc :: new (
67- Queue :: new (
68- & redis_config. url ,
69- "webhook" ,
70- Some ( webhook_queue_opts) ,
71- webhook_handler,
72- )
73- . await ?,
78+ let webhook_queue_name =
79+ get_queue_name_for_namespace ( & queue_config. execution_namespace , WEBHOOK_QUEUE_NAME ) ;
80+
81+ let external_bundler_send_queue_name = get_queue_name_for_namespace (
82+ & queue_config. execution_namespace ,
83+ EXTERNAL_BUNDLER_SEND_QUEUE_NAME ,
84+ ) ;
85+
86+ let userop_confirm_queue_name = get_queue_name_for_namespace (
87+ & queue_config. execution_namespace ,
88+ USEROP_CONFIRM_QUEUE_NAME ,
7489 ) ;
7590
91+ let webhook_queue = Queue :: builder ( )
92+ . name ( webhook_queue_name)
93+ . options ( webhook_queue_opts)
94+ . handler ( webhook_handler)
95+ . redis_client ( redis_client. clone ( ) )
96+ . build ( )
97+ . await ?
98+ . arc ( ) ;
99+
76100 // Create confirmation queue first (needed by send queue)
77101 let confirm_handler = UserOpConfirmationHandler :: new (
78102 chain_service. clone ( ) ,
79103 deployment_lock. clone ( ) ,
80104 webhook_queue. clone ( ) ,
81105 ) ;
82106
83- let erc4337_confirm_queue = Arc :: new (
84- Queue :: new (
85- & redis_config. url ,
86- "erc4337_confirm" ,
87- Some ( external_bundler_confirm_queue_opts) ,
88- confirm_handler,
89- )
90- . await ?,
91- ) ;
107+ let userop_confirm_queue = Queue :: builder ( )
108+ . name ( userop_confirm_queue_name)
109+ . options ( userop_confirm_queue_opts)
110+ . handler ( confirm_handler)
111+ . redis_client ( redis_client. clone ( ) )
112+ . build ( )
113+ . await ?
114+ . arc ( ) ;
92115
93116 // Create send queue
94117 let send_handler = ExternalBundlerSendHandler {
@@ -97,56 +120,51 @@ impl QueueManager {
97120 deployment_cache,
98121 deployment_lock,
99122 webhook_queue : webhook_queue. clone ( ) ,
100- confirm_queue : erc4337_confirm_queue . clone ( ) ,
123+ confirm_queue : userop_confirm_queue . clone ( ) ,
101124 } ;
102125
103- let erc4337_send_queue = Arc :: new (
104- Queue :: new (
105- & redis_config. url ,
106- "erc4337_send" ,
107- Some ( external_bundler_send_queue_opts) ,
108- send_handler,
109- )
110- . await ?,
111- ) ;
126+ let external_bundler_send_queue = Queue :: builder ( )
127+ . name ( external_bundler_send_queue_name)
128+ . options ( external_bundler_send_queue_opts)
129+ . handler ( send_handler)
130+ . redis_client ( redis_client. clone ( ) )
131+ . build ( )
132+ . await ?
133+ . arc ( ) ;
112134
113135 Ok ( Self {
114136 webhook_queue,
115- erc4337_send_queue ,
116- erc4337_confirm_queue ,
137+ external_bundler_send_queue ,
138+ userop_confirm_queue ,
117139 } )
118140 }
119141
120142 /// Start all workers
121- pub async fn start_workers ( & self , queue_config : & QueueConfig ) -> Result < ( ) , EngineError > {
143+ pub fn start_workers ( & self , queue_config : & QueueConfig ) -> ShutdownHandle {
122144 tracing:: info!( "Starting queue workers..." ) ;
123145
124146 // Start webhook workers
125147 tracing:: info!( "Starting webhook worker" ) ;
126- if let Err ( e) = self . webhook_queue . clone ( ) . work ( ) . await {
127- tracing:: error!( "Webhook worker failed: {}" , e) ;
128- }
148+ let webhook_worker = self . webhook_queue . work ( ) ;
129149
130150 // Start ERC-4337 send workers
131151 tracing:: info!( "Starting external bundler send worker" ) ;
132- if let Err ( e) = self . erc4337_send_queue . clone ( ) . work ( ) . await {
133- tracing:: error!( "Webhook worker failed: {}" , e) ;
134- }
152+ let external_bundler_send_worker = self . external_bundler_send_queue . work ( ) ;
135153
136154 // Start ERC-4337 confirmation workers
137155 tracing:: info!( "Starting external bundler confirmation worker" ) ;
138- if let Err ( e) = self . erc4337_confirm_queue . clone ( ) . work ( ) . await {
139- tracing:: error!( "Webhook worker failed: {}" , e) ;
140- }
156+ let userop_confirm_worker = self . userop_confirm_queue . work ( ) ;
141157
142158 tracing:: info!(
143159 "Started {} webhook workers, {} send workers, {} confirm workers" ,
144160 queue_config. webhook_workers,
145- queue_config. erc4337_send_workers ,
146- queue_config. erc4337_confirm_workers
161+ queue_config. external_bundler_send_workers ,
162+ queue_config. userop_confirm_workers
147163 ) ;
148164
149- Ok ( ( ) )
165+ ShutdownHandle :: with_worker ( webhook_worker)
166+ . and_worker ( external_bundler_send_worker)
167+ . and_worker ( userop_confirm_worker)
150168 }
151169
152170 /// Get queue statistics for monitoring
@@ -162,34 +180,49 @@ impl QueueManager {
162180 } ;
163181
164182 let send_stats = QueueStatistics {
165- pending : self . erc4337_send_queue . count ( JobStatus :: Pending ) . await ?,
166- active : self . erc4337_send_queue . count ( JobStatus :: Active ) . await ?,
167- delayed : self . erc4337_send_queue . count ( JobStatus :: Delayed ) . await ?,
168- success : self . erc4337_send_queue . count ( JobStatus :: Success ) . await ?,
169- failed : self . erc4337_send_queue . count ( JobStatus :: Failed ) . await ?,
183+ pending : self
184+ . external_bundler_send_queue
185+ . count ( JobStatus :: Pending )
186+ . await ?,
187+ active : self
188+ . external_bundler_send_queue
189+ . count ( JobStatus :: Active )
190+ . await ?,
191+ delayed : self
192+ . external_bundler_send_queue
193+ . count ( JobStatus :: Delayed )
194+ . await ?,
195+ success : self
196+ . external_bundler_send_queue
197+ . count ( JobStatus :: Success )
198+ . await ?,
199+ failed : self
200+ . external_bundler_send_queue
201+ . count ( JobStatus :: Failed )
202+ . await ?,
170203 } ;
171204
172205 let confirm_stats = QueueStatistics {
173- pending : self . erc4337_confirm_queue . count ( JobStatus :: Pending ) . await ?,
174- active : self . erc4337_confirm_queue . count ( JobStatus :: Active ) . await ?,
175- delayed : self . erc4337_confirm_queue . count ( JobStatus :: Delayed ) . await ?,
176- success : self . erc4337_confirm_queue . count ( JobStatus :: Success ) . await ?,
177- failed : self . erc4337_confirm_queue . count ( JobStatus :: Failed ) . await ?,
206+ pending : self . userop_confirm_queue . count ( JobStatus :: Pending ) . await ?,
207+ active : self . userop_confirm_queue . count ( JobStatus :: Active ) . await ?,
208+ delayed : self . userop_confirm_queue . count ( JobStatus :: Delayed ) . await ?,
209+ success : self . userop_confirm_queue . count ( JobStatus :: Success ) . await ?,
210+ failed : self . userop_confirm_queue . count ( JobStatus :: Failed ) . await ?,
178211 } ;
179212
180213 Ok ( QueueStats {
181214 webhook : webhook_stats,
182- erc4337_send : send_stats,
183- erc4337_confirm : confirm_stats,
215+ external_bundler_send : send_stats,
216+ userop_confirm : confirm_stats,
184217 } )
185218 }
186219}
187220
188221#[ derive( Debug , serde:: Serialize ) ]
189222pub struct QueueStats {
190223 pub webhook : QueueStatistics ,
191- pub erc4337_send : QueueStatistics ,
192- pub erc4337_confirm : QueueStatistics ,
224+ pub external_bundler_send : QueueStatistics ,
225+ pub userop_confirm : QueueStatistics ,
193226}
194227
195228#[ derive( Debug , serde:: Serialize ) ]
0 commit comments