@@ -29,10 +29,10 @@ pub(crate) struct EventQueue<K: Deref + Clone>
29
29
where
30
30
K :: Target : KVStore ,
31
31
{
32
- state : Arc < Mutex < QueueState > > ,
33
- waker : Arc < Mutex < Option < Waker > > > ,
32
+ state : Mutex < QueueState > ,
33
+ waker : Mutex < Option < Waker > > ,
34
34
#[ cfg( feature = "std" ) ]
35
- condvar : Arc < crate :: sync:: Condvar > ,
35
+ condvar : crate :: sync:: Condvar ,
36
36
kv_store : K ,
37
37
persist_notifier : Arc < Notifier > ,
38
38
}
@@ -44,13 +44,13 @@ where
44
44
pub fn new (
45
45
queue : VecDeque < LiquidityEvent > , kv_store : K , persist_notifier : Arc < Notifier > ,
46
46
) -> Self {
47
- let state = Arc :: new ( Mutex :: new ( QueueState { queue, needs_persist : false } ) ) ;
48
- let waker = Arc :: new ( Mutex :: new ( None ) ) ;
47
+ let state = Mutex :: new ( QueueState { queue, needs_persist : false } ) ;
48
+ let waker = Mutex :: new ( None ) ;
49
49
Self {
50
50
state,
51
51
waker,
52
52
#[ cfg( feature = "std" ) ]
53
- condvar : Arc :: new ( crate :: sync:: Condvar :: new ( ) ) ,
53
+ condvar : crate :: sync:: Condvar :: new ( ) ,
54
54
kv_store,
55
55
persist_notifier,
56
56
}
74
74
}
75
75
76
76
pub async fn next_event_async ( & self ) -> LiquidityEvent {
77
- EventFuture {
78
- queue_state : Arc :: clone ( & self . state ) ,
79
- waker : Arc :: clone ( & self . waker ) ,
80
- persist_notifier : Arc :: clone ( & self . persist_notifier ) ,
81
- }
82
- . await
77
+ EventFuture ( self ) . await
83
78
}
84
79
85
80
#[ cfg( feature = "std" ) ]
@@ -213,31 +208,32 @@ where
213
208
}
214
209
}
215
210
216
- struct EventFuture {
217
- queue_state : Arc < Mutex < QueueState > > ,
218
- waker : Arc < Mutex < Option < Waker > > > ,
219
- persist_notifier : Arc < Notifier > ,
220
- }
211
+ struct EventFuture < ' a , K : Deref + Clone > ( & ' a EventQueue < K > )
212
+ where
213
+ K :: Target : KVStore ;
221
214
222
- impl Future for EventFuture {
215
+ impl < K : Deref + Clone > Future for EventFuture < ' _ , K >
216
+ where
217
+ K :: Target : KVStore ,
218
+ {
223
219
type Output = LiquidityEvent ;
224
220
225
221
fn poll (
226
222
self : core:: pin:: Pin < & mut Self > , cx : & mut core:: task:: Context < ' _ > ,
227
223
) -> core:: task:: Poll < Self :: Output > {
228
224
let ( res, should_persist_notify) = {
229
- let mut state_lock = self . queue_state . lock ( ) . unwrap ( ) ;
225
+ let mut state_lock = self . 0 . state . lock ( ) . unwrap ( ) ;
230
226
if let Some ( event) = state_lock. queue . pop_front ( ) {
231
227
state_lock. needs_persist = true ;
232
228
( Poll :: Ready ( event) , true )
233
229
} else {
234
- * self . waker . lock ( ) . unwrap ( ) = Some ( cx. waker ( ) . clone ( ) ) ;
230
+ * self . 0 . waker . lock ( ) . unwrap ( ) = Some ( cx. waker ( ) . clone ( ) ) ;
235
231
( Poll :: Pending , false )
236
232
}
237
233
} ;
238
234
239
235
if should_persist_notify {
240
- self . persist_notifier . notify ( ) ;
236
+ self . 0 . persist_notifier . notify ( ) ;
241
237
}
242
238
243
239
res
0 commit comments