11use async_stream;
22use async_trait:: async_trait;
3- use bytes:: Bytes ;
43use chrono:: NaiveDateTime ;
54use futures_util:: stream:: BoxStream ;
65use futures_util:: TryStreamExt ;
@@ -33,6 +32,10 @@ impl PostgresStorage {
3332 }
3433}
3534
35+ fn map_sqlx_err ( error : sqlx:: Error ) -> StoreError {
36+ StoreError :: StorageError ( Box :: new ( error) )
37+ }
38+
3639#[ async_trait]
3740impl Store for PostgresStorage {
3841 async fn delete ( & self ) -> Result < ( ) , StoreError > {
@@ -44,22 +47,22 @@ impl Store for PostgresStorage {
4447 . execute ( & self . pool )
4548 . await
4649 . map ( |_| ( ) )
47- . map_err ( StoreError :: SqlxError )
50+ . map_err ( map_sqlx_err )
4851 }
4952
50- async fn write ( & self , update : & Bytes ) -> Result < ( ) , StoreError > {
53+ async fn write ( & self , update : & Vec < u8 > ) -> Result < ( ) , StoreError > {
5154 let document_id = self . document_id . clone ( ) ;
5255 let now = chrono:: Utc :: now ( ) . naive_utc ( ) ;
5356 let query_result = sqlx:: query ( & format ! (
5457 "INSERT INTO {} (document_id, payload, timestamp) VALUES ($1, $2, $3)" ,
5558 self . table_name
5659 ) )
5760 . bind ( document_id)
58- . bind ( update. as_ref ( ) )
61+ . bind ( update)
5962 . bind ( now)
6063 . execute ( & self . pool )
6164 . await
62- . map_err ( StoreError :: SqlxError ) ?;
65+ . map_err ( map_sqlx_err ) ?;
6366
6467 let rows_affected = query_result. rows_affected ( ) ;
6568
@@ -74,7 +77,7 @@ impl Store for PostgresStorage {
7477 Ok ( ( ) )
7578 }
7679
77- async fn read ( & self ) -> Result < BoxStream < Result < ( Bytes , i64 ) , StoreError > > , StoreError > {
80+ async fn read ( & self ) -> Result < BoxStream < Result < ( Vec < u8 > , i64 ) , StoreError > > , StoreError > {
7881 let document_id = self . document_id ;
7982 let table_name = self . table_name . clone ( ) ;
8083 let pool = self . pool . clone ( ) ;
@@ -89,9 +92,8 @@ impl Store for PostgresStorage {
8992 . bind( document_id)
9093 . fetch( & pool) ;
9194
92- while let Some ( row) = rows. try_next( ) . await . map_err( StoreError :: SqlxError ) ? {
93- let payload_vec: Vec <u8 > = row. get( "payload" ) ;
94- let payload = payload_vec. into( ) ;
95+ while let Some ( row) = rows. try_next( ) . await . map_err( map_sqlx_err) ? {
96+ let payload: Vec <u8 > = row. get( "payload" ) ;
9597 let timestamp_ndt: NaiveDateTime = row. get( "timestamp" ) ;
9698 let timestamp_ms = timestamp_ndt. and_utc( ) . timestamp_millis( ) ;
9799 yield Ok ( ( payload, timestamp_ms) ) ;
@@ -101,7 +103,7 @@ impl Store for PostgresStorage {
101103 Ok ( Box :: pin ( stream) )
102104 }
103105
104- async fn read_payloads ( & self ) -> Result < BoxStream < Result < Bytes , StoreError > > , StoreError > {
106+ async fn read_payloads ( & self ) -> Result < BoxStream < Result < Vec < u8 > , StoreError > > , StoreError > {
105107 let document_id = self . document_id ;
106108 let table_name = self . table_name . clone ( ) ;
107109 let pool = self . pool . clone ( ) ;
@@ -116,9 +118,8 @@ impl Store for PostgresStorage {
116118 . bind( document_id)
117119 . fetch( & pool) ;
118120
119- while let Some ( row) = rows. try_next( ) . await . map_err( StoreError :: SqlxError ) ? {
120- let payload_vec: Vec <u8 > = row. get( "payload" ) ;
121- let payload = payload_vec. into( ) ;
121+ while let Some ( row) = rows. try_next( ) . await . map_err( map_sqlx_err) ? {
122+ let payload: Vec <u8 > = row. get( "payload" ) ;
122123 yield Ok ( payload) ;
123124 }
124125 } ;
@@ -129,28 +130,28 @@ impl Store for PostgresStorage {
129130 async fn squash ( & self ) -> Result < ( ) , StoreError > {
130131 let doc = Doc :: new ( ) ;
131132 self . load ( & doc) . await ?;
132- let tx = self . pool . begin ( ) . await . map_err ( StoreError :: SqlxError ) ?;
133+ let tx = self . pool . begin ( ) . await . map_err ( map_sqlx_err ) ?;
133134 sqlx:: query ( & format ! (
134135 "DELETE FROM {} WHERE document_id = $1" ,
135136 self . table_name
136137 ) )
137138 . bind ( self . document_id )
138139 . execute ( & self . pool )
139140 . await
140- . map_err ( StoreError :: SqlxError ) ?;
141+ . map_err ( map_sqlx_err ) ?;
141142
142143 let squashed_update = doc. get_update ( ) ;
143144 self . write ( & squashed_update) . await ?;
144145
145146 // 如果在事务超出范围之前都未调用,rollback则自动调用
146- tx. commit ( ) . await . map_err ( StoreError :: SqlxError ) ?;
147+ tx. commit ( ) . await . map_err ( map_sqlx_err ) ?;
147148
148149 if self . run_vacuum {
149150 // 回收死行占据的存储空间
150151 sqlx:: query ( & format ! ( "VACUUM {}" , self . table_name) )
151152 . execute ( & self . pool )
152153 . await
153- . map_err ( StoreError :: SqlxError ) ?;
154+ . map_err ( map_sqlx_err ) ?;
154155 }
155156 Ok ( ( ) )
156157 }
0 commit comments