diff --git a/src/main.rs b/src/main.rs index 2ad1e5f..c013d0b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -6,6 +6,7 @@ use log::{debug, error, info, warn}; use octocrab::models::{issues, pulls}; use octocrab::Page; use octocrab::{models, params}; +use std::borrow::Cow; use std::fs; use std::fs::File; use std::io::prelude::*; @@ -22,13 +23,14 @@ const STATE_FILE: &str = "state.json"; const MAX_PER_PAGE: u8 = 100; const START_PAGE: u32 = 1; // GitHub starts indexing at page 1 -const STATE_VERSION: u32 = 1; +const STATE_VERSION: u32 = 2; const EXIT_CREATING_DIRS: u8 = 1; const EXIT_CREATING_OCTOCRAB_INSTANCE: u8 = 2; const EXIT_API_ERROR: u8 = 3; const EXIT_WRITING: u8 = 3; const EXIT_NO_PAT: u8 = 4; +const EXIT_INTERNAL_ERROR: u8 = 5; mod types; @@ -267,6 +269,35 @@ async fn get_issue_page( } } +#[async_recursion] +async fn fetch_issue( + issue_number: u64, + owner: String, + repo: String, + attempt: u8, +) -> octocrab::Result { + match octocrab::instance() + .issues(&owner, &repo) + .get(issue_number) + .await + { + Ok(p) => Ok(p), + Err(e) => { + match e { + octocrab::Error::GitHub { .. } => { + if attempt > 0 { + return Err(e); + } + // retry once incase we hit the rate-limiting + wait_on_ratelimit().await; + fetch_issue(issue_number, owner, repo, attempt + 1).await + } + _ => Err(e), + } + } + } +} + async fn get_pull( number: u64, owner: String, @@ -304,11 +335,25 @@ async fn get_pull( } async fn get_issue( - issue: issues::Issue, + issue: Option, number: u64, owner: String, repo: String, ) -> Result { + let issue = if let Some(issue) = issue { + // Issue has already been fetched as part of the pagination: + issue + } else { + // Issue has not been fetched yet, need to get it: + match fetch_issue(number, owner.clone(), repo.clone(), 0).await { + Ok(issue) => issue, + Err(e) => { + error!("Error in get_issue_body() for issue={}: {}", number, e); + return Err(e); + } + } + }; + let events_future = get_timeline(number, owner.clone(), repo.clone()); let events = match events_future.await { @@ -324,12 +369,17 @@ async fn get_issue( ))) } +struct FetchResult { + failed_issues: Vec, + failed_pulls: Vec, +} + async fn get_issues_and_pulls( sender: mpsc::Sender, - since: Option>, + last_backup_state: Option>, owner: String, repo: String, -) -> Result<(), octocrab::Error> { +) -> Result { let mut loaded_issues: usize = 0; let mut loaded_pulls: usize = 0; let mut failed_issues: Vec = Vec::new(); @@ -339,7 +389,15 @@ async fn get_issues_and_pulls( owner, repo ); for page_num in START_PAGE..u32::MAX { - let page = match get_issue_page(page_num, since, owner.clone(), repo.clone(), 0).await { + let page = match get_issue_page( + page_num, + last_backup_state.as_ref().map(|s| s.last_backup), + owner.clone(), + repo.clone(), + 0, + ) + .await + { Ok(page) => page, Err(e) => { error!( @@ -350,27 +408,59 @@ async fn get_issues_and_pulls( } }; - for entry in page.items { - if entry.pull_request.is_none() { - match get_issue(entry.clone(), entry.number, owner.clone(), repo.clone()).await { - Ok(issue) => { - sender.send(issue).await.unwrap(); - loaded_issues += 1; - } - Err(e) => { - error!("Could not get issue #{}: {}", entry.number, e); - failed_issues.push(entry.number); - } + enum EntryType { + Issue(u64, Option), + Pr(u64), + } + + for entry in page + .items + .into_iter() + .map(|entry| { + if entry.pull_request.is_none() { + EntryType::Issue(entry.number, Some(entry)) + } else { + EntryType::Pr(entry.number) } - } else { - match get_pull(entry.number, owner.clone(), repo.clone()).await { - Ok(pull) => { - sender.send(pull).await.unwrap(); - loaded_pulls += 1; + }) + .chain( + last_backup_state + .as_ref() + .map_or(&[][..], |s| &s.failed_issues) + .iter() + .map(|issue_number| EntryType::Issue(*issue_number, None)), + ) + .chain( + last_backup_state + .as_ref() + .map_or(&[][..], |s| &s.failed_pulls) + .iter() + .map(|pr_number| EntryType::Pr(*pr_number)), + ) + { + match entry { + EntryType::Issue(issue_number, issue_opt) => { + match get_issue(issue_opt, issue_number, owner.clone(), repo.clone()).await { + Ok(issue) => { + sender.send(issue).await.unwrap(); + loaded_issues += 1; + } + Err(e) => { + error!("Could not get issue #{}: {}", issue_number, e); + failed_issues.push(issue_number); + } } - Err(e) => { - error!("Could not get pull-request #{}: {}", entry.number, e); - failed_pulls.push(entry.number); + } + EntryType::Pr(pr_number) => { + match get_pull(pr_number, owner.clone(), repo.clone()).await { + Ok(pull) => { + sender.send(pull).await.unwrap(); + loaded_pulls += 1; + } + Err(e) => { + error!("Could not get pull-request #{}: {}", pr_number, e); + failed_pulls.push(pr_number); + } } } } @@ -384,22 +474,11 @@ async fn get_issues_and_pulls( "Loaded {} issues and {} pulls from {}:{}", loaded_issues, loaded_pulls, owner, repo ); - if !failed_issues.is_empty() { - warn!( - "The following {} issues failed to load: {:?}", - failed_issues.len(), - failed_issues - ); - } - if !failed_issues.is_empty() { - warn!( - "The following {} pulls failed to load: {:?}", - failed_pulls.len(), - failed_pulls - ); - } - Ok(()) + Ok(FetchResult { + failed_issues, + failed_pulls, + }) } fn write(x: EntryWithMetadata, destination: PathBuf) -> Result<(), WriteError> { @@ -424,11 +503,15 @@ fn write(x: EntryWithMetadata, destination: PathBuf) -> Result<(), WriteError> { fn write_backup_state( start_time: DateTime, + failed_issues: &[u64], + failed_pulls: &[u64], mut destination: PathBuf, ) -> Result<(), WriteError> { let state = BackupState { version: STATE_VERSION, last_backup: start_time, + failed_issues: Cow::Borrowed(failed_issues), + failed_pulls: Cow::Borrowed(failed_pulls), }; destination.push(STATE_FILE); let json = serde_json::to_string_pretty(&state)?; @@ -438,7 +521,7 @@ fn write_backup_state( Ok(()) } -fn get_last_backup_time(destination: PathBuf) -> Option> { +fn load_backup_state(destination: PathBuf) -> Option> { let mut path = destination; path.push(STATE_FILE); info!("Trying to read {} file", path.display()); @@ -447,12 +530,21 @@ fn get_last_backup_time(destination: PathBuf) -> Option> { info!("Trying deserialize {} file", path.display()); match serde_json::from_str::(&contents) { Ok(state) => match state.version { - STATE_VERSION => { + // We can load both `STATE_VERSION` (2) and version + // 1. Version 2 simply adds `failed_issues` and + // `failed_pulls` fields, which we can default-populate. + STATE_VERSION | 1 => { info!( "Doing an incremental GitHub backup starting from {}.", state.last_backup ); - Some(state.last_backup) + if !state.failed_issues.is_empty() { + info!("Retrying to fetch failed issues: {:?}", state.failed_issues,); + } + if !state.failed_pulls.is_empty() { + info!("Retrying to fetch failed PRs: {:?}", state.failed_pulls,); + } + Some(state) } _ => { warn!("BackupState version {} is unknown.", state.version); @@ -506,6 +598,34 @@ fn personal_access_token(args: Args) -> Option { None } +fn print_failed_issues_pulls_warning(failed_issues: &[u64], failed_pulls: &[u64]) { + warn!( + "Failed to fetch {failed_issues_label}{failed_issues_list}\ + {conjunction}{failed_pulls_label}{failed_pulls_list}", + failed_issues_label = if !failed_issues.is_empty() { + "issues " + } else { + "" + }, + failed_issues_list = failed_issues + .iter() + .map(|pull_id| pull_id.to_string()) + .collect::>() + .join(", "), + conjunction = if !failed_issues.is_empty() && !failed_pulls.is_empty() { + " and " + } else { + "" + }, + failed_pulls_label = if !failed_pulls.is_empty() { "PRs " } else { "" }, + failed_pulls_list = failed_pulls + .into_iter() + .map(|pull_id| pull_id.to_string()) + .collect::>() + .join(", "), + ); +} + #[tokio::main] async fn main() -> ExitCode { env_logger::Builder::from_env(Env::default().default_filter_or("info")).init(); @@ -551,7 +671,7 @@ async fn main() -> ExitCode { } let start_time = chrono::Utc::now(); - let last_backup_time: Option> = get_last_backup_time(args.destination.clone()); + let last_backup_state: Option = load_backup_state(args.destination.clone()); let instance = match octocrab::OctocrabBuilder::default() .personal_token(pat) @@ -573,10 +693,7 @@ async fn main() -> ExitCode { let (sender, mut receiver) = mpsc::channel(100); let task = task::spawn(async move { - if let Err(e) = get_issues_and_pulls(sender, last_backup_time, args.owner, args.repo).await - { - panic!("Error loading issues and pulls: {}", e); - } + get_issues_and_pulls(sender, last_backup_state, args.owner, args.repo).await }); let mut written_anything = false; @@ -594,22 +711,68 @@ async fn main() -> ExitCode { } } - if task.await.is_ok() { - // only write state file if we actually wrote anything - if written_anything { - if let Err(e) = write_backup_state(start_time, args.destination.clone()) { + match (task.await, written_anything) { + // There was an error preventing us from loading any issues or + // PRs, exit with `API_ERROR`: + (Ok(Err(e)), _) => { + error!("Error loading issues and pulls: {}", e); + ExitCode::from(EXIT_API_ERROR) + } + + // Some state was written: + ( + Ok(Ok(FetchResult { + failed_issues, + failed_pulls, + })), + true, + ) => { + if let Err(e) = write_backup_state( + start_time, + &failed_issues, + &failed_pulls, + args.destination.clone(), + ) { error!( "Failed to write {} to {}: {}", STATE_FILE, args.destination.clone().display(), e ); - return ExitCode::from(EXIT_WRITING); + + ExitCode::from(EXIT_WRITING) + } else if !failed_issues.is_empty() || !failed_pulls.is_empty() { + // There were errors fetching at least some issues or + // PRs, exit with `API_ERROR`: + print_failed_issues_pulls_warning(&failed_issues, &failed_pulls); + ExitCode::from(EXIT_API_ERROR) + } else { + ExitCode::SUCCESS + } + } + + // No updated issues or PRs were written: + ( + Ok(Ok(FetchResult { + failed_issues, + failed_pulls, + })), + false, + ) => { + if !failed_issues.is_empty() || !failed_pulls.is_empty() { + // There were errors fetching at least some issues or + // PRs, exit with `API_ERROR`: + print_failed_issues_pulls_warning(&failed_issues, &failed_pulls); + ExitCode::from(EXIT_API_ERROR) + } else { + info!("No updated issues or pull requests to save."); + ExitCode::SUCCESS } } - } else { - return ExitCode::from(EXIT_API_ERROR); - } - ExitCode::SUCCESS + (Err(join_error), _) => { + error!("Failed to join task: {:?}", join_error); + ExitCode::from(EXIT_INTERNAL_ERROR) + } + } } diff --git a/src/types.rs b/src/types.rs index 43843db..17ff0f7 100644 --- a/src/types.rs +++ b/src/types.rs @@ -2,6 +2,7 @@ use chrono::{DateTime, Utc}; use clap::Parser; use octocrab::models::{issues, pulls, timelines}; use serde::{Deserialize, Serialize}; +use std::borrow::Cow; use std::error; use std::fmt; use std::io; @@ -114,9 +115,29 @@ impl PullWithMetadata { } #[derive(Serialize, Deserialize, Debug)] -pub struct BackupState { +pub struct BackupState<'a> { /// Version of the BackupState pub version: u32, /// UTC Unix timestamp when the last backup was completed. pub last_backup: DateTime, + /// Failed issues on last run, to be retried. + #[serde(default = "failed_issues_default")] + pub failed_issues: Cow<'a, [u64]>, + /// Failed pull requests on last run, to be retried. + // Serde default value provided for compatibility / upgrade path + // from state version 1. + #[serde(default = "failed_pulls_default")] + pub failed_pulls: Cow<'a, [u64]>, +} + +// Serde default value generator provided for compatibility / upgrade +// path from state version 1. +fn failed_issues_default() -> Cow<'static, [u64]> { + Cow::Borrowed(&[]) +} + +// Serde default value generator provided for compatibility / upgrade +// path from state version 1. +fn failed_pulls_default() -> Cow<'static, [u64]> { + Cow::Borrowed(&[]) }