@@ -4,6 +4,8 @@ use futures::StreamExt;
44use redis:: { AsyncCommands , Client , ErrorKind , RedisError , RedisResult } ;
55use tracing:: debug;
66
7+ use crate :: error:: DbPoolError ;
8+
79const DEFAULT_COLUMN : & str = "" ;
810const PJ_V1_COLUMN : & str = "pjv1" ;
911
@@ -19,19 +21,21 @@ impl DbPool {
1921 Ok ( Self { client, timeout } )
2022 }
2123
24+ /// Peek using [`DEFAULT_COLUMN`] as the channel type.
2225 pub async fn push_default ( & self , subdirectory_id : & str , data : Vec < u8 > ) -> RedisResult < ( ) > {
2326 self . push ( subdirectory_id, DEFAULT_COLUMN , data) . await
2427 }
2528
26- pub async fn peek_default ( & self , subdirectory_id : & str ) -> Option < RedisResult < Vec < u8 > > > {
29+ pub async fn peek_default ( & self , subdirectory_id : & str ) -> Result < Vec < u8 > , DbPoolError > {
2730 self . peek_with_timeout ( subdirectory_id, DEFAULT_COLUMN ) . await
2831 }
2932
3033 pub async fn push_v1 ( & self , subdirectory_id : & str , data : Vec < u8 > ) -> RedisResult < ( ) > {
3134 self . push ( subdirectory_id, PJ_V1_COLUMN , data) . await
3235 }
3336
34- pub async fn peek_v1 ( & self , subdirectory_id : & str ) -> Option < RedisResult < Vec < u8 > > > {
37+ /// Peek using [`PJ_V1_COLUMN`] as the channel type.
38+ pub async fn peek_v1 ( & self , subdirectory_id : & str ) -> Result < Vec < u8 > , DbPoolError > {
3539 self . peek_with_timeout ( subdirectory_id, PJ_V1_COLUMN ) . await
3640 }
3741
@@ -52,8 +56,14 @@ impl DbPool {
5256 & self ,
5357 subdirectory_id : & str ,
5458 channel_type : & str ,
55- ) -> Option < RedisResult < Vec < u8 > > > {
56- tokio:: time:: timeout ( self . timeout , self . peek ( subdirectory_id, channel_type) ) . await . ok ( )
59+ ) -> Result < Vec < u8 > , DbPoolError > {
60+ match tokio:: time:: timeout ( self . timeout , self . peek ( subdirectory_id, channel_type) ) . await {
61+ Ok ( redis_result) => match redis_result {
62+ Ok ( result) => Ok ( result) ,
63+ Err ( redis_err) => Err ( DbPoolError :: Redis ( redis_err) ) ,
64+ } ,
65+ Err ( elapsed) => Err ( DbPoolError :: Timeout ( elapsed) ) ,
66+ }
5767 }
5868
5969 async fn peek ( & self , subdirectory_id : & str , channel_type : & str ) -> RedisResult < Vec < u8 > > {
0 commit comments