Skip to content

Commit 2071457

Browse files
committed
Save incremental progress on issue / PR fetch failures
This commit implements logic for saving incremental progress when encountering an error fetching issues or PRs (such as an unsupported even type). When encountering such an error, keep fetching other issues and store the failed issue / PR IDs in the state file. Subsequent runs will then retry fetching these failed resources.
1 parent a6a0947 commit 2071457

File tree

2 files changed

+200
-43
lines changed

2 files changed

+200
-43
lines changed

src/main.rs

Lines changed: 178 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use log::{debug, error, info, warn};
66
use octocrab::models::{issues, pulls};
77
use octocrab::Page;
88
use octocrab::{models, params};
9+
use std::borrow::Cow;
910
use std::fs;
1011
use std::fs::File;
1112
use std::io::prelude::*;
@@ -22,13 +23,14 @@ const STATE_FILE: &str = "state.json";
2223

2324
const MAX_PER_PAGE: u8 = 100;
2425
const START_PAGE: u32 = 1; // GitHub starts indexing at page 1
25-
const STATE_VERSION: u32 = 1;
26+
const STATE_VERSION: u32 = 2;
2627

2728
const EXIT_CREATING_DIRS: u8 = 1;
2829
const EXIT_CREATING_OCTOCRAB_INSTANCE: u8 = 2;
2930
const EXIT_API_ERROR: u8 = 3;
3031
const EXIT_WRITING: u8 = 3;
3132
const EXIT_NO_PAT: u8 = 4;
33+
const EXIT_INTERNAL_ERROR: u8 = 5;
3234

3335
mod types;
3436

@@ -267,6 +269,35 @@ async fn get_issue_page(
267269
}
268270
}
269271

272+
#[async_recursion]
273+
async fn fetch_issue(
274+
issue_number: u64,
275+
owner: String,
276+
repo: String,
277+
attempt: u8,
278+
) -> octocrab::Result<octocrab::models::issues::Issue> {
279+
match octocrab::instance()
280+
.issues(&owner, &repo)
281+
.get(issue_number)
282+
.await
283+
{
284+
Ok(p) => Ok(p),
285+
Err(e) => {
286+
match e {
287+
octocrab::Error::GitHub { .. } => {
288+
if attempt > 0 {
289+
return Err(e);
290+
}
291+
// retry once incase we hit the rate-limiting
292+
wait_on_ratelimit().await;
293+
fetch_issue(issue_number, owner, repo, attempt + 1).await
294+
}
295+
_ => Err(e),
296+
}
297+
}
298+
}
299+
}
300+
270301
async fn get_pull(
271302
number: u64,
272303
owner: String,
@@ -304,11 +335,25 @@ async fn get_pull(
304335
}
305336

306337
async fn get_issue(
307-
issue: issues::Issue,
338+
issue: Option<issues::Issue>,
308339
number: u64,
309340
owner: String,
310341
repo: String,
311342
) -> Result<EntryWithMetadata, octocrab::Error> {
343+
let issue = if let Some(issue) = issue {
344+
// Issue has already been fetched as part of the pagination:
345+
issue
346+
} else {
347+
// Issue has not been fetched yet, need to get it:
348+
match fetch_issue(number, owner.clone(), repo.clone(), 0).await {
349+
Ok(issue) => issue,
350+
Err(e) => {
351+
error!("Error in get_issue_body() for issue={}: {}", number, e);
352+
return Err(e);
353+
}
354+
}
355+
};
356+
312357
let events_future = get_timeline(number, owner.clone(), repo.clone());
313358

314359
let events = match events_future.await {
@@ -326,10 +371,10 @@ async fn get_issue(
326371

327372
async fn get_issues_and_pulls(
328373
sender: mpsc::Sender<EntryWithMetadata>,
329-
since: Option<DateTime<Utc>>,
374+
last_backup_state: Option<BackupState<'_>>,
330375
owner: String,
331376
repo: String,
332-
) -> Result<(), octocrab::Error> {
377+
) -> Result<(Vec<u64>, Vec<u64>), octocrab::Error> {
333378
let mut loaded_issues: usize = 0;
334379
let mut loaded_pulls: usize = 0;
335380
let mut failed_issues: Vec<u64> = Vec::new();
@@ -339,7 +384,15 @@ async fn get_issues_and_pulls(
339384
owner, repo
340385
);
341386
for page_num in START_PAGE..u32::MAX {
342-
let page = match get_issue_page(page_num, since, owner.clone(), repo.clone(), 0).await {
387+
let page = match get_issue_page(
388+
page_num,
389+
last_backup_state.as_ref().map(|s| s.last_backup),
390+
owner.clone(),
391+
repo.clone(),
392+
0,
393+
)
394+
.await
395+
{
343396
Ok(page) => page,
344397
Err(e) => {
345398
error!(
@@ -350,27 +403,59 @@ async fn get_issues_and_pulls(
350403
}
351404
};
352405

353-
for entry in page.items {
354-
if entry.pull_request.is_none() {
355-
match get_issue(entry.clone(), entry.number, owner.clone(), repo.clone()).await {
356-
Ok(issue) => {
357-
sender.send(issue).await.unwrap();
358-
loaded_issues += 1;
359-
}
360-
Err(e) => {
361-
error!("Could not get issue #{}: {}", entry.number, e);
362-
failed_issues.push(entry.number);
363-
}
406+
enum EntryType {
407+
Issue(u64, Option<issues::Issue>),
408+
Pr(u64),
409+
}
410+
411+
for entry in page
412+
.items
413+
.into_iter()
414+
.map(|entry| {
415+
if entry.pull_request.is_none() {
416+
EntryType::Issue(entry.number, Some(entry))
417+
} else {
418+
EntryType::Pr(entry.number)
364419
}
365-
} else {
366-
match get_pull(entry.number, owner.clone(), repo.clone()).await {
367-
Ok(pull) => {
368-
sender.send(pull).await.unwrap();
369-
loaded_pulls += 1;
420+
})
421+
.chain(
422+
last_backup_state
423+
.as_ref()
424+
.map_or(&[][..], |s| &s.failed_issues)
425+
.iter()
426+
.map(|issue_number| EntryType::Issue(*issue_number, None)),
427+
)
428+
.chain(
429+
last_backup_state
430+
.as_ref()
431+
.map_or(&[][..], |s| &s.failed_pulls)
432+
.iter()
433+
.map(|pr_number| EntryType::Pr(*pr_number)),
434+
)
435+
{
436+
match entry {
437+
EntryType::Issue(issue_number, issue_opt) => {
438+
match get_issue(issue_opt, issue_number, owner.clone(), repo.clone()).await {
439+
Ok(issue) => {
440+
sender.send(issue).await.unwrap();
441+
loaded_issues += 1;
442+
}
443+
Err(e) => {
444+
error!("Could not get issue #{}: {}", issue_number, e);
445+
failed_issues.push(issue_number);
446+
}
370447
}
371-
Err(e) => {
372-
error!("Could not get pull-request #{}: {}", entry.number, e);
373-
failed_pulls.push(entry.number);
448+
}
449+
EntryType::Pr(pr_number) => {
450+
match get_pull(pr_number, owner.clone(), repo.clone()).await {
451+
Ok(pull) => {
452+
sender.send(pull).await.unwrap();
453+
loaded_pulls += 1;
454+
}
455+
Err(e) => {
456+
error!("Could not get pull-request #{}: {}", pr_number, e);
457+
failed_pulls.push(pr_number);
458+
}
374459
}
375460
}
376461
}
@@ -399,7 +484,8 @@ async fn get_issues_and_pulls(
399484
);
400485
}
401486

402-
Ok(())
487+
// TODO: this should be a named struct to avoid confusion
488+
Ok((failed_issues, failed_pulls))
403489
}
404490

405491
fn write(x: EntryWithMetadata, destination: PathBuf) -> Result<(), WriteError> {
@@ -424,11 +510,15 @@ fn write(x: EntryWithMetadata, destination: PathBuf) -> Result<(), WriteError> {
424510

425511
fn write_backup_state(
426512
start_time: DateTime<Utc>,
513+
failed_issues: &[u64],
514+
failed_pulls: &[u64],
427515
mut destination: PathBuf,
428516
) -> Result<(), WriteError> {
429517
let state = BackupState {
430518
version: STATE_VERSION,
431519
last_backup: start_time,
520+
failed_issues: Cow::Borrowed(failed_issues),
521+
failed_pulls: Cow::Borrowed(failed_pulls),
432522
};
433523
destination.push(STATE_FILE);
434524
let json = serde_json::to_string_pretty(&state)?;
@@ -438,7 +528,7 @@ fn write_backup_state(
438528
Ok(())
439529
}
440530

441-
fn get_last_backup_time(destination: PathBuf) -> Option<DateTime<Utc>> {
531+
fn load_backup_state(destination: PathBuf) -> Option<BackupState<'static>> {
442532
let mut path = destination;
443533
path.push(STATE_FILE);
444534
info!("Trying to read {} file", path.display());
@@ -447,12 +537,21 @@ fn get_last_backup_time(destination: PathBuf) -> Option<DateTime<Utc>> {
447537
info!("Trying deserialize {} file", path.display());
448538
match serde_json::from_str::<BackupState>(&contents) {
449539
Ok(state) => match state.version {
450-
STATE_VERSION => {
540+
// We can load both `STATE_VERSION` (2) and version
541+
// 1. Version 2 simply adds `failed_issues` and
542+
// `failed_pulls` fields, which we can default-populate.
543+
STATE_VERSION | 1 => {
451544
info!(
452545
"Doing an incremental GitHub backup starting from {}.",
453546
state.last_backup
454547
);
455-
Some(state.last_backup)
548+
if !state.failed_issues.is_empty() {
549+
info!("Retrying to fetch failed issues: {:?}", state.failed_issues,);
550+
}
551+
if !state.failed_pulls.is_empty() {
552+
info!("Retrying to fetch failed PRs: {:?}", state.failed_pulls,);
553+
}
554+
Some(state)
456555
}
457556
_ => {
458557
warn!("BackupState version {} is unknown.", state.version);
@@ -551,7 +650,7 @@ async fn main() -> ExitCode {
551650
}
552651

553652
let start_time = chrono::Utc::now();
554-
let last_backup_time: Option<DateTime<Utc>> = get_last_backup_time(args.destination.clone());
653+
let last_backup_state: Option<BackupState> = load_backup_state(args.destination.clone());
555654

556655
let instance = match octocrab::OctocrabBuilder::default()
557656
.personal_token(pat)
@@ -573,10 +672,7 @@ async fn main() -> ExitCode {
573672
let (sender, mut receiver) = mpsc::channel(100);
574673

575674
let task = task::spawn(async move {
576-
if let Err(e) = get_issues_and_pulls(sender, last_backup_time, args.owner, args.repo).await
577-
{
578-
panic!("Error loading issues and pulls: {}", e);
579-
}
675+
get_issues_and_pulls(sender, last_backup_state, args.owner, args.repo).await
580676
});
581677

582678
let mut written_anything = false;
@@ -594,22 +690,62 @@ async fn main() -> ExitCode {
594690
}
595691
}
596692

597-
if task.await.is_ok() {
598-
// only write state file if we actually wrote anything
599-
if written_anything {
600-
if let Err(e) = write_backup_state(start_time, args.destination.clone()) {
693+
match (task.await, written_anything) {
694+
// There was an error preventing us from loading any issues or
695+
// PRs, exit with `API_ERROR`:
696+
(Ok(Err(e)), _) => {
697+
error!("Error loading issues and pulls: {}", e);
698+
ExitCode::from(EXIT_API_ERROR)
699+
}
700+
701+
// Some state was written:
702+
(Ok(Ok((failed_issues, failed_pulls))), true) => {
703+
if let Err(e) = write_backup_state(
704+
start_time,
705+
&failed_issues,
706+
&failed_pulls,
707+
args.destination.clone(),
708+
) {
601709
error!(
602710
"Failed to write {} to {}: {}",
603711
STATE_FILE,
604712
args.destination.clone().display(),
605713
e
606714
);
607-
return ExitCode::from(EXIT_WRITING);
715+
716+
ExitCode::from(EXIT_WRITING)
717+
} else if !failed_issues.is_empty() || !failed_pulls.is_empty() {
718+
// There were errors fetching at least some issues or
719+
// PRs, exit with `API_ERROR`:
720+
warn!(
721+
"Failed to fetch the following issues and PRs: {:?}",
722+
failed_issues.iter().chain(failed_pulls.iter())
723+
);
724+
ExitCode::from(EXIT_API_ERROR)
725+
} else {
726+
ExitCode::SUCCESS
608727
}
609728
}
610-
} else {
611-
return ExitCode::from(EXIT_API_ERROR);
612-
}
613729

614-
ExitCode::SUCCESS
730+
// No updated issues or PRs were written:
731+
(Ok(Ok((failed_issues, failed_pulls))), false) => {
732+
if !failed_issues.is_empty() || !failed_pulls.is_empty() {
733+
// There were errors fetching at least some issues or
734+
// PRs, exit with `API_ERROR`:
735+
warn!(
736+
"Failed to fetch the following issues and PRs: {:?}",
737+
failed_issues.iter().chain(failed_pulls.iter())
738+
);
739+
ExitCode::from(EXIT_API_ERROR)
740+
} else {
741+
info!("No updated issues or pull requests to save.");
742+
ExitCode::SUCCESS
743+
}
744+
}
745+
746+
(Err(join_error), _) => {
747+
error!("Failed to join task: {:?}", join_error);
748+
ExitCode::from(EXIT_INTERNAL_ERROR)
749+
}
750+
}
615751
}

src/types.rs

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use chrono::{DateTime, Utc};
22
use clap::Parser;
33
use octocrab::models::{issues, pulls, timelines};
44
use serde::{Deserialize, Serialize};
5+
use std::borrow::Cow;
56
use std::error;
67
use std::fmt;
78
use std::io;
@@ -114,9 +115,29 @@ impl PullWithMetadata {
114115
}
115116

116117
#[derive(Serialize, Deserialize, Debug)]
117-
pub struct BackupState {
118+
pub struct BackupState<'a> {
118119
/// Version of the BackupState
119120
pub version: u32,
120121
/// UTC Unix timestamp when the last backup was completed.
121122
pub last_backup: DateTime<Utc>,
123+
/// Failed issues on last run, to be retried.
124+
#[serde(default = "failed_issues_default")]
125+
pub failed_issues: Cow<'a, [u64]>,
126+
/// Failed pull requests on last run, to be retried.
127+
// Serde default value provided for compatibility / upgrade path
128+
// from state version 1.
129+
#[serde(default = "failed_pulls_default")]
130+
pub failed_pulls: Cow<'a, [u64]>,
131+
}
132+
133+
// Serde default value generator provided for compatibility / upgrade
134+
// path from state version 1.
135+
fn failed_issues_default() -> Cow<'static, [u64]> {
136+
Cow::Borrowed(&[])
137+
}
138+
139+
// Serde default value generator provided for compatibility / upgrade
140+
// path from state version 1.
141+
fn failed_pulls_default() -> Cow<'static, [u64]> {
142+
Cow::Borrowed(&[])
122143
}

0 commit comments

Comments
 (0)