Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
396 changes: 141 additions & 255 deletions Cargo.lock

Large diffs are not rendered by default.

17 changes: 5 additions & 12 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ serial_test = { version = "3.2" }

[dependencies]
cfg-if.workspace = true
reqwest_0_12_24 = { package = "reqwest", version = "0.12.24", features = ["json"] }
clap.workspace = true
indoc.workspace = true
paste.workspace = true
Expand Down Expand Up @@ -287,15 +288,11 @@ aws-smithy-runtime-api = { version = "1.7.3", default-features = false, optional
aws-smithy-types = { version = "1.2.11", default-features = false, features = ["rt-tokio"], optional = true }

# Azure
azure_core = { version = "0.25", default-features = false, features = ["reqwest", "hmac_openssl"], optional = true }
azure_identity = { version = "0.25", default-features = false, features = ["reqwest"], optional = true }
azure_core = { version = "0.30", default-features = false, features = ["reqwest", "hmac_openssl"] }
azure_identity = { version = "0.30", default-features = false, optional = true }

# Azure Storage
azure_storage = { version = "0.21", default-features = false, optional = true }
azure_storage_blobs = { version = "0.21", default-features = false, optional = true }

# Needed to bridge with outdated version of azure_core used in azure_storage*
azure_core_for_storage = { package = "azure_core", version = "0.21.0", default-features = false, features = ["enable_reqwest", "hmac_openssl"] }
azure_storage_blob = { version = "0.7", default-features = false, optional = true }


# OpenDAL
Expand Down Expand Up @@ -464,10 +461,6 @@ openssl-src = { version = "300", default-features = false, features = ["force-en
approx = "0.5.1"
assert_cmd = { version = "2.0.17", default-features = false }
aws-smithy-runtime = { version = "1.8.3", default-features = false, features = ["tls-rustls"] }
azure_core = { version = "0.25", default-features = false, features = ["reqwest", "hmac_openssl", "azurite_workaround"] }
azure_identity = { version = "0.25", default-features = false, features = ["reqwest"] }
azure_storage = { version = "0.21", default-features = false, features = ["enable_reqwest", "hmac_openssl"] }
azure_storage_blobs = { version = "0.21", default-features = false, features = ["enable_reqwest", "hmac_openssl", "azurite_workaround"] }
base64 = "0.22.1"
criterion = { version = "0.7.0", features = ["html_reports", "async_tokio"] }
itertools.workspace = true
Expand Down Expand Up @@ -853,7 +846,7 @@ sinks-aws_s3 = ["dep:base64", "dep:md-5", "aws-core", "dep:aws-sdk-s3"]
sinks-aws_sqs = ["aws-core", "dep:aws-sdk-sqs"]
sinks-aws_sns = ["aws-core", "dep:aws-sdk-sns"]
sinks-axiom = ["sinks-http"]
sinks-azure_blob = ["dep:azure_core", "dep:azure_identity", "dep:azure_storage", "dep:azure_storage_blobs"]
sinks-azure_blob = ["dep:azure_storage_blob"]
sinks-azure_monitor_logs = []
sinks-blackhole = []
sinks-chronicle = []
Expand Down
14 changes: 4 additions & 10 deletions LICENSE-3rdparty.csv
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
Component,Origin,License,Copyright
Inflector,https://github.com/whatisinternet/inflector,BSD-2-Clause,Josh Teeter<[email protected]>
RustyXML,https://github.com/Florob/RustyXML,MIT OR Apache-2.0,Florian Zeitz <[email protected]>
addr2line,https://github.com/gimli-rs/addr2line,Apache-2.0 OR MIT,The addr2line Authors
adler2,https://github.com/oyvindln/adler2,0BSD OR MIT OR Apache-2.0,"Jonas Schievink <[email protected]>, oyvindln <[email protected]>"
adler32,https://github.com/remram44/adler32-rs,Zlib,Remi Rampin <[email protected]>
Expand Down Expand Up @@ -104,11 +103,8 @@ aws-types,https://github.com/smithy-lang/smithy-rs,Apache-2.0,"AWS Rust SDK Team
axum,https://github.com/tokio-rs/axum,MIT,The axum Authors
axum-core,https://github.com/tokio-rs/axum,MIT,The axum-core Authors
azure_core,https://github.com/azure/azure-sdk-for-rust,MIT,Microsoft
azure_core,https://github.com/azure/azure-sdk-for-rust,MIT,Microsoft Corp.
azure_identity,https://github.com/azure/azure-sdk-for-rust,MIT,Microsoft
azure_storage,https://github.com/azure/azure-sdk-for-rust,MIT,Microsoft Corp.
azure_storage_blobs,https://github.com/azure/azure-sdk-for-rust,MIT,Microsoft Corp.
azure_svc_blobstorage,https://github.com/azure/azure-sdk-for-rust,MIT,The azure_svc_blobstorage Authors
azure_core_macros,https://github.com/azure/azure-sdk-for-rust,MIT,Microsoft
azure_storage_blob,https://github.com/azure/azure-sdk-for-rust,MIT,Microsoft
backoff,https://github.com/ihrwein/backoff,MIT OR Apache-2.0,Tibor Benke <[email protected]>
backon,https://github.com/Xuanwo/backon,Apache-2.0,The backon Authors
backtrace,https://github.com/rust-lang/backtrace-rs,MIT OR Apache-2.0,The Rust Project Developers
Expand Down Expand Up @@ -358,7 +354,6 @@ http-body,https://github.com/hyperium/http-body,MIT,"Carl Lerche <me@carllerche.
http-body-util,https://github.com/hyperium/http-body,MIT,"Carl Lerche <[email protected]>, Lucio Franco <[email protected]>, Sean McArthur <[email protected]>"
http-range-header,https://github.com/MarcusGrass/parse-range-headers,MIT,The http-range-header Authors
http-serde,https://gitlab.com/kornelski/http-serde,Apache-2.0 OR MIT,Kornel <[email protected]>
http-types,https://github.com/http-rs/http-types,MIT OR Apache-2.0,Yoshua Wuyts <[email protected]>
httparse,https://github.com/seanmonstar/httparse,MIT OR Apache-2.0,Sean McArthur <[email protected]>
httpdate,https://github.com/pyfisch/httpdate,MIT OR Apache-2.0,Pyfisch <[email protected]>
humantime,https://github.com/chronotope/humantime,MIT OR Apache-2.0,The humantime Authors
Expand Down Expand Up @@ -389,7 +384,6 @@ idna_adapter,https://github.com/hsivonen/idna_adapter,Apache-2.0 OR MIT,The rust
indexmap,https://github.com/bluss/indexmap,Apache-2.0 OR MIT,The indexmap Authors
indexmap,https://github.com/indexmap-rs/indexmap,Apache-2.0 OR MIT,The indexmap Authors
indoc,https://github.com/dtolnay/indoc,MIT OR Apache-2.0,David Tolnay <[email protected]>
infer,https://github.com/bojand/infer,MIT,Bojan <[email protected]>
influxdb-line-protocol,https://github.com/influxdata/influxdb_iox/tree/main/influxdb_line_protocol,MIT OR Apache-2.0,InfluxDB IOx Project Developers
inotify,https://github.com/hannobraun/inotify,ISC,"Hanno Braun <[email protected]>, Félix Saparelli <[email protected]>, Cristian Kubis <[email protected]>, Frank Denis <[email protected]>"
inotify-sys,https://github.com/hannobraun/inotify-sys,ISC,Hanno Braun <[email protected]>
Expand All @@ -404,6 +398,7 @@ ipconfig,https://github.com/liranringel/ipconfig,MIT OR Apache-2.0,Liran Ringel
ipcrypt-rs,https://github.com/jedisct1/rust-ipcrypt2,ISC,Frank Denis <[email protected]>
ipnet,https://github.com/krisprice/ipnet,MIT OR Apache-2.0,Kris Price <[email protected]>
ipnetwork,https://github.com/achanda/ipnetwork,MIT OR Apache-2.0,"Abhishek Chanda <[email protected]>, Linus Färnstrand <[email protected]>"
iri-string,https://github.com/lo48576/iri-string,MIT OR Apache-2.0,YOSHIOKA Takuma <[email protected]>
is-terminal,https://github.com/sunfishcode/is-terminal,MIT,"softprops <[email protected]>, Dan Gohman <[email protected]>"
is_ci,https://github.com/zkat/is_ci,ISC,Kat Marchán <[email protected]>
itertools,https://github.com/rust-itertools/itertools,MIT OR Apache-2.0,bluss
Expand Down Expand Up @@ -621,7 +616,6 @@ rand,https://github.com/rust-random/rand,MIT OR Apache-2.0,"The Rand Project Dev
rand_chacha,https://github.com/rust-random/rand,MIT OR Apache-2.0,"The Rand Project Developers, The Rust Project Developers, The CryptoCorrosion Contributors"
rand_core,https://github.com/rust-random/rand,MIT OR Apache-2.0,"The Rand Project Developers, The Rust Project Developers"
rand_distr,https://github.com/rust-random/rand_distr,MIT OR Apache-2.0,The Rand Project Developers
rand_hc,https://github.com/rust-random/rand,MIT OR Apache-2.0,The Rand Project Developers
rand_xorshift,https://github.com/rust-random/rngs,MIT OR Apache-2.0,"The Rand Project Developers, The Rust Project Developers"
ratatui,https://github.com/ratatui/ratatui,MIT,"Florian Dehau <[email protected]>, The Ratatui Developers"
raw-cpuid,https://github.com/gz/rust-cpuid,MIT,Gerd Zellweger <[email protected]>
Expand Down Expand Up @@ -700,7 +694,6 @@ serde_json,https://github.com/serde-rs/json,MIT OR Apache-2.0,"Erick Tryzelaar <
serde_nanos,https://github.com/caspervonb/serde_nanos,MIT OR Apache-2.0,Casper Beyer <[email protected]>
serde_path_to_error,https://github.com/dtolnay/path-to-error,MIT OR Apache-2.0,David Tolnay <[email protected]>
serde_plain,https://github.com/mitsuhiko/serde-plain,MIT OR Apache-2.0,Armin Ronacher <[email protected]>
serde_qs,https://github.com/samscott89/serde_qs,MIT OR Apache-2.0,Sam Scott <[email protected]>
serde_repr,https://github.com/dtolnay/serde-repr,MIT OR Apache-2.0,David Tolnay <[email protected]>
serde_spanned,https://github.com/toml-rs/toml,MIT OR Apache-2.0,The serde_spanned Authors
serde_urlencoded,https://github.com/nox/serde_urlencoded,MIT OR Apache-2.0,Anthony Ramine <[email protected]>
Expand Down Expand Up @@ -876,6 +869,7 @@ wasm-timer,https://github.com/tomaka/wasm-timer,MIT,Pierre Krieger <pierre.krieg
web-sys,https://github.com/rustwasm/wasm-bindgen/tree/master/crates/web-sys,MIT OR Apache-2.0,The wasm-bindgen Developers
web-time,https://github.com/daxpedda/web-time,MIT OR Apache-2.0,The web-time Authors
webbrowser,https://github.com/amodm/webbrowser-rs,MIT OR Apache-2.0,Amod Malviya @amodm
webpki-roots,https://github.com/rustls/webpki-roots,CDLA-Permissive-2.0,The webpki-roots Authors
webpki-roots,https://github.com/rustls/webpki-roots,MPL-2.0,The webpki-roots Authors
whoami,https://github.com/ardaku/whoami,Apache-2.0 OR BSL-1.0 OR MIT,The whoami Authors
widestring,https://github.com/starkat99/widestring-rs,MIT OR Apache-2.0,Kathryn Long <[email protected]>
Expand Down
4 changes: 2 additions & 2 deletions src/sinks/azure_blob/config.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::sync::Arc;

use azure_storage_blobs::prelude::*;
use azure_storage_blob::BlobContainerClient;
use tower::ServiceBuilder;
use vector_lib::{
codecs::{JsonSerializerConfig, NewlineDelimitedEncoderConfig, encoding::Framer},
Expand Down Expand Up @@ -193,7 +193,7 @@ const DEFAULT_FILENAME_TIME_FORMAT: &str = "%s";
const DEFAULT_FILENAME_APPEND_UUID: bool = true;

impl AzureBlobSinkConfig {
pub fn build_processor(&self, client: Arc<ContainerClient>) -> crate::Result<VectorSink> {
pub fn build_processor(&self, client: Arc<BlobContainerClient>) -> crate::Result<VectorSink> {
let request_limits = self.request.into_settings();
let service = ServiceBuilder::new()
.settings(request_limits, AzureBlobRetryLogic)
Expand Down
133 changes: 72 additions & 61 deletions src/sinks/azure_blob/integration_tests.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
use std::{
io::{BufRead, BufReader},
num::NonZeroU32,
};
use std::io::{BufRead, BufReader};

use azure_core::http::StatusCode;
use azure_core_for_storage::prelude::Range;
use azure_storage_blobs::prelude::*;

use bytes::{Buf, BytesMut};
use flate2::read::GzDecoder;
use futures::{Stream, StreamExt, stream};
Expand Down Expand Up @@ -83,8 +79,8 @@ async fn azure_blob_insert_lines_into_blob() {
let blobs = config.list_blobs(blob_prefix).await;
assert_eq!(blobs.len(), 1);
assert!(blobs[0].clone().ends_with(".log"));
let (blob, blob_lines) = config.get_blob(blobs[0].clone()).await;
assert_eq!(blob.properties.content_type, String::from("text/plain"));
let (content_type, _content_encoding, blob_lines) = config.get_blob(blobs[0].clone()).await;
assert_eq!(content_type, Some(String::from("text/plain")));
assert_eq!(lines, blob_lines);
}

Expand All @@ -108,12 +104,9 @@ async fn azure_blob_insert_json_into_blob() {
let blobs = config.list_blobs(blob_prefix).await;
assert_eq!(blobs.len(), 1);
assert!(blobs[0].clone().ends_with(".log"));
let (blob, blob_lines) = config.get_blob(blobs[0].clone()).await;
assert_eq!(blob.properties.content_encoding, None);
assert_eq!(
blob.properties.content_type,
String::from("application/x-ndjson")
);
let (content_type, content_encoding, blob_lines) = config.get_blob(blobs[0].clone()).await;
assert_eq!(content_encoding, None);
assert_eq!(content_type, Some(String::from("application/x-ndjson")));
let expected = events
.iter()
.map(|event| serde_json::to_string(&event.as_log().all_event_fields().unwrap()).unwrap())
Expand Down Expand Up @@ -141,9 +134,9 @@ async fn azure_blob_insert_lines_into_blob_gzip() {
let blobs = config.list_blobs(blob_prefix).await;
assert_eq!(blobs.len(), 1);
assert!(blobs[0].clone().ends_with(".log.gz"));
let (blob, blob_lines) = config.get_blob(blobs[0].clone()).await;
assert_eq!(blob.properties.content_encoding, Some(String::from("gzip")));
assert_eq!(blob.properties.content_type, String::from("text/plain"));
let (content_type, content_encoding, blob_lines) = config.get_blob(blobs[0].clone()).await;
assert_eq!(content_encoding, Some(String::from("gzip")));
assert_eq!(content_type, Some(String::from("text/plain")));
assert_eq!(lines, blob_lines);
}

Expand Down Expand Up @@ -171,12 +164,9 @@ async fn azure_blob_insert_json_into_blob_gzip() {
let blobs = config.list_blobs(blob_prefix).await;
assert_eq!(blobs.len(), 1);
assert!(blobs[0].clone().ends_with(".log.gz"));
let (blob, blob_lines) = config.get_blob(blobs[0].clone()).await;
assert_eq!(blob.properties.content_encoding, Some(String::from("gzip")));
assert_eq!(
blob.properties.content_type,
String::from("application/x-ndjson")
);
let (content_type, content_encoding, blob_lines) = config.get_blob(blobs[0].clone()).await;
assert_eq!(content_encoding, Some(String::from("gzip")));
assert_eq!(content_type, Some(String::from("application/x-ndjson")));
let expected = events
.iter()
.map(|event| serde_json::to_string(&event.as_log().all_event_fields().unwrap()).unwrap())
Expand Down Expand Up @@ -207,7 +197,7 @@ async fn azure_blob_rotate_files_after_the_buffer_size_is_reached() {
assert_eq!(blobs.len(), 3);
let response = stream::iter(blobs)
.fold(Vec::new(), |mut acc, blob| async {
let (_, lines) = config.get_blob(blob).await;
let (_, _, lines) = config.get_blob(blob).await;
acc.push(lines);
acc
})
Expand Down Expand Up @@ -262,45 +252,69 @@ impl AzureBlobSinkConfig {
self.container_name.clone(),
)
.unwrap();
let response = client
.list_blobs()
.prefix(prefix)
.max_results(NonZeroU32::new(1000).unwrap())
.delimiter("/")
.include_metadata(true)
.into_stream()
.next()
.await
.expect("Failed to fetch blobs")
.unwrap();

response
.blobs
.blobs()
.map(|blob| blob.name.clone())
.collect::<Vec<_>>()

// Iterate pager results and collect blob names. Filter by prefix server-side.
let mut pager = client
.list_blobs(None)
.expect("Failed to start list blobs pager");
let mut names = Vec::new();
while let Some(result) = pager.next().await {
let item = result.expect("Failed to fetch blobs");
if let Some(name) = item.name.and_then(|bn| bn.content)
&& name.starts_with(&prefix)
{
names.push(name);
}
}

names
}

pub async fn get_blob(&self, blob: String) -> (Blob, Vec<String>) {
pub async fn get_blob(&self, blob: String) -> (Option<String>, Option<String>, Vec<String>) {
let client = azure_common::config::build_client(
self.connection_string.clone().into(),
self.container_name.clone(),
)
.unwrap();
let response = client
.blob_client(blob)
.get()
.range(Range::new(0, 1024 * 1024))
.into_stream()
.next()

let blob_client = client.blob_client(&blob);

// Fetch properties to obtain content-type and content-encoding
let props_resp = blob_client
.get_properties(None)
.await
.expect("Failed to get blob")
.unwrap();
.expect("Failed to get blob properties");
let headers = props_resp.headers();
let content_type = headers.iter().find_map(|(name, value)| {
let key = name.as_str();
if key.eq_ignore_ascii_case("content-type") {
Some(value.as_str().to_string())
} else {
None
}
});
let content_encoding = headers.iter().find_map(|(name, value)| {
let key = name.as_str();
if key.eq_ignore_ascii_case("content-encoding") {
Some(value.as_str().to_string())
} else {
None
}
});

(
response.blob,
self.get_blob_content(response.data.collect().await.unwrap().to_vec()),
)
// Download blob content (full or first MB as needed)
let downloaded = blob_client
.download(None)
.await
.expect("Failed to download blob");
let body_bytes = downloaded
.into_body()
.collect()
.await
.expect("Failed to read blob body");
let data = body_bytes.to_vec();

(content_type, content_encoding, self.get_blob_content(data))
}

fn get_blob_content(&self, data: Vec<u8>) -> Vec<String> {
Expand All @@ -322,15 +336,12 @@ impl AzureBlobSinkConfig {
self.container_name.clone(),
)
.unwrap();
let request = client
.create()
.public_access(PublicAccess::None)
.into_future();
let result = client.create_container(None).await;

let response = match request.await {
let response = match result {
Ok(_) => Ok(()),
Err(error) => match error.as_http_error() {
Some(http_error) if http_error.status() as u16 == StatusCode::Conflict => Ok(()),
Err(error) => match error.http_status() {
Some(StatusCode::Conflict) => Ok(()),
_ => Err(error),
},
};
Expand Down
Loading
Loading