Skip to content

Commit dbf0297

Browse files
authored
Merge branch 'master' into greylilac09/add-incremental-to-absolute
2 parents 00b3714 + a9c4c16 commit dbf0297

File tree

23 files changed

+537
-301
lines changed

23 files changed

+537
-301
lines changed

.github/workflows/preview_site_trigger.yml

Lines changed: 24 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -7,62 +7,44 @@ jobs:
77
runs-on: ubuntu-24.04
88
timeout-minutes: 5
99
# Only run for PRs with 'website' in the branch name
10-
if: ${{ contains(github.head_ref, 'website') && contains(github.head_ref, '-') }}
10+
if: ${{ contains(github.head_ref, 'website') }}
1111
steps:
12-
- name: Echo approval
12+
# Validate branch name
13+
- name: Validate branch name and set output
14+
id: validate
1315
run: |
14-
echo "Workflow has been allowed to run for PR ${{ github.event.number }}. Setting artifacts and then continuing workflow runs"
15-
16-
# Use GitHub Action to safely validate and store PR information
16+
BRANCH="${{ github.head_ref }}"
17+
if [[ ! "$BRANCH" =~ ^[a-zA-Z0-9_-]+$ ]]; then
18+
echo "valid=false" >> $GITHUB_OUTPUT
19+
else
20+
echo "valid=true" >> $GITHUB_OUTPUT
21+
fi
22+
23+
# Save PR information (only if branch is valid)
1724
- name: Validate and save PR information
25+
if: steps.validate.outputs.valid == 'true'
1826
uses: actions/github-script@60a0d83039c74a4aee543508d2ffcb1c3799cdea # v7.0.1
1927
with:
2028
script: |
2129
const fs = require('fs').promises;
2230
const path = require('path');
2331
const crypto = require('crypto');
32+
const prNumber = context.payload.number;
33+
const branchName = context.payload.pull_request.head.ref;
2434
25-
async function createAndValidateArtifact() {
26-
try {
27-
// Create directory for artifact
28-
await fs.mkdir('./pr', { recursive: true });
29-
30-
// Get PR number and validate
31-
const prNumber = context.payload.number;
32-
if (typeof prNumber !== 'number' || !Number.isInteger(prNumber) || prNumber <= 0) {
33-
core.setFailed(`Invalid PR number: ${prNumber}`);
34-
return;
35-
}
36-
37-
// Get branch name and validate
38-
const branchName = context.payload.pull_request.head.ref;
39-
// Validate branch name (only allow alphanumeric, dash, and underscore)
40-
const branchNameRegex = /^[a-zA-Z0-9_\-]+$/;
41-
if (!branchNameRegex.test(branchName)) {
42-
core.setFailed(`Invalid branch name detected: ${branchName}`);
43-
return;
44-
}
45-
46-
// Write validated information to files
47-
await fs.writeFile('./pr/number', prNumber.toString());
48-
await fs.writeFile('./pr/branch', branchName);
49-
50-
// Log success
51-
core.info(`Successfully validated and saved PR #${prNumber} with branch ${branchName}`);
35+
await fs.mkdir('./pr', { recursive: true });
36+
await fs.writeFile('./pr/number', prNumber.toString());
37+
await fs.writeFile('./pr/branch', branchName);
5238
53-
// Create hash signature of the data
54-
const numberHash = crypto.createHash('sha256').update(prNumber.toString()).digest('hex');
55-
const branchHash = crypto.createHash('sha256').update(branchName).digest('hex');
56-
await fs.writeFile('./pr/integrity', `${numberHash}:${branchHash}`);
57-
} catch (error) {
58-
core.setFailed(`An error occurred: ${error.message}`);
59-
}
60-
}
39+
const numberHash = crypto.createHash('sha256').update(prNumber.toString()).digest('hex');
40+
const branchHash = crypto.createHash('sha256').update(branchName).digest('hex');
41+
await fs.writeFile('./pr/integrity', `${numberHash}:${branchHash}`);
6142
62-
createAndValidateArtifact();
43+
core.info(`Saved PR #${prNumber} and branch ${branchName}`);
6344
64-
# Upload the artifact using latest version
45+
# Upload the artifact using latest version (only if branch is valid)
6546
- name: Upload PR information artifact
47+
if: steps.validate.outputs.valid == 'true'
6648
uses: actions/upload-artifact@ea165f8d65b6e75b540449e92b4886f43607fa02 # v4.6.2
6749
with:
6850
name: pr
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
The `nats` sink now supports message headers when publishing to JetStream.
2+
3+
It introduces a configurable, templated Nats-Msg-Id header that ensures a unique ID for each message. This enables broker-level deduplication, resulting in stronger delivery guarantees and exactly-once semantics when combined with idempotent consumers.
4+
5+
authors: benjamin-awd
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
Added an option to set max packet size for MQTT source and sink.
2+
3+
authors: simplepad
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
The `log_to_metric` transforms now emits a pair expansion error. This error was previously silently ignored.
2+
3+
authors: pront

scripts/integration/nats/compose.yaml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,3 +43,10 @@ services:
4343
- /usr/share/nats/config/nats-jwt.conf
4444
volumes:
4545
- ../../../tests/data/nats:/usr/share/nats/config
46+
nats-jetstream-test:
47+
image: docker.io/library/nats:${CONFIG_VERSION}
48+
command:
49+
- --config
50+
- /usr/share/nats/config/nats-jetstream.conf
51+
volumes:
52+
- ../../../tests/data/nats:/usr/share/nats/config

scripts/integration/nats/test.yaml

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
features:
2-
- nats-integration-tests
2+
- nats-integration-tests
33

4-
test_filter: '::nats::'
4+
test_filter: "::nats::"
55

66
env:
77
NATS_ADDRESS: nats://nats:4222
@@ -11,17 +11,18 @@ env:
1111
NATS_TLS_CLIENT_CERT_ADDRESS: nats://nats-tls-client-cert:4222
1212
NATS_TOKEN_ADDRESS: nats://nats-token:4222
1313
NATS_USERPASS_ADDRESS: nats://nats-userpass:4222
14+
NATS_JETSTREAM_ADDRESS: nats://nats-jetstream-test:4222
1415

1516
matrix:
1617
version: [latest]
1718

1819
# changes to these files/paths will invoke the integration test in CI
1920
# expressions are evaluated using https://github.com/micromatch/picomatch
2021
paths:
21-
- "src/internal_events/nats.rs"
22-
- "src/sources/nats.rs"
23-
- "src/sources/util/**"
24-
- "src/sinks/nats/**"
25-
- "src/sinks/util/**"
26-
- "src/nats.rs"
27-
- "scripts/integration/nats/**"
22+
- "src/internal_events/nats.rs"
23+
- "src/sources/nats.rs"
24+
- "src/sources/util/**"
25+
- "src/sinks/nats/**"
26+
- "src/sinks/util/**"
27+
- "src/nats.rs"
28+
- "scripts/integration/nats/**"

src/common/mqtt.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,11 @@ pub struct MqttCommonConfig {
4141
#[derivative(Default(value = "default_keep_alive()"))]
4242
pub keep_alive: u16,
4343

44+
/// Maximum packet size
45+
#[serde(default = "default_max_packet_size")]
46+
#[derivative(Default(value = "default_max_packet_size()"))]
47+
pub max_packet_size: usize,
48+
4449
/// TLS configuration.
4550
#[configurable(derived)]
4651
pub tls: Option<TlsEnableableConfig>,
@@ -54,6 +59,10 @@ const fn default_keep_alive() -> u16 {
5459
60
5560
}
5661

62+
const fn default_max_packet_size() -> usize {
63+
10 * 1024
64+
}
65+
5766
/// MQTT Error Types
5867
#[derive(Debug, Snafu)]
5968
#[snafu(visibility(pub))]

src/internal_events/expansion.rs

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
use metrics::counter;
2+
use vector_lib::internal_event::{error_stage, error_type};
3+
use vector_lib::internal_event::{ComponentEventsDropped, InternalEvent, UNINTENTIONAL};
4+
5+
pub struct PairExpansionError<'a> {
6+
pub key: &'a str,
7+
pub value: &'a str,
8+
pub drop_event: bool,
9+
pub error: serde_json::Error,
10+
}
11+
12+
impl InternalEvent for PairExpansionError<'_> {
13+
fn emit(self) {
14+
let message = format!("Failed to expand key: `{}`:`{}`", self.key, self.value);
15+
16+
if self.drop_event {
17+
error!(
18+
message = %message,
19+
error = %self.error,
20+
error_type = error_type::PARSER_FAILED,
21+
stage = error_stage::PROCESSING,
22+
23+
);
24+
25+
counter!(
26+
"component_errors_total",
27+
"error_type" => error_type::PARSER_FAILED,
28+
"stage" => error_stage::PROCESSING,
29+
)
30+
.increment(1);
31+
32+
emit!(ComponentEventsDropped::<UNINTENTIONAL> {
33+
count: 1,
34+
reason: &message,
35+
});
36+
} else {
37+
warn!(
38+
message = %message,
39+
error = %self.error,
40+
error_type = error_type::PARSER_FAILED,
41+
stage = error_stage::PROCESSING,
42+
43+
);
44+
}
45+
}
46+
}

src/internal_events/mod.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,11 @@ mod window;
140140
mod file;
141141
mod windows;
142142

143+
#[cfg(any(feature = "transforms-log_to_metric", feature = "sinks-loki"))]
144+
mod expansion;
145+
#[cfg(any(feature = "transforms-log_to_metric", feature = "sinks-loki"))]
146+
pub use self::expansion::*;
147+
143148
#[cfg(feature = "sources-mongodb_metrics")]
144149
pub(crate) use mongodb_metrics::*;
145150

src/sinks/mqtt/config.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,7 @@ impl MqttSinkConfig {
148148
MaybeTlsSettings::from_config(self.common.tls.as_ref(), false).context(TlsSnafu)?;
149149
let mut options = MqttOptions::new(&client_id, &self.common.host, self.common.port);
150150
options.set_keep_alive(Duration::from_secs(self.common.keep_alive.into()));
151+
options.set_max_packet_size(self.common.max_packet_size, self.common.max_packet_size);
151152
options.set_clean_session(self.clean_session);
152153
match (&self.common.user, &self.common.password) {
153154
(Some(user), Some(password)) => {

0 commit comments

Comments
 (0)