1- use axum:: { extract:: State , routing:: get, serve, Router } ;
2- use hyper:: header:: HeaderMap ;
1+ use axum:: {
2+ extract:: { Query , State } ,
3+ routing:: { get, post} ,
4+ serve, Router ,
5+ } ;
6+ use hyper:: { body:: Bytes , header:: HeaderMap , StatusCode } ;
37use redb:: { Database , ReadableTable , TableDefinition } ;
48use serde:: { Deserialize , Serialize } ;
5- use std:: { error:: Error , fs, net:: SocketAddr , path:: Path } ;
9+ use std:: { error:: Error , fs, net:: SocketAddr , path:: Path , time :: SystemTime } ;
610use tokio:: signal:: unix:: { signal, SignalKind } ;
711use tokio:: { net:: TcpListener , sync:: mpsc} ;
812
@@ -15,6 +19,8 @@ use crate::{
1519 lib:: http,
1620} ;
1721
22+ const DEFAULT_REQUEST_SIZE : u64 = 10 ;
23+ const MAX_REQUEST_SIZE : u64 = 100 ;
1824#[ repr( transparent) ]
1925struct CounterType ( bool ) ;
2026
@@ -25,6 +31,9 @@ impl CounterType {
2531
2632static COUNTERS : TableDefinition < bool , u64 > = TableDefinition :: new ( "counters" ) ;
2733static STRINGS : TableDefinition < u64 , & str > = TableDefinition :: new ( "strings" ) ;
34+ static PROCESSED_IDS : TableDefinition < & str , bool > = TableDefinition :: new ( "processed_ids" ) ;
35+ static SUMS_JOURNAL : TableDefinition < u64 , & [ u8 ] > = TableDefinition :: new ( "sums_journal" ) ;
36+ static JOURNAL_META : TableDefinition < & str , u64 > = TableDefinition :: new ( "journal_meta" ) ;
2837
2938struct IncCounterRequest {
3039 counter_type : CounterType ,
@@ -66,6 +75,98 @@ impl DbRequestHandler for IncStringRequest {
6675 }
6776}
6877
78+ struct CheckAndStoreIdRequest ( String ) ;
79+
80+ enum CheckAndStoreIdResponse {
81+ Unique ,
82+ Duplicate ,
83+ }
84+
85+ impl DbRequestHandler for CheckAndStoreIdRequest {
86+ type Response = CheckAndStoreIdResponse ;
87+
88+ fn handle ( & self , db : & Database ) -> AsyncDbResult < Self :: Response > {
89+ let write_txn = db. begin_write ( ) ?;
90+ {
91+ let mut table = write_txn. open_table ( PROCESSED_IDS ) ?;
92+ if table. get ( self . 0 . as_str ( ) ) ?. is_some ( ) {
93+ return Ok ( CheckAndStoreIdResponse :: Duplicate ) ;
94+ }
95+ table. insert ( self . 0 . as_str ( ) , true ) ?;
96+ }
97+ write_txn. commit ( ) ?;
98+ Ok ( CheckAndStoreIdResponse :: Unique )
99+ }
100+ }
101+
102+ #[ derive( Deserialize , Debug ) ]
103+ struct SumRequest {
104+ id : String ,
105+ a : i32 ,
106+ b : i32 ,
107+ c : i32 ,
108+ }
109+
110+ #[ derive( Serialize , Deserialize , Debug ) ]
111+ struct JournalEntry {
112+ id : String ,
113+ a : i32 ,
114+ b : i32 ,
115+ c : i32 ,
116+ timestamp_us : u64 ,
117+ }
118+
119+ struct AddToJournalRequest ( JournalEntry ) ;
120+
121+ impl DbRequestHandler for AddToJournalRequest {
122+ type Response = ( ) ;
123+
124+ fn handle ( & self , db : & Database ) -> AsyncDbResult < Self :: Response > {
125+ let serialized = serde_json:: to_vec ( & self . 0 ) ?;
126+
127+ let write_txn = db. begin_write ( ) ?;
128+ {
129+ let mut journal_meta = write_txn. open_table ( JOURNAL_META ) ?;
130+ let mut journal = write_txn. open_table ( SUMS_JOURNAL ) ?;
131+
132+ let idx = journal_meta. get ( "idx" ) ?. map ( |v| v. value ( ) ) . unwrap_or ( 0 ) ;
133+ journal. insert ( idx, serialized. as_slice ( ) ) ?;
134+ journal_meta. insert ( "idx" , idx + 1 ) ?;
135+ }
136+ write_txn. commit ( ) ?;
137+ Ok ( ( ) )
138+ }
139+ }
140+
141+ struct GetJournalRequest ( u64 ) ;
142+
143+ impl DbRequestHandler for GetJournalRequest {
144+ type Response = Vec < JournalEntry > ;
145+
146+ fn handle ( & self , db : & Database ) -> AsyncDbResult < Self :: Response > {
147+ let read_txn = db. begin_read ( ) ?;
148+ let journal_meta = read_txn. open_table ( JOURNAL_META ) ?;
149+ let journal = read_txn. open_table ( SUMS_JOURNAL ) ?;
150+
151+ let idx = journal_meta. get ( "idx" ) ?. map ( |v| v. value ( ) ) . unwrap_or ( 0 ) ;
152+
153+ let mut result = Vec :: new ( ) ;
154+ let n = std:: cmp:: min ( std:: cmp:: min ( self . 0 , idx) , MAX_REQUEST_SIZE ) ;
155+
156+ let i0 = idx - n;
157+
158+ for i in 0 ..n {
159+ if let Some ( entry_bytes) = journal. get ( i0 + i) ? {
160+ if let Ok ( entry) = serde_json:: from_slice :: < JournalEntry > ( entry_bytes. value ( ) ) {
161+ result. push ( entry) ;
162+ }
163+ }
164+ }
165+
166+ Ok ( result)
167+ }
168+ }
169+
69170#[ derive( Clone ) ]
70171struct AppState {
71172 redb : AsyncRedb ,
@@ -80,6 +181,7 @@ enum JSONResponse {
80181 Message { text : String } ,
81182 Counters { counter_runs : u64 , counter_requests : u64 } ,
82183 StringCounter { value : String } ,
184+ Journal { entries : Vec < JournalEntry > } ,
83185}
84186
85187async fn health_handler ( ) -> & ' static str {
@@ -109,6 +211,63 @@ async fn string_handler(State(state): State<AppState>, headers: HeaderMap) -> im
109211 http:: json_or_html ( headers, & json_string) . await
110212}
111213
214+ #[ derive( Deserialize ) ]
215+ struct JournalQuery {
216+ #[ serde( default = "default_request_size" ) ]
217+ n : u64 ,
218+ }
219+
220+ fn default_request_size ( ) -> u64 {
221+ DEFAULT_REQUEST_SIZE
222+ }
223+
224+ async fn journal_handler (
225+ State ( state) : State < AppState > , headers : HeaderMap , Query ( query) : Query < JournalQuery > ,
226+ ) -> impl axum:: response:: IntoResponse {
227+ let entries = state. redb . run ( GetJournalRequest ( query. n ) ) . await . unwrap_or_default ( ) ;
228+ let response = JSONResponse :: Journal { entries } ;
229+ let json_string = serde_json:: to_string ( & response) . unwrap ( ) ;
230+ http:: json_or_html ( headers, & json_string) . await
231+ }
232+
233+ async fn sums_handler ( State ( state) : State < AppState > , body : Bytes ) -> Result < & ' static str , ( StatusCode , String ) > {
234+ match serde_json:: from_slice :: < SumRequest > ( & body) {
235+ Ok ( req) => {
236+ if req. id . is_empty ( ) {
237+ Err ( ( StatusCode :: BAD_REQUEST , "Error: id cannot be empty.\n " . to_string ( ) ) )
238+ } else {
239+ match state. redb . run ( CheckAndStoreIdRequest ( req. id . clone ( ) ) ) . await {
240+ Ok ( inner_result) => match inner_result {
241+ CheckAndStoreIdResponse :: Unique => {
242+ let result = req. c == req. a + req. b ;
243+ if result {
244+ let _ = state
245+ . redb
246+ . run ( AddToJournalRequest ( JournalEntry {
247+ id : req. id . clone ( ) ,
248+ a : req. a ,
249+ b : req. b ,
250+ c : req. c ,
251+ timestamp_us : SystemTime :: now ( ) . duration_since ( SystemTime :: UNIX_EPOCH ) . unwrap ( ) . as_micros ( ) as u64 ,
252+ } ) )
253+ . await ;
254+ Ok ( "OK\n " )
255+ } else {
256+ Err ( ( StatusCode :: BAD_REQUEST , "Error: c must equal a + b.\n " . to_string ( ) ) )
257+ }
258+ }
259+ CheckAndStoreIdResponse :: Duplicate => {
260+ Err ( ( StatusCode :: CONFLICT , "Error: Duplicate request ID.\n " . to_string ( ) ) )
261+ }
262+ } ,
263+ Err ( e) => Err ( ( StatusCode :: INTERNAL_SERVER_ERROR , e. to_string ( ) ) ) ,
264+ }
265+ }
266+ }
267+ Err ( e) => Err ( ( StatusCode :: BAD_REQUEST , format ! ( "Invalid JSON: {}.\n " , e) ) ) ,
268+ }
269+ }
270+
112271#[ tokio:: main]
113272async fn main ( ) -> Result < ( ) , Box < dyn Error + Send + Sync > > {
114273 fs:: create_dir_all ( & Path :: new ( "./.db" ) ) ?;
@@ -125,6 +284,8 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
125284 . route ( "/quit" , get ( quit_handler) )
126285 . route ( "/json" , get ( json_handler) )
127286 . route ( "/string" , get ( string_handler) )
287+ . route ( "/journal" , get ( journal_handler) )
288+ . route ( "/sums" , post ( sums_handler) )
128289 . with_state ( state) ;
129290
130291 let addr = SocketAddr :: from ( ( [ 0 , 0 , 0 , 0 ] , 3000 ) ) ;
0 commit comments