@@ -2,10 +2,10 @@ use crate::api::ChainId;
22use chrono:: { DateTime , NaiveDateTime } ;
33use ethers:: abi:: AbiEncode ;
44use ethers:: prelude:: TxHash ;
5- use ethers:: types:: BlockNumber ;
65use serde:: Serialize ;
7- use sqlx:: { Pool , Sqlite , SqlitePool } ;
8- use std:: collections:: { BTreeMap , HashMap } ;
6+ use sqlx:: { migrate, Pool , Sqlite , SqlitePool } ;
7+ use tokio:: spawn;
8+ use tokio:: sync:: mpsc;
99use utoipa:: ToSchema ;
1010
1111#[ derive( Clone , Debug , Serialize , PartialEq ) ]
@@ -105,16 +105,45 @@ impl From<LogRow> for RequestLog {
105105
106106pub struct History {
107107 pool : Pool < Sqlite > ,
108+ write_queue : mpsc:: Sender < RequestLog > ,
109+ writer_thread : tokio:: task:: JoinHandle < ( ) > ,
108110}
109111
110112impl History {
111113 const MAX_HISTORY : usize = 1_000_000 ;
114+ const MAX_WRITE_QUEUE : usize = 1_000 ;
112115 pub async fn new ( ) -> Self {
113- let pool = SqlitePool :: connect ( "sqlite:fortuna.db" ) . await . unwrap ( ) ;
114- Self { pool }
116+ Self :: new_with_url ( "sqlite:fortuna.db" ) . await
115117 }
116118
117- pub async fn add_to_db ( & self , log : RequestLog ) {
119+ pub async fn new_in_memory ( ) -> Self {
120+ Self :: new_with_url ( "sqlite::memory:" ) . await
121+ }
122+
123+ pub async fn new_with_url ( url : & str ) -> Self {
124+ let pool = SqlitePool :: connect ( url) . await . unwrap ( ) ;
125+ let migrator = migrate ! ( "./migrations" ) ;
126+ migrator. run ( & pool) . await . unwrap ( ) ;
127+ Self :: new_with_pool ( pool) . await
128+ }
129+ pub async fn new_with_pool ( pool : Pool < Sqlite > ) -> Self {
130+ let ( sender, mut receiver) = mpsc:: channel ( Self :: MAX_WRITE_QUEUE ) ;
131+ let pool_write_connection = pool. clone ( ) ;
132+ let writer_thread = spawn (
133+ async move {
134+ while let Some ( log) = receiver. recv ( ) . await {
135+ Self :: add_to_db ( & pool_write_connection, log) . await ;
136+ }
137+ } ,
138+ ) ;
139+ Self {
140+ pool,
141+ write_queue : sender,
142+ writer_thread,
143+ }
144+ }
145+
146+ async fn add_to_db ( pool : & Pool < Sqlite > , log : RequestLog ) {
118147 let sequence = log. sequence as i64 ;
119148 let log_type = log. log . get_type ( ) ;
120149 let block_number = log
@@ -131,7 +160,7 @@ impl History {
131160 block_number,
132161 info,
133162 tx_hash)
134- . execute ( & self . pool )
163+ . execute ( pool)
135164 . await
136165 . unwrap ( ) ;
137166 }
@@ -151,7 +180,9 @@ impl History {
151180 }
152181
153182 pub fn add ( & mut self , log : RequestLog ) {
154- self . add_to_db ( log) ;
183+ if let Err ( e) = self . write_queue . try_send ( log) {
184+ tracing:: warn!( "Failed to send log to write queue: {}" , e) ;
185+ }
155186 }
156187
157188 pub async fn get_request_logs ( & self , request_key : & RequestKey ) -> Vec < RequestLog > {
@@ -200,11 +231,28 @@ impl History {
200231}
201232
202233mod tests {
234+ use tokio:: time:: sleep;
203235 use super :: * ;
204236
205- #[ sqlx:: test]
206- async fn test_history ( pool : Pool < Sqlite > ) {
207- let history = History { pool } ;
237+ #[ tokio:: test]
238+ async fn test_history ( ) {
239+ let history = History :: new_in_memory ( ) . await ;
240+ let log = RequestLog {
241+ chain_id : "ethereum" . to_string ( ) ,
242+ sequence : 1 ,
243+ timestamp : chrono:: Utc :: now ( ) ,
244+ log : RequestLogType :: Observed {
245+ tx_hash : TxHash :: zero ( ) ,
246+ } ,
247+ } ;
248+ History :: add_to_db ( & history. pool , log. clone ( ) ) . await ;
249+ let logs = history. get_request_logs ( & ( "ethereum" . to_string ( ) , 1 ) ) . await ;
250+ assert_eq ! ( logs, vec![ log. clone( ) ] ) ;
251+ }
252+
253+ #[ tokio:: test( flavor = "multi_thread" ) ]
254+ async fn test_writer_thread ( ) {
255+ let mut history = History :: new_in_memory ( ) . await ;
208256 let log = RequestLog {
209257 chain_id : "ethereum" . to_string ( ) ,
210258 sequence : 1 ,
@@ -213,7 +261,9 @@ mod tests {
213261 tx_hash : TxHash :: zero ( ) ,
214262 } ,
215263 } ;
216- history. add_to_db ( log. clone ( ) ) . await ;
264+ history. add ( log. clone ( ) ) ;
265+ // wait for the writer thread to write to the db
266+ sleep ( std:: time:: Duration :: from_secs ( 1 ) ) . await ;
217267 let logs = history. get_request_logs ( & ( "ethereum" . to_string ( ) , 1 ) ) . await ;
218268 assert_eq ! ( logs, vec![ log. clone( ) ] ) ;
219269 }
0 commit comments