@@ -3,6 +3,7 @@ use std::time::Duration;
33use futures:: StreamExt ;
44use redis:: { AsyncCommands , Client , ErrorKind , RedisError , RedisResult } ;
55use tracing:: debug;
6+ use crate :: error:: DbPoolError ;
67
78const DEFAULT_COLUMN : & str = "" ;
89const PJ_V1_COLUMN : & str = "pjv1" ;
@@ -19,19 +20,24 @@ impl DbPool {
1920 Ok ( Self { client, timeout } )
2021 }
2122
23+ /// Peek using [`DEFAULT_COLUMN`] as the channel type.
2224 pub async fn push_default ( & self , subdirectory_id : & str , data : Vec < u8 > ) -> RedisResult < ( ) > {
2325 self . push ( subdirectory_id, DEFAULT_COLUMN , data) . await
2426 }
2527
26- pub async fn peek_default ( & self , subdirectory_id : & str ) -> Option < RedisResult < Vec < u8 > > > {
28+ pub async fn peek_default (
29+ & self ,
30+ subdirectory_id : & str ,
31+ ) -> Result < Vec < u8 > , DbPoolError > {
2732 self . peek_with_timeout ( subdirectory_id, DEFAULT_COLUMN ) . await
2833 }
2934
3035 pub async fn push_v1 ( & self , subdirectory_id : & str , data : Vec < u8 > ) -> RedisResult < ( ) > {
3136 self . push ( subdirectory_id, PJ_V1_COLUMN , data) . await
3237 }
3338
34- pub async fn peek_v1 ( & self , subdirectory_id : & str ) -> Option < RedisResult < Vec < u8 > > > {
39+ /// Peek using [`PJ_V1_COLUMN`] as the channel type.
40+ pub async fn peek_v1 ( & self , subdirectory_id : & str ) -> Result < Vec < u8 > , DbPoolError > {
3541 self . peek_with_timeout ( subdirectory_id, PJ_V1_COLUMN ) . await
3642 }
3743
@@ -52,8 +58,16 @@ impl DbPool {
5258 & self ,
5359 subdirectory_id : & str ,
5460 channel_type : & str ,
55- ) -> Option < RedisResult < Vec < u8 > > > {
56- tokio:: time:: timeout ( self . timeout , self . peek ( subdirectory_id, channel_type) ) . await . ok ( )
61+ ) -> Result < Vec < u8 > , DbPoolError > {
62+ match tokio:: time:: timeout ( self . timeout , self . peek ( subdirectory_id, channel_type) ) . await {
63+ Ok ( redis_result) => {
64+ match redis_result {
65+ Ok ( result) => { Ok ( result) }
66+ Err ( redis_err) => { Err ( DbPoolError :: Redis ( redis_err) ) }
67+ }
68+ }
69+ Err ( elapsed) => { Err ( DbPoolError :: Timeout ( elapsed) ) }
70+ }
5771 }
5872
5973 async fn peek ( & self , subdirectory_id : & str , channel_type : & str ) -> RedisResult < Vec < u8 > > {
0 commit comments