diff --git a/changelog.d/23509_allow_headers_in_nats_jetstream_sink.enhancement.md b/changelog.d/23509_allow_headers_in_nats_jetstream_sink.enhancement.md new file mode 100644 index 0000000000000..a709c4c9f5c2a --- /dev/null +++ b/changelog.d/23509_allow_headers_in_nats_jetstream_sink.enhancement.md @@ -0,0 +1,5 @@ +The `nats` sink now supports message headers when publishing to JetStream. + +It introduces a configurable, templated Nats-Msg-Id header that ensures a unique ID for each message. This enables broker-level deduplication, resulting in stronger delivery guarantees and exactly-once semantics when combined with idempotent consumers. + +authors: benjamin-awd diff --git a/scripts/integration/nats/compose.yaml b/scripts/integration/nats/compose.yaml index fb36e78d819d4..9be71dc529380 100644 --- a/scripts/integration/nats/compose.yaml +++ b/scripts/integration/nats/compose.yaml @@ -43,3 +43,10 @@ services: - /usr/share/nats/config/nats-jwt.conf volumes: - ../../../tests/data/nats:/usr/share/nats/config + nats-jetstream-test: + image: docker.io/library/nats:${CONFIG_VERSION} + command: + - --config + - /usr/share/nats/config/nats-jetstream.conf + volumes: + - ../../../tests/data/nats:/usr/share/nats/config diff --git a/scripts/integration/nats/test.yaml b/scripts/integration/nats/test.yaml index 553c9efa25257..171a1c4ae6a3e 100644 --- a/scripts/integration/nats/test.yaml +++ b/scripts/integration/nats/test.yaml @@ -1,7 +1,7 @@ features: -- nats-integration-tests + - nats-integration-tests -test_filter: '::nats::' +test_filter: "::nats::" env: NATS_ADDRESS: nats://nats:4222 @@ -11,6 +11,7 @@ env: NATS_TLS_CLIENT_CERT_ADDRESS: nats://nats-tls-client-cert:4222 NATS_TOKEN_ADDRESS: nats://nats-token:4222 NATS_USERPASS_ADDRESS: nats://nats-userpass:4222 + NATS_JETSTREAM_ADDRESS: nats://nats-jetstream-test:4222 matrix: version: [latest] @@ -18,10 +19,10 @@ matrix: # changes to these files/paths will invoke the integration test in CI # expressions are evaluated using https://github.com/micromatch/picomatch paths: -- "src/internal_events/nats.rs" -- "src/sources/nats.rs" -- "src/sources/util/**" -- "src/sinks/nats/**" -- "src/sinks/util/**" -- "src/nats.rs" -- "scripts/integration/nats/**" + - "src/internal_events/nats.rs" + - "src/sources/nats.rs" + - "src/sources/util/**" + - "src/sinks/nats/**" + - "src/sinks/util/**" + - "src/nats.rs" + - "scripts/integration/nats/**" diff --git a/src/sinks/nats/config.rs b/src/sinks/nats/config.rs index c5209c54041bf..9082e5123714d 100644 --- a/src/sinks/nats/config.rs +++ b/src/sinks/nats/config.rs @@ -1,15 +1,16 @@ +use async_nats::header; use bytes::Bytes; use futures_util::TryFutureExt; use snafu::ResultExt; use vector_lib::codecs::JsonSerializerConfig; use vector_lib::tls::TlsEnableableConfig; +use super::{sink::NatsSink, ConfigSnafu, ConnectSnafu, NatsError}; use crate::{ nats::{from_tls_auth_config, NatsAuthConfig, NatsConfigError}, sinks::{prelude::*, util::service::TowerRequestConfigDefaults}, }; - -use super::{sink::NatsSink, ConfigSnafu, ConnectSnafu, NatsError}; +use async_nats::HeaderMap; #[derive(Clone, Copy, Debug)] pub struct NatsTowerRequestConfigDefaults; @@ -18,6 +19,61 @@ impl TowerRequestConfigDefaults for NatsTowerRequestConfigDefaults { const CONCURRENCY: Concurrency = Concurrency::None; } +/// A set of NATS headers that can be added to each message. +#[configurable_component] +#[serde_with::serde_as] +#[derive(Clone, Debug, Default, PartialEq, Eq)] +#[serde(deny_unknown_fields)] +pub struct NatsHeaderConfig { + /// A unique identifier for the message. Useful for deduplication. + /// + /// Can be a template that references fields in the event, e.g., `{{ event_id }}`. + #[configurable(metadata(docs::templateable))] + #[serde(skip_serializing_if = "Option::is_none")] + #[configurable(metadata(docs::examples = "{{ event_id }}"))] + pub(super) message_id: Option