Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@

- Add experimental playstation endpoint. ([#4555](https://github.com/getsentry/relay/pull/4555))

**Bug Fixes**:

- Separates profiles into backend and ui profiles. ([#4595](https://github.com/getsentry/relay/pull/4595))

**Internal**:

- Add ui chunk profiling data category. ([#4593](https://github.com/getsentry/relay/pull/4593))
Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions relay-profiling/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ workspace = true

[dependencies]
android_trace_log = { workspace = true, features = ["serde"] }
bytes = { workspace = true }
chrono = { workspace = true }
data-encoding = { workspace = true }
itertools = { workspace = true }
Expand Down
99 changes: 69 additions & 30 deletions relay-profiling/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
use std::error::Error;
use std::net::IpAddr;
use std::time::Duration;

use bytes::Bytes;
use url::Url;

use relay_base_schema::project::ProjectId;
Expand Down Expand Up @@ -80,6 +82,12 @@ const MAX_PROFILE_CHUNK_DURATION: Duration = Duration::from_secs(66);
/// Same format as event IDs.
pub type ProfileId = EventId;

#[derive(Debug, Clone, Copy)]
pub enum ProfileType {
Backend,
Ui,
}

#[derive(Debug, Deserialize)]
struct MinimalProfile {
#[serde(alias = "profile_id", alias = "chunk_id")]
Expand Down Expand Up @@ -275,41 +283,72 @@ pub fn expand_profile(
}
}

pub fn expand_profile_chunk(
payload: &[u8],
client_ip: Option<IpAddr>,
filter_settings: &ProjectFiltersConfig,
global_config: &GlobalConfig,
) -> Result<Vec<u8>, ProfileError> {
let profile = match minimal_profile_from_json(payload) {
Ok(profile) => profile,
Err(err) => {
relay_log::warn!(
error = &err as &dyn Error,
from = "minimal",
"invalid profile chunk",
);
return Err(ProfileError::InvalidJson(err));
/// Intermediate type for all processing on a profile chunk.
pub struct ProfileChunk {
profile: MinimalProfile,
payload: Bytes,
}

impl ProfileChunk {
/// Parses a new [`Self`] from raw bytes.
pub fn new(payload: Bytes) -> Result<Self, ProfileError> {
match minimal_profile_from_json(&payload) {
Ok(profile) => Ok(Self { profile, payload }),
Err(err) => {
relay_log::debug!(
error = &err as &dyn Error,
from = "minimal",
"invalid profile chunk",
);
Err(ProfileError::InvalidJson(err))
}
}
};
}

if let Err(filter_stat_key) = relay_filter::should_filter(
&profile,
client_ip,
filter_settings,
global_config.filters(),
) {
return Err(ProfileError::Filtered(filter_stat_key));
/// Returns the [`ProfileType`] this chunk belongs to.
///
/// The profile type is currently determined based on the contained profile
/// platform. It determines the data category this profile chunk belongs to.
///
/// This needs to be synchronized with the implementation in Sentry:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we can add the profile_type in the payload before we send it, this would become the source of truth.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can use add it to the Kafka message https://github.com/getsentry/relay/blob/master/relay-server/src/services/store.rs#L1035-L1041 and the worker will handle it, no need to deserialize/serialize again.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, that works, let's figure that out in a follow up where we wanna put it (payload / kafka header) and how it should look?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would add it to the payload and not the header, it's easier to manipulate later on.

/// <https://github.com/getsentry/sentry/blob/ed2e1c8bcd0d633e6f828fcfbeefbbdd98ef3dba/src/sentry/profiles/task.py#L995>
pub fn profile_type(&self) -> ProfileType {
match self.profile.platform.as_str() {
"cocoa" | "android" | "javascript" => ProfileType::Ui,
_ => ProfileType::Backend,
}
}
Comment on lines +315 to 320
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍


match (profile.platform.as_str(), profile.version) {
("android", _) => android::chunk::parse(payload),
(_, sample::Version::V2) => {
let mut profile = sample::v2::parse(payload)?;
profile.normalize()?;
serde_json::to_vec(&profile).map_err(|_| ProfileError::CannotSerializePayload)
/// Applies inbound filters to the profile chunk.
///
/// The profile needs to be filtered (rejected) when this returns an error.
pub fn filter(
&self,
client_ip: Option<IpAddr>,
filter_settings: &ProjectFiltersConfig,
global_config: &GlobalConfig,
) -> Result<(), ProfileError> {
relay_filter::should_filter(
&self.profile,
client_ip,
filter_settings,
global_config.filters(),
)
.map_err(ProfileError::Filtered)
}

/// Normalizes and 'expands' the profile chunk into its normalized form Sentry expects.
pub fn expand(&self) -> Result<Vec<u8>, ProfileError> {
match (self.profile.platform.as_str(), self.profile.version) {
("android", _) => android::chunk::parse(&self.payload),
(_, sample::Version::V2) => {
let mut profile = sample::v2::parse(&self.payload)?;
profile.normalize()?;
Ok(serde_json::to_vec(&profile)
.map_err(|_| ProfileError::CannotSerializePayload)?)
}
(_, _) => Err(ProfileError::PlatformNotSupported),
}
(_, _) => Err(ProfileError::PlatformNotSupported),
}
}

Expand Down
2 changes: 1 addition & 1 deletion relay-profiling/src/sample/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ pub mod v1;
pub mod v2;

/// Possible values for the version field of the Sample Format.
#[derive(Debug, Serialize, Deserialize, Clone, Default, PartialEq, Eq)]
#[derive(Debug, Serialize, Deserialize, Copy, Clone, Default, PartialEq, Eq)]
pub enum Version {
#[default]
Unknown,
Expand Down
24 changes: 23 additions & 1 deletion relay-server/src/envelope.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
//! ```
use relay_base_schema::project::ProjectKey;
use relay_profiling::ProfileType;
use std::borrow::Borrow;
use std::collections::BTreeMap;
use std::fmt;
Expand Down Expand Up @@ -596,6 +597,12 @@ pub struct ItemHeaders {
#[serde(default, skip)]
ingest_span_in_eap: bool,

/// Tracks whether the item is a backend or ui profile chunk.
///
/// NOTE: This is internal-only and not exposed into the Envelope.
#[serde(default, skip)]
profile_type: Option<ProfileType>,

/// Other attributes for forward compatibility.
#[serde(flatten)]
other: BTreeMap<String, Value>,
Expand Down Expand Up @@ -673,6 +680,7 @@ impl Item {
sampled: true,
fully_normalized: false,
ingest_span_in_eap: false,
profile_type: None,
},
payload: Bytes::new(),
}
Expand Down Expand Up @@ -725,7 +733,11 @@ impl Item {
ItemType::Span | ItemType::OtelSpan => smallvec![(DataCategory::Span, 1)],
// NOTE: semantically wrong, but too expensive to parse.
ItemType::OtelTracesData => smallvec![(DataCategory::Span, 1)],
ItemType::ProfileChunk => smallvec![(DataCategory::ProfileChunk, 1)], // TODO: should be seconds?
ItemType::ProfileChunk => match self.headers.profile_type {
Some(ProfileType::Backend) => smallvec![(DataCategory::ProfileChunk, 1)],
Some(ProfileType::Ui) => smallvec![(DataCategory::ProfileChunkUi, 1)],
None => smallvec![],
},
ItemType::Unknown(_) => smallvec![],
}
}
Expand Down Expand Up @@ -890,6 +902,16 @@ impl Item {
self.headers.ingest_span_in_eap = ingest_span_in_eap;
}

/// Returns the associated profile type of a profile chunk.
pub fn profile_type(&self) -> Option<ProfileType> {
self.headers.profile_type
}

/// Set the profile type of the profile chunk.
pub fn set_profile_type(&mut self, profile_type: ProfileType) {
self.headers.profile_type = Some(profile_type);
}

/// Gets the `sampled` flag.
pub fn sampled(&self) -> bool {
self.headers.sampled
Expand Down
22 changes: 17 additions & 5 deletions relay-server/src/services/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1922,15 +1922,25 @@ impl EnvelopeProcessorService {
&self,
managed_envelope: &mut TypedEnvelope<ProfileChunkGroup>,
project_info: Arc<ProjectInfo>,
_rate_limits: Arc<RateLimits>,
) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
profile_chunk::filter(managed_envelope, project_info.clone());
if_processing!(self.inner.config, {
profile_chunk::process(
managed_envelope,
project_info,
project_info.clone(),
&self.inner.global_config.current(),
&self.inner.config,
);

self.enforce_quotas(
managed_envelope,
Annotated::empty(),
&mut ProcessingExtractedMetrics::new(),
project_info,
_rate_limits,
)
.await?;
});

Ok(None)
Expand All @@ -1943,7 +1953,7 @@ impl EnvelopeProcessorService {
config: Arc<Config>,
project_id: ProjectId,
project_info: Arc<ProjectInfo>,
#[allow(unused_variables)] rate_limits: Arc<RateLimits>,
_rate_limits: Arc<RateLimits>,
) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
#[allow(unused_mut)]
let mut extracted_metrics = ProcessingExtractedMetrics::new();
Expand All @@ -1964,7 +1974,7 @@ impl EnvelopeProcessorService {
Annotated::empty(),
&mut extracted_metrics,
project_info.clone(),
rate_limits,
_rate_limits,
)
.await?;
});
Expand Down Expand Up @@ -2297,7 +2307,9 @@ impl EnvelopeProcessorService {
rate_limits,
reservoir_counters
),
ProcessingGroup::ProfileChunk => run!(process_profile_chunks, project_info),
ProcessingGroup::ProfileChunk => {
run!(process_profile_chunks, project_info, rate_limits)
}
// Currently is not used.
ProcessingGroup::Metrics => {
// In proxy mode we simply forward the metrics.
Expand Down Expand Up @@ -3650,7 +3662,7 @@ impl UpstreamRequest for SendMetricsRequest {

/// Container for global and project level [`Quota`].
#[cfg(feature = "processing")]
#[derive(Copy, Clone)]
#[derive(Copy, Clone, Debug)]
struct CombinedQuotas<'a> {
global_quotas: &'a [Quota],
project_quotas: &'a [Quota],
Expand Down
60 changes: 36 additions & 24 deletions relay-server/src/services/processor/profile_chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use {
crate::services::processor::ProfileChunkGroup,
relay_config::Config,
relay_dynamic_config::GlobalConfig,
relay_profiling::ProfileError,
};

/// Removes profile chunks from the envelope if the feature is not enabled.
Expand All @@ -40,44 +41,55 @@ pub fn process(
) {
let client_ip = managed_envelope.envelope().meta().client_addr();
let filter_settings = &project_info.config.filter_settings;

let continuous_profiling_enabled =
if project_info.has_feature(Feature::ContinuousProfilingBetaIngest) {
project_info.has_feature(Feature::ContinuousProfilingBeta)
} else {
project_info.has_feature(Feature::ContinuousProfiling)
};

managed_envelope.retain_items(|item| match item.ty() {
ItemType::ProfileChunk => {
if !continuous_profiling_enabled {
return ItemAction::DropSilently;
}

match relay_profiling::expand_profile_chunk(
&item.payload(),
client_ip,
filter_settings,
global_config,
) {
Ok(payload) => {
if payload.len() <= config.max_profile_size() {
item.set_payload(ContentType::Json, payload);
ItemAction::Keep
} else {
ItemAction::Drop(Outcome::Invalid(DiscardReason::Profiling(
relay_profiling::discard_reason(
relay_profiling::ProfileError::ExceedSizeLimit,
),
)))
}
}
Err(relay_profiling::ProfileError::Filtered(filter_stat_key)) => {
ItemAction::Drop(Outcome::Filtered(filter_stat_key))
}
Err(err) => ItemAction::Drop(Outcome::Invalid(DiscardReason::Profiling(
relay_profiling::discard_reason(err),
))),
let chunk = match relay_profiling::ProfileChunk::new(item.payload()) {
Ok(chunk) => chunk,
Err(err) => return error_to_action(err),
};
// Important: set the profile type to get outcomes in the correct category.
item.set_profile_type(chunk.profile_type());

if let Err(err) = chunk.filter(client_ip, filter_settings, global_config) {
return error_to_action(err);
}

let payload = match chunk.expand() {
Ok(expanded) => expanded,
Err(err) => return error_to_action(err),
};

if payload.len() > config.max_profile_size() {
return error_to_action(relay_profiling::ProfileError::ExceedSizeLimit);
}

item.set_payload(ContentType::Json, payload);
ItemAction::Keep
}
_ => ItemAction::Keep,
});
}

#[cfg(feature = "processing")]
fn error_to_action(err: ProfileError) -> ItemAction {
match err {
ProfileError::Filtered(filter_stat_key) => {
ItemAction::Drop(Outcome::Filtered(filter_stat_key))
}
err => ItemAction::Drop(Outcome::Invalid(DiscardReason::Profiling(
relay_profiling::discard_reason(err),
))),
}
}
8 changes: 8 additions & 0 deletions relay-server/src/utils/managed_envelope.rs
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,14 @@ impl ManagedEnvelope {
);
}

if self.context.summary.profile_chunk_ui_quantity > 0 {
self.track_outcome(
outcome.clone(),
DataCategory::ProfileChunkUi,
self.context.summary.profile_chunk_ui_quantity,
);
}

self.finish(RelayCounters::EnvelopeRejected, handling);
}

Expand Down
Loading