@@ -94,7 +94,7 @@ async fn main() -> Result<(), anyhow::Error> {
9494 process_file ( p, & bucket) . await ;
9595 } else {
9696 info ! ( "Uploading all content in {}" , core_dir_command) ;
97- run_polling_agent ( ) . await ;
97+ run_polling_agent ( false ) . await ;
9898 }
9999 process:: exit ( 0 ) ;
100100 }
@@ -149,7 +149,7 @@ async fn main() -> Result<(), anyhow::Error> {
149149 std:: thread:: sleep ( Duration :: from_millis ( 1000 ) ) ;
150150 }
151151 } else {
152- run_polling_agent ( ) . await ;
152+ run_polling_agent ( use_inotify == "true" ) . await ;
153153 }
154154
155155 if !interval. is_empty ( ) && !schedule. is_empty ( ) {
@@ -190,7 +190,7 @@ async fn main() -> Result<(), anyhow::Error> {
190190 match next_tick {
191191 Ok ( Some ( ts) ) => {
192192 info ! ( "Next scheduled run {:?}" , ts) ;
193- run_polling_agent ( ) . await ;
193+ run_polling_agent ( false ) . await ;
194194 }
195195 _ => warn ! ( "Could not get next tick for job" ) ,
196196 }
@@ -255,22 +255,14 @@ async fn main() -> Result<(), anyhow::Error> {
255255 if event. mask . contains ( EventMask :: ISDIR ) {
256256 warn ! ( "Unknown Directory created: {:?}" , event. name) ;
257257 } else {
258- let bucket = match get_bucket ( ) {
259- Ok ( v) => v,
260- Err ( e) => {
261- error ! ( "Bucket creation failed in event: {}" , e) ;
262- continue ;
263- }
264- } ;
265258 match event. name {
266259 Some ( s) => {
267260 let file = format ! (
268261 "{}/{}" ,
269262 core_dir_command,
270263 s. to_str( ) . unwrap_or_default( )
271264 ) ;
272- let p = Path :: new ( & file) ;
273- process_file ( p, & bucket) . await
265+ tokio:: spawn ( process_file_or_retry ( PathBuf :: from ( file) , 0 ) ) ;
274266 }
275267 None => {
276268 continue ;
@@ -287,7 +279,30 @@ async fn main() -> Result<(), anyhow::Error> {
287279 Ok ( ( ) )
288280}
289281
290- async fn process_file ( zip_path : & Path , bucket : & Bucket ) {
282+ async fn process_file_or_retry ( file : PathBuf , iteration : usize ) {
283+ let bucket = match get_bucket ( ) {
284+ Ok ( v) => v,
285+ Err ( e) => {
286+ error ! ( "Bucket creation failed in event: {}" , e) ;
287+ return ;
288+ }
289+ } ;
290+
291+ match process_file ( & file, & bucket) . await {
292+ Ok ( ( ) ) => ( ) ,
293+ Err ( e) => {
294+ let backoff = Duration :: from_secs ( 60 ) . mul_f32 ( ( iteration as f32 + 1.0 ) . powf ( 1.5 ) ) ;
295+
296+ error ! ( "Core dump file processing failed: {e}. Retrying in {} s." , backoff. as_secs( ) ) ;
297+ tokio:: time:: sleep ( backoff) . await ;
298+
299+ Box :: pin ( process_file_or_retry ( file, iteration + 1 ) ) . await ;
300+ }
301+ }
302+
303+ }
304+
305+ async fn process_file ( zip_path : & Path , bucket : & Bucket ) -> Result < ( ) , String > {
291306 info ! ( "Uploading: {}" , zip_path. display( ) ) ;
292307
293308 let f = File :: open ( zip_path) . expect ( "no file found" ) ;
@@ -395,7 +410,7 @@ fn get_bucket() -> Result<Box<Bucket>, anyhow::Error> {
395410 Ok ( Bucket :: new ( & s3. bucket , s3. region , s3. credentials ) ?. with_path_style ( ) )
396411}
397412
398- async fn run_polling_agent ( ) {
413+ async fn run_polling_agent ( retry : bool ) {
399414 let core_location = env:: var ( "CORE_DIR" ) . unwrap_or_else ( |_| DEFAULT_CORE_DIR . to_string ( ) ) ;
400415 info ! ( "Executing Agent with location : {}" , core_location) ;
401416
@@ -418,7 +433,16 @@ async fn run_polling_agent() {
418433
419434 info ! ( "Dir Content {:?}" , paths) ;
420435 for zip_path in paths {
421- process_file ( & zip_path, & bucket) . await ;
436+ if retry {
437+ process_file_or_retry ( zip_path, 0 ) . await ;
438+ } else {
439+ match process_file ( & zip_path, & bucket) . await {
440+ Ok ( ( ) ) => ( ) ,
441+ Err ( e) => {
442+ error ! ( "File processing failed: {e}" ) ;
443+ }
444+ } ;
445+ }
422446 }
423447}
424448
0 commit comments