-
Notifications
You must be signed in to change notification settings - Fork 111
Expand file tree
/
Copy pathmod.rs
More file actions
135 lines (113 loc) · 3.89 KB
/
mod.rs
File metadata and controls
135 lines (113 loc) · 3.89 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
use std::sync::Arc;
use relay_quotas::RateLimits;
use crate::envelope::{EnvelopeHeaders, Item, ItemType, Items};
use crate::managed::{Counted, Managed, ManagedEnvelope, OutcomeError, Quantities, Rejected};
use crate::processing::{self, CountRateLimited, Output, QuotaRateLimiter};
#[cfg(feature = "processing")]
use crate::services::outcome::DiscardReason;
use crate::services::outcome::Outcome;
use crate::statsd::RelayCounters;
mod forward;
mod process;
#[derive(Debug, thiserror::Error)]
pub enum Error {
/// The Attachment was rate limited.
#[error("rate limited")]
RateLimited(RateLimits),
/// The envelope did not contain an event ID.
#[cfg(feature = "processing")]
#[error("missing event ID")]
NoEventId,
}
impl OutcomeError for Error {
type Error = Self;
fn consume(self) -> (Option<crate::services::outcome::Outcome>, Self::Error) {
let outcome = match &self {
Self::RateLimited(limits) => {
let reason_code = limits.longest().and_then(|limit| limit.reason_code.clone());
Some(Outcome::RateLimited(reason_code))
}
#[cfg(feature = "processing")]
Self::NoEventId => Some(Outcome::Invalid(DiscardReason::Internal)),
};
(outcome, self)
}
}
impl From<RateLimits> for Error {
fn from(value: RateLimits) -> Self {
Self::RateLimited(value)
}
}
/// A processor for Attachments.
pub struct AttachmentProcessor {
limiter: Arc<QuotaRateLimiter>,
}
impl AttachmentProcessor {
/// Creates a new [`Self`].
pub fn new(limiter: Arc<QuotaRateLimiter>) -> Self {
Self { limiter }
}
}
impl processing::Processor for AttachmentProcessor {
type Input = SerializedAttachments;
type Output = AttachmentsOutput;
type Error = Error;
fn prepare_envelope(&self, envelope: &mut ManagedEnvelope) -> Option<Managed<Self::Input>> {
debug_assert!(
!envelope.envelope().items().any(Item::creates_event),
"AttachmentProcessor should not receive items that create events"
);
let attachments = envelope
.envelope_mut()
.take_items_by(|i| i.requires_event() && matches!(i.ty(), ItemType::Attachment));
if attachments.is_empty() {
return None;
}
let headers = envelope.envelope().headers().clone();
let work = SerializedAttachments {
headers,
attachments,
};
Some(Managed::with_meta_from(envelope, work))
}
async fn process(
&self,
attachments: Managed<Self::Input>,
ctx: processing::Context<'_>,
) -> Result<processing::Output<Self::Output>, Rejected<Self::Error>> {
for item in &attachments.attachments {
let attachment_type_tag = match item.attachment_type() {
Some(t) => &t.to_string(),
None => "",
};
relay_statsd::metric!(
counter(RelayCounters::StandaloneItem) += 1,
processor = "new",
item_type = item.ty().name(),
attachment_type = attachment_type_tag,
);
}
let mut attachments = self.limiter.enforce_quotas(attachments, ctx).await?;
process::scrub(&mut attachments, ctx)?;
Ok(Output::just(AttachmentsOutput(attachments)))
}
}
/// Serialized attachments extracted from an envelope.
#[derive(Debug)]
pub struct SerializedAttachments {
/// Original envelope headers.
headers: EnvelopeHeaders,
/// A list of attachments.
attachments: Items,
}
impl Counted for SerializedAttachments {
fn quantities(&self) -> Quantities {
self.attachments.quantities()
}
}
impl CountRateLimited for Managed<SerializedAttachments> {
type Error = Error;
}
/// Output produced by the [`AttachmentProcessor`].
#[derive(Debug)]
pub struct AttachmentsOutput(Managed<SerializedAttachments>);