@@ -33,8 +33,7 @@ use serde::{Deserialize, Serialize, Serializer};
33
33
use serde_json:: { json, Value } ;
34
34
use tokio:: net:: { TcpListener , TcpStream } ;
35
35
use tokio:: sync:: oneshot:: Sender ;
36
- use tokio:: sync:: { oneshot, Mutex , Notify , OnceCell } ;
37
- use tokio:: task:: JoinHandle ;
36
+ use tokio:: sync:: { oneshot, Mutex , Notify } ;
38
37
use tokio:: time;
39
38
use uuid:: Uuid ;
40
39
@@ -173,12 +172,12 @@ struct InnerBrowserSignerProxy {
173
172
options : BrowserSignerProxyOptions ,
174
173
/// Internal state of the proxy including request queues
175
174
state : Arc < ProxyState > ,
176
- /// Handle to the running proxy server (initialized on demand)
177
- handle : OnceCell < Arc < JoinHandle < ( ) > > > ,
178
175
/// Notification trigger for graceful shutdown
179
176
shutdown : Arc < Notify > ,
180
177
/// Flag to indicate if the server is shutdown
181
178
is_shutdown : Arc < AtomicBool > ,
179
+ /// Flat indicating if the server is started
180
+ is_started : Arc < AtomicBool > ,
182
181
}
183
182
184
183
impl AtomicDestroyer for InnerBrowserSignerProxy {
@@ -253,9 +252,9 @@ impl BrowserSignerProxy {
253
252
inner : AtomicDestructor :: new ( InnerBrowserSignerProxy {
254
253
options,
255
254
state : Arc :: new ( state) ,
256
- handle : OnceCell :: new ( ) ,
257
255
shutdown : Arc :: new ( Notify :: new ( ) ) ,
258
256
is_shutdown : Arc :: new ( AtomicBool :: new ( false ) ) ,
257
+ is_started : Arc :: new ( AtomicBool :: new ( false ) ) ,
259
258
} ) ,
260
259
}
261
260
}
@@ -270,57 +269,78 @@ impl BrowserSignerProxy {
270
269
///
271
270
/// If this is not called, will be automatically started on the first interaction with the signer.
272
271
pub async fn start ( & self ) -> Result < ( ) , Error > {
273
- let _handle: & Arc < JoinHandle < ( ) > > = self
274
- . inner
275
- . handle
276
- . get_or_try_init ( || async {
277
- let listener = TcpListener :: bind ( self . inner . options . addr ) . await ?;
278
-
279
- tracing:: info!( "Starting proxy server on {}" , self . inner. options. addr) ;
280
-
281
- let state = self . inner . state . clone ( ) ;
282
- let shutdown = self . inner . shutdown . clone ( ) ;
283
-
284
- let handle: JoinHandle < ( ) > = tokio:: spawn ( async move {
285
- loop {
286
- tokio:: select! {
287
- res = listener. accept( ) => {
288
- let stream: TcpStream = match res {
289
- Ok ( ( stream, ..) ) => stream,
290
- Err ( e) => {
291
- tracing:: error!( "Failed to accept connection: {}" , e) ;
292
- continue ;
293
- }
294
- } ;
272
+ // Ensure is not shutdown
273
+ if self . inner . is_shutdown ( ) {
274
+ return Err ( Error :: Shutdown ) ;
275
+ }
276
+
277
+ // Mark the proxy as started and check if was already started
278
+ let is_started: bool = self . inner . is_started . swap ( true , Ordering :: SeqCst ) ;
279
+
280
+ // Immediately return if already started
281
+ if is_started {
282
+ return Ok ( ( ) ) ;
283
+ }
284
+
285
+ let listener: TcpListener = match TcpListener :: bind ( self . inner . options . addr ) . await {
286
+ Ok ( listener) => listener,
287
+ Err ( e) => {
288
+ // Undo the started flag if binding fails
289
+ self . inner . is_started . store ( false , Ordering :: SeqCst ) ;
290
+
291
+ // Propagate error
292
+ return Err ( Error :: from ( e) ) ;
293
+ }
294
+ } ;
295
295
296
- let io: TokioIo <TcpStream > = TokioIo :: new( stream) ;
297
- let state: Arc <ProxyState > = state. clone( ) ;
296
+ let addr: SocketAddr = self . inner . options . addr ;
297
+ let state: Arc < ProxyState > = self . inner . state . clone ( ) ;
298
+ let shutdown: Arc < Notify > = self . inner . shutdown . clone ( ) ;
299
+
300
+ tokio:: spawn ( async move {
301
+ tracing:: info!( "Starting proxy server on {addr}" ) ;
302
+
303
+ loop {
304
+ tokio:: select! {
305
+ res = listener. accept( ) => {
306
+ let stream: TcpStream = match res {
307
+ Ok ( ( stream, ..) ) => stream,
308
+ Err ( e) => {
309
+ tracing:: error!( "Failed to accept connection: {}" , e) ;
310
+ continue ;
311
+ }
312
+ } ;
313
+
314
+ let io: TokioIo <TcpStream > = TokioIo :: new( stream) ;
315
+ let state: Arc <ProxyState > = state. clone( ) ;
316
+ let shutdown: Arc <Notify > = shutdown. clone( ) ;
298
317
299
- tokio:: spawn( async move {
300
- let service = service_fn( move |req| {
301
- handle_request( req, state. clone( ) )
302
- } ) ;
318
+ tokio:: spawn( async move {
319
+ let service = service_fn( move |req| {
320
+ handle_request( req, state. clone( ) )
321
+ } ) ;
303
322
304
- if let Err ( e) = http1:: Builder :: new( )
305
- . serve_connection( io, service)
306
- . await
307
- {
323
+ tokio:: select! {
324
+ res = http1:: Builder :: new( ) . serve_connection( io, service) => {
325
+ if let Err ( e) = res {
308
326
tracing:: error!( "Error serving connection: {e}" ) ;
309
327
}
310
- } ) ;
311
- } ,
312
- _ = shutdown. notified( ) => {
313
- break ;
314
- }
315
- }
328
+ }
329
+ _ = shutdown. notified( ) => {
330
+ tracing:: debug!( "Closing connection, proxy server is shutting down." ) ;
331
+ }
332
+ }
333
+ } ) ;
334
+ } ,
335
+ _ = shutdown. notified( ) => {
336
+ break ;
316
337
}
338
+ }
339
+ }
317
340
318
- tracing:: info!( "Shutting down proxy server." ) ;
319
- } ) ;
341
+ tracing:: info!( "Shutting down proxy server." ) ;
342
+ } ) ;
320
343
321
- Ok :: < _ , Error > ( Arc :: new ( handle) )
322
- } )
323
- . await ?;
324
344
Ok ( ( ) )
325
345
}
326
346
@@ -340,11 +360,6 @@ impl BrowserSignerProxy {
340
360
where
341
361
T : DeserializeOwned ,
342
362
{
343
- // Ensure is not shutdown
344
- if self . inner . is_shutdown ( ) {
345
- return Err ( Error :: Shutdown ) ;
346
- }
347
-
348
363
// Start the proxy if not already started
349
364
self . start ( ) . await ?;
350
365
0 commit comments