Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
The `nats` sink now supports message headers when publishing to JetStream.

It introduces a configurable, templated Nats-Msg-Id header that ensures a unique ID for each message. This enables broker-level deduplication, resulting in stronger delivery guarantees and exactly-once semantics when combined with idempotent consumers.

authors: benjamin-awd
7 changes: 7 additions & 0 deletions scripts/integration/nats/compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,10 @@ services:
- /usr/share/nats/config/nats-jwt.conf
volumes:
- ../../../tests/data/nats:/usr/share/nats/config
nats-jetstream-test:
image: docker.io/library/nats:${CONFIG_VERSION}
command:
- --config
- /usr/share/nats/config/nats-jetstream.conf
volumes:
- ../../../tests/data/nats:/usr/share/nats/config
19 changes: 10 additions & 9 deletions scripts/integration/nats/test.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
features:
- nats-integration-tests
- nats-integration-tests

test_filter: '::nats::'
test_filter: "::nats::"

env:
NATS_ADDRESS: nats://nats:4222
Expand All @@ -11,17 +11,18 @@ env:
NATS_TLS_CLIENT_CERT_ADDRESS: nats://nats-tls-client-cert:4222
NATS_TOKEN_ADDRESS: nats://nats-token:4222
NATS_USERPASS_ADDRESS: nats://nats-userpass:4222
NATS_JETSTREAM_ADDRESS: nats://nats-jetstream-test:4222

matrix:
version: [latest]

# changes to these files/paths will invoke the integration test in CI
# expressions are evaluated using https://github.com/micromatch/picomatch
paths:
- "src/internal_events/nats.rs"
- "src/sources/nats.rs"
- "src/sources/util/**"
- "src/sinks/nats/**"
- "src/sinks/util/**"
- "src/nats.rs"
- "scripts/integration/nats/**"
- "src/internal_events/nats.rs"
- "src/sources/nats.rs"
- "src/sources/util/**"
- "src/sinks/nats/**"
- "src/sinks/util/**"
- "src/nats.rs"
- "scripts/integration/nats/**"
88 changes: 78 additions & 10 deletions src/sinks/nats/config.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
use async_nats::header;
use bytes::Bytes;
use futures_util::TryFutureExt;
use snafu::ResultExt;
use vector_lib::codecs::JsonSerializerConfig;
use vector_lib::tls::TlsEnableableConfig;

use super::{sink::NatsSink, ConfigSnafu, ConnectSnafu, NatsError};
use crate::{
nats::{from_tls_auth_config, NatsAuthConfig, NatsConfigError},
sinks::{prelude::*, util::service::TowerRequestConfigDefaults},
};

use super::{sink::NatsSink, ConfigSnafu, ConnectSnafu, NatsError};
use async_nats::HeaderMap;

#[derive(Clone, Copy, Debug)]
pub struct NatsTowerRequestConfigDefaults;
Expand All @@ -18,6 +19,61 @@ impl TowerRequestConfigDefaults for NatsTowerRequestConfigDefaults {
const CONCURRENCY: Concurrency = Concurrency::None;
}

/// A set of NATS headers that can be added to each message.
#[configurable_component]
#[serde_with::serde_as]
#[derive(Clone, Debug, Default, PartialEq, Eq)]
#[serde(deny_unknown_fields)]
pub struct NatsHeaderConfig {
/// A unique identifier for the message. Useful for deduplication.
///
/// Can be a template that references fields in the event, e.g., `{{ event_id }}`.
#[configurable(metadata(docs::templateable))]
#[serde(skip_serializing_if = "Option::is_none")]
#[configurable(metadata(docs::examples = "{{ event_id }}"))]
pub(super) message_id: Option<Template>,
}

impl NatsHeaderConfig {
pub fn build_headers(&self, event: &Event) -> HeaderMap {
let mut headers = HeaderMap::new();

if let Some(template) = &self.message_id {
if let Ok(value) = template.render_string(event) {
headers.insert(header::NATS_MESSAGE_ID, value.as_str());
}
}

headers
}
}

/// Configuration for sending messages using NATS JetStream.
#[configurable_component]
#[serde_with::serde_as]
#[derive(Clone, Debug, Eq, PartialEq, Default)]
#[serde(deny_unknown_fields)]
pub struct JetStreamConfig {
// Whether to enable Jetstream.
#[configurable(derived)]
#[serde(default)]
pub enabled: bool,

/// A map of NATS headers to be included in each message.
#[configurable(metadata(docs::templateable))]
#[serde(default)]
pub(super) headers: Option<NatsHeaderConfig>,
}

impl From<bool> for JetStreamConfig {
fn from(enabled: bool) -> Self {
Self {
enabled,
..Default::default()
}
}
}

/// Configuration for the `nats` sink.
#[configurable_component(sink(
"nats",
Expand Down Expand Up @@ -86,8 +142,13 @@ pub struct NatsSinkConfig {
/// If set, the `subject` must belong to an existing JetStream stream.
///
/// [jetstream]: https://docs.nats.io/nats-concepts/jetstream
#[serde(default)]
pub(super) jetstream: bool,
#[configurable(derived)]
#[serde(
default,
deserialize_with = "crate::serde::bool_or_struct",
skip_serializing_if = "crate::serde::is_default"
)]
pub(super) jetstream: JetStreamConfig,
}

fn default_name() -> String {
Expand All @@ -105,7 +166,12 @@ impl GenerateConfig for NatsSinkConfig {
tls: None,
url: "nats://127.0.0.1:4222".into(),
request: Default::default(),
jetstream: Default::default(),
jetstream: JetStreamConfig {
enabled: true,
headers: Some(NatsHeaderConfig {
message_id: Some(Template::try_from("{{ event_id }}").unwrap()),
}),
},
})
.unwrap()
}
Expand Down Expand Up @@ -175,7 +241,7 @@ impl NatsSinkConfig {
let options = self.create_connect_options()?;
let connection = self.connect(options).await?;

if self.jetstream {
if self.jetstream.enabled {
Ok(NatsPublisher::JetStream(async_nats::jetstream::new(
connection,
)))
Expand Down Expand Up @@ -203,6 +269,7 @@ impl NatsPublisher {
pub(super) async fn publish<S: async_nats::subject::ToSubject>(
&self,
subject: S,
headers: HeaderMap,
payload: Bytes,
) -> Result<(), NatsError> {
match self {
Expand All @@ -222,11 +289,12 @@ impl NatsPublisher {
.await
}
NatsPublisher::JetStream(jetstream) => {
let ack = jetstream.publish(subject, payload).await.map_err(|e| {
NatsError::PublishError {
let ack = jetstream
.publish_with_headers(subject, headers, payload)
.await
.map_err(|e| NatsError::PublishError {
source: Box::new(e),
}
})?;
})?;
ack.await.map(|_| ()).map_err(|e| NatsError::PublishError {
source: Box::new(e),
})
Expand Down
Loading
Loading