11use std:: sync:: {
22 atomic:: { AtomicBool , Ordering } ,
3- Arc , Mutex ,
3+ Arc ,
44} ;
55
66use redis:: Commands ;
77
88const QUEUE_NAME : & str = "queue" ;
99
1010pub struct WaffleQueue {
11- redis : Arc < Mutex < redis:: Client > > ,
11+ redis : Arc < redis:: Client > ,
1212 is_open : AtomicBool ,
1313}
1414
1515impl WaffleQueue {
16- pub fn new ( redis : Arc < Mutex < redis:: Client > > ) -> Self {
16+ pub fn new ( redis : Arc < redis:: Client > ) -> Self {
1717 Self {
1818 redis,
1919 is_open : AtomicBool :: new ( false ) ,
@@ -33,7 +33,7 @@ impl WaffleQueue {
3333 }
3434
3535 pub fn index_of ( & self , target : String ) -> Option < usize > {
36- let mut con = match self . redis . lock ( ) . unwrap ( ) . get_connection ( ) {
36+ let mut con = match self . redis . get_connection ( ) {
3737 Ok ( c) => c,
3838 Err ( _) => return None ,
3939 } ;
@@ -47,24 +47,24 @@ impl WaffleQueue {
4747 }
4848
4949 pub fn size ( & self ) -> usize {
50- let mut con = self . redis . lock ( ) . unwrap ( ) . get_connection ( ) . unwrap ( ) ;
50+ let mut con = self . redis . get_connection ( ) . unwrap ( ) ;
5151 con. llen ( QUEUE_NAME ) . unwrap_or ( 0 ) as usize
5252 }
5353
5454 pub fn push ( & self , value : String ) -> usize {
55- let mut con = self . redis . lock ( ) . unwrap ( ) . get_connection ( ) . unwrap ( ) ;
55+ let mut con = self . redis . get_connection ( ) . unwrap ( ) ;
5656
5757 con. rpush ( QUEUE_NAME , value) . unwrap_or_default ( )
5858 }
5959
6060 pub fn pop ( & self ) -> Option < String > {
61- let mut con = self . redis . lock ( ) . unwrap ( ) . get_connection ( ) . unwrap ( ) ;
61+ let mut con = self . redis . get_connection ( ) . unwrap ( ) ;
6262 let result: redis:: RedisResult < Vec < String > > = con. lpop ( QUEUE_NAME , None ) ;
6363 result. ok ( ) . and_then ( |mut vec| vec. pop ( ) )
6464 }
6565
6666 pub fn list ( & self ) -> Vec < String > {
67- let mut con = self . redis . lock ( ) . unwrap ( ) . get_connection ( ) . unwrap ( ) ;
67+ let mut con = self . redis . get_connection ( ) . unwrap ( ) ;
6868 con. lrange ( QUEUE_NAME , 0 , -1 ) . unwrap_or_else ( |_| vec ! [ ] )
6969 }
7070}
@@ -82,7 +82,7 @@ mod tests {
8282 let host_ip = node. get_host ( ) . unwrap ( ) ;
8383 let host_port = node. get_host_port_ipv4 ( 6379 ) . unwrap ( ) ;
8484 let url = format ! ( "redis://{host_ip}:{host_port}" ) ;
85- let client = Arc :: new ( Mutex :: new ( redis:: Client :: open ( url) . unwrap ( ) ) ) ;
85+ let client = Arc :: new ( redis:: Client :: open ( url) . unwrap ( ) ) ;
8686 let queue = WaffleQueue :: new ( client) ;
8787
8888 queue. push ( "foo" . to_string ( ) ) ;
@@ -105,7 +105,7 @@ mod tests {
105105 let host_ip = node. get_host ( ) . unwrap ( ) ;
106106 let host_port = node. get_host_port_ipv4 ( 6379 ) . unwrap ( ) ;
107107 let url = format ! ( "redis://{host_ip}:{host_port}" ) ;
108- let client = Arc :: new ( Mutex :: new ( redis:: Client :: open ( url) . unwrap ( ) ) ) ;
108+ let client = Arc :: new ( redis:: Client :: open ( url) . unwrap ( ) ) ;
109109 let queue = WaffleQueue :: new ( client) ;
110110
111111 queue. push ( "foo" . to_string ( ) ) ;
@@ -121,7 +121,7 @@ mod tests {
121121 let host_ip = node. get_host ( ) . unwrap ( ) ;
122122 let host_port = node. get_host_port_ipv4 ( 6379 ) . unwrap ( ) ;
123123 let url = format ! ( "redis://{host_ip}:{host_port}" ) ;
124- let client = Arc :: new ( Mutex :: new ( redis:: Client :: open ( url) . unwrap ( ) ) ) ;
124+ let client = Arc :: new ( redis:: Client :: open ( url) . unwrap ( ) ) ;
125125 let queue = WaffleQueue :: new ( client) ;
126126
127127 queue. push ( "foo" . to_string ( ) ) ;
@@ -138,7 +138,7 @@ mod tests {
138138 let host_ip = node. get_host ( ) . unwrap ( ) ;
139139 let host_port = node. get_host_port_ipv4 ( 6379 ) . unwrap ( ) ;
140140 let url = format ! ( "redis://{host_ip}:{host_port}" ) ;
141- let client = Arc :: new ( Mutex :: new ( redis:: Client :: open ( url) . unwrap ( ) ) ) ;
141+ let client = Arc :: new ( redis:: Client :: open ( url) . unwrap ( ) ) ;
142142 let queue = WaffleQueue :: new ( client) ;
143143
144144 assert_eq ! ( queue. size( ) , 0 ) ;
0 commit comments