@@ -355,6 +355,42 @@ impl EventObserver {
355
355
Ok ( ( ) )
356
356
}
357
357
358
+ /// Insert a payload into the database, retrying on failure.
359
+ fn insert_payload_with_retry (
360
+ conn : & Connection ,
361
+ url : & str ,
362
+ payload : & serde_json:: Value ,
363
+ timeout : Duration ,
364
+ ) {
365
+ let mut attempts = 0i64 ;
366
+ let mut backoff = Duration :: from_millis ( 100 ) ; // Initial backoff duration
367
+ let max_backoff = Duration :: from_secs ( 5 ) ; // Cap the backoff duration
368
+
369
+ loop {
370
+ match Self :: insert_payload ( conn, url, payload, timeout) {
371
+ Ok ( _) => {
372
+ // Successful insert, break the loop
373
+ return ;
374
+ }
375
+ Err ( err) => {
376
+ // Log the error, then retry after a delay
377
+ warn ! ( "Failed to insert payload into event observer database: {:?}" , err;
378
+ "backoff" => ?backoff,
379
+ "attempts" => attempts
380
+ ) ;
381
+
382
+ // Wait for the backoff duration
383
+ sleep ( backoff) ;
384
+
385
+ // Increase the backoff duration (with exponential backoff)
386
+ backoff = std:: cmp:: min ( backoff. saturating_mul ( 2 ) , max_backoff) ;
387
+
388
+ attempts = attempts. saturating_add ( 1 ) ;
389
+ }
390
+ }
391
+ }
392
+ }
393
+
358
394
fn get_pending_payloads (
359
395
conn : & Connection ,
360
396
) -> Result < Vec < ( i64 , String , serde_json:: Value , u64 ) > , db_error > {
@@ -524,8 +560,7 @@ impl EventObserver {
524
560
Connection :: open ( db_path) . expect ( "Failed to open database for event observer" ) ;
525
561
526
562
// Insert the new payload into the database
527
- Self :: insert_payload ( & conn, & full_url, payload, self . timeout )
528
- . expect ( "Failed to insert payload into event observer database" ) ;
563
+ Self :: insert_payload_with_retry ( & conn, & full_url, payload, self . timeout ) ;
529
564
530
565
// Process all pending payloads
531
566
Self :: process_pending_payloads ( & conn) ;
0 commit comments