Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
0aa91e4
feat(eap): Produce logs to the items topic
phacops May 6, 2025
3e2b852
Vendor sentry_protos
phacops May 6, 2025
5332d1c
Gate sentry_protos dependency behind the processing feature
phacops May 6, 2025
d5ddc89
Use the proper enum value
phacops May 6, 2025
a401c2f
Do not use the macro
phacops May 6, 2025
697763a
Serialize in the function and return the raw bytes in serialize()
phacops May 6, 2025
74e809d
Read the items portion of the config
phacops May 6, 2025
1be1453
Generate proper types for Protobuf schema validation
phacops May 6, 2025
9a779f8
Remove vendored in favor of installing protoc
phacops May 6, 2025
8d21e2b
Enable type_generation for sentry-kafka-schemas
phacops May 6, 2025
d57223f
Install protoc to all workflows needing it
phacops May 6, 2025
a4dbeb8
Add missing workflows
phacops May 6, 2025
ad27999
Add a CHANGELOG entry
phacops May 6, 2025
095c26f
Fix some tests
phacops May 6, 2025
90462cc
Revert "Add missing workflows"
phacops May 7, 2025
f53c73a
Revert "Install protoc to all workflows needing it"
phacops May 7, 2025
dd26cab
Upgrade sentry_protos and sentry-kafka-schemas so we don't need protoc
phacops May 7, 2025
08c7886
Fix tests
phacops May 7, 2025
d78d272
Move some logic to the normalization
phacops May 7, 2025
57761af
Fix normalization tests
phacops May 7, 2025
86ca4aa
Fix one more test
phacops May 8, 2025
c9fe378
Remove another installation of protoc in CI
phacops May 8, 2025
1e3e6ac
Order crates alphabetically and remove types_generation feature
phacops May 8, 2025
6dc69a6
Use to_owned() instead of to_string()
phacops May 8, 2025
387b59f
Disable default features again now the crate is fixed
phacops May 8, 2025
02f7c1a
Really disable default features for sentry-kafka-schemas
phacops May 8, 2025
cf2475b
Can't make it with less default features than this
phacops May 8, 2025
abc0f44
Remove Serialization trait for logs
phacops May 8, 2025
79ed892
Move Protobuf encoding to serialize
phacops May 8, 2025
b7eb2b6
Use to_owned() instead of to_string() where we can
phacops May 8, 2025
b00ab99
Remove nanoseconds from this timestamp
phacops May 8, 2025
ba2fdeb
Remove other and add back sentry.timestamp_precise
phacops May 8, 2025
0d25ca7
Fix tests
phacops May 8, 2025
33ae11f
Fix integration tests
phacops May 9, 2025
5633637
Update snapshots
phacops May 9, 2025
b90d31b
Add a better comment on why we replace the item_id
phacops May 9, 2025
1cda6e9
Fix linting
phacops May 9, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 83 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,8 @@ sentry-core = "0.36.0"
sentry-kafka-schemas = { version = "0.1.129", default-features = false }
sentry-release-parser = { version = "1.3.2", default-features = false }
sentry-types = "0.36.0"
sentry_protos = "0.1.74"
prost-types = "0.13.3"
semver = "1.0.23"
serde = { version = "1.0.215", features = ["derive", "rc"] }
serde-transcode = "1.1.1"
Expand Down Expand Up @@ -201,6 +203,6 @@ unescaper = "0.1.5"
unicase = "2.8.0"
url = "2.5.4"
utf16string = "0.2.0"
uuid = { version = "1.11.0", features = ["serde", "v4"] }
uuid = { version = "1.11.0", features = ["serde", "v4", "v7"] }
walkdir = "2.5.0"
zstd = { version = "0.13.2", features = ["experimental"] }
2 changes: 2 additions & 0 deletions relay-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ reqwest = { workspace = true, features = [
] }
rmp-serde = { workspace = true }
sentry = { workspace = true, features = ["tower-http"] }
sentry_protos = { workspace = true }
prost-types = { workspace = true }
serde = { workspace = true }
serde_bytes = { workspace = true }
serde_json = { workspace = true }
Expand Down
114 changes: 101 additions & 13 deletions relay-server/src/services/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ use relay_config::Config;
use relay_event_schema::protocol::{EventId, VALID_PLATFORMS};

use crate::envelope::{AttachmentType, Envelope, Item, ItemType};
use prost::Message as _;
use prost_types::Timestamp;
use relay_kafka::{ClientError, KafkaClient, KafkaTopic, Message};
use relay_metrics::{
Bucket, BucketView, BucketViewValue, BucketsView, ByNamespace, FiniteF64, GaugeValue,
Expand All @@ -28,6 +30,8 @@ use relay_quotas::Scoping;
use relay_statsd::metric;
use relay_system::{Addr, FromMessage, Interface, NoResponse, Service};
use relay_threading::AsyncPool;
use sentry_protos::snuba::v1::any_value::Value;
use sentry_protos::snuba::v1::{AnyValue, TraceItem, TraceItemType};
use serde::{Deserialize, Serialize};
use serde_json::Deserializer;
use serde_json::value::RawValue;
Expand Down Expand Up @@ -959,7 +963,6 @@ impl StoreService {
let payload_len = payload.len();

let d = &mut Deserializer::from_slice(&payload);

let logs: LogKafkaMessages = match serde_path_to_error::deserialize(d) {
Ok(logs) => logs,
Err(error) => {
Expand Down Expand Up @@ -989,18 +992,104 @@ impl StoreService {
}
};

for mut log in logs.items {
log.organization_id = scoping.organization_id.value();
log.project_id = scoping.project_id.value();
log.retention_days = retention_days;
log.received = safe_timestamp(received_at);
for log in logs.items {
let timestamp_seconds = log.timestamp_nanos / 1_000_000_000;
let timestamp_nanos = log.timestamp_nanos % 1_000_000_000;
let item_id = u128::from_be_bytes(
*Uuid::new_v7(uuid::Timestamp::from_unix(
uuid::NoContext,
timestamp_seconds,
timestamp_nanos as u32,
))
.as_bytes(),
)
.to_le_bytes()
.to_vec();
let mut trace_item = TraceItem {
item_type: TraceItemType::Log.into(),
organization_id: scoping.organization_id.value(),
project_id: scoping.project_id.value(),
received: Some(Timestamp {
seconds: safe_timestamp(received_at) as i64,
nanos: 0,
}),
retention_days: retention_days.into(),
timestamp: Some(Timestamp {
seconds: timestamp_seconds as i64,
nanos: timestamp_nanos as i32,
}),
trace_id: log.trace_id.to_string(),
item_id,
attributes: Default::default(),
client_sample_rate: 1.0,
server_sample_rate: 1.0,
};

trace_item.attributes.insert(
"sentry.timestamp_precise".to_string(),
AnyValue {
value: Some(Value::IntValue(log.timestamp_nanos as i64)),
},
);
trace_item.attributes.insert(
"sentry.severity_text".to_string(),
AnyValue {
value: Some(Value::StringValue(
log.severity_text.unwrap_or_else(|| "INFO".into()).into(),
)),
},
);
trace_item.attributes.insert(
"sentry.severity_number".to_string(),
AnyValue {
value: Some(Value::IntValue(
log.severity_number.unwrap_or_default().into(),
)),
},
);
trace_item.attributes.insert(
"sentry.body".to_string(),
AnyValue {
value: Some(Value::StringValue(log.body.to_string())),
},
);

for (name, attribute) in log.attributes.unwrap_or_default() {
if let Some(attribute_value) = attribute {
if let Some(v) = attribute_value.value {
let any_value = match v {
LogAttributeValue::String(value) => AnyValue {
value: Some(Value::StringValue(value)),
},
LogAttributeValue::Int(value) => AnyValue {
value: Some(Value::IntValue(value)),
},
Comment on lines +1034 to +1036
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That was already a bug before I think, this will drop logs with an int value > i64::MAX.

I'll take care of that.

LogAttributeValue::Bool(value) => AnyValue {
value: Some(Value::BoolValue(value)),
},
LogAttributeValue::Double(value) => AnyValue {
value: Some(Value::DoubleValue(value)),
},
LogAttributeValue::Unknown(_) => continue,
};

trace_item.attributes.insert(name.into(), any_value);
}
}
}

let mut message = vec![];

if trace_item.encode(&mut message).is_err() {
return Ok(());
};

let message = KafkaMessage::Log {
headers: BTreeMap::from([(
"project_id".to_string(),
scoping.project_id.to_string(),
)]),
message: log,
headers: BTreeMap::from([
("project_id".to_string(), scoping.project_id.to_string()),
("item_type".to_string(), "3".to_string()),
]),
message,
};

self.produce(KafkaTopic::OurLogs, message)?;
Expand Down Expand Up @@ -1592,8 +1681,7 @@ enum KafkaMessage<'a> {
Log {
#[serde(skip)]
headers: BTreeMap<String, String>,
#[serde(flatten)]
message: LogKafkaMessage<'a>,
message: Vec<u8>,
},
ProfileChunk(ProfileChunkKafkaMessage),
}
Expand Down
Loading