Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions relay-server/src/services/projects/cache/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use crate::services::projects::cache::state::{
self, CompletedFetch, Eviction, Fetch, ProjectStore, Refresh,
};
use crate::services::projects::project::ProjectState;
use crate::services::projects::source::ProjectSource;
use crate::services::projects::source::{ProjectSource, ProjectSourceError, upstream};
use crate::statsd::{RelayCounters, RelayGauges, RelayTimers};
use crate::utils::FuturesScheduled;

Expand Down Expand Up @@ -119,7 +119,13 @@ impl ProjectCacheService {
.await
{
Ok(result) => result,
Err(err) => {
Err(ProjectSourceError::Upstream(upstream::Error::DeadlineExceeded)) => {
// Somewhat of an expected error which is already logged on the upstream side.
//
// -> we can just go into our usual pending handling.
ProjectState::Pending.into()
}
Err(err @ ProjectSourceError::FatalUpstream) => {
relay_log::error!(
tags.project_key = fetch.project_key().as_str(),
tags.has_revision = fetch.revision().as_str().is_some(),
Expand Down
18 changes: 11 additions & 7 deletions relay-server/src/services/projects/source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ impl ProjectSource {
current_revision,
no_cache,
})
.await?;
.await
.map_err(|_| ProjectSourceError::FatalUpstream)??;

Ok(match state {
SourceProjectState::New(state) => {
Expand All @@ -115,12 +116,15 @@ impl ProjectSource {

#[derive(Debug, thiserror::Error)]
pub enum ProjectSourceError {
#[error("redis permit error {0}")]
RedisPermit(#[from] tokio::sync::AcquireError),
#[error("redis join error {0}")]
RedisJoin(#[from] tokio::task::JoinError),
#[error("upstream error {0}")]
Upstream(#[from] relay_system::SendError),
/// Error returned from the upstream.
#[error("upstream error: {0}")]
Upstream(#[from] upstream::Error),
/// Upstream did not return a result.
///
/// This happens when the upstream does not reply to the request.
/// This should never happen.
#[error("fatal upstream error")]
FatalUpstream,
}

impl From<Infallible> for ProjectSourceError {
Expand Down
96 changes: 59 additions & 37 deletions relay-server/src/services/projects/source/upstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,13 +94,21 @@ impl UpstreamQuery for GetProjectStates {
}
}

/// Error returned to the requester.
#[derive(Copy, Clone, Debug, thiserror::Error)]
pub enum Error {
/// Fetching the project state exceeded the configured deadline.
#[error("deadline exceeded while fetching project state")]
DeadlineExceeded,
}

/// The wrapper struct for the incoming external requests which also keeps addition information.
#[derive(Debug)]
struct ProjectStateChannel {
// Main broadcast channel.
channel: BroadcastChannel<SourceProjectState>,
channel: BroadcastChannel<Message>,
// Additional broadcast channels tracked from merge operations.
merged: Vec<BroadcastChannel<SourceProjectState>>,
merged: Vec<BroadcastChannel<Message>>,
revision: Revision,
deadline: Instant,
no_cache: bool,
Expand All @@ -113,7 +121,7 @@ struct ProjectStateChannel {

impl ProjectStateChannel {
pub fn new(
sender: BroadcastSender<SourceProjectState>,
sender: BroadcastSender<Message>,
revision: Revision,
timeout: Duration,
no_cache: bool,
Expand Down Expand Up @@ -141,22 +149,23 @@ impl ProjectStateChannel {
/// If the new revision is different from the contained revision this clears the revision.
/// To not have multiple fetches per revision per batch, we need to find a common denominator
/// for requests with different revisions, which is always to fetch the full project config.
pub fn attach(&mut self, sender: BroadcastSender<SourceProjectState>, revision: Revision) {
pub fn attach(&mut self, sender: BroadcastSender<Message>, revision: Revision) {
self.channel.attach(sender);
if self.revision != revision {
self.revision = Revision::default();
}
}

pub fn send(self, state: SourceProjectState) {
for channel in self.merged {
channel.send(state.clone());
}
self.channel.send(state)
self.do_send(Ok(state));
}

pub fn error(self, err: Error) {
self.do_send(Err(err));
}

pub fn expired(&self) -> bool {
Instant::now() > self.deadline
pub fn expired(&self, now: Instant) -> bool {
now > self.deadline
}

pub fn merge(&mut self, channel: ProjectStateChannel) {
Expand All @@ -182,28 +191,35 @@ impl ProjectStateChannel {
self.errors += errors;
self.pending += pending;
}

fn do_send(self, message: Message) {
for channel in self.merged {
channel.send(message.clone());
}
self.channel.send(message)
}
}

/// The map of project keys with their project state channels.
type ProjectStateChannels = HashMap<ProjectKey, ProjectStateChannel>;

/// The message used to communicate with the requester.
type Message = Result<SourceProjectState, Error>;

/// This is the [`UpstreamProjectSourceService`] interface.
///
/// The service is responsible for fetching the [`ParsedProjectState`] from the upstream.
/// Internally it maintains the buffer queue of the incoming requests, which got scheduled to fetch the
/// state and takes care of the backoff in case there is a problem with the requests.
#[derive(Debug)]
pub struct UpstreamProjectSource(FetchProjectState, BroadcastSender<SourceProjectState>);
pub struct UpstreamProjectSource(FetchProjectState, BroadcastSender<Message>);

impl Interface for UpstreamProjectSource {}

impl FromMessage<FetchProjectState> for UpstreamProjectSource {
type Response = BroadcastResponse<SourceProjectState>;
type Response = BroadcastResponse<Message>;

fn from_message(
message: FetchProjectState,
sender: BroadcastSender<SourceProjectState>,
) -> Self {
fn from_message(message: FetchProjectState, sender: BroadcastSender<Message>) -> Self {
Self(message, sender)
}
}
Expand Down Expand Up @@ -271,6 +287,7 @@ impl UpstreamProjectSourceService {
/// Prepares the batches of the cache and nocache channels which could be used to request the
/// project states.
fn prepare_batches(&mut self) -> ChannelsBatch {
let now = Instant::now();
let batch_size = self.config.query_batch_size();
let num_batches = self.config.max_concurrent_queries();

Expand All @@ -286,26 +303,31 @@ impl UpstreamProjectSourceService {

let fresh_channels = (projects.iter())
.filter_map(|id| Some((*id, self.state_channels.remove(id)?)))
.filter(|(id, channel)| {
if channel.expired() {
metric!(
distribution(RelayDistributions::ProjectStateAttempts) = channel.attempts,
result = "timeout",
);
metric!(
counter(RelayCounters::ProjectUpstreamCompleted) += 1,
result = "timeout",
);
relay_log::error!(
errors = channel.errors,
pending = channel.pending,
tags.did_error = channel.errors > 0,
tags.was_pending = channel.pending > 0,
tags.project_key = id.to_string(),
"error fetching project state {id}: deadline exceeded",
);
.filter_map(|(id, channel)| {
if !channel.expired(now) {
return Some((id, channel));
}
!channel.expired()

// Channel is expired, emit telemetry and notify the other end.
metric!(
distribution(RelayDistributions::ProjectStateAttempts) = channel.attempts,
result = "timeout",
);
metric!(
counter(RelayCounters::ProjectUpstreamCompleted) += 1,
result = "timeout",
);
relay_log::warn!(
errors = channel.errors,
pending = channel.pending,
tags.did_error = channel.errors > 0,
tags.was_pending = channel.pending > 0,
tags.project_key = %id,
"error fetching project state {id}: deadline exceeded",
);
channel.error(Error::DeadlineExceeded);

None
});

// Separate regular channels from those with the `nocache` flag. The latter go in separate
Expand Down Expand Up @@ -730,8 +752,8 @@ mod tests {
.await;

let (response1, response2) = futures::future::join(response1, response2).await;
assert!(matches!(response1, Ok(SourceProjectState::NotModified)));
assert!(matches!(response2, Ok(SourceProjectState::NotModified)));
assert!(matches!(response1, Ok(Ok(SourceProjectState::NotModified))));
assert!(matches!(response2, Ok(Ok(SourceProjectState::NotModified))));

// No more messages to upstream expected.
assert!(upstream_rx.try_recv().is_err());
Expand Down