1- // src/main.rs
21mod database;
32mod ingest;
43mod persistent_queue;
@@ -18,7 +17,7 @@ use std::sync::Arc;
1817use std:: env;
1918use tokio:: time:: { sleep, Duration } ;
2019use tokio_util:: sync:: CancellationToken ;
21- use tracing:: { info, error} ;
20+ use tracing:: { info, error, debug } ;
2221use tracing_subscriber:: EnvFilter ;
2322use dotenv:: dotenv;
2423use tokio:: task:: spawn_blocking;
@@ -188,7 +187,7 @@ async fn export_records(
188187
189188 HttpResponse :: Ok ( )
190189 . content_type ( "text/csv" )
191- . append_header ( ( "Content-Disposition" , "attachment; filename=\" records.csv\" " ) ) // Updated to append_header
190+ . append_header ( ( "Content-Disposition" , "attachment; filename=\" records.csv\" " ) )
192191 . body ( csv)
193192}
194193
@@ -203,20 +202,46 @@ async fn landing() -> impl Responder {
203202async fn main ( ) -> anyhow:: Result < ( ) > {
204203 dotenv ( ) . ok ( ) ;
205204 tracing_subscriber:: fmt ( )
206- . with_env_filter ( EnvFilter :: from_default_env ( ) )
205+ . with_env_filter ( EnvFilter :: try_from_default_env ( ) . unwrap_or_else ( |_| EnvFilter :: new ( "debug" ) ) )
207206 . init ( ) ;
208207
209208 info ! ( "Starting TimeFusion application" ) ;
210209
211- let bucket = env:: var ( "S3_BUCKET_NAME" ) . context ( "S3_BUCKET_NAME environment variable not set" ) ?;
212- let http_port = env:: var ( "PORT" ) . unwrap_or_else ( |_| "80" . to_string ( ) ) ;
213- let pgwire_port = env:: var ( "PGWIRE_PORT" ) . unwrap_or_else ( |_| "5432" . to_string ( ) ) ;
210+ let bucket = match env:: var ( "S3_BUCKET_NAME" ) {
211+ Ok ( b) => b,
212+ Err ( e) => {
213+ error ! ( "S3_BUCKET_NAME not set: {:?}" , e) ;
214+ return Err ( anyhow:: anyhow!( "S3_BUCKET_NAME environment variable not set" ) ) ;
215+ }
216+ } ;
217+ let http_port = env:: var ( "PORT" ) . unwrap_or_else ( |_| {
218+ info ! ( "PORT not set, defaulting to 80" ) ;
219+ "80" . to_string ( )
220+ } ) ;
221+ let pgwire_port = env:: var ( "PGWIRE_PORT" ) . unwrap_or_else ( |_| {
222+ info ! ( "PGWIRE_PORT not set, defaulting to 5432" ) ;
223+ "5432" . to_string ( )
224+ } ) ;
214225 let s3_uri = format ! ( "s3://{}/delta_table" , bucket) ;
226+ info ! ( "S3 URI configured: {}" , s3_uri) ;
215227
216228 deltalake:: aws:: register_handlers ( None ) ;
229+ info ! ( "AWS S3 handlers registered" ) ;
217230
218- let db = Arc :: new ( Database :: new ( ) . await . context ( "Failed to initialize Database" ) ?) ;
219- db. add_project ( "events" , & s3_uri) . await . context ( "Failed to add project 'events'" ) ?;
231+ let db = match Database :: new ( ) . await {
232+ Ok ( db) => {
233+ info ! ( "Database initialized successfully" ) ;
234+ Arc :: new ( db)
235+ } ,
236+ Err ( e) => {
237+ error ! ( "Failed to initialize Database: {:?}" , e) ;
238+ return Err ( e) ;
239+ }
240+ } ;
241+ if let Err ( e) = db. add_project ( "events" , & s3_uri) . await {
242+ error ! ( "Failed to add project 'events': {:?}" , e) ;
243+ return Err ( e) ;
244+ }
220245 match db. create_events_table ( "events" , & s3_uri) . await {
221246 Ok ( _) => info ! ( "Events table created successfully" ) ,
222247 Err ( e) => {
@@ -229,7 +254,16 @@ async fn main() -> anyhow::Result<()> {
229254 }
230255 }
231256
232- let queue = Arc :: new ( PersistentQueue :: new ( "/app/queue_db" ) . context ( "Failed to initialize PersistentQueue" ) ?) ;
257+ let queue = match PersistentQueue :: new ( "/app/queue_db" ) {
258+ Ok ( q) => {
259+ info ! ( "PersistentQueue initialized successfully" ) ;
260+ Arc :: new ( q)
261+ } ,
262+ Err ( e) => {
263+ error ! ( "Failed to initialize PersistentQueue: {:?}" , e) ;
264+ return Err ( e) ;
265+ }
266+ } ;
233267 let status_store = Arc :: new ( IngestStatusStore :: new ( ) ) ;
234268 let app_info = web:: Data :: new ( AppInfo {
235269 start_time : Utc :: now ( ) ,
@@ -265,6 +299,8 @@ async fn main() -> anyhow::Result<()> {
265299 async move {
266300 if let Err ( e) = run_pgwire_server ( handler_factory, & pg_addr, pgwire_shutdown) . await {
267301 error ! ( "PGWire server error: {:?}" , e) ;
302+ } else {
303+ info ! ( "PGWire server shut down gracefully" ) ;
268304 }
269305 }
270306 } ) ;
@@ -281,8 +317,12 @@ async fn main() -> anyhow::Result<()> {
281317 break ;
282318 }
283319 _ = sleep( Duration :: from_secs( 5 ) ) => {
320+ debug!( "Checking queue for records to flush" ) ;
284321 let records = match queue_clone. dequeue_all( ) . await {
285- Ok ( r) => r,
322+ Ok ( r) => {
323+ debug!( "Dequeued {} records" , r. len( ) ) ;
324+ r
325+ } ,
286326 Err ( e) => {
287327 error!( "Error during dequeue_all: {:?}" , e) ;
288328 Vec :: new( )
@@ -302,7 +342,7 @@ async fn main() -> anyhow::Result<()> {
302342
303343 let http_addr = format ! ( "0.0.0.0:{}" , http_port) ;
304344 info ! ( "Binding HTTP server to {}" , http_addr) ;
305- let server = HttpServer :: new ( move || {
345+ let server = match HttpServer :: new ( move || {
306346 App :: new ( )
307347 . wrap ( Logger :: default ( ) )
308348 . wrap ( metrics_middleware:: MetricsMiddleware )
@@ -318,8 +358,13 @@ async fn main() -> anyhow::Result<()> {
318358 . service ( get_all_data)
319359 . service ( get_data_by_id)
320360 } )
321- . bind ( & http_addr)
322- . context ( format ! ( "Failed to bind HTTP server to {}" , http_addr) ) ?
361+ . bind ( & http_addr) {
362+ Ok ( s) => s,
363+ Err ( e) => {
364+ error ! ( "Failed to bind HTTP server to {}: {:?}" , http_addr, e) ;
365+ return Err ( anyhow:: anyhow!( "Failed to bind HTTP server: {:?}" , e) ) ;
366+ }
367+ }
323368 . run ( ) ;
324369
325370 let http_server_handle = server. handle ( ) ;
@@ -331,6 +376,8 @@ async fn main() -> anyhow::Result<()> {
331376 result = server => {
332377 if let Err ( e) = result {
333378 error!( "HTTP server failed: {:?}" , e) ;
379+ } else {
380+ info!( "HTTP server shut down gracefully" ) ;
334381 }
335382 }
336383 }
@@ -339,9 +386,21 @@ async fn main() -> anyhow::Result<()> {
339386 info ! ( "HTTP server running on http://{}" , http_addr) ;
340387
341388 tokio:: select! {
342- res = pg_server => res. context( "PGWire server task failed" ) ?,
343- res = http_task => res. context( "HTTP server task failed" ) ?,
344- res = flush_task => res. context( "Queue flush task failed" ) ?,
389+ res = pg_server => {
390+ if let Err ( e) = res {
391+ error!( "PGWire server task failed: {:?}" , e) ;
392+ }
393+ } ,
394+ res = http_task => {
395+ if let Err ( e) = res {
396+ error!( "HTTP server task failed: {:?}" , e) ;
397+ }
398+ } ,
399+ res = flush_task => {
400+ if let Err ( e) = res {
401+ error!( "Queue flush task failed: {:?}" , e) ;
402+ }
403+ } ,
345404 _ = tokio:: signal:: ctrl_c( ) => {
346405 info!( "Received Ctrl+C, initiating shutdown." ) ;
347406 shutdown_token. cancel( ) ;
0 commit comments