Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
8e722a5
chore(dev): upgrade to Rust 2024 edition
pront Aug 4, 2025
d20b90a
cargo clippy --fix
pront Aug 4, 2025
d4d9ed2
bump to 2024
pront Aug 5, 2025
17b58e1
fixes and updates
pront Aug 5, 2025
2c3c241
cargo fmt --all
pront Aug 5, 2025
ad9eb24
Merge remote-tracking branch 'origin' into pront-rust-2024
pront Aug 5, 2025
17363f6
fmt
pront Aug 5, 2025
ec3dd82
more fixes
pront Aug 5, 2025
e085251
more fixes
pront Aug 5, 2025
0a60e33
Update src/sinks/socket.rs
pront Aug 6, 2025
3c1af5a
ran cargo fmt
pront Aug 5, 2025
f805af8
Merge remote-tracking branch 'origin' into pront-rust-2024
pront Aug 6, 2025
937cf07
change edition in rustfmt
pront Aug 6, 2025
5b2db9b
ran cargo fmt
pront Aug 6, 2025
7cbf78d
Merge remote-tracking branch 'origin' into pront-rust-2024
pront Aug 7, 2025
469610c
ran cargo fmt
pront Aug 7, 2025
90ef1ef
Merge remote-tracking branch 'origin' into pront-rust-2024
pront Aug 13, 2025
132e745
address reporter future return type clippy failure
pront Aug 13, 2025
b016290
ran cargo fmt
pront Aug 13, 2025
38e3ba5
events check
pront Aug 13, 2025
bbbadc3
ran cargo fmt
pront Aug 13, 2025
5b828c2
running.rs changes
pront Aug 13, 2025
47d322e
upgrade cargo-deb
pront Aug 13, 2025
c28f976
ran cargo fmt
pront Aug 13, 2025
1cc55d1
Merge remote-tracking branch 'origin' into pront-rust-2024
pront Aug 18, 2025
a65dc47
Merge remote-tracking branch 'origin' into pront-rust-2024
pront Aug 18, 2025
af7f293
fix
pront Aug 18, 2025
20e1332
ran cargo fmt
pront Aug 18, 2025
b0a0a8a
cargo-deb override
pront Aug 19, 2025
3eb2634
older cargo-deb
pront Aug 19, 2025
a0e1626
downgrade cargo-deb
pront Aug 19, 2025
8a253c4
fix one windows failure
pront Aug 19, 2025
37049f3
fix windows error[E0521]
pront Aug 19, 2025
5d7d20e
Merge branch 'master' into pront-rust-2024
pront Aug 19, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
name = "vector"
version = "0.49.0"
authors = ["Vector Contributors <vector@datadoghq.com>"]
edition = "2021"
edition = "2024"
description = "A lightweight and ultra-fast tool for building observability pipelines"
homepage = "https://vector.dev"
license = "MPL-2.0"
Expand Down
6 changes: 3 additions & 3 deletions benches/batch.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use std::{convert::Infallible, time::Duration};

use bytes::{BufMut, Bytes, BytesMut};
use criterion::{criterion_group, Criterion, SamplingMode, Throughput};
use futures::{future, stream, SinkExt, StreamExt};
use criterion::{Criterion, SamplingMode, Throughput, criterion_group};
use futures::{SinkExt, StreamExt, future, stream};
use vector::{
sinks::util::{
batch::{Batch, BatchConfig, BatchError, BatchSettings, BatchSize, PushResult},
BatchSink, Buffer, Compression, EncodedEvent, Merged, Partition, PartitionBatchSink,
SinkBatchSettings,
batch::{Batch, BatchConfig, BatchError, BatchSettings, BatchSize, PushResult},
},
test_util::{random_lines, runtime},
};
Expand Down
6 changes: 3 additions & 3 deletions benches/codecs/character_delimited_bytes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@ use std::{fmt, time::Duration};

use bytes::BytesMut;
use criterion::{
criterion_group, measurement::WallTime, BatchSize, BenchmarkGroup, BenchmarkId, Criterion,
SamplingMode, Throughput,
BatchSize, BenchmarkGroup, BenchmarkId, Criterion, SamplingMode, Throughput, criterion_group,
measurement::WallTime,
};
use tokio_util::codec::Decoder;
use vector_lib::codecs::{
decoding::{Deserializer, Framer},
BytesDeserializer, CharacterDelimitedDecoder,
decoding::{Deserializer, Framer},
};

#[derive(Debug)]
Expand Down
6 changes: 3 additions & 3 deletions benches/codecs/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ use std::time::Duration;

use bytes::{BufMut, BytesMut};
use criterion::{
criterion_group, measurement::WallTime, BatchSize, BenchmarkGroup, Criterion, SamplingMode,
Throughput,
BatchSize, BenchmarkGroup, Criterion, SamplingMode, Throughput, criterion_group,
measurement::WallTime,
};
use tokio_util::codec::Encoder;
use vector::event::{Event, LogEvent};
use vector_lib::codecs::{encoding::Framer, JsonSerializerConfig, NewlineDelimitedEncoder};
use vector_lib::codecs::{JsonSerializerConfig, NewlineDelimitedEncoder, encoding::Framer};
use vector_lib::{btreemap, byte_size_of::ByteSizeOf};

#[derive(Debug, Clone)]
Expand Down
6 changes: 3 additions & 3 deletions benches/codecs/newline_bytes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ use std::{fmt, time::Duration};

use bytes::BytesMut;
use criterion::{
criterion_group, measurement::WallTime, BatchSize, BenchmarkGroup, BenchmarkId, Criterion,
SamplingMode, Throughput,
BatchSize, BenchmarkGroup, BenchmarkId, Criterion, SamplingMode, Throughput, criterion_group,
measurement::WallTime,
};
use tokio_util::codec::Decoder;
use vector_lib::codecs::{
decoding::Deserializer, decoding::Framer, BytesDeserializer, NewlineDelimitedDecoder,
BytesDeserializer, NewlineDelimitedDecoder, decoding::Deserializer, decoding::Framer,
};

#[derive(Debug)]
Expand Down
2 changes: 1 addition & 1 deletion benches/distribution_statistic.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use criterion::{criterion_group, criterion_main, BatchSize, Criterion};
use criterion::{BatchSize, Criterion, criterion_group, criterion_main};
use rand::distr::Distribution;
use rand::{distr::Uniform, seq::SliceRandom};
use vector::{event::metric::Sample, sinks::util::statistic::DistributionStatistic};
Expand Down
4 changes: 2 additions & 2 deletions benches/dnstap/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use base64::engine::general_purpose::STANDARD;
use base64::Engine;
use base64::engine::general_purpose::STANDARD;
use bytes::Bytes;
use criterion::{criterion_group, criterion_main, BatchSize, Criterion, Throughput};
use criterion::{BatchSize, Criterion, Throughput, criterion_group, criterion_main};
use dnsmsg_parser::dns_message_parser::DnsParserOptions;
use dnstap_parser::parser::DnstapParser;
use vector::event::LogEvent;
Expand Down
4 changes: 2 additions & 2 deletions benches/enrichment_tables.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use std::time::SystemTime;

use chrono::prelude::*;
use criterion::{criterion_group, criterion_main, BatchSize, Criterion};
use criterion::{BatchSize, Criterion, criterion_group, criterion_main};
use vector::enrichment_tables::file::FileData;
use vector::enrichment_tables::{
Condition, Table,
file::File,
geoip::{Geoip, GeoipConfig},
mmdb::{Mmdb, MmdbConfig},
Condition, Table,
};
use vector_lib::enrichment::Case;
use vrl::value::{ObjectMap, Value};
Expand Down
2 changes: 1 addition & 1 deletion benches/event.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use bytes::Bytes;
use criterion::{criterion_group, BatchSize, Criterion};
use criterion::{BatchSize, Criterion, criterion_group};
use vector::event::LogEvent;
use vrl::event_path;

Expand Down
6 changes: 3 additions & 3 deletions benches/files.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
use std::{convert::TryInto, path::PathBuf, time::Duration};

use bytes::Bytes;
use criterion::{criterion_group, BatchSize, Criterion, SamplingMode, Throughput};
use futures::{stream, SinkExt, StreamExt};
use criterion::{BatchSize, Criterion, SamplingMode, Throughput, criterion_group};
use futures::{SinkExt, StreamExt, stream};
use tempfile::tempdir;
use tokio::fs::OpenOptions;
use tokio_util::codec::{BytesCodec, FramedWrite};
use vector::{
config, sinks, sources,
test_util::{random_lines, runtime, start_topology},
};
use vector_lib::codecs::{encoding::FramingConfig, TextSerializerConfig};
use vector_lib::codecs::{TextSerializerConfig, encoding::FramingConfig};

fn benchmark_files_no_partitions(c: &mut Criterion) {
let num_lines: usize = 10_000;
Expand Down
9 changes: 4 additions & 5 deletions benches/http.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,23 @@
use std::net::SocketAddr;

use criterion::{criterion_group, BatchSize, BenchmarkId, Criterion, SamplingMode, Throughput};
use criterion::{BatchSize, BenchmarkId, Criterion, SamplingMode, Throughput, criterion_group};
use futures::TryFutureExt;
use hyper::{
service::{make_service_fn, service_fn},
Body, Response, Server,
service::{make_service_fn, service_fn},
};
use tokio::runtime::Runtime;
use vector::{
config,
Error, config,
sinks::{
self,
util::{BatchConfig, Compression},
},
sources,
template::Template,
test_util::{next_addr, random_lines, runtime, send_lines, start_topology, wait_for_tcp},
Error,
};
use vector_lib::codecs::{encoding::FramingConfig, TextSerializerConfig};
use vector_lib::codecs::{TextSerializerConfig, encoding::FramingConfig};

fn benchmark_http(c: &mut Criterion) {
let num_lines: usize = 1_000;
Expand Down
4 changes: 2 additions & 2 deletions benches/languages.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use criterion::{criterion_group, criterion_main, BatchSize, Criterion, SamplingMode, Throughput};
use criterion::{BatchSize, Criterion, SamplingMode, Throughput, criterion_group, criterion_main};
use indoc::indoc;
use vector::{
config,
test_util::{next_addr, runtime, send_lines, start_topology, wait_for_tcp, CountReceiver},
test_util::{CountReceiver, next_addr, runtime, send_lines, start_topology, wait_for_tcp},
};

criterion_group!(
Expand Down
2 changes: 1 addition & 1 deletion benches/loki.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::convert::TryFrom;

use criterion::{criterion_group, criterion_main, BatchSize, Criterion};
use criterion::{BatchSize, Criterion, criterion_group, criterion_main};
use vector::{sinks::loki::valid_label_name, template::Template};

const VALID: [&str; 4] = ["name", " name ", "bee_bop", "a09b"];
Expand Down
4 changes: 2 additions & 2 deletions benches/lua.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::pin::Pin;

use criterion::{criterion_group, BatchSize, Criterion, Throughput};
use futures::{stream, SinkExt, Stream, StreamExt};
use criterion::{BatchSize, Criterion, Throughput, criterion_group};
use futures::{SinkExt, Stream, StreamExt, stream};
use indoc::indoc;
use transforms::lua::v2::LuaConfig;
use vector::{
Expand Down
2 changes: 1 addition & 1 deletion benches/metrics_snapshot.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use criterion::{criterion_group, BenchmarkId, Criterion};
use criterion::{BenchmarkId, Criterion, criterion_group};

fn benchmark(c: &mut Criterion) {
let mut group = c.benchmark_group("metrics_snapshot");
Expand Down
4 changes: 2 additions & 2 deletions benches/remap.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use std::collections::HashMap;

use chrono::{DateTime, Utc};
use criterion::{criterion_group, criterion_main, BatchSize, Criterion};
use criterion::{BatchSize, Criterion, criterion_group, criterion_main};
use vector::{
config::{DataType, TransformOutput},
event::{Event, LogEvent, Value},
transforms::{
remap::{Remap, RemapConfig},
SyncTransform, TransformOutputsBuf,
remap::{Remap, RemapConfig},
},
};
use vrl::event_path;
Expand Down
2 changes: 1 addition & 1 deletion benches/template.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::convert::TryFrom;

use chrono::Utc;
use criterion::{criterion_group, BatchSize, Criterion};
use criterion::{BatchSize, Criterion, criterion_group};
use vector::{config::log_schema, event::Event, event::LogEvent};

fn bench_elasticsearch_index(c: &mut Criterion) {
Expand Down
2 changes: 1 addition & 1 deletion benches/transform/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::{
task::{Context, Poll},
};

use futures::{task::noop_waker, Stream};
use futures::{Stream, task::noop_waker};
use vector::event::{Event, LogEvent};

// == Streams ==
Expand Down
6 changes: 3 additions & 3 deletions benches/transform/dedupe.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use core::fmt;
use std::{num::NonZeroUsize, time::Duration};

use crate::common::{consume, FixedLogStream};
use crate::common::{FixedLogStream, consume};
use criterion::{
criterion_group, measurement::WallTime, BatchSize, BenchmarkGroup, BenchmarkId, Criterion,
SamplingMode, Throughput,
BatchSize, BenchmarkGroup, BenchmarkId, Criterion, SamplingMode, Throughput, criterion_group,
measurement::WallTime,
};
use vector::transforms::dedupe::common::{CacheConfig, FieldMatchConfig};
use vector::transforms::dedupe::config::DedupeConfig;
Expand Down
6 changes: 3 additions & 3 deletions benches/transform/filter.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use std::time::Duration;

use criterion::{
criterion_group, measurement::WallTime, BatchSize, BenchmarkGroup, Criterion, SamplingMode,
Throughput,
BatchSize, BenchmarkGroup, Criterion, SamplingMode, Throughput, criterion_group,
measurement::WallTime,
};
use vector::{
conditions::Condition,
transforms::{filter::Filter, FunctionTransform, OutputBuffer},
transforms::{FunctionTransform, OutputBuffer, filter::Filter},
};
use vector_lib::event::{Event, LogEvent};

Expand Down
6 changes: 3 additions & 3 deletions benches/transform/reduce.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use core::fmt;
use std::{num::NonZeroUsize, time::Duration};

use crate::common::{consume, FixedLogStream};
use crate::common::{FixedLogStream, consume};
use criterion::{
criterion_group, measurement::WallTime, BatchSize, BenchmarkGroup, BenchmarkId, Criterion,
SamplingMode, Throughput,
BatchSize, BenchmarkGroup, BenchmarkId, Criterion, SamplingMode, Throughput, criterion_group,
measurement::WallTime,
};
use indexmap::IndexMap;
use vector::transforms::reduce::config::ReduceConfig;
Expand Down
6 changes: 3 additions & 3 deletions benches/transform/route.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@ use std::time::Duration;

use bytes::Bytes;
use criterion::{
criterion_group, measurement::WallTime, BatchSize, BenchmarkGroup, BenchmarkId, Criterion,
SamplingMode, Throughput,
BatchSize, BenchmarkGroup, BenchmarkId, Criterion, SamplingMode, Throughput, criterion_group,
measurement::WallTime,
};
use vector::config::TransformContext;
use vector::transforms::{
route::{Route, RouteConfig},
TransformOutputsBuf,
route::{Route, RouteConfig},
};
use vector_lib::{
config::{DataType, TransformOutput},
Expand Down
4 changes: 2 additions & 2 deletions src/api/handler.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use std::sync::{
atomic::{self, AtomicBool},
Arc,
atomic::{self, AtomicBool},
};

use serde_json::json;
use warp::{reply::json, Rejection, Reply};
use warp::{Rejection, Reply, reply::json};

// Health handler, responds with '{ ok: true }' when running and '{ ok: false}'
// when shutting down
Expand Down
4 changes: 2 additions & 2 deletions src/api/schema/components/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::{
};

use async_graphql::{Enum, InputObject, Interface, Object, Subscription};
use tokio_stream::{wrappers::BroadcastStream, Stream, StreamExt};
use tokio_stream::{Stream, StreamExt, wrappers::BroadcastStream};
use vector_lib::internal_event::DEFAULT_OUTPUT;

use crate::{
Expand All @@ -19,7 +19,7 @@ use crate::{
filter::{self, filter_items},
relay, sort,
},
config::{get_transform_output_ids, ComponentKey, Config},
config::{ComponentKey, Config, get_transform_output_ids},
filter_check,
};

Expand Down
2 changes: 1 addition & 1 deletion src/api/schema/components/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::cmp;

use async_graphql::{Enum, InputObject, Object};

use super::{source, state, transform, Component};
use super::{Component, source, state, transform};
use crate::{
api::schema::{
filter,
Expand Down
4 changes: 2 additions & 2 deletions src/api/schema/components/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ use std::cmp;

use async_graphql::{Enum, InputObject, Object};

use super::{sink, state, transform, Component};
use super::{Component, sink, state, transform};
use crate::{
api::schema::{
filter,
metrics::{self, outputs_by_component_key, IntoSourceMetrics, Output},
metrics::{self, IntoSourceMetrics, Output, outputs_by_component_key},
sort,
},
config::{ComponentKey, DataType, OutputId},
Expand Down
2 changes: 1 addition & 1 deletion src/api/schema/components/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::{
sync::{Arc, LazyLock, RwLock},
};

use super::{sink, source, transform, Component};
use super::{Component, sink, source, transform};
use crate::config::{ComponentKey, OutputId};

pub const INVARIANT: &str = "Couldn't acquire lock on Vector components. Please report this.";
Expand Down
4 changes: 2 additions & 2 deletions src/api/schema/components/transform.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ use std::cmp;

use async_graphql::{Enum, InputObject, Object};

use super::{sink, source, state, Component};
use super::{Component, sink, source, state};
use crate::{
api::schema::{
filter,
metrics::{self, outputs_by_component_key, IntoTransformMetrics, Output},
metrics::{self, IntoTransformMetrics, Output, outputs_by_component_key},
sort,
},
config::{ComponentKey, Inputs, OutputId},
Expand Down
8 changes: 6 additions & 2 deletions src/api/schema/events/metric.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,10 +126,14 @@ impl Metric {
.expect("logfmt serialization of metric event failed: conversion to serde Value failed. Please report.");
match json {
Value::Object(map) => encode_logfmt::encode_map(
&map.into_iter().map(|(k,v)| (event::KeyString::from(k), v)).collect(),
&map.into_iter()
.map(|(k, v)| (event::KeyString::from(k), v))
.collect(),
)
.expect("logfmt serialization of metric event failed. Please report."),
_ => panic!("logfmt serialization of metric event failed: metric converted to unexpected serde Value. Please report."),
_ => panic!(
"logfmt serialization of metric event failed: metric converted to unexpected serde Value. Please report."
),
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions src/api/schema/events/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ pub mod trace;

use async_graphql::{Context, Subscription};
use encoding::EventEncodingType;
use futures::{stream, Stream, StreamExt};
use output::{from_tap_payload_to_output_events, OutputEventsPayload};
use rand::{rngs::SmallRng, Rng, SeedableRng};
use futures::{Stream, StreamExt, stream};
use output::{OutputEventsPayload, from_tap_payload_to_output_events};
use rand::{Rng, SeedableRng, rngs::SmallRng};
use std::time::{SystemTime, UNIX_EPOCH};
use tokio::{select, sync::mpsc, time};
use tokio_stream::wrappers::ReceiverStream;
Expand Down
Loading
Loading