Skip to content

Commit 9310686

Browse files
authored
Revert "ref(kafka): Make routing key optional, instead of randomizing it" (#4763)
Reverts #4738
1 parent e314c24 commit 9310686

File tree

5 files changed

+44
-49
lines changed

5 files changed

+44
-49
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

relay-kafka/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ serde_json = { workspace = true, optional = true }
2323
thiserror = { workspace = true }
2424
sentry-kafka-schemas = { workspace = true, default-features = false, optional = true }
2525
parking_lot = { workspace = true }
26+
uuid = { workspace = true }
2627
hashbrown = { workspace = true }
2728

2829
[dev-dependencies]

relay-kafka/src/producer/mod.rs

Lines changed: 21 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ use rdkafka::message::Header;
1111
use rdkafka::producer::{BaseRecord, Producer as _};
1212
use relay_statsd::metric;
1313
use thiserror::Error;
14+
use uuid::Uuid;
1415

1516
use crate::config::{KafkaParams, KafkaTopic};
1617
use crate::debounced::Debounced;
@@ -76,7 +77,7 @@ pub enum ClientError {
7677
/// Describes the type which can be sent using kafka producer provided by this crate.
7778
pub trait Message {
7879
/// Returns the partitioning key for this kafka message determining.
79-
fn key(&self) -> Option<[u8; 16]>;
80+
fn key(&self) -> [u8; 16];
8081

8182
/// Returns the type of the message.
8283
fn variant(&self) -> &'static str;
@@ -187,7 +188,7 @@ impl KafkaClient {
187188
pub fn send(
188189
&self,
189190
topic: KafkaTopic,
190-
key: Option<[u8; 16]>,
191+
key: [u8; 16],
191192
headers: Option<&BTreeMap<String, String>>,
192193
variant: &str,
193194
payload: &[u8],
@@ -304,7 +305,7 @@ impl Producer {
304305
/// Sends the payload to the correct producer for the current topic.
305306
fn send(
306307
&self,
307-
key: Option<[u8; 16]>,
308+
key: [u8; 16],
308309
headers: Option<&BTreeMap<String, String>>,
309310
variant: &str,
310311
payload: &[u8],
@@ -327,37 +328,28 @@ impl Producer {
327328
})
328329
.collect::<KafkaHeaders>();
329330

330-
let key = match (key, self.rate_limiter.as_ref()) {
331-
(Some(key), Some(limiter)) => {
332-
let is_limited = limiter.try_increment(now, key, 1) < 1;
333-
334-
if is_limited {
335-
metric!(
336-
counter(KafkaCounters::ProducerPartitionKeyRateLimit) += 1,
337-
variant = variant,
338-
topic = topic_name,
339-
);
340-
341-
headers.insert(Header {
342-
key: "sentry-reshuffled",
343-
value: Some("1"),
344-
});
345-
346-
None
347-
} else {
348-
Some(key)
349-
}
331+
let mut key = key;
332+
if let Some(ref limiter) = self.rate_limiter {
333+
if limiter.try_increment(now, key, 1) < 1 {
334+
metric!(
335+
counter(KafkaCounters::ProducerPartitionKeyRateLimit) += 1,
336+
variant = variant,
337+
topic = topic_name,
338+
);
339+
340+
key = Uuid::new_v4().into_bytes();
341+
headers.insert(Header {
342+
key: "sentry-reshuffled",
343+
value: Some("1"),
344+
});
350345
}
351-
(key, _) => key,
352-
};
346+
}
347+
348+
let mut record = BaseRecord::to(topic_name).key(&key).payload(payload);
353349

354-
let mut record = BaseRecord::to(topic_name).payload(payload);
355350
if let Some(headers) = headers.into_inner() {
356351
record = record.headers(headers);
357352
}
358-
if let Some(key) = key.as_ref() {
359-
record = record.key(key);
360-
}
361353

362354
self.metrics.debounce(now, || {
363355
metric!(

relay-server/src/services/outcome.rs

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1151,13 +1151,10 @@ impl OutcomeBroker {
11511151
KafkaTopic::Outcomes
11521152
};
11531153

1154-
let result = producer.client.send(
1155-
topic,
1156-
Some(key.into_bytes()),
1157-
None,
1158-
"outcome",
1159-
payload.as_bytes(),
1160-
);
1154+
let result =
1155+
producer
1156+
.client
1157+
.send(topic, key.into_bytes(), None, "outcome", payload.as_bytes());
11611158

11621159
match result {
11631160
Ok(_) => Ok(()),

relay-server/src/services/store.rs

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1595,30 +1595,34 @@ impl Message for KafkaMessage<'_> {
15951595
}
15961596

15971597
/// Returns the partitioning key for this kafka message determining.
1598-
fn key(&self) -> Option<[u8; 16]> {
1599-
match self {
1600-
Self::Event(message) => Some(message.event_id.0),
1601-
Self::Attachment(message) => Some(message.event_id.0),
1602-
Self::AttachmentChunk(message) => Some(message.event_id.0),
1603-
Self::UserReport(message) => Some(message.event_id.0),
1604-
Self::ReplayEvent(message) => Some(message.replay_id.0),
1605-
Self::Span { message, .. } => Some(message.trace_id.0),
1598+
fn key(&self) -> [u8; 16] {
1599+
let mut uuid = match self {
1600+
Self::Event(message) => message.event_id.0,
1601+
Self::Attachment(message) => message.event_id.0,
1602+
Self::AttachmentChunk(message) => message.event_id.0,
1603+
Self::UserReport(message) => message.event_id.0,
1604+
Self::ReplayEvent(message) => message.replay_id.0,
1605+
Self::Span { message, .. } => message.trace_id.0,
16061606

16071607
// Monitor check-ins use the hinted UUID passed through from the Envelope.
16081608
//
16091609
// XXX(epurkhiser): In the future it would be better if all KafkaMessage's would
16101610
// recieve the routing_key_hint form their envelopes.
1611-
Self::CheckIn(message) => message.routing_key_hint,
1611+
Self::CheckIn(message) => message.routing_key_hint.unwrap_or_else(Uuid::nil),
16121612

16131613
// Random partitioning
16141614
Self::Profile(_)
16151615
| Self::Log { .. }
16161616
| Self::ReplayRecordingNotChunked(_)
16171617
| Self::ProfileChunk(_)
1618-
| Self::Metric { .. } => None,
1618+
| Self::Metric { .. } => Uuid::nil(),
1619+
};
1620+
1621+
if uuid.is_nil() {
1622+
uuid = Uuid::new_v4();
16191623
}
1620-
.filter(|uuid| !uuid.is_nil())
1621-
.map(|uuid| uuid.into_bytes())
1624+
1625+
*uuid.as_bytes()
16221626
}
16231627

16241628
fn headers(&self) -> Option<&BTreeMap<String, String>> {
@@ -1715,7 +1719,7 @@ mod tests {
17151719
for topic in [KafkaTopic::Outcomes, KafkaTopic::OutcomesBilling] {
17161720
let res = producer
17171721
.client
1718-
.send(topic, Some(*b"0123456789abcdef"), None, "foo", b"");
1722+
.send(topic, *b"0123456789abcdef", None, "foo", b"");
17191723

17201724
assert!(matches!(res, Err(ClientError::InvalidTopicName)));
17211725
}

0 commit comments

Comments
 (0)