-
Notifications
You must be signed in to change notification settings - Fork 110
Expand file tree
/
Copy pathmod.rs
More file actions
147 lines (120 loc) · 4.18 KB
/
mod.rs
File metadata and controls
147 lines (120 loc) · 4.18 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
use std::sync::Arc;
use relay_quotas::{DataCategory, RateLimits};
use crate::Envelope;
use crate::envelope::{EnvelopeHeaders, Item, ItemType, Items};
use crate::managed::{Counted, Managed, ManagedEnvelope, OutcomeError, Quantities, Rejected};
use crate::processing::{self, Context, CountRateLimited, Forward, Output, QuotaRateLimiter};
use crate::services::outcome::{DiscardReason, Outcome};
mod process;
type Result<T, E = Error> = std::result::Result<T, E>;
#[derive(Debug, thiserror::Error)]
pub enum Error {
/// The check-ins are rate limited.
#[error("rate limited")]
RateLimited(RateLimits),
/// Failed to process the check-in.
#[error("failed to process checkin: {0}")]
Processing(#[from] relay_monitors::ProcessCheckInError),
}
impl OutcomeError for Error {
type Error = Self;
fn consume(self) -> (Option<Outcome>, Self::Error) {
let outcome = match &self {
Self::RateLimited(limits) => {
let reason_code = limits.longest().and_then(|limit| limit.reason_code.clone());
Some(Outcome::RateLimited(reason_code))
}
Self::Processing(relay_monitors::ProcessCheckInError::Json(_)) => {
Some(Outcome::Invalid(DiscardReason::InvalidJson))
}
Self::Processing(_) => Some(Outcome::Invalid(DiscardReason::InvalidCheckIn)),
};
(outcome, self)
}
}
impl From<RateLimits> for Error {
fn from(value: RateLimits) -> Self {
Self::RateLimited(value)
}
}
/// A processor for Check-Ins.
pub struct CheckInsProcessor {
limiter: Arc<QuotaRateLimiter>,
}
impl CheckInsProcessor {
/// Creates a new [`Self`].
pub fn new(limiter: Arc<QuotaRateLimiter>) -> Self {
Self { limiter }
}
}
impl processing::Processor for CheckInsProcessor {
type UnitOfWork = SerializedCheckIns;
type Output = CheckInsOutput;
type Error = Error;
fn prepare_envelope(
&self,
envelope: &mut ManagedEnvelope,
) -> Option<Managed<Self::UnitOfWork>> {
let headers = envelope.envelope().headers().clone();
let check_ins = envelope
.envelope_mut()
.take_items_by(|item| matches!(*item.ty(), ItemType::CheckIn))
.into_vec();
let work = SerializedCheckIns { headers, check_ins };
Some(Managed::with_meta_from(envelope, work))
}
async fn process(
&self,
mut check_ins: Managed<Self::UnitOfWork>,
ctx: Context<'_>,
) -> Result<Output<Self::Output>, Rejected<Self::Error>> {
if ctx.is_processing() {
process::normalize(&mut check_ins);
}
let check_ins = self.limiter.enforce_quotas(check_ins, ctx).await?;
Ok(Output::just(CheckInsOutput(check_ins)))
}
}
/// Output produced by the [`CheckInsProcessor`].
#[derive(Debug)]
pub struct CheckInsOutput(Managed<SerializedCheckIns>);
impl Forward for CheckInsOutput {
fn serialize_envelope(
self,
_: processing::ForwardContext<'_>,
) -> Result<Managed<Box<Envelope>>, Rejected<()>> {
let envelope = self.0.map(|SerializedCheckIns { headers, check_ins }, _| {
Envelope::from_parts(headers, Items::from_vec(check_ins))
});
Ok(envelope)
}
#[cfg(feature = "processing")]
fn forward_store(
self,
s: processing::StoreHandle<'_>,
ctx: processing::ForwardContext<'_>,
) -> Result<(), Rejected<()>> {
let envelope = self.serialize_envelope(ctx)?;
let envelope = ManagedEnvelope::from(envelope);
s.send_to_store(crate::services::store::StoreEnvelope { envelope });
Ok(())
}
}
/// Check-Ins in their serialized state, as transported in an envelope.
#[derive(Debug)]
pub struct SerializedCheckIns {
/// Original envelope headers.
headers: EnvelopeHeaders,
/// A list of check-ins waiting to be processed.
///
/// All items contained here must be check-ins.
check_ins: Vec<Item>,
}
impl Counted for SerializedCheckIns {
fn quantities(&self) -> Quantities {
smallvec::smallvec![(DataCategory::Monitor, self.check_ins.len())]
}
}
impl CountRateLimited for Managed<SerializedCheckIns> {
type Error = Error;
}