@@ -74,7 +74,7 @@ impl<M: TryFromBytes + Debug> DmqConsumerClientPallas<M> {
74
74
"socket" => ?self . socket,
75
75
"network" => ?self . network
76
76
) ;
77
- let mut client_lock = self . client . lock ( ) . await ;
77
+ let mut client_lock = self . client . try_lock ( ) ? ;
78
78
if let Some ( client) = client_lock. take ( ) {
79
79
client. abort ( ) . await ;
80
80
}
@@ -249,11 +249,14 @@ mod tests {
249
249
}
250
250
251
251
#[ tokio:: test( flavor = "multi_thread" ) ]
252
- async fn pallas_dmq_consumer_publisher_succeeds_when_messages_are_available ( ) {
252
+ async fn pallas_dmq_consumer_client_succeeds_when_messages_are_available ( ) {
253
253
let socket_path = create_temp_dir ( current_function ! ( ) ) . join ( "node.socket" ) ;
254
254
let reply_messages = fake_msgs ( ) ;
255
255
let server = setup_dmq_server ( socket_path. clone ( ) , reply_messages) ;
256
256
let client = tokio:: spawn ( async move {
257
+ // sleep to avoid refused connection from the server
258
+ tokio:: time:: sleep ( Duration :: from_millis ( 10 ) ) . await ;
259
+
257
260
let consumer = DmqConsumerClientPallas :: new (
258
261
socket_path,
259
262
CardanoNetwork :: TestNet ( 0 ) ,
@@ -282,11 +285,14 @@ mod tests {
282
285
}
283
286
284
287
#[ tokio:: test]
285
- async fn pallas_dmq_consumer_publisher_blocks_when_no_message_available ( ) {
288
+ async fn pallas_dmq_consumer_client_blocks_when_no_message_available ( ) {
286
289
let socket_path = create_temp_dir ( current_function ! ( ) ) . join ( "node.socket" ) ;
287
290
let reply_messages = vec ! [ ] ;
288
291
let server = setup_dmq_server ( socket_path. clone ( ) , reply_messages) ;
289
292
let client = tokio:: spawn ( async move {
293
+ // sleep to avoid refused connection from the server
294
+ tokio:: time:: sleep ( Duration :: from_millis ( 10 ) ) . await ;
295
+
290
296
let consumer = DmqConsumerClientPallas :: < DmqMessageTestPayload > :: new (
291
297
socket_path,
292
298
CardanoNetwork :: TestNet ( 0 ) ,
@@ -311,6 +317,9 @@ mod tests {
311
317
let reply_messages = fake_msgs ( ) ;
312
318
let server = setup_dmq_server ( socket_path. clone ( ) , reply_messages) ;
313
319
let client = tokio:: spawn ( async move {
320
+ // sleep to avoid refused connection from the server
321
+ tokio:: time:: sleep ( Duration :: from_millis ( 10 ) ) . await ;
322
+
314
323
let consumer = DmqConsumerClientPallas :: < DmqMessageTestPayload > :: new (
315
324
socket_path,
316
325
CardanoNetwork :: TestNet ( 0 ) ,
0 commit comments