Skip to content

Commit 00b3714

Browse files
authored
Merge branch 'master' into greylilac09/add-incremental-to-absolute
2 parents 024ae54 + 7c30315 commit 00b3714

File tree

31 files changed

+817
-236
lines changed

31 files changed

+817
-236
lines changed

.github/actions/spelling/allow.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -461,6 +461,7 @@ Softbank
461461
Sogou
462462
solarwinds
463463
Soref
464+
sortedset
464465
splunk
465466
ssh
466467
staticuser
@@ -524,6 +525,7 @@ XSALSA
524525
yandex
525526
Yarvik
526527
Yifang
528+
zadd
527529
zeek
528530
zookeeper
529531
Zopo

.github/workflows/semantic.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,7 @@ jobs:
200200
blackhole sink
201201
clickhouse sink
202202
console sink
203+
datadog_common sink
203204
datadog_archives sink
204205
datadog_events sink
205206
datadog_logs sink

benches/transform/dedupe.rs

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use criterion::{
66
criterion_group, measurement::WallTime, BatchSize, BenchmarkGroup, BenchmarkId, Criterion,
77
SamplingMode, Throughput,
88
};
9-
use vector::transforms::dedupe::common::{CacheConfig, FieldMatchConfig};
9+
use vector::transforms::dedupe::common::{CacheConfig, FieldMatchConfig, TimedCacheConfig};
1010
use vector::transforms::dedupe::config::DedupeConfig;
1111
use vector::transforms::dedupe::transform::Dedupe;
1212
use vector_lib::transform::Transform;
@@ -45,6 +45,7 @@ fn dedupe(c: &mut Criterion) {
4545
dedupe_config: DedupeConfig {
4646
fields: Some(FieldMatchConfig::IgnoreFields(vec!["message".into()])),
4747
cache: cache.clone(),
48+
time_settings: None,
4849
},
4950
},
5051
// Modification of previous where field "message" is matched.
@@ -54,6 +55,33 @@ fn dedupe(c: &mut Criterion) {
5455
dedupe_config: DedupeConfig {
5556
fields: Some(FieldMatchConfig::MatchFields(vec!["message".into()])),
5657
cache: cache.clone(),
58+
time_settings: None,
59+
},
60+
},
61+
// Modification of previous where deduplication with max age is used.
62+
Param {
63+
slug: "field_match_message_timed",
64+
input: fixed_stream.clone(),
65+
dedupe_config: DedupeConfig {
66+
fields: Some(FieldMatchConfig::MatchFields(vec!["message".into()])),
67+
cache: cache.clone(),
68+
time_settings: Some(TimedCacheConfig {
69+
max_age_ms: Duration::from_secs(5),
70+
refresh_on_drop: false,
71+
}),
72+
},
73+
},
74+
// Modification of previous where refresh on drop is enabled.
75+
Param {
76+
slug: "field_match_message_timed_refresh_on_drop",
77+
input: fixed_stream.clone(),
78+
dedupe_config: DedupeConfig {
79+
fields: Some(FieldMatchConfig::MatchFields(vec!["message".into()])),
80+
cache: cache.clone(),
81+
time_settings: Some(TimedCacheConfig {
82+
max_age_ms: Duration::from_secs(5),
83+
refresh_on_drop: true,
84+
}),
5785
},
5886
},
5987
// Measurement where ignore fields do not exist in the event.
@@ -69,6 +97,7 @@ fn dedupe(c: &mut Criterion) {
6997
"cdeab".into(),
7098
"bcdea".into(),
7199
])),
100+
time_settings: None,
72101
},
73102
},
74103
// Modification of previous where match fields do not exist in the
@@ -85,6 +114,7 @@ fn dedupe(c: &mut Criterion) {
85114
"cdeab".into(),
86115
"bcdea".into(),
87116
])),
117+
time_settings: None,
88118
},
89119
},
90120
] {
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
Fixed an issue with `dnstap` `tcp` source, where it could have a drastically reduced throughput after number of connections exceeded number of available cores.
2+
3+
authors: esensar Quad9DNS
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
Adds `time_settings` configuration for `dedupe` transform, to enable configuring `max_age` for items in the deduplication cache, helping make a distinction between real duplicates and duplication that is expected in the incoming data over a longer period of time.
2+
3+
authors: esensar Quad9DNS
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
Prevent panic on non-UTF8 environment variables.
2+
3+
authors: kurochan
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
Prevents a panic when DD_API_KEY contains newline characters.
2+
3+
authors: kurochan
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
Adds support for the Redis ZADD command in the Redis sink.
2+
3+
authors: 5Dev24

lib/vector-buffers/src/buffer_usage_data.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -242,8 +242,10 @@ impl BufferUsage {
242242
/// The `buffer_id` should be a unique name -- ideally the `component_id` of the sink using this buffer -- but is
243243
/// not used for anything other than reporting, and so has no _requirement_ to be unique.
244244
pub fn install(self, buffer_id: &str) {
245+
let buffer_id = buffer_id.to_string();
245246
let span = self.span;
246247
let stages = self.stages;
248+
let task_name = format!("buffer usage reporter ({buffer_id})");
247249

248250
let task = async move {
249251
let mut interval = interval(Duration::from_secs(2));
@@ -264,6 +266,7 @@ impl BufferUsage {
264266
let received = stage.received.consume();
265267
if received.has_updates() {
266268
emit(BufferEventsReceived {
269+
buffer_id: buffer_id.clone(),
267270
idx: stage.idx,
268271
count: received.event_count,
269272
byte_size: received.event_byte_size,
@@ -273,6 +276,7 @@ impl BufferUsage {
273276
let sent = stage.sent.consume();
274277
if sent.has_updates() {
275278
emit(BufferEventsSent {
279+
buffer_id: buffer_id.clone(),
276280
idx: stage.idx,
277281
count: sent.event_count,
278282
byte_size: sent.event_byte_size,
@@ -282,6 +286,7 @@ impl BufferUsage {
282286
let dropped = stage.dropped.consume();
283287
if dropped.has_updates() {
284288
emit(BufferEventsDropped {
289+
buffer_id: buffer_id.clone(),
285290
idx: stage.idx,
286291
intentional: false,
287292
reason: "corrupted_events",
@@ -293,6 +298,7 @@ impl BufferUsage {
293298
let dropped_intentional = stage.dropped_intentional.consume();
294299
if dropped_intentional.has_updates() {
295300
emit(BufferEventsDropped {
301+
buffer_id: buffer_id.clone(),
296302
idx: stage.idx,
297303
intentional: true,
298304
reason: "drop_newest",
@@ -304,7 +310,6 @@ impl BufferUsage {
304310
}
305311
};
306312

307-
let task_name = format!("buffer usage reporter ({buffer_id})");
308313
spawn_named(task.instrument(span.or_current()), task_name.as_str());
309314
}
310315
}

0 commit comments

Comments
 (0)