From b54628e84fdc64f3dfb216eacc635be3e3301b6b Mon Sep 17 00:00:00 2001 From: Leon Schuermann Date: Mon, 3 Nov 2025 18:46:24 -0500 Subject: [PATCH] Save incremental progress on issue / PR fetch failures This commit implements logic for saving incremental progress when encountering an error fetching issues or PRs (such as an unsupported even type). When encountering such an error, keep fetching other issues and store the failed issue / PR IDs in the state file. The current run exit with an error code of `EXIT_API_ERROR` (3). Subsequent runs will then retry fetching these failed resources. --- src/main.rs | 275 ++++++++++++++++++++++++++++++++++++++++----------- src/types.rs | 23 ++++- 2 files changed, 241 insertions(+), 57 deletions(-) diff --git a/src/main.rs b/src/main.rs index 2ad1e5f..c013d0b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -6,6 +6,7 @@ use log::{debug, error, info, warn}; use octocrab::models::{issues, pulls}; use octocrab::Page; use octocrab::{models, params}; +use std::borrow::Cow; use std::fs; use std::fs::File; use std::io::prelude::*; @@ -22,13 +23,14 @@ const STATE_FILE: &str = "state.json"; const MAX_PER_PAGE: u8 = 100; const START_PAGE: u32 = 1; // GitHub starts indexing at page 1 -const STATE_VERSION: u32 = 1; +const STATE_VERSION: u32 = 2; const EXIT_CREATING_DIRS: u8 = 1; const EXIT_CREATING_OCTOCRAB_INSTANCE: u8 = 2; const EXIT_API_ERROR: u8 = 3; const EXIT_WRITING: u8 = 3; const EXIT_NO_PAT: u8 = 4; +const EXIT_INTERNAL_ERROR: u8 = 5; mod types; @@ -267,6 +269,35 @@ async fn get_issue_page( } } +#[async_recursion] +async fn fetch_issue( + issue_number: u64, + owner: String, + repo: String, + attempt: u8, +) -> octocrab::Result { + match octocrab::instance() + .issues(&owner, &repo) + .get(issue_number) + .await + { + Ok(p) => Ok(p), + Err(e) => { + match e { + octocrab::Error::GitHub { .. } => { + if attempt > 0 { + return Err(e); + } + // retry once incase we hit the rate-limiting + wait_on_ratelimit().await; + fetch_issue(issue_number, owner, repo, attempt + 1).await + } + _ => Err(e), + } + } + } +} + async fn get_pull( number: u64, owner: String, @@ -304,11 +335,25 @@ async fn get_pull( } async fn get_issue( - issue: issues::Issue, + issue: Option, number: u64, owner: String, repo: String, ) -> Result { + let issue = if let Some(issue) = issue { + // Issue has already been fetched as part of the pagination: + issue + } else { + // Issue has not been fetched yet, need to get it: + match fetch_issue(number, owner.clone(), repo.clone(), 0).await { + Ok(issue) => issue, + Err(e) => { + error!("Error in get_issue_body() for issue={}: {}", number, e); + return Err(e); + } + } + }; + let events_future = get_timeline(number, owner.clone(), repo.clone()); let events = match events_future.await { @@ -324,12 +369,17 @@ async fn get_issue( ))) } +struct FetchResult { + failed_issues: Vec, + failed_pulls: Vec, +} + async fn get_issues_and_pulls( sender: mpsc::Sender, - since: Option>, + last_backup_state: Option>, owner: String, repo: String, -) -> Result<(), octocrab::Error> { +) -> Result { let mut loaded_issues: usize = 0; let mut loaded_pulls: usize = 0; let mut failed_issues: Vec = Vec::new(); @@ -339,7 +389,15 @@ async fn get_issues_and_pulls( owner, repo ); for page_num in START_PAGE..u32::MAX { - let page = match get_issue_page(page_num, since, owner.clone(), repo.clone(), 0).await { + let page = match get_issue_page( + page_num, + last_backup_state.as_ref().map(|s| s.last_backup), + owner.clone(), + repo.clone(), + 0, + ) + .await + { Ok(page) => page, Err(e) => { error!( @@ -350,27 +408,59 @@ async fn get_issues_and_pulls( } }; - for entry in page.items { - if entry.pull_request.is_none() { - match get_issue(entry.clone(), entry.number, owner.clone(), repo.clone()).await { - Ok(issue) => { - sender.send(issue).await.unwrap(); - loaded_issues += 1; - } - Err(e) => { - error!("Could not get issue #{}: {}", entry.number, e); - failed_issues.push(entry.number); - } + enum EntryType { + Issue(u64, Option), + Pr(u64), + } + + for entry in page + .items + .into_iter() + .map(|entry| { + if entry.pull_request.is_none() { + EntryType::Issue(entry.number, Some(entry)) + } else { + EntryType::Pr(entry.number) } - } else { - match get_pull(entry.number, owner.clone(), repo.clone()).await { - Ok(pull) => { - sender.send(pull).await.unwrap(); - loaded_pulls += 1; + }) + .chain( + last_backup_state + .as_ref() + .map_or(&[][..], |s| &s.failed_issues) + .iter() + .map(|issue_number| EntryType::Issue(*issue_number, None)), + ) + .chain( + last_backup_state + .as_ref() + .map_or(&[][..], |s| &s.failed_pulls) + .iter() + .map(|pr_number| EntryType::Pr(*pr_number)), + ) + { + match entry { + EntryType::Issue(issue_number, issue_opt) => { + match get_issue(issue_opt, issue_number, owner.clone(), repo.clone()).await { + Ok(issue) => { + sender.send(issue).await.unwrap(); + loaded_issues += 1; + } + Err(e) => { + error!("Could not get issue #{}: {}", issue_number, e); + failed_issues.push(issue_number); + } } - Err(e) => { - error!("Could not get pull-request #{}: {}", entry.number, e); - failed_pulls.push(entry.number); + } + EntryType::Pr(pr_number) => { + match get_pull(pr_number, owner.clone(), repo.clone()).await { + Ok(pull) => { + sender.send(pull).await.unwrap(); + loaded_pulls += 1; + } + Err(e) => { + error!("Could not get pull-request #{}: {}", pr_number, e); + failed_pulls.push(pr_number); + } } } } @@ -384,22 +474,11 @@ async fn get_issues_and_pulls( "Loaded {} issues and {} pulls from {}:{}", loaded_issues, loaded_pulls, owner, repo ); - if !failed_issues.is_empty() { - warn!( - "The following {} issues failed to load: {:?}", - failed_issues.len(), - failed_issues - ); - } - if !failed_issues.is_empty() { - warn!( - "The following {} pulls failed to load: {:?}", - failed_pulls.len(), - failed_pulls - ); - } - Ok(()) + Ok(FetchResult { + failed_issues, + failed_pulls, + }) } fn write(x: EntryWithMetadata, destination: PathBuf) -> Result<(), WriteError> { @@ -424,11 +503,15 @@ fn write(x: EntryWithMetadata, destination: PathBuf) -> Result<(), WriteError> { fn write_backup_state( start_time: DateTime, + failed_issues: &[u64], + failed_pulls: &[u64], mut destination: PathBuf, ) -> Result<(), WriteError> { let state = BackupState { version: STATE_VERSION, last_backup: start_time, + failed_issues: Cow::Borrowed(failed_issues), + failed_pulls: Cow::Borrowed(failed_pulls), }; destination.push(STATE_FILE); let json = serde_json::to_string_pretty(&state)?; @@ -438,7 +521,7 @@ fn write_backup_state( Ok(()) } -fn get_last_backup_time(destination: PathBuf) -> Option> { +fn load_backup_state(destination: PathBuf) -> Option> { let mut path = destination; path.push(STATE_FILE); info!("Trying to read {} file", path.display()); @@ -447,12 +530,21 @@ fn get_last_backup_time(destination: PathBuf) -> Option> { info!("Trying deserialize {} file", path.display()); match serde_json::from_str::(&contents) { Ok(state) => match state.version { - STATE_VERSION => { + // We can load both `STATE_VERSION` (2) and version + // 1. Version 2 simply adds `failed_issues` and + // `failed_pulls` fields, which we can default-populate. + STATE_VERSION | 1 => { info!( "Doing an incremental GitHub backup starting from {}.", state.last_backup ); - Some(state.last_backup) + if !state.failed_issues.is_empty() { + info!("Retrying to fetch failed issues: {:?}", state.failed_issues,); + } + if !state.failed_pulls.is_empty() { + info!("Retrying to fetch failed PRs: {:?}", state.failed_pulls,); + } + Some(state) } _ => { warn!("BackupState version {} is unknown.", state.version); @@ -506,6 +598,34 @@ fn personal_access_token(args: Args) -> Option { None } +fn print_failed_issues_pulls_warning(failed_issues: &[u64], failed_pulls: &[u64]) { + warn!( + "Failed to fetch {failed_issues_label}{failed_issues_list}\ + {conjunction}{failed_pulls_label}{failed_pulls_list}", + failed_issues_label = if !failed_issues.is_empty() { + "issues " + } else { + "" + }, + failed_issues_list = failed_issues + .iter() + .map(|pull_id| pull_id.to_string()) + .collect::>() + .join(", "), + conjunction = if !failed_issues.is_empty() && !failed_pulls.is_empty() { + " and " + } else { + "" + }, + failed_pulls_label = if !failed_pulls.is_empty() { "PRs " } else { "" }, + failed_pulls_list = failed_pulls + .into_iter() + .map(|pull_id| pull_id.to_string()) + .collect::>() + .join(", "), + ); +} + #[tokio::main] async fn main() -> ExitCode { env_logger::Builder::from_env(Env::default().default_filter_or("info")).init(); @@ -551,7 +671,7 @@ async fn main() -> ExitCode { } let start_time = chrono::Utc::now(); - let last_backup_time: Option> = get_last_backup_time(args.destination.clone()); + let last_backup_state: Option = load_backup_state(args.destination.clone()); let instance = match octocrab::OctocrabBuilder::default() .personal_token(pat) @@ -573,10 +693,7 @@ async fn main() -> ExitCode { let (sender, mut receiver) = mpsc::channel(100); let task = task::spawn(async move { - if let Err(e) = get_issues_and_pulls(sender, last_backup_time, args.owner, args.repo).await - { - panic!("Error loading issues and pulls: {}", e); - } + get_issues_and_pulls(sender, last_backup_state, args.owner, args.repo).await }); let mut written_anything = false; @@ -594,22 +711,68 @@ async fn main() -> ExitCode { } } - if task.await.is_ok() { - // only write state file if we actually wrote anything - if written_anything { - if let Err(e) = write_backup_state(start_time, args.destination.clone()) { + match (task.await, written_anything) { + // There was an error preventing us from loading any issues or + // PRs, exit with `API_ERROR`: + (Ok(Err(e)), _) => { + error!("Error loading issues and pulls: {}", e); + ExitCode::from(EXIT_API_ERROR) + } + + // Some state was written: + ( + Ok(Ok(FetchResult { + failed_issues, + failed_pulls, + })), + true, + ) => { + if let Err(e) = write_backup_state( + start_time, + &failed_issues, + &failed_pulls, + args.destination.clone(), + ) { error!( "Failed to write {} to {}: {}", STATE_FILE, args.destination.clone().display(), e ); - return ExitCode::from(EXIT_WRITING); + + ExitCode::from(EXIT_WRITING) + } else if !failed_issues.is_empty() || !failed_pulls.is_empty() { + // There were errors fetching at least some issues or + // PRs, exit with `API_ERROR`: + print_failed_issues_pulls_warning(&failed_issues, &failed_pulls); + ExitCode::from(EXIT_API_ERROR) + } else { + ExitCode::SUCCESS + } + } + + // No updated issues or PRs were written: + ( + Ok(Ok(FetchResult { + failed_issues, + failed_pulls, + })), + false, + ) => { + if !failed_issues.is_empty() || !failed_pulls.is_empty() { + // There were errors fetching at least some issues or + // PRs, exit with `API_ERROR`: + print_failed_issues_pulls_warning(&failed_issues, &failed_pulls); + ExitCode::from(EXIT_API_ERROR) + } else { + info!("No updated issues or pull requests to save."); + ExitCode::SUCCESS } } - } else { - return ExitCode::from(EXIT_API_ERROR); - } - ExitCode::SUCCESS + (Err(join_error), _) => { + error!("Failed to join task: {:?}", join_error); + ExitCode::from(EXIT_INTERNAL_ERROR) + } + } } diff --git a/src/types.rs b/src/types.rs index 43843db..17ff0f7 100644 --- a/src/types.rs +++ b/src/types.rs @@ -2,6 +2,7 @@ use chrono::{DateTime, Utc}; use clap::Parser; use octocrab::models::{issues, pulls, timelines}; use serde::{Deserialize, Serialize}; +use std::borrow::Cow; use std::error; use std::fmt; use std::io; @@ -114,9 +115,29 @@ impl PullWithMetadata { } #[derive(Serialize, Deserialize, Debug)] -pub struct BackupState { +pub struct BackupState<'a> { /// Version of the BackupState pub version: u32, /// UTC Unix timestamp when the last backup was completed. pub last_backup: DateTime, + /// Failed issues on last run, to be retried. + #[serde(default = "failed_issues_default")] + pub failed_issues: Cow<'a, [u64]>, + /// Failed pull requests on last run, to be retried. + // Serde default value provided for compatibility / upgrade path + // from state version 1. + #[serde(default = "failed_pulls_default")] + pub failed_pulls: Cow<'a, [u64]>, +} + +// Serde default value generator provided for compatibility / upgrade +// path from state version 1. +fn failed_issues_default() -> Cow<'static, [u64]> { + Cow::Borrowed(&[]) +} + +// Serde default value generator provided for compatibility / upgrade +// path from state version 1. +fn failed_pulls_default() -> Cow<'static, [u64]> { + Cow::Borrowed(&[]) }