1- use std:: sync:: atomic:: { AtomicBool , Ordering } ;
1+ use std:: sync:: {
2+ atomic:: { AtomicBool , Ordering } ,
3+ Arc , Mutex ,
4+ } ;
25
36use redis:: Commands ;
47
58const QUEUE_NAME : & str = "queue" ;
69
710pub struct WaffleQueue {
8- redis : redis:: Client ,
11+ redis : Arc < Mutex < redis:: Client > > ,
912 is_open : AtomicBool ,
1013}
1114
1215impl WaffleQueue {
13- pub fn new ( redis : redis:: Client ) -> Self {
16+ pub fn new ( redis : Arc < Mutex < redis:: Client > > ) -> Self {
1417 Self {
1518 redis,
1619 is_open : AtomicBool :: new ( false ) ,
@@ -30,7 +33,7 @@ impl WaffleQueue {
3033 }
3134
3235 pub fn index_of ( & self , target : String ) -> Option < usize > {
33- let mut con = match self . redis . get_connection ( ) {
36+ let mut con = match self . redis . lock ( ) . unwrap ( ) . get_connection ( ) {
3437 Ok ( c) => c,
3538 Err ( _) => return None ,
3639 } ;
@@ -44,24 +47,24 @@ impl WaffleQueue {
4447 }
4548
4649 pub fn size ( & self ) -> usize {
47- let mut con = self . redis . get_connection ( ) . unwrap ( ) ;
50+ let mut con = self . redis . lock ( ) . unwrap ( ) . get_connection ( ) . unwrap ( ) ;
4851 con. llen ( QUEUE_NAME ) . unwrap_or ( 0 ) as usize
4952 }
5053
5154 pub fn push ( & self , value : String ) -> usize {
52- let mut con = self . redis . get_connection ( ) . unwrap ( ) ;
55+ let mut con = self . redis . lock ( ) . unwrap ( ) . get_connection ( ) . unwrap ( ) ;
5356
5457 con. rpush ( QUEUE_NAME , value) . unwrap_or_default ( )
5558 }
5659
5760 pub fn pop ( & self ) -> Option < String > {
58- let mut con = self . redis . get_connection ( ) . unwrap ( ) ;
61+ let mut con = self . redis . lock ( ) . unwrap ( ) . get_connection ( ) . unwrap ( ) ;
5962 let result: redis:: RedisResult < Vec < String > > = con. lpop ( QUEUE_NAME , None ) ;
6063 result. ok ( ) . and_then ( |mut vec| vec. pop ( ) )
6164 }
6265
6366 pub fn list ( & self ) -> Vec < String > {
64- let mut con = self . redis . get_connection ( ) . unwrap ( ) ;
67+ let mut con = self . redis . lock ( ) . unwrap ( ) . get_connection ( ) . unwrap ( ) ;
6568 con. lrange ( QUEUE_NAME , 0 , -1 ) . unwrap_or_else ( |_| vec ! [ ] )
6669 }
6770}
@@ -79,7 +82,7 @@ mod tests {
7982 let host_ip = node. get_host ( ) . unwrap ( ) ;
8083 let host_port = node. get_host_port_ipv4 ( 6379 ) . unwrap ( ) ;
8184 let url = format ! ( "redis://{host_ip}:{host_port}" ) ;
82- let client = redis:: Client :: open ( url) . unwrap ( ) ;
85+ let client = Arc :: new ( Mutex :: new ( redis:: Client :: open ( url) . unwrap ( ) ) ) ;
8386 let queue = WaffleQueue :: new ( client) ;
8487
8588 queue. push ( "foo" . to_string ( ) ) ;
@@ -102,7 +105,7 @@ mod tests {
102105 let host_ip = node. get_host ( ) . unwrap ( ) ;
103106 let host_port = node. get_host_port_ipv4 ( 6379 ) . unwrap ( ) ;
104107 let url = format ! ( "redis://{host_ip}:{host_port}" ) ;
105- let client = redis:: Client :: open ( url) . unwrap ( ) ;
108+ let client = Arc :: new ( Mutex :: new ( redis:: Client :: open ( url) . unwrap ( ) ) ) ;
106109 let queue = WaffleQueue :: new ( client) ;
107110
108111 queue. push ( "foo" . to_string ( ) ) ;
@@ -118,7 +121,7 @@ mod tests {
118121 let host_ip = node. get_host ( ) . unwrap ( ) ;
119122 let host_port = node. get_host_port_ipv4 ( 6379 ) . unwrap ( ) ;
120123 let url = format ! ( "redis://{host_ip}:{host_port}" ) ;
121- let client = redis:: Client :: open ( url) . unwrap ( ) ;
124+ let client = Arc :: new ( Mutex :: new ( redis:: Client :: open ( url) . unwrap ( ) ) ) ;
122125 let queue = WaffleQueue :: new ( client) ;
123126
124127 queue. push ( "foo" . to_string ( ) ) ;
@@ -135,7 +138,7 @@ mod tests {
135138 let host_ip = node. get_host ( ) . unwrap ( ) ;
136139 let host_port = node. get_host_port_ipv4 ( 6379 ) . unwrap ( ) ;
137140 let url = format ! ( "redis://{host_ip}:{host_port}" ) ;
138- let client = redis:: Client :: open ( url) . unwrap ( ) ;
141+ let client = Arc :: new ( Mutex :: new ( redis:: Client :: open ( url) . unwrap ( ) ) ) ;
139142 let queue = WaffleQueue :: new ( client) ;
140143
141144 assert_eq ! ( queue. size( ) , 0 ) ;
0 commit comments