From fa715cd3f8e2106adc1727e125de68ec9fcc7d95 Mon Sep 17 00:00:00 2001 From: Abhijeet Prasad Date: Wed, 26 Nov 2025 17:14:24 -0500 Subject: [PATCH] feat: Add Heroku Log Drain Endpoint --- Cargo.lock | 22 + Cargo.toml | 5 +- relay-dynamic-config/src/feature.rs | 5 + relay-ourlogs/Cargo.toml | 1 + relay-ourlogs/src/heroku_to_sentry.rs | 556 ++++++++++++++++++ relay-ourlogs/src/lib.rs | 4 + relay-protocol/src/value.rs | 9 + .../src/endpoints/integrations/heroku.rs | 69 +++ .../src/endpoints/integrations/mod.rs | 1 + relay-server/src/endpoints/mod.rs | 1 + relay-server/src/envelope/content_type.rs | 5 + relay-server/src/envelope/item.rs | 4 + .../src/extractors/integration_builder.rs | 20 +- relay-server/src/integrations/mod.rs | 5 + .../processing/logs/integrations/heroku.rs | 77 +++ .../src/processing/logs/integrations/mod.rs | 2 + tests/integration/fixtures/__init__.py | 24 + tests/integration/test_heroku_logs.py | 133 +++++ 18 files changed, 941 insertions(+), 2 deletions(-) create mode 100644 relay-ourlogs/src/heroku_to_sentry.rs create mode 100644 relay-server/src/endpoints/integrations/heroku.rs create mode 100644 relay-server/src/processing/logs/integrations/heroku.rs create mode 100644 tests/integration/test_heroku_logs.py diff --git a/Cargo.lock b/Cargo.lock index 85c45559fe0..24b9fc3db2a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2782,6 +2782,16 @@ dependencies = [ "minimal-lexical", ] +[[package]] +name = "nom-regex" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72e5c7731c4c1370b61604ed52a2475e861aac9e08dec9f23903d4ddfdc91c18" +dependencies = [ + "nom", + "regex", +] + [[package]] name = "ntapi" version = "0.4.1" @@ -4213,6 +4223,7 @@ dependencies = [ "relay-event-schema", "relay-otel", "relay-protocol", + "rsyslog", "serde", "serde_json", ] @@ -4706,6 +4717,17 @@ dependencies = [ "zeroize", ] +[[package]] +name = "rsyslog" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f5963755808e34aea5b05a720a0044e2f10f39f61d69bbed56ecbb3369a1541" +dependencies = [ + "chrono", + "nom", + "nom-regex", +] + [[package]] name = "rustc-demangle" version = "0.1.24" diff --git a/Cargo.toml b/Cargo.toml index a85457474bc..68184eb2a16 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -169,6 +169,7 @@ regex = "1.11.3" regex-lite = "0.1.7" reqwest = "0.12.23" rmp-serde = "1.3.0" +rsyslog = { version = "0.1.5", features = ["chrono-timestamp"] } semver = "1.0.27" sentry = { version = "0.41.0", default-features = false, features = [ # default features, except `release-health` is disabled @@ -180,7 +181,9 @@ sentry = { version = "0.41.0", default-features = false, features = [ ] } sentry-core = "0.41.0" sentry-kafka-schemas = { version = "2.1.14", default-features = false } -sentry-release-parser = { version = "1.4.0", default-features = false, features = ["semver-1"] } +sentry-release-parser = { version = "1.4.0", default-features = false, features = [ + "semver-1", +] } sentry-types = "0.41.0" sentry_protos = "0.4.2" serde = { version = "=1.0.228", features = ["derive", "rc"] } diff --git a/relay-dynamic-config/src/feature.rs b/relay-dynamic-config/src/feature.rs index d2604008788..5b532c7ee13 100644 --- a/relay-dynamic-config/src/feature.rs +++ b/relay-dynamic-config/src/feature.rs @@ -61,6 +61,11 @@ pub enum Feature { /// Serialized as `organizations:relay-otel-logs-endpoint`. #[serde(rename = "organizations:relay-otel-logs-endpoint")] OtelLogsEndpoint, + /// Enable Heroku log drain ingestion via the `/integration/heroku/logs` endpoint. + /// + /// Serialized as `organizations:relay-heroku-log-drain-endpoint`. + #[serde(rename = "organizations:relay-heroku-log-drain-endpoint")] + HerokuLogDrainEndpoint, /// Enable playstation crash dump ingestion via the `/playstation/` endpoint. /// /// Serialized as `organizations:relay-playstation-ingestion`. diff --git a/relay-ourlogs/Cargo.toml b/relay-ourlogs/Cargo.toml index bb510464032..1e51dcf7c99 100644 --- a/relay-ourlogs/Cargo.toml +++ b/relay-ourlogs/Cargo.toml @@ -25,6 +25,7 @@ relay-event-schema = { workspace = true } relay-otel = { workspace = true } relay-protocol = { workspace = true } relay-common = { workspace = true } +rsyslog = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } diff --git a/relay-ourlogs/src/heroku_to_sentry.rs b/relay-ourlogs/src/heroku_to_sentry.rs new file mode 100644 index 00000000000..f0eec4d4c3c --- /dev/null +++ b/relay-ourlogs/src/heroku_to_sentry.rs @@ -0,0 +1,556 @@ +//! Transforms Heroku Logplex syslog messages to Sentry Logs. + +use chrono::Utc; +use relay_conventions::ORIGIN; +use relay_event_schema::protocol::{Attributes, OurLog, OurLogLevel, Timestamp}; +use relay_protocol::Annotated; +use rsyslog::parser::{DateTime, Skip}; +use rsyslog::{Message, Originator, ParseMsg, ParsePart}; + +/// Header names for Heroku Logplex integration. +/// +/// These are the keys used to store Heroku-specific metadata on envelope items. +/// The values come from HTTP headers sent by Logplex in HTTPS drain requests. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum HerokuHeader { + /// The unique request identifier. + /// + /// Logplex sets this to a unique ID for each request. If a request is retried + /// (e.g., due to non-2xx response or network failure), this ID enables detection + /// of duplicate requests. + FrameId, + /// The drain token. + /// + /// This token identifies the specific log drain sending the request. + /// It can be used to associate logs with their source drain configuration. + DrainToken, + /// The Logplex user agent. + /// + /// Describes the version of Logplex (e.g., "Logplex/v72"). + /// This changes with Logplex releases. + UserAgent, +} + +impl HerokuHeader { + /// Returns the HTTP header name sent by Logplex. + /// + /// These are the original header names from the Logplex HTTPS drain request. + pub fn http_header_name(&self) -> &'static str { + match self { + Self::FrameId => "Logplex-Frame-Id", + Self::DrainToken => "Logplex-Drain-Token", + Self::UserAgent => "User-Agent", + } + } + + /// Returns the string key used to store this header on envelope items. + pub fn as_str(&self) -> &'static str { + match self { + Self::FrameId => "heroku-frame-id", + Self::DrainToken => "heroku-drain-token", + Self::UserAgent => "heroku-user-agent", + } + } +} + +/// Parsed message body from a Logplex syslog message. +/// +/// This struct stores the raw message body along with any logfmt key-value pairs +/// that were parsed from it, using Sentry's message parameterization conventions. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct LogplexMsgBody<'a> { + /// The raw message body. + pub msg: &'a str, + /// The parameterized template string (if logfmt pairs were found). + /// Example: `"source={source} sample#db_size={sample#db_size}"` + pub template: Option, + /// The parsed logfmt parameters as (key, value) pairs. + /// Keys are the original logfmt keys, values are the parsed values. + pub parameters: Vec<(&'a str, &'a str)>, +} + +impl<'a> ParseMsg<'a> for LogplexMsgBody<'a> { + fn parse(msg: &'a str, _: &Originator) -> Result<(&'a str, Self), rsyslog::Error<'a>> { + let mut template_parts = Vec::new(); + let mut parameters = Vec::new(); + + for pair in msg.split_whitespace() { + if let Some((key, value)) = pair.split_once('=') + && !key.is_empty() + { + template_parts.push(format!("{key}={{{key}}}")); + parameters.push((key, value)); + } + } + + Ok(( + "", + LogplexMsgBody { + msg, + template: (!parameters.is_empty()).then_some(template_parts.join(" ")), + parameters, + }, + )) + } +} + +/// A required timestamp that errors during parsing if missing. +/// +/// Unlike `Option`, this type rejects the syslog nil value "-" +/// and requires a valid RFC3339 timestamp. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct RequiredDateTime(pub DateTime); + +impl<'a> ParsePart<'a> for RequiredDateTime { + fn parse(part: &'a str) -> Result<(&'a str, Self), rsyslog::Error<'a>> { + let (rem, timestamp) = >::parse(part)?; + match timestamp { + Some(dt) => Ok((rem, RequiredDateTime(dt))), + None => Err(rsyslog::Error::Custom("timestamp is required".to_owned())), + } + } +} + +impl From for Timestamp { + fn from(value: RequiredDateTime) -> Self { + Timestamp(value.0.with_timezone(&Utc)) + } +} + +/// A parsed Logplex syslog message. +/// +/// This type uses rsyslog's generic Message type with: +/// - `RequiredDateTime` for the timestamp (must be present, not "-") +/// - `Skip` for the structured data (which Logplex omits per their docs) +/// - `LogplexMsgBody<'a>` for the message body (with logfmt parsing) +pub type LogplexMessage<'a> = Message<'a, RequiredDateTime, Skip, LogplexMsgBody<'a>>; + +/// Parse a Logplex-framed syslog message. +/// +/// Logplex uses octet counting framing (RFC 6587 Section 3.4.1) where messages +/// are prefixed with their length: ` `. The rsyslog parser +/// handles this by skipping any content before the first `<` when parsing the +/// priority field. +pub fn parse_logplex(msg: &str) -> Result, rsyslog::Error<'_>> { + LogplexMessage::parse(msg) +} + +/// Maps syslog severity (0-7) to Sentry log level. +/// +/// Mapping follows [OpenTelemetry syslog semantic conventions](https://opentelemetry.io/docs/specs/otel/logs/data-model-appendix/#appendix-b-severitynumber-example-mappings) +fn map_syslog_severity_to_level(severity: u8) -> OurLogLevel { + match severity { + 0 => OurLogLevel::Fatal, // Emergency + 1 => OurLogLevel::Fatal, // Alert + 2 => OurLogLevel::Fatal, // Critical + 3 => OurLogLevel::Error, // Error + 4 => OurLogLevel::Warn, // Warning + 5 => OurLogLevel::Info, // Notice + 6 => OurLogLevel::Info, // Informational + 7 => OurLogLevel::Debug, // Debug + _ => OurLogLevel::Info, + } +} + +/// Converts a parsed Logplex message to a Sentry log. +pub fn logplex_message_to_sentry_log( + logplex_message: LogplexMessage<'_>, + frame_id: Option<&str>, + drain_token: Option<&str>, + user_agent: Option<&str>, +) -> OurLog { + let LogplexMessage { + facility, + severity, + version, + timestamp, + hostname, + app_name, + proc_id, + msg_id, + msg, + structured_data: _, + } = logplex_message; + + let LogplexMsgBody { + msg, + template, + parameters, + } = msg; + + let mut attributes = Attributes::default(); + + macro_rules! add_optional_attribute { + ($name:expr, $value:expr) => {{ + if let Some(value) = $value { + attributes.insert($name.to_owned(), value.to_owned()); + } + }}; + } + + macro_rules! add_attribute { + ($name:expr, $value:expr) => {{ + let val = $value; + attributes.insert($name.to_owned(), val.to_owned()); + }}; + } + + add_attribute!(ORIGIN, "auto.log_drain.heroku"); + + // Add syslog fields following OpenTelemetry syslog semantic conventions + // See: https://opentelemetry.io/docs/specs/otel/logs/data-model-appendix/#rfc5424-syslog + add_attribute!("syslog.facility", facility as i64); + add_attribute!("syslog.version", version as i64); + add_optional_attribute!("syslog.procid", proc_id); + add_optional_attribute!("syslog.msgid", msg_id); + add_optional_attribute!("resource.host.name", hostname); + add_optional_attribute!("resource.service.name", app_name); + + // Add Heroku Logplex specific fields from log drain + add_optional_attribute!("heroku.logplex.frame_id", frame_id); + add_optional_attribute!("heroku.logplex.drain_token", drain_token); + add_optional_attribute!("heroku.logplex.user_agent", user_agent); + + // Add pre-parsed logfmt parameters from the message body + add_optional_attribute!("sentry.message.template", template); + for (key, value) in parameters { + let param_key = format!("sentry.message.parameter.{key}"); + add_attribute!(param_key, value); + } + + OurLog { + timestamp: Annotated::new(timestamp.into()), + level: Annotated::new(map_syslog_severity_to_level(severity)), + body: Annotated::new(msg.to_owned()), + attributes: Annotated::new(attributes), + ..Default::default() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use relay_protocol::SerializableAnnotated; + #[test] + fn test_parse_logplex_basic_state_change() { + let input = + "83 <40>1 2012-11-30T06:45:29+00:00 host app web.3 - State changed from starting to up"; + let msg = parse_logplex(input).unwrap(); + insta::assert_debug_snapshot!(msg, @r#" + Message { + facility: 5, + severity: 0, + version: 1, + timestamp: RequiredDateTime( + 2012-11-30T06:45:29+00:00, + ), + hostname: Some( + "host", + ), + app_name: Some( + "app", + ), + proc_id: Some( + "web.3", + ), + msg_id: None, + structured_data: Skip, + msg: LogplexMsgBody { + msg: "State changed from starting to up", + template: None, + parameters: [], + }, + } + "#); + } + + #[test] + fn test_parse_logplex_process_start_with_backticks() { + let input = "119 <40>1 2012-11-30T06:45:26+00:00 host app web.3 - Starting process with command `bundle exec rackup config.ru -p 24405`"; + let msg = parse_logplex(input).unwrap(); + insta::assert_debug_snapshot!(msg, @r#" + Message { + facility: 5, + severity: 0, + version: 1, + timestamp: RequiredDateTime( + 2012-11-30T06:45:26+00:00, + ), + hostname: Some( + "host", + ), + app_name: Some( + "app", + ), + proc_id: Some( + "web.3", + ), + msg_id: None, + structured_data: Skip, + msg: LogplexMsgBody { + msg: "Starting process with command `bundle exec rackup config.ru -p 24405`", + template: None, + parameters: [], + }, + } + "#); + } + + #[test] + fn test_parse_logplex_heroku_dyno_with_drain_token() { + let input = "156 <40>1 2012-11-30T06:45:26+00:00 heroku web.3 d.73ea7440-270a-435a-a0ea-adf50b4e5f5a - Starting process with command `bundle exec rackup config.ru -p 24405`"; + let msg = parse_logplex(input).unwrap(); + insta::assert_debug_snapshot!(msg, @r#" + Message { + facility: 5, + severity: 0, + version: 1, + timestamp: RequiredDateTime( + 2012-11-30T06:45:26+00:00, + ), + hostname: Some( + "heroku", + ), + app_name: Some( + "web.3", + ), + proc_id: Some( + "d.73ea7440-270a-435a-a0ea-adf50b4e5f5a", + ), + msg_id: None, + structured_data: Skip, + msg: LogplexMsgBody { + msg: "Starting process with command `bundle exec rackup config.ru -p 24405`", + template: None, + parameters: [], + }, + } + "#); + } + + #[test] + fn test_parse_logplex_heroku_postgres_metrics() { + let input = "530 <134>1 2016-02-13T21:20:25+00:00 host app heroku-postgres - source=DATABASE sample#current_transaction=15365 sample#db_size=4347350804bytes sample#tables=43 sample#active-connections=6 sample#waiting-connections=0 sample#index-cache-hit-rate=0.97116 sample#table-cache-hit-rate=0.73958 sample#load-avg-1m=0.05 sample#load-avg-5m=0.03 sample#load-avg-15m=0.035 sample#read-iops=0 sample#write-iops=112.73 sample#memory-total=15405636.0kB sample#memory-free=214004kB sample#memory-cached=14392920.0kB sample#memory-postgres=181644kB"; + let msg = parse_logplex(input).unwrap(); + insta::assert_debug_snapshot!(msg, @r#" + Message { + facility: 16, + severity: 6, + version: 1, + timestamp: RequiredDateTime( + 2016-02-13T21:20:25+00:00, + ), + hostname: Some( + "host", + ), + app_name: Some( + "app", + ), + proc_id: Some( + "heroku-postgres", + ), + msg_id: None, + structured_data: Skip, + msg: LogplexMsgBody { + msg: "source=DATABASE sample#current_transaction=15365 sample#db_size=4347350804bytes sample#tables=43 sample#active-connections=6 sample#waiting-connections=0 sample#index-cache-hit-rate=0.97116 sample#table-cache-hit-rate=0.73958 sample#load-avg-1m=0.05 sample#load-avg-5m=0.03 sample#load-avg-15m=0.035 sample#read-iops=0 sample#write-iops=112.73 sample#memory-total=15405636.0kB sample#memory-free=214004kB sample#memory-cached=14392920.0kB sample#memory-postgres=181644kB", + template: Some( + "source={source} sample#current_transaction={sample#current_transaction} sample#db_size={sample#db_size} sample#tables={sample#tables} sample#active-connections={sample#active-connections} sample#waiting-connections={sample#waiting-connections} sample#index-cache-hit-rate={sample#index-cache-hit-rate} sample#table-cache-hit-rate={sample#table-cache-hit-rate} sample#load-avg-1m={sample#load-avg-1m} sample#load-avg-5m={sample#load-avg-5m} sample#load-avg-15m={sample#load-avg-15m} sample#read-iops={sample#read-iops} sample#write-iops={sample#write-iops} sample#memory-total={sample#memory-total} sample#memory-free={sample#memory-free} sample#memory-cached={sample#memory-cached} sample#memory-postgres={sample#memory-postgres}", + ), + parameters: [ + ( + "source", + "DATABASE", + ), + ( + "sample#current_transaction", + "15365", + ), + ( + "sample#db_size", + "4347350804bytes", + ), + ( + "sample#tables", + "43", + ), + ( + "sample#active-connections", + "6", + ), + ( + "sample#waiting-connections", + "0", + ), + ( + "sample#index-cache-hit-rate", + "0.97116", + ), + ( + "sample#table-cache-hit-rate", + "0.73958", + ), + ( + "sample#load-avg-1m", + "0.05", + ), + ( + "sample#load-avg-5m", + "0.03", + ), + ( + "sample#load-avg-15m", + "0.035", + ), + ( + "sample#read-iops", + "0", + ), + ( + "sample#write-iops", + "112.73", + ), + ( + "sample#memory-total", + "15405636.0kB", + ), + ( + "sample#memory-free", + "214004kB", + ), + ( + "sample#memory-cached", + "14392920.0kB", + ), + ( + "sample#memory-postgres", + "181644kB", + ), + ], + }, + } + "#); + } + + #[test] + fn test_parse_logplex_invalid_format() { + let msg = "invalid"; + let result = parse_logplex(msg); + assert!(result.is_err()); + } + + #[test] + fn test_logplex_message_to_sentry_log() { + let msg = + "83 <40>1 2012-11-30T06:45:29+00:00 host app web.3 - State changed from starting to up"; + let parsed = parse_logplex(msg).unwrap(); + + let ourlog = logplex_message_to_sentry_log( + parsed, + Some("frame-123"), + Some("d.abc123"), + Some("Logplex/v72"), + ); + + let ourlog = Annotated::new(ourlog); + insta::assert_json_snapshot!(SerializableAnnotated(&ourlog), @r#" + { + "timestamp": 1354257929.0, + "level": "fatal", + "body": "State changed from starting to up", + "attributes": { + "heroku.logplex.drain_token": { + "type": "string", + "value": "d.abc123" + }, + "heroku.logplex.frame_id": { + "type": "string", + "value": "frame-123" + }, + "heroku.logplex.user_agent": { + "type": "string", + "value": "Logplex/v72" + }, + "resource.host.name": { + "type": "string", + "value": "host" + }, + "resource.service.name": { + "type": "string", + "value": "app" + }, + "sentry.origin": { + "type": "string", + "value": "auto.log_drain.heroku" + }, + "syslog.facility": { + "type": "integer", + "value": 5 + }, + "syslog.procid": { + "type": "string", + "value": "web.3" + }, + "syslog.version": { + "type": "integer", + "value": 1 + } + } + } + "#); + } + + #[test] + fn test_logplex_message_to_sentry_log_with_logfmt() { + let input = "100 <134>1 2016-02-13T21:20:25+00:00 host app heroku-postgres - source=DATABASE sample#db_size=1234bytes"; + let parsed = parse_logplex(input).unwrap(); + + let ourlog = logplex_message_to_sentry_log(parsed, None, None, None); + + let ourlog = Annotated::new(ourlog); + insta::assert_json_snapshot!(SerializableAnnotated(&ourlog), @r#" + { + "timestamp": 1455398425.0, + "level": "info", + "body": "source=DATABASE sample#db_size=1234bytes", + "attributes": { + "resource.host.name": { + "type": "string", + "value": "host" + }, + "resource.service.name": { + "type": "string", + "value": "app" + }, + "sentry.message.parameter.sample#db_size": { + "type": "string", + "value": "1234bytes" + }, + "sentry.message.parameter.source": { + "type": "string", + "value": "DATABASE" + }, + "sentry.message.template": { + "type": "string", + "value": "source={source} sample#db_size={sample#db_size}" + }, + "sentry.origin": { + "type": "string", + "value": "auto.log_drain.heroku" + }, + "syslog.facility": { + "type": "integer", + "value": 16 + }, + "syslog.procid": { + "type": "string", + "value": "heroku-postgres" + }, + "syslog.version": { + "type": "integer", + "value": 1 + } + } + } + "#); + } +} diff --git a/relay-ourlogs/src/lib.rs b/relay-ourlogs/src/lib.rs index aa48b3d0887..6b73e1512b1 100644 --- a/relay-ourlogs/src/lib.rs +++ b/relay-ourlogs/src/lib.rs @@ -6,10 +6,14 @@ html_favicon_url = "https://raw.githubusercontent.com/getsentry/relay/master/artwork/relay-icon.png" )] +mod heroku_to_sentry; mod otel_to_sentry; mod size; mod vercel_to_sentry; +pub use self::heroku_to_sentry::{ + HerokuHeader, LogplexMessage, LogplexMsgBody, logplex_message_to_sentry_log, parse_logplex, +}; pub use self::otel_to_sentry::otel_to_sentry_log; pub use self::size::calculate_size; pub use self::vercel_to_sentry::{VercelLog, vercel_log_to_sentry_log}; diff --git a/relay-protocol/src/value.rs b/relay-protocol/src/value.rs index a6b38cf36cc..ee678cbb1be 100644 --- a/relay-protocol/src/value.rs +++ b/relay-protocol/src/value.rs @@ -88,6 +88,15 @@ impl Value { } } + /// Returns an `i64` if the value is an integer type, otherwise `None`. + pub fn as_i64(&self) -> Option { + match self { + Value::I64(v) => Some(*v), + Value::U64(v) => Some(*v as i64), + _ => None, + } + } + /// Constructs a `Value` from a `serde_json::Value` object. fn from_json(value: serde_json::Value) -> Option { Some(match value { diff --git a/relay-server/src/endpoints/integrations/heroku.rs b/relay-server/src/endpoints/integrations/heroku.rs new file mode 100644 index 00000000000..fbf00ff8da9 --- /dev/null +++ b/relay-server/src/endpoints/integrations/heroku.rs @@ -0,0 +1,69 @@ +//! Heroku Log Drain integration endpoint. +//! +//! This module handles HTTPS log drains from Heroku's Logplex system. +//! Logplex sends batches of syslog-formatted messages via POST requests. + +use axum::extract::DefaultBodyLimit; +use axum::http::{HeaderMap, StatusCode}; +use axum::response::IntoResponse; +use axum::routing::{MethodRouter, post}; +use relay_config::Config; +use relay_dynamic_config::Feature; +use relay_ourlogs::HerokuHeader; +use relay_protocol::Value; + +use crate::endpoints::common; +use crate::envelope::ContentType; +use crate::extractors::{IntegrationBuilder, RawContentType}; +use crate::integrations::LogsIntegration; +use crate::service::ServiceState; + +/// All routes configured for the Heroku integration. +/// +/// The integration currently supports the following endpoints: +/// - Heroku Log Drain (Logplex HTTPS drain) +pub fn routes(config: &Config) -> axum::Router { + axum::Router::new() + .route("/logs", logs::route(config)) + .route("/logs/", logs::route(config)) +} + +mod logs { + use super::*; + + async fn handle( + content_type: RawContentType, + headers: HeaderMap, + state: ServiceState, + builder: IntegrationBuilder, + ) -> axum::response::Result { + if ContentType::Logplex != content_type.as_ref() { + return Ok(StatusCode::UNSUPPORTED_MEDIA_TYPE); + } + + let item_headers: Vec<_> = [ + HerokuHeader::FrameId, + HerokuHeader::DrainToken, + HerokuHeader::UserAgent, + ] + .into_iter() + .filter_map(|header| { + let value = headers.get(header.http_header_name())?.to_str().ok()?; + Some((header.as_str().to_owned(), Value::String(value.to_owned()))) + }) + .collect(); + + let envelope = builder + .with_type_and_headers(LogsIntegration::HerokuLogDrain, item_headers) + .with_required_feature(Feature::HerokuLogDrainEndpoint) + .build(); + + common::handle_envelope(&state, envelope).await?; + + Ok(StatusCode::ACCEPTED) + } + + pub fn route(config: &Config) -> MethodRouter { + post(handle).route_layer(DefaultBodyLimit::max(config.max_envelope_size())) + } +} diff --git a/relay-server/src/endpoints/integrations/mod.rs b/relay-server/src/endpoints/integrations/mod.rs index f102f5ac487..058ecc3b9cb 100644 --- a/relay-server/src/endpoints/integrations/mod.rs +++ b/relay-server/src/endpoints/integrations/mod.rs @@ -1,2 +1,3 @@ +pub mod heroku; pub mod otlp; pub mod vercel; diff --git a/relay-server/src/endpoints/mod.rs b/relay-server/src/endpoints/mod.rs index b3204b223a3..dc4b3e81021 100644 --- a/relay-server/src/endpoints/mod.rs +++ b/relay-server/src/endpoints/mod.rs @@ -108,6 +108,7 @@ fn public_routes_raw(config: &Config) -> Router { let integration_routes = Router::new() .nest("/api/{project_id}/integration/otlp", integrations::otlp::routes(config)) .nest("/api/{project_id}/integration/vercel", integrations::vercel::routes(config)) + .nest("/api/{project_id}/integration/heroku", integrations::heroku::routes(config)) .route_layer(middlewares::cors()); // NOTE: If you add a new (non-experimental) route here, please also list it in diff --git a/relay-server/src/envelope/content_type.rs b/relay-server/src/envelope/content_type.rs index bd081fae019..d933dd4c840 100644 --- a/relay-server/src/envelope/content_type.rs +++ b/relay-server/src/envelope/content_type.rs @@ -29,6 +29,8 @@ pub enum ContentType { Envelope, /// `application/x-protobuf` Protobuf, + /// [`application/logplex-1`](https://devcenter.heroku.com/articles/log-drains#https-drains) + Logplex, /// `application/vnd.sentry.items.log+json` LogContainer, /// `application/vnd.sentry.items.span.v2+json` @@ -54,6 +56,7 @@ impl ContentType { Self::Xml => "text/xml", Self::Envelope => CONTENT_TYPE, Self::Protobuf => "application/x-protobuf", + Self::Logplex => "application/logplex-1", Self::LogContainer => "application/vnd.sentry.items.log+json", Self::SpanV2Container => "application/vnd.sentry.items.span.v2+json", Self::TraceMetricContainer => "application/vnd.sentry.items.trace-metric+json", @@ -93,6 +96,8 @@ impl ContentType { Some(Self::Envelope) } else if ct.eq_ignore_ascii_case(Self::Protobuf.as_str()) { Some(Self::Protobuf) + } else if ct.eq_ignore_ascii_case(Self::Logplex.as_str()) { + Some(Self::Logplex) } else if ct.eq_ignore_ascii_case(Self::LogContainer.as_str()) { Some(Self::LogContainer) } else if ct.eq_ignore_ascii_case(Self::SpanV2Container.as_str()) { diff --git a/relay-server/src/envelope/item.rs b/relay-server/src/envelope/item.rs index 4e318fe8433..c158a3a11e3 100644 --- a/relay-server/src/envelope/item.rs +++ b/relay-server/src/envelope/item.rs @@ -172,6 +172,10 @@ impl Item { (DataCategory::LogByte, self.len().max(1)), (DataCategory::LogItem, item_count), ], + Some(Integration::Logs(LogsIntegration::HerokuLogDrain)) => smallvec![ + (DataCategory::LogByte, self.len().max(1)), + (DataCategory::LogItem, item_count), + ], Some(Integration::Spans(SpansIntegration::OtelV1 { .. })) => { smallvec![ (DataCategory::Span, item_count), diff --git a/relay-server/src/extractors/integration_builder.rs b/relay-server/src/extractors/integration_builder.rs index a17761bef40..6caf398f550 100644 --- a/relay-server/src/extractors/integration_builder.rs +++ b/relay-server/src/extractors/integration_builder.rs @@ -30,12 +30,30 @@ impl IntegrationBuilder<()> { /// Configures the [`Integration`] type. /// /// Setting the type is required. - pub fn with_type(mut self, integration: impl Into) -> IntegrationBuilder { + pub fn with_type(self, integration: impl Into) -> IntegrationBuilder { + self.with_type_and_headers(integration, []) + } + + /// Configures the [`Integration`] type with additional headers. + /// + /// Headers are stored in the item's `other` field and can be retrieved + /// during processing via `item.get_header()`. + pub fn with_type_and_headers( + mut self, + integration: impl Into, + headers: I, + ) -> IntegrationBuilder + where + I: IntoIterator, + { let integration = integration.into(); self.envelope.add_item({ let mut item = Item::new(ItemType::Integration); item.set_payload(integration.into(), self.payload.clone()); + for (name, value) in headers { + item.set_header(name, value); + } item }); diff --git a/relay-server/src/integrations/mod.rs b/relay-server/src/integrations/mod.rs index e83d88ac394..bb0112901a8 100644 --- a/relay-server/src/integrations/mod.rs +++ b/relay-server/src/integrations/mod.rs @@ -44,6 +44,7 @@ define_integrations!( "application/vnd.sentry.integration.otel.spans+protobuf" => Integration::Spans(SpansIntegration::OtelV1 { format: OtelFormat::Protobuf }), "application/vnd.sentry.integration.vercel.logs+json" => Integration::Logs(LogsIntegration::VercelDrainLog { format: VercelLogDrainFormat::Json }), "application/vnd.sentry.integration.vercel.logs+ndjson" => Integration::Logs(LogsIntegration::VercelDrainLog { format: VercelLogDrainFormat::NdJson }), + "application/vnd.sentry.integration.heroku.logs+logplex" => Integration::Logs(LogsIntegration::HerokuLogDrain), ); /// An exhaustive list of all integrations supported by Relay. @@ -80,6 +81,10 @@ pub enum LogsIntegration { /// /// Supports the [`relay_ourlogs::VercelLog`] format. VercelDrainLog { format: VercelLogDrainFormat }, + /// The Heroku Log Drain integration. + /// + /// Supports Heroku's Logplex syslog format via HTTPS drains. + HerokuLogDrain, } /// All span integrations supported by Relay. diff --git a/relay-server/src/processing/logs/integrations/heroku.rs b/relay-server/src/processing/logs/integrations/heroku.rs new file mode 100644 index 00000000000..5269305ce4c --- /dev/null +++ b/relay-server/src/processing/logs/integrations/heroku.rs @@ -0,0 +1,77 @@ +//! Heroku Logplex log expansion. + +use relay_event_schema::protocol::OurLog; +use relay_ourlogs::HerokuHeader; + +use crate::envelope::Item; +use crate::processing::logs::{Error, Result}; +use crate::services::outcome::DiscardReason; + +/// Expands Heroku Logplex logs into the [`OurLog`] format. +/// +/// # Arguments +/// +/// * `item` - The envelope item containing the Logplex payload and headers +/// * `produce` - A callback function that receives each parsed log entry +pub fn expand(item: &Item, mut produce: F) -> Result<()> +where + F: FnMut(OurLog), +{ + let frame_id = item + .get_header(HerokuHeader::FrameId.as_str()) + .and_then(|v| v.as_str()); + + let drain_token = item + .get_header(HerokuHeader::DrainToken.as_str()) + .and_then(|v| v.as_str()); + + let user_agent = item + .get_header(HerokuHeader::UserAgent.as_str()) + .and_then(|v| v.as_str()); + + let payload = item.payload(); + + let payload_str = std::str::from_utf8(&payload).map_err(|e| { + relay_log::debug!( + error = &e as &dyn std::error::Error, + "Failed to parse Logplex payload as UTF-8" + ); + Error::Invalid(DiscardReason::InvalidLog) + })?; + + let mut count: u32 = 0; + + // Logplex sends multiple syslog messages, each on its own line + // Each line is prefixed with a length: " " + for line in payload_str.lines() { + if line.is_empty() { + continue; + } + + match relay_ourlogs::parse_logplex(line) { + Ok(msg) => { + count += 1; + let ourlog = relay_ourlogs::logplex_message_to_sentry_log( + msg, + frame_id, + drain_token, + user_agent, + ); + produce(ourlog); + } + Err(e) => { + relay_log::debug!( + error = %e, + "Failed to parse Logplex message, skipping" + ); + } + } + } + + if count == 0 { + relay_log::debug!("Failed to parse any logs from Heroku Logplex payload"); + return Err(Error::Invalid(DiscardReason::InvalidLog)); + } + + Ok(()) +} diff --git a/relay-server/src/processing/logs/integrations/mod.rs b/relay-server/src/processing/logs/integrations/mod.rs index 6ff3712608b..d61e5e5a2d0 100644 --- a/relay-server/src/processing/logs/integrations/mod.rs +++ b/relay-server/src/processing/logs/integrations/mod.rs @@ -5,6 +5,7 @@ use crate::envelope::{ContainerItems, Item, WithHeader}; use crate::integrations::{Integration, LogsIntegration}; use crate::managed::RecordKeeper; +mod heroku; mod otel; mod vercel; @@ -45,6 +46,7 @@ pub fn expand_into( let result = match integration { LogsIntegration::OtelV1 { format } => otel::expand(format, &payload, produce), LogsIntegration::VercelDrainLog { format } => vercel::expand(format, &payload, produce), + LogsIntegration::HerokuLogDrain => heroku::expand(&item, produce), }; match result { diff --git a/tests/integration/fixtures/__init__.py b/tests/integration/fixtures/__init__.py index e98d678ef7c..283b6ae14cc 100644 --- a/tests/integration/fixtures/__init__.py +++ b/tests/integration/fixtures/__init__.py @@ -288,6 +288,30 @@ def send_vercel_logs( response.raise_for_status() return response + def send_heroku_logs( + self, + project_id, + data=None, + headers=None, + dsn_key_idx=0, + dsn_key=None, + ): + + if dsn_key is None: + dsn_key = self.get_dsn_public_key(project_id, dsn_key_idx) + + url = f"/api/{project_id}/integration/heroku/logs?sentry_key={dsn_key}" + + headers = { + "Content-Type": "application/logplex-1", + **(headers or {}), + } + + response = self.post(url, headers=headers, data=data) + + response.raise_for_status() + return response + def send_options(self, project_id, headers=None, dsn_key_idx=0): headers = { "X-Sentry-Auth": self.get_auth_header(project_id, dsn_key_idx), diff --git a/tests/integration/test_heroku_logs.py b/tests/integration/test_heroku_logs.py new file mode 100644 index 00000000000..cfa2654e066 --- /dev/null +++ b/tests/integration/test_heroku_logs.py @@ -0,0 +1,133 @@ +from unittest import mock + +from sentry_relay.consts import DataCategory + +# Example Logplex syslog messages from Heroku documentation +# https://devcenter.heroku.com/articles/log-drains#https-drains +HEROKU_LOGS_PAYLOAD = """\ +83 <40>1 2012-11-30T06:45:29+00:00 host app web.3 - State changed from starting to up +119 <40>1 2012-11-30T06:45:26+00:00 host app web.3 - Starting process with command `bundle exec rackup config.ru -p 24405`""" + + +EXPECTED_ITEMS = [ + { + "organizationId": "1", + "projectId": "42", + "traceId": mock.ANY, + "itemId": mock.ANY, + "itemType": "TRACE_ITEM_TYPE_LOG", + "timestamp": mock.ANY, + "attributes": { + "sentry.origin": {"stringValue": "auto.log_drain.heroku"}, + "syslog.facility": {"intValue": "5"}, + "syslog.version": {"intValue": "1"}, + "syslog.procid": {"stringValue": "web.3"}, + "resource.host.name": {"stringValue": "host"}, + "resource.service.name": {"stringValue": "app"}, + "sentry.body": {"stringValue": "State changed from starting to up"}, + "sentry.severity_text": {"stringValue": "fatal"}, + "sentry.observed_timestamp_nanos": {"stringValue": mock.ANY}, + "sentry.timestamp_precise": {"intValue": "1354257929000000000"}, + "sentry.payload_size_bytes": {"intValue": mock.ANY}, + "sentry._meta.fields.trace_id": { + "stringValue": '{"meta":{"":{"rem":[["trace_id.missing","s"]]}}}' + }, + "heroku.logplex.frame_id": {"stringValue": "abc123"}, + "heroku.logplex.drain_token": { + "stringValue": "d.73ea7440-270a-435a-a0ea-adf50b4e5f5a" + }, + "heroku.logplex.user_agent": {"stringValue": "Logplex/v72"}, + }, + "clientSampleRate": 1.0, + "serverSampleRate": 1.0, + "retentionDays": 90, + "received": mock.ANY, + "downsampledRetentionDays": 90, + }, + { + "organizationId": "1", + "projectId": "42", + "traceId": mock.ANY, + "itemId": mock.ANY, + "itemType": "TRACE_ITEM_TYPE_LOG", + "timestamp": mock.ANY, + "attributes": { + "sentry.origin": {"stringValue": "auto.log_drain.heroku"}, + "syslog.facility": {"intValue": "5"}, + "syslog.version": {"intValue": "1"}, + "syslog.procid": {"stringValue": "web.3"}, + "resource.host.name": {"stringValue": "host"}, + "resource.service.name": {"stringValue": "app"}, + "sentry.body": { + "stringValue": "Starting process with command `bundle exec rackup config.ru -p 24405`" + }, + "sentry.severity_text": {"stringValue": "fatal"}, + "sentry.observed_timestamp_nanos": {"stringValue": mock.ANY}, + "sentry.timestamp_precise": {"intValue": "1354257926000000000"}, + "sentry.payload_size_bytes": {"intValue": mock.ANY}, + "sentry._meta.fields.trace_id": { + "stringValue": '{"meta":{"":{"rem":[["trace_id.missing","s"]]}}}' + }, + "heroku.logplex.frame_id": {"stringValue": "abc123"}, + "heroku.logplex.drain_token": { + "stringValue": "d.73ea7440-270a-435a-a0ea-adf50b4e5f5a" + }, + "heroku.logplex.user_agent": {"stringValue": "Logplex/v72"}, + }, + "clientSampleRate": 1.0, + "serverSampleRate": 1.0, + "retentionDays": 90, + "received": mock.ANY, + "downsampledRetentionDays": 90, + }, +] + + +def test_heroku_logs( + mini_sentry, relay, relay_with_processing, outcomes_consumer, items_consumer +): + """Test Heroku Logplex log drain ingestion.""" + items_consumer = items_consumer() + outcomes_consumer = outcomes_consumer() + project_id = 42 + project_config = mini_sentry.add_full_project_config(project_id) + project_config["config"]["features"] = [ + "organizations:ourlogs-ingestion", + "organizations:relay-heroku-log-drain-endpoint", + ] + + relay = relay(relay_with_processing()) + + relay.send_heroku_logs( + project_id, + data=HEROKU_LOGS_PAYLOAD, + headers={ + "Logplex-Msg-Count": "2", + "Logplex-Frame-Id": "abc123", + "Logplex-Drain-Token": "d.73ea7440-270a-435a-a0ea-adf50b4e5f5a", + "User-Agent": "Logplex/v72", + }, + ) + + items = items_consumer.get_items(n=2) + assert items == EXPECTED_ITEMS + + outcomes = outcomes_consumer.get_aggregated_outcomes(n=4) + assert outcomes == [ + { + "category": DataCategory.LOG_ITEM.value, + "key_id": 123, + "org_id": 1, + "outcome": 0, + "project_id": 42, + "quantity": 2, + }, + { + "category": DataCategory.LOG_BYTE.value, + "key_id": 123, + "org_id": 1, + "outcome": 0, + "project_id": 42, + "quantity": mock.ANY, + }, + ]