Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 71 additions & 27 deletions gha-cache/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use sha2::{Digest, Sha256};
use thiserror::Error;
use tokio::{io::AsyncRead, sync::Semaphore};
use twirp::client::Client as TwirpClient;
use twirp::{ClientError, TwirpErrorCode, TwirpErrorResponse};
use unicode_bom::Bom;
use url::Url;

Expand Down Expand Up @@ -549,7 +550,11 @@ impl Api {
.send()
.await?
.check()
.await?;
.await
.inspect_err(|e| {
self.circuit_breaker_429_tripped
.check_err(&e, &self.circuit_breaker_429_tripped_callback);
})?;

offset += chunk_len;
}
Expand All @@ -565,7 +570,11 @@ impl Api {
.send()
.await?
.check()
.await?;
.await
.inspect_err(|e| {
self.circuit_breaker_429_tripped
.check_err(&e, &self.circuit_breaker_429_tripped_callback);
})?;

let request = FinalizeCacheEntryUploadRequest {
metadata: None,
Expand All @@ -574,18 +583,21 @@ impl Api {
version: self.version.clone(),
};

let response = self.twirp_client.finalize_cache_entry_upload(request).await;

match response {
Ok(response) => {
self.twirp_client
.finalize_cache_entry_upload(request)
.await
.map_err(|e| e.into())
.and_then(|response| {
if response.ok {
Ok(offset)
} else {
Err(Error::ApiErrorNotOk)
}
}
Err(e) => Err(e.into()),
}
})
.inspect_err(|e| {
self.circuit_breaker_429_tripped
.check_err(&e, &self.circuit_breaker_429_tripped_callback);
})
}
}
}
Expand Down Expand Up @@ -639,26 +651,26 @@ impl Api {
Err(e) => Err(e),
}
} else {
let res = self
.twirp_client
self.twirp_client
.get_cache_entry_download_url(GetCacheEntryDownloadUrlRequest {
version: self.version.clone(),
key: keys[0].to_string(),
restore_keys: keys.iter().map(|k| k.to_string()).collect(),
metadata: None,
})
.await;

match res {
Ok(entry) => {
.await
.map_err(|e| e.into())
.and_then(|entry| {
if entry.ok {
Ok(Some(entry.signed_download_url))
} else {
Ok(None)
}
}
Err(e) => Err(e.into()),
}
})
.inspect_err(|e| {
self.circuit_breaker_429_tripped
.check_err(&e, &self.circuit_breaker_429_tripped_callback);
})
}
}

Expand Down Expand Up @@ -702,7 +714,22 @@ impl Api {
version: self.version.clone(),
};

let res = self.twirp_client.create_cache_entry(req).await?;
let res = self
.twirp_client
.create_cache_entry(req)
.await
.map_err(|e| e.into())
.and_then(|response| {
if response.ok {
Ok(response)
} else {
Err(Error::ApiErrorNotOk)
}
})
.inspect_err(|e| {
self.circuit_breaker_429_tripped
.check_err(&e, &self.circuit_breaker_429_tripped_callback);
})?;

Ok(FileAllocation::V2(SignedUrl {
signed_url: res.signed_upload_url,
Expand Down Expand Up @@ -799,14 +826,31 @@ impl AtomicCircuitBreaker for AtomicBool {
}

fn check_err(&self, e: &Error, callback: &CircuitBreakerTrippedCallback) {
if let Error::ApiError {
status: reqwest::StatusCode::TOO_MANY_REQUESTS,
..
} = e
{
tracing::info!("Disabling GitHub Actions Cache due to 429: Too Many Requests");
self.store(true, Ordering::Relaxed);
callback();
match e {
Error::ApiError {
status: reqwest::StatusCode::TOO_MANY_REQUESTS,
..
}
| Error::TwirpError(ClientError::TwirpError(TwirpErrorResponse {
code: TwirpErrorCode::ResourceExhausted,
..
}))
| Error::TwirpError(ClientError::HttpError {
// The cache backend seems to give out this error for overload:
// Twirp error: http error, status code: 502 Bad Gateway, msg:unknown error
status: StatusCode::BAD_GATEWAY,
..
}) => {
// meat is below
}
otherwise => {
tracing::error!(%otherwise, "Checked error for resource exhaustion, but it appears to be a different cause");
return;
}
}

tracing::info!(%e, "Disabling GitHub Actions Cache due to rate limiting");
self.store(true, Ordering::Relaxed);
callback();
}
}
Loading