Skip to content

Commit 2c334d2

Browse files
authored
enhancement(kafka sink): Add support for traces in kafka sink (#24639)
* add support for traces in kafka sink * add changelog * add suggestion Made-with: Cursor * fix fmt Made-with: Cursor
1 parent 2097fc3 commit 2c334d2

File tree

4 files changed

+160
-12
lines changed

4 files changed

+160
-12
lines changed
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
The `kafka` sink now supports trace events.
2+
3+
authors: pstalmach

src/sinks/kafka/config.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -288,8 +288,7 @@ impl SinkConfig for KafkaSinkConfig {
288288
fn input(&self) -> Input {
289289
let requirements = Requirement::empty().optional_meaning("timestamp", Kind::timestamp());
290290

291-
Input::new(self.encoding.config().input_type() & (DataType::Log | DataType::Metric))
292-
.with_schema_requirement(requirements)
291+
Input::new(self.encoding.config().input_type()).with_schema_requirement(requirements)
293292
}
294293

295294
fn acknowledgements(&self) -> &AcknowledgementsConfig {

src/sinks/kafka/request_builder.rs

Lines changed: 31 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -67,28 +67,36 @@ impl RequestBuilder<(String, Event)> for KafkaRequestBuilder {
6767
fn get_key(event: &Event, key_field: Option<&OwnedTargetPath>) -> Option<Bytes> {
6868
key_field.and_then(|key_field| match event {
6969
Event::Log(log) => log.get(key_field).map(|value| value.coerce_to_bytes()),
70+
Event::Trace(trace) => trace.get(key_field).map(|value| value.coerce_to_bytes()),
7071
Event::Metric(metric) => metric
7172
.tags()
7273
.and_then(|tags| tags.get(key_field.to_string().as_str()))
7374
.map(|value| value.to_owned().into()),
74-
_ => None,
7575
})
7676
}
7777

7878
fn get_timestamp_millis(event: &Event) -> Option<i64> {
7979
match &event {
8080
Event::Log(log) => log.get_timestamp().and_then(|v| v.as_timestamp()).copied(),
81+
Event::Trace(trace) => trace
82+
.as_ref()
83+
.get_timestamp()
84+
.and_then(|v| v.as_timestamp())
85+
.copied(),
8186
Event::Metric(metric) => metric.timestamp(),
82-
_ => None,
8387
}
8488
.map(|ts| ts.timestamp_millis())
8589
}
8690

8791
fn get_headers(event: &Event, headers_key: Option<&OwnedTargetPath>) -> Option<OwnedHeaders> {
8892
headers_key.and_then(|headers_key| {
89-
if let Event::Log(log) = event
90-
&& let Some(headers) = log.get(headers_key)
91-
{
93+
let headers = match event {
94+
Event::Log(log) => log.get(headers_key),
95+
Event::Trace(trace) => trace.get(headers_key),
96+
Event::Metric(_) => None,
97+
};
98+
99+
if let Some(headers) = headers {
92100
match headers {
93101
Value::Object(headers_map) => {
94102
let mut owned_headers = OwnedHeaders::new_with_capacity(headers_map.len());
@@ -123,7 +131,7 @@ mod tests {
123131
use rdkafka::message::Headers;
124132

125133
use super::*;
126-
use crate::event::{LogEvent, ObjectMap};
134+
use crate::event::{LogEvent, ObjectMap, TraceEvent};
127135

128136
#[test]
129137
fn kafka_get_headers() {
@@ -141,4 +149,21 @@ mod tests {
141149
assert_eq!(headers.get(1).key, "b-key");
142150
assert_eq!(headers.get(1).value.unwrap(), "b-value".as_bytes());
143151
}
152+
153+
#[test]
154+
fn kafka_get_headers_trace() {
155+
let headers_key = OwnedTargetPath::try_from("headers".to_string()).unwrap();
156+
let mut header_values = ObjectMap::new();
157+
header_values.insert("a-key".into(), Value::Bytes(Bytes::from("a-value")));
158+
header_values.insert("b-key".into(), Value::Bytes(Bytes::from("b-value")));
159+
160+
let mut event = Event::Trace(TraceEvent::from(LogEvent::from("hello")));
161+
event.as_mut_trace().insert(&headers_key, header_values);
162+
163+
let headers = get_headers(&event, Some(&headers_key)).unwrap();
164+
assert_eq!(headers.get(0).key, "a-key");
165+
assert_eq!(headers.get(0).value.unwrap(), "a-value".as_bytes());
166+
assert_eq!(headers.get(1).key, "b-key");
167+
assert_eq!(headers.get(1).value.unwrap(), "b-value".as_bytes());
168+
}
144169
}

src/sinks/kafka/tests.rs

Lines changed: 125 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,30 +6,30 @@ mod integration_test {
66
use std::{collections::HashMap, future::ready, thread, time::Duration};
77

88
use bytes::Bytes;
9-
use futures::StreamExt;
9+
use futures::{StreamExt, stream};
1010
use rdkafka::{
1111
Message, Offset, TopicPartitionList,
1212
consumer::{BaseConsumer, Consumer},
1313
message::Headers,
1414
};
1515
use vector_lib::{
16-
codecs::TextSerializerConfig,
16+
codecs::{JsonSerializerConfig, TextSerializerConfig},
1717
config::{Tags, Telemetry, init_telemetry},
1818
event::{BatchNotifier, BatchStatus},
1919
lookup::lookup_v2::ConfigTargetPath,
2020
};
2121

2222
use super::super::{config::KafkaSinkConfig, sink::KafkaSink, *};
2323
use crate::{
24-
event::{ObjectMap, Value},
24+
event::{ObjectMap, TraceEvent, Value},
2525
kafka::{KafkaAuthConfig, KafkaCompression, KafkaSaslConfig},
2626
sinks::prelude::*,
2727
test_util::{
2828
components::{
2929
DATA_VOLUME_SINK_TAGS, SINK_TAGS, assert_data_volume_sink_compliance,
3030
assert_sink_compliance,
3131
},
32-
random_lines_with_stream, random_string, wait_for,
32+
map_event_batch_stream, random_lines_with_stream, random_string, wait_for,
3333
},
3434
tls::{TEST_PEM_INTERMEDIATE_CA_PATH, TlsConfig, TlsEnableableConfig},
3535
};
@@ -311,6 +311,127 @@ mod integration_test {
311311
.await;
312312
}
313313

314+
#[tokio::test]
315+
async fn kafka_happy_path_trace_events() {
316+
crate::test_util::trace_init();
317+
318+
assert_sink_compliance(&SINK_TAGS, async move {
319+
let topic_prefix = format!("test-trace-{}", random_string(10));
320+
let topic = format!("{}-{}", topic_prefix, chrono::Utc::now().format("%Y%m%d"));
321+
let key_field = ConfigTargetPath::try_from("trace_key".to_string()).unwrap();
322+
let headers_key = ConfigTargetPath::try_from("trace_headers".to_string()).unwrap();
323+
let trace_key = "trace-partition-key";
324+
let header_key = "trace-header-key";
325+
let header_value = "trace-header-value";
326+
327+
let config = KafkaSinkConfig {
328+
bootstrap_servers: kafka_address(9091),
329+
topic: Template::try_from(format!("{topic_prefix}-%Y%m%d")).unwrap(),
330+
healthcheck_topic: None,
331+
key_field: Some(key_field.clone()),
332+
encoding: JsonSerializerConfig::default().into(),
333+
batch: BatchConfig::default(),
334+
compression: KafkaCompression::None,
335+
auth: KafkaAuthConfig::default(),
336+
socket_timeout_ms: Duration::from_millis(60000),
337+
message_timeout_ms: Duration::from_millis(300000),
338+
rate_limit_duration_secs: 1,
339+
rate_limit_num: i64::MAX as u64,
340+
librdkafka_options: HashMap::new(),
341+
headers_key: Some(headers_key.clone()),
342+
acknowledgements: Default::default(),
343+
};
344+
345+
let num_events = 100;
346+
let mut expected_messages = Vec::with_capacity(num_events);
347+
348+
let (batch, receiver) = BatchNotifier::new_with_receiver();
349+
let mut events = Vec::with_capacity(num_events);
350+
for i in 0..num_events {
351+
let message = format!("trace-message-{i}");
352+
expected_messages.push(message.clone());
353+
354+
let mut trace = TraceEvent::default();
355+
trace.insert("message", message);
356+
trace.insert("trace_key", trace_key);
357+
trace.insert("timestamp", chrono::Utc::now());
358+
359+
let mut trace_headers = ObjectMap::new();
360+
trace_headers.insert(header_key.into(), Value::Bytes(Bytes::from(header_value)));
361+
trace.insert(&headers_key, trace_headers);
362+
363+
events.push(Event::Trace(trace.with_batch_notifier(&batch)));
364+
}
365+
366+
let sink = KafkaSink::new(config).unwrap();
367+
let sink = VectorSink::from_event_streamsink(sink);
368+
let stream = map_event_batch_stream(stream::iter(events), Some(batch));
369+
sink.run(stream).await.unwrap();
370+
assert_eq!(receiver.await, BatchStatus::Delivered);
371+
372+
// Read back everything from the beginning.
373+
let mut client_config = rdkafka::ClientConfig::new();
374+
client_config.set("bootstrap.servers", kafka_address(9091).as_str());
375+
client_config.set("group.id", random_string(10));
376+
client_config.set("enable.partition.eof", "true");
377+
let mut tpl = TopicPartitionList::new();
378+
tpl.add_partition(&topic, 0)
379+
.set_offset(Offset::Beginning)
380+
.unwrap();
381+
let consumer: BaseConsumer = client_config.create().unwrap();
382+
consumer.assign(&tpl).unwrap();
383+
384+
wait_for(
385+
|| match consumer.fetch_watermarks(&topic, 0, Duration::from_secs(3)) {
386+
Ok((_low, high)) => ready(high >= num_events as i64),
387+
Err(err) => {
388+
println!("retrying due to error fetching watermarks: {err}");
389+
ready(false)
390+
}
391+
},
392+
)
393+
.await;
394+
395+
let (low, high) = consumer
396+
.fetch_watermarks(&topic, 0, Duration::from_secs(3))
397+
.unwrap();
398+
assert_eq!((0, num_events as i64), (low, high));
399+
400+
let mut failures = 0;
401+
let mut observed_messages = Vec::new();
402+
while failures < 100 {
403+
match consumer.poll(Duration::from_secs(3)) {
404+
Some(Ok(msg)) => {
405+
let payload: &str = msg.payload_view().unwrap().unwrap();
406+
let payload_json: serde_json::Value =
407+
serde_json::from_str(payload).unwrap();
408+
observed_messages
409+
.push(payload_json["message"].as_str().unwrap().to_owned());
410+
411+
let key = msg.key().unwrap();
412+
assert_eq!(key, trace_key.as_bytes());
413+
414+
let timestamp = msg.timestamp().to_millis();
415+
assert!(timestamp.is_some());
416+
417+
let header = msg.headers().unwrap().get(0);
418+
assert_eq!(header.key, header_key);
419+
assert_eq!(header.value.unwrap(), header_value.as_bytes());
420+
}
421+
None if observed_messages.len() >= num_events => break,
422+
_ => {
423+
failures += 1;
424+
thread::sleep(Duration::from_millis(50));
425+
}
426+
}
427+
}
428+
429+
assert_eq!(observed_messages.len(), num_events);
430+
assert_eq!(observed_messages, expected_messages);
431+
})
432+
.await;
433+
}
434+
314435
async fn kafka_happy_path(
315436
server: String,
316437
sasl: Option<KafkaSaslConfig>,

0 commit comments

Comments
 (0)