Skip to content

Commit 2f8ff95

Browse files
committed
Save incremental progress on issue / PR fetch failures
This commit implements logic for saving incremental progress when encountering an error fetching issues or PRs (such as an unsupported even type). When encountering such an error, keep fetching other issues and store the failed issue / PR IDs in the state file. The current run exit with an error code of `EXIT_API_ERROR` (3). Subsequent runs will then retry fetching these failed resources.
1 parent a6a0947 commit 2f8ff95

File tree

2 files changed

+219
-43
lines changed

2 files changed

+219
-43
lines changed

src/main.rs

Lines changed: 197 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use log::{debug, error, info, warn};
66
use octocrab::models::{issues, pulls};
77
use octocrab::Page;
88
use octocrab::{models, params};
9+
use std::borrow::Cow;
910
use std::fs;
1011
use std::fs::File;
1112
use std::io::prelude::*;
@@ -22,13 +23,14 @@ const STATE_FILE: &str = "state.json";
2223

2324
const MAX_PER_PAGE: u8 = 100;
2425
const START_PAGE: u32 = 1; // GitHub starts indexing at page 1
25-
const STATE_VERSION: u32 = 1;
26+
const STATE_VERSION: u32 = 2;
2627

2728
const EXIT_CREATING_DIRS: u8 = 1;
2829
const EXIT_CREATING_OCTOCRAB_INSTANCE: u8 = 2;
2930
const EXIT_API_ERROR: u8 = 3;
3031
const EXIT_WRITING: u8 = 3;
3132
const EXIT_NO_PAT: u8 = 4;
33+
const EXIT_INTERNAL_ERROR: u8 = 5;
3234

3335
mod types;
3436

@@ -267,6 +269,35 @@ async fn get_issue_page(
267269
}
268270
}
269271

272+
#[async_recursion]
273+
async fn fetch_issue(
274+
issue_number: u64,
275+
owner: String,
276+
repo: String,
277+
attempt: u8,
278+
) -> octocrab::Result<octocrab::models::issues::Issue> {
279+
match octocrab::instance()
280+
.issues(&owner, &repo)
281+
.get(issue_number)
282+
.await
283+
{
284+
Ok(p) => Ok(p),
285+
Err(e) => {
286+
match e {
287+
octocrab::Error::GitHub { .. } => {
288+
if attempt > 0 {
289+
return Err(e);
290+
}
291+
// retry once incase we hit the rate-limiting
292+
wait_on_ratelimit().await;
293+
fetch_issue(issue_number, owner, repo, attempt + 1).await
294+
}
295+
_ => Err(e),
296+
}
297+
}
298+
}
299+
}
300+
270301
async fn get_pull(
271302
number: u64,
272303
owner: String,
@@ -304,11 +335,25 @@ async fn get_pull(
304335
}
305336

306337
async fn get_issue(
307-
issue: issues::Issue,
338+
issue: Option<issues::Issue>,
308339
number: u64,
309340
owner: String,
310341
repo: String,
311342
) -> Result<EntryWithMetadata, octocrab::Error> {
343+
let issue = if let Some(issue) = issue {
344+
// Issue has already been fetched as part of the pagination:
345+
issue
346+
} else {
347+
// Issue has not been fetched yet, need to get it:
348+
match fetch_issue(number, owner.clone(), repo.clone(), 0).await {
349+
Ok(issue) => issue,
350+
Err(e) => {
351+
error!("Error in get_issue_body() for issue={}: {}", number, e);
352+
return Err(e);
353+
}
354+
}
355+
};
356+
312357
let events_future = get_timeline(number, owner.clone(), repo.clone());
313358

314359
let events = match events_future.await {
@@ -324,12 +369,17 @@ async fn get_issue(
324369
)))
325370
}
326371

372+
struct FetchResult {
373+
failed_issues: Vec<u64>,
374+
failed_pulls: Vec<u64>,
375+
}
376+
327377
async fn get_issues_and_pulls(
328378
sender: mpsc::Sender<EntryWithMetadata>,
329-
since: Option<DateTime<Utc>>,
379+
last_backup_state: Option<BackupState<'_>>,
330380
owner: String,
331381
repo: String,
332-
) -> Result<(), octocrab::Error> {
382+
) -> Result<FetchResult, octocrab::Error> {
333383
let mut loaded_issues: usize = 0;
334384
let mut loaded_pulls: usize = 0;
335385
let mut failed_issues: Vec<u64> = Vec::new();
@@ -339,7 +389,15 @@ async fn get_issues_and_pulls(
339389
owner, repo
340390
);
341391
for page_num in START_PAGE..u32::MAX {
342-
let page = match get_issue_page(page_num, since, owner.clone(), repo.clone(), 0).await {
392+
let page = match get_issue_page(
393+
page_num,
394+
last_backup_state.as_ref().map(|s| s.last_backup),
395+
owner.clone(),
396+
repo.clone(),
397+
0,
398+
)
399+
.await
400+
{
343401
Ok(page) => page,
344402
Err(e) => {
345403
error!(
@@ -350,27 +408,59 @@ async fn get_issues_and_pulls(
350408
}
351409
};
352410

353-
for entry in page.items {
354-
if entry.pull_request.is_none() {
355-
match get_issue(entry.clone(), entry.number, owner.clone(), repo.clone()).await {
356-
Ok(issue) => {
357-
sender.send(issue).await.unwrap();
358-
loaded_issues += 1;
359-
}
360-
Err(e) => {
361-
error!("Could not get issue #{}: {}", entry.number, e);
362-
failed_issues.push(entry.number);
363-
}
411+
enum EntryType {
412+
Issue(u64, Option<issues::Issue>),
413+
Pr(u64),
414+
}
415+
416+
for entry in page
417+
.items
418+
.into_iter()
419+
.map(|entry| {
420+
if entry.pull_request.is_none() {
421+
EntryType::Issue(entry.number, Some(entry))
422+
} else {
423+
EntryType::Pr(entry.number)
364424
}
365-
} else {
366-
match get_pull(entry.number, owner.clone(), repo.clone()).await {
367-
Ok(pull) => {
368-
sender.send(pull).await.unwrap();
369-
loaded_pulls += 1;
425+
})
426+
.chain(
427+
last_backup_state
428+
.as_ref()
429+
.map_or(&[][..], |s| &s.failed_issues)
430+
.iter()
431+
.map(|issue_number| EntryType::Issue(*issue_number, None)),
432+
)
433+
.chain(
434+
last_backup_state
435+
.as_ref()
436+
.map_or(&[][..], |s| &s.failed_pulls)
437+
.iter()
438+
.map(|pr_number| EntryType::Pr(*pr_number)),
439+
)
440+
{
441+
match entry {
442+
EntryType::Issue(issue_number, issue_opt) => {
443+
match get_issue(issue_opt, issue_number, owner.clone(), repo.clone()).await {
444+
Ok(issue) => {
445+
sender.send(issue).await.unwrap();
446+
loaded_issues += 1;
447+
}
448+
Err(e) => {
449+
error!("Could not get issue #{}: {}", issue_number, e);
450+
failed_issues.push(issue_number);
451+
}
370452
}
371-
Err(e) => {
372-
error!("Could not get pull-request #{}: {}", entry.number, e);
373-
failed_pulls.push(entry.number);
453+
}
454+
EntryType::Pr(pr_number) => {
455+
match get_pull(pr_number, owner.clone(), repo.clone()).await {
456+
Ok(pull) => {
457+
sender.send(pull).await.unwrap();
458+
loaded_pulls += 1;
459+
}
460+
Err(e) => {
461+
error!("Could not get pull-request #{}: {}", pr_number, e);
462+
failed_pulls.push(pr_number);
463+
}
374464
}
375465
}
376466
}
@@ -399,7 +489,10 @@ async fn get_issues_and_pulls(
399489
);
400490
}
401491

402-
Ok(())
492+
Ok(FetchResult {
493+
failed_issues,
494+
failed_pulls,
495+
})
403496
}
404497

405498
fn write(x: EntryWithMetadata, destination: PathBuf) -> Result<(), WriteError> {
@@ -424,11 +517,15 @@ fn write(x: EntryWithMetadata, destination: PathBuf) -> Result<(), WriteError> {
424517

425518
fn write_backup_state(
426519
start_time: DateTime<Utc>,
520+
failed_issues: &[u64],
521+
failed_pulls: &[u64],
427522
mut destination: PathBuf,
428523
) -> Result<(), WriteError> {
429524
let state = BackupState {
430525
version: STATE_VERSION,
431526
last_backup: start_time,
527+
failed_issues: Cow::Borrowed(failed_issues),
528+
failed_pulls: Cow::Borrowed(failed_pulls),
432529
};
433530
destination.push(STATE_FILE);
434531
let json = serde_json::to_string_pretty(&state)?;
@@ -438,7 +535,7 @@ fn write_backup_state(
438535
Ok(())
439536
}
440537

441-
fn get_last_backup_time(destination: PathBuf) -> Option<DateTime<Utc>> {
538+
fn load_backup_state(destination: PathBuf) -> Option<BackupState<'static>> {
442539
let mut path = destination;
443540
path.push(STATE_FILE);
444541
info!("Trying to read {} file", path.display());
@@ -447,12 +544,21 @@ fn get_last_backup_time(destination: PathBuf) -> Option<DateTime<Utc>> {
447544
info!("Trying deserialize {} file", path.display());
448545
match serde_json::from_str::<BackupState>(&contents) {
449546
Ok(state) => match state.version {
450-
STATE_VERSION => {
547+
// We can load both `STATE_VERSION` (2) and version
548+
// 1. Version 2 simply adds `failed_issues` and
549+
// `failed_pulls` fields, which we can default-populate.
550+
STATE_VERSION | 1 => {
451551
info!(
452552
"Doing an incremental GitHub backup starting from {}.",
453553
state.last_backup
454554
);
455-
Some(state.last_backup)
555+
if !state.failed_issues.is_empty() {
556+
info!("Retrying to fetch failed issues: {:?}", state.failed_issues,);
557+
}
558+
if !state.failed_pulls.is_empty() {
559+
info!("Retrying to fetch failed PRs: {:?}", state.failed_pulls,);
560+
}
561+
Some(state)
456562
}
457563
_ => {
458564
warn!("BackupState version {} is unknown.", state.version);
@@ -551,7 +657,7 @@ async fn main() -> ExitCode {
551657
}
552658

553659
let start_time = chrono::Utc::now();
554-
let last_backup_time: Option<DateTime<Utc>> = get_last_backup_time(args.destination.clone());
660+
let last_backup_state: Option<BackupState> = load_backup_state(args.destination.clone());
555661

556662
let instance = match octocrab::OctocrabBuilder::default()
557663
.personal_token(pat)
@@ -573,10 +679,7 @@ async fn main() -> ExitCode {
573679
let (sender, mut receiver) = mpsc::channel(100);
574680

575681
let task = task::spawn(async move {
576-
if let Err(e) = get_issues_and_pulls(sender, last_backup_time, args.owner, args.repo).await
577-
{
578-
panic!("Error loading issues and pulls: {}", e);
579-
}
682+
get_issues_and_pulls(sender, last_backup_state, args.owner, args.repo).await
580683
});
581684

582685
let mut written_anything = false;
@@ -594,22 +697,74 @@ async fn main() -> ExitCode {
594697
}
595698
}
596699

597-
if task.await.is_ok() {
598-
// only write state file if we actually wrote anything
599-
if written_anything {
600-
if let Err(e) = write_backup_state(start_time, args.destination.clone()) {
700+
match (task.await, written_anything) {
701+
// There was an error preventing us from loading any issues or
702+
// PRs, exit with `API_ERROR`:
703+
(Ok(Err(e)), _) => {
704+
error!("Error loading issues and pulls: {}", e);
705+
ExitCode::from(EXIT_API_ERROR)
706+
}
707+
708+
// Some state was written:
709+
(
710+
Ok(Ok(FetchResult {
711+
failed_issues,
712+
failed_pulls,
713+
})),
714+
true,
715+
) => {
716+
if let Err(e) = write_backup_state(
717+
start_time,
718+
&failed_issues,
719+
&failed_pulls,
720+
args.destination.clone(),
721+
) {
601722
error!(
602723
"Failed to write {} to {}: {}",
603724
STATE_FILE,
604725
args.destination.clone().display(),
605726
e
606727
);
607-
return ExitCode::from(EXIT_WRITING);
728+
729+
ExitCode::from(EXIT_WRITING)
730+
} else if !failed_issues.is_empty() || !failed_pulls.is_empty() {
731+
// There were errors fetching at least some issues or
732+
// PRs, exit with `API_ERROR`:
733+
warn!(
734+
"Failed to fetch the following issues and PRs: {:?}",
735+
failed_issues.iter().chain(failed_pulls.iter())
736+
);
737+
ExitCode::from(EXIT_API_ERROR)
738+
} else {
739+
ExitCode::SUCCESS
608740
}
609741
}
610-
} else {
611-
return ExitCode::from(EXIT_API_ERROR);
612-
}
613742

614-
ExitCode::SUCCESS
743+
// No updated issues or PRs were written:
744+
(
745+
Ok(Ok(FetchResult {
746+
failed_issues,
747+
failed_pulls,
748+
})),
749+
false,
750+
) => {
751+
if !failed_issues.is_empty() || !failed_pulls.is_empty() {
752+
// There were errors fetching at least some issues or
753+
// PRs, exit with `API_ERROR`:
754+
warn!(
755+
"Failed to fetch the following issues and PRs: {:?}",
756+
failed_issues.iter().chain(failed_pulls.iter())
757+
);
758+
ExitCode::from(EXIT_API_ERROR)
759+
} else {
760+
info!("No updated issues or pull requests to save.");
761+
ExitCode::SUCCESS
762+
}
763+
}
764+
765+
(Err(join_error), _) => {
766+
error!("Failed to join task: {:?}", join_error);
767+
ExitCode::from(EXIT_INTERNAL_ERROR)
768+
}
769+
}
615770
}

0 commit comments

Comments
 (0)