diff --git a/gha-cache/src/api.rs b/gha-cache/src/api.rs index 2a81050..cc3a0e3 100644 --- a/gha-cache/src/api.rs +++ b/gha-cache/src/api.rs @@ -27,6 +27,7 @@ use sha2::{Digest, Sha256}; use thiserror::Error; use tokio::{io::AsyncRead, sync::Semaphore}; use twirp::client::Client as TwirpClient; +use twirp::{ClientError, TwirpErrorCode, TwirpErrorResponse}; use unicode_bom::Bom; use url::Url; @@ -549,7 +550,11 @@ impl Api { .send() .await? .check() - .await?; + .await + .inspect_err(|e| { + self.circuit_breaker_429_tripped + .check_err(&e, &self.circuit_breaker_429_tripped_callback); + })?; offset += chunk_len; } @@ -565,7 +570,11 @@ impl Api { .send() .await? .check() - .await?; + .await + .inspect_err(|e| { + self.circuit_breaker_429_tripped + .check_err(&e, &self.circuit_breaker_429_tripped_callback); + })?; let request = FinalizeCacheEntryUploadRequest { metadata: None, @@ -574,18 +583,21 @@ impl Api { version: self.version.clone(), }; - let response = self.twirp_client.finalize_cache_entry_upload(request).await; - - match response { - Ok(response) => { + self.twirp_client + .finalize_cache_entry_upload(request) + .await + .map_err(|e| e.into()) + .and_then(|response| { if response.ok { Ok(offset) } else { Err(Error::ApiErrorNotOk) } - } - Err(e) => Err(e.into()), - } + }) + .inspect_err(|e| { + self.circuit_breaker_429_tripped + .check_err(&e, &self.circuit_breaker_429_tripped_callback); + }) } } } @@ -639,26 +651,26 @@ impl Api { Err(e) => Err(e), } } else { - let res = self - .twirp_client + self.twirp_client .get_cache_entry_download_url(GetCacheEntryDownloadUrlRequest { version: self.version.clone(), key: keys[0].to_string(), restore_keys: keys.iter().map(|k| k.to_string()).collect(), metadata: None, }) - .await; - - match res { - Ok(entry) => { + .await + .map_err(|e| e.into()) + .and_then(|entry| { if entry.ok { Ok(Some(entry.signed_download_url)) } else { Ok(None) } - } - Err(e) => Err(e.into()), - } + }) + .inspect_err(|e| { + self.circuit_breaker_429_tripped + .check_err(&e, &self.circuit_breaker_429_tripped_callback); + }) } } @@ -702,7 +714,22 @@ impl Api { version: self.version.clone(), }; - let res = self.twirp_client.create_cache_entry(req).await?; + let res = self + .twirp_client + .create_cache_entry(req) + .await + .map_err(|e| e.into()) + .and_then(|response| { + if response.ok { + Ok(response) + } else { + Err(Error::ApiErrorNotOk) + } + }) + .inspect_err(|e| { + self.circuit_breaker_429_tripped + .check_err(&e, &self.circuit_breaker_429_tripped_callback); + })?; Ok(FileAllocation::V2(SignedUrl { signed_url: res.signed_upload_url, @@ -799,14 +826,31 @@ impl AtomicCircuitBreaker for AtomicBool { } fn check_err(&self, e: &Error, callback: &CircuitBreakerTrippedCallback) { - if let Error::ApiError { - status: reqwest::StatusCode::TOO_MANY_REQUESTS, - .. - } = e - { - tracing::info!("Disabling GitHub Actions Cache due to 429: Too Many Requests"); - self.store(true, Ordering::Relaxed); - callback(); + match e { + Error::ApiError { + status: reqwest::StatusCode::TOO_MANY_REQUESTS, + .. + } + | Error::TwirpError(ClientError::TwirpError(TwirpErrorResponse { + code: TwirpErrorCode::ResourceExhausted, + .. + })) + | Error::TwirpError(ClientError::HttpError { + // The cache backend seems to give out this error for overload: + // Twirp error: http error, status code: 502 Bad Gateway, msg:unknown error + status: StatusCode::BAD_GATEWAY, + .. + }) => { + // meat is below + } + otherwise => { + tracing::error!(%otherwise, "Checked error for resource exhaustion, but it appears to be a different cause"); + return; + } } + + tracing::info!(%e, "Disabling GitHub Actions Cache due to rate limiting"); + self.store(true, Ordering::Relaxed); + callback(); } }