diff --git a/.github/workflows/changes.yml b/.github/workflows/changes.yml index ada9463bb9133..31dd60615a50f 100644 --- a/.github/workflows/changes.yml +++ b/.github/workflows/changes.yml @@ -92,6 +92,8 @@ on: value: ${{ jobs.int_tests.outputs.dnstap }} docker-logs: value: ${{ jobs.int_tests.outputs.docker-logs }} + doris: + value: ${{ jobs.int_tests.outputs.doris }} elasticsearch: value: ${{ jobs.int_tests.outputs.elasticsearch }} eventstoredb: @@ -299,6 +301,7 @@ jobs: datadog-traces: ${{ steps.filter.outputs.datadog-traces }} dnstap: ${{ steps.filter.outputs.dnstap }} docker-logs: ${{ steps.filter.outputs.docker-logs }} + doris: ${{ steps.filter.outputs.doris }} elasticsearch: ${{ steps.filter.outputs.elasticsearch }} eventstoredb: ${{ steps.filter.outputs.eventstoredb }} fluent: ${{ steps.filter.outputs.fluent }} @@ -364,6 +367,7 @@ jobs: "datadog-traces": ${{ steps.filter.outputs.datadog-traces }}, "dnstap": ${{ steps.filter.outputs.dnstap }}, "docker-logs": ${{ steps.filter.outputs.docker-logs }}, + "doris": ${{ steps.filter.outputs.doris }}, "elasticsearch": ${{ steps.filter.outputs.elasticsearch }}, "eventstoredb": ${{ steps.filter.outputs.eventstoredb }}, "fluent": ${{ steps.filter.outputs.fluent }}, diff --git a/.github/workflows/integration.yml b/.github/workflows/integration.yml index a92c7f3d27593..de2cc01c29c7b 100644 --- a/.github/workflows/integration.yml +++ b/.github/workflows/integration.yml @@ -72,7 +72,7 @@ jobs: # If you modify this list, please also update the `int_tests` job in changes.yml. service: [ "amqp", "appsignal", "axiom", "aws", "azure", "clickhouse", "databend", "datadog-agent", - "datadog-logs", "datadog-metrics", "datadog-traces", "dnstap", "docker-logs", "elasticsearch", + "datadog-logs", "datadog-metrics", "datadog-traces", "dnstap", "docker-logs", "doris", "elasticsearch", "eventstoredb", "fluent", "gcp", "greptimedb", "http-client", "influxdb", "kafka", "logstash", "loki", "mqtt", "mongodb", "nats", "nginx", "opentelemetry", "postgres", "prometheus", "pulsar", "redis", "webhdfs" diff --git a/.github/workflows/semantic.yml b/.github/workflows/semantic.yml index 40a237787c53d..88fd9e4cb6451 100644 --- a/.github/workflows/semantic.yml +++ b/.github/workflows/semantic.yml @@ -215,6 +215,7 @@ jobs: datadog_events sink datadog_logs sink datadog_metrics sink + doris sink elasticsearch sink file sink gcp_chronicle sink diff --git a/Cargo.toml b/Cargo.toml index e4b501795ccfd..03fb3a3afe8fa 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -793,6 +793,7 @@ sinks-logs = [ "sinks-datadog_events", "sinks-datadog_logs", "sinks-datadog_traces", + "sinks-doris", "sinks-elasticsearch", "sinks-file", "sinks-gcp", @@ -860,6 +861,7 @@ sinks-datadog_events = [] sinks-datadog_logs = [] sinks-datadog_metrics = ["protobuf-build", "dep:prost", "dep:prost-reflect"] sinks-datadog_traces = ["protobuf-build", "dep:prost", "dep:rmpv", "dep:rmp-serde", "dep:serde_bytes"] +sinks-doris = ["sqlx/mysql"] sinks-elasticsearch = ["transforms-metric_to_log"] sinks-file = ["dep:async-compression"] sinks-gcp = ["sinks-gcp-chronicle", "dep:base64", "gcp"] @@ -913,6 +915,7 @@ all-integration-tests = [ "datadog-traces-integration-tests", "dnstap-integration-tests", "docker-logs-integration-tests", + "doris-integration-tests", "es-integration-tests", "eventstoredb_metrics-integration-tests", "fluent-integration-tests", @@ -978,6 +981,7 @@ datadog-logs-integration-tests = ["sinks-datadog_logs"] datadog-metrics-integration-tests = ["sinks-datadog_metrics", "dep:prost"] datadog-traces-integration-tests = ["sources-datadog_agent", "sinks-datadog_traces", "axum/tokio"] docker-logs-integration-tests = ["sources-docker_logs", "unix"] +doris-integration-tests = ["sinks-doris"] es-integration-tests = ["sinks-elasticsearch", "aws-core"] eventstoredb_metrics-integration-tests = ["sources-eventstoredb_metrics"] fluent-integration-tests = ["docker", "sources-fluent"] diff --git a/changelog.d/doris_sink_support.feature.md b/changelog.d/doris_sink_support.feature.md new file mode 100644 index 0000000000000..7a82356f93dcd --- /dev/null +++ b/changelog.d/doris_sink_support.feature.md @@ -0,0 +1,3 @@ +Add new Apache Doris sink for sending log data to Apache Doris databases using the Stream Load API. The sink supports configurable batching, custom HTTP headers for Doris-specific options, authentication, rate limiting, adaptive concurrency control, and includes comprehensive health checks. + +authors: bingquanzhao diff --git a/src/internal_events/doris.rs b/src/internal_events/doris.rs new file mode 100644 index 0000000000000..0a2430869b7c5 --- /dev/null +++ b/src/internal_events/doris.rs @@ -0,0 +1,42 @@ +use metrics::counter; +use vector_lib::{NamedInternalEvent, internal_event::InternalEvent}; + +/// Emitted when rows are successfully loaded into Doris. +#[derive(Debug, NamedInternalEvent)] +pub struct DorisRowsLoaded { + pub loaded_rows: i64, + pub load_bytes: i64, +} + +impl InternalEvent for DorisRowsLoaded { + fn emit(self) { + trace!( + message = "Doris rows loaded successfully.", + loaded_rows = %self.loaded_rows, + load_bytes = %self.load_bytes + ); + + // Record the number of rows loaded + counter!("doris_rows_loaded_total").increment(self.loaded_rows as u64); + + // Record the number of bytes loaded + counter!("doris_bytes_loaded_total").increment(self.load_bytes as u64); + } +} + +/// Emitted when rows are filtered by Doris during loading. +#[derive(Debug, NamedInternalEvent)] +pub struct DorisRowsFiltered { + pub filtered_rows: i64, +} + +impl InternalEvent for DorisRowsFiltered { + fn emit(self) { + warn!( + message = "Doris rows filtered during loading.", + filtered_rows = %self.filtered_rows + ); + + counter!("doris_rows_filtered_total").increment(self.filtered_rows as u64); + } +} diff --git a/src/internal_events/mod.rs b/src/internal_events/mod.rs index 7a45737149708..f22f896336070 100644 --- a/src/internal_events/mod.rs +++ b/src/internal_events/mod.rs @@ -45,6 +45,8 @@ mod demo_logs; mod dnstap; #[cfg(feature = "sources-docker_logs")] mod docker_logs; +#[cfg(feature = "sinks-doris")] +mod doris; mod encoding_transcode; #[cfg(feature = "sources-eventstoredb_metrics")] mod eventstoredb_metrics; @@ -197,6 +199,8 @@ pub(crate) use self::demo_logs::*; pub(crate) use self::dnstap::*; #[cfg(feature = "sources-docker_logs")] pub(crate) use self::docker_logs::*; +#[cfg(feature = "sinks-doris")] +pub(crate) use self::doris::*; #[cfg(feature = "sources-eventstoredb_metrics")] pub(crate) use self::eventstoredb_metrics::*; #[cfg(feature = "sources-exec")] diff --git a/src/sinks/doris/client.rs b/src/sinks/doris/client.rs new file mode 100644 index 0000000000000..85bf9092829ba --- /dev/null +++ b/src/sinks/doris/client.rs @@ -0,0 +1,515 @@ +use crate::{ + http::{Auth, HttpClient}, + internal_events::EndpointBytesSent, + sinks::util::Compression, +}; +use bytes::Bytes; +use http::{ + Method, Response, StatusCode, Uri, + header::{CONTENT_LENGTH, CONTENT_TYPE, EXPECT}, +}; +use http_body::{Body as _, Collected}; +use hyper::{Body, Request}; +use serde_json::Value; +use snafu::Snafu; +use std::{ + collections::{HashMap, HashSet}, + sync::Arc, + time::SystemTime, +}; +use tracing::debug; +use uuid::Uuid; + +/// Content-Type header value for Doris Stream Load requests. +/// Doris expects plain text with UTF-8 encoding for JSON data. +const DORIS_CONTENT_TYPE: &str = "text/plain;charset=utf-8"; + +/// Expect header value for Doris Stream Load requests. +/// The "100-continue" mechanism allows the client to wait for server acknowledgment +/// before sending the request body, which is required by Doris Stream Load protocol. +const DORIS_EXPECT_HEADER: &str = "100-continue"; + +/// Group commit header name for Doris Stream Load. +const GROUP_COMMIT_HEADER: &str = "group_commit"; + +/// Group commit sync mode - multiple imports merged into one transaction, +/// returns after transaction commit. Data is visible immediately after import. +const GROUP_COMMIT_SYNC_MODE: &str = "sync_mode"; + +/// Group commit async mode - data written to WAL first, returns immediately. +/// Data is visible after async commit based on group_commit_interval. +const GROUP_COMMIT_ASYNC_MODE: &str = "async_mode"; + +/// Thread-safe version of the DorisSinkClient, wrapped in an Arc +pub type ThreadSafeDorisSinkClient = Arc; + +/// DorisSinkClient handles the HTTP communication with Doris server +/// This client is thread-safe by design +#[derive(Clone, Debug)] +pub struct DorisSinkClient { + http_client: HttpClient, + base_url: Uri, + auth: Option, + compression: Compression, + label_prefix: String, + headers: Arc>, +} + +impl DorisSinkClient { + pub async fn new( + http_client: HttpClient, + base_url: Uri, + auth: Option, + compression: Compression, + label_prefix: String, + req_headers: HashMap, + ) -> Self { + // Store custom headers (basic headers like Content-Type and Expect + // are set directly in the request builder) + let headers = req_headers; + + Self { + http_client, + base_url, + auth, + compression, + label_prefix, + headers: Arc::new(headers), + } + } + + /// Converts a DorisSinkClient into a thread-safe version + pub fn into_thread_safe(self) -> ThreadSafeDorisSinkClient { + Arc::new(self) + } + + /// Generate a unique label for the stream load + fn generate_label(&self, database: &str, table: &str) -> String { + format!( + "{}_{}_{}_{}_{}", + self.label_prefix, + database, + table, + SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap_or_default() + .as_millis(), + Uuid::new_v4() + ) + } + + /// Check if group commit is enabled in the custom headers + /// Group commit has three modes: + /// - off_mode: disabled, label is required + /// - sync_mode: enabled, label should be skipped + /// - async_mode: enabled, label should be skipped + fn is_group_commit_enabled(&self) -> bool { + self.headers.iter().any(|(k, v)| { + k.eq_ignore_ascii_case(GROUP_COMMIT_HEADER) + && (v.eq_ignore_ascii_case(GROUP_COMMIT_SYNC_MODE) + || v.eq_ignore_ascii_case(GROUP_COMMIT_ASYNC_MODE)) + }) + } + + /// Build a request for the Doris stream load + async fn build_request( + &self, + database: &str, + table: &str, + payload: &Bytes, + redirect_url: Option<&str>, + ) -> Result, crate::Error> { + let uri = if let Some(redirect_url) = redirect_url { + debug!(%redirect_url, "Using redirect URL."); + redirect_url.parse::().map_err(|source| { + debug!( + message = "Failed to parse redirect URI.", + %source, + %redirect_url + ); + StreamLoadError::InvalidRedirectUri { source } + })? + } else { + // Build original URL using Uri components to avoid trailing slash issues + let scheme = self.base_url.scheme_str().unwrap_or("http"); + let authority = self.base_url.authority().map(|a| a.as_str()).unwrap_or(""); + let stream_load_url = format!( + "{}://{}/api/{}/{}/_stream_load", + scheme, authority, database, table + ); + + stream_load_url.parse::().map_err(|source| { + debug!( + message = "Failed to parse URI.", + %source, + url = %stream_load_url + ); + StreamLoadError::InvalidStreamLoadUri { source } + })? + }; + + let mut builder = Request::builder() + .method(Method::PUT) + .uri(uri.clone()) + .header(CONTENT_LENGTH, payload.len()) + .header(CONTENT_TYPE, DORIS_CONTENT_TYPE) + .header(EXPECT, DORIS_EXPECT_HEADER); + + // Only set label when group commit is not enabled + // When group commit is enabled, Doris server ignores the label anyway + // as multiple requests are merged into a single transaction + if self.is_group_commit_enabled() { + debug!(%uri, "Building request with group commit enabled (no label)."); + } else { + let label = self.generate_label(database, table); + debug!(%uri, %label, "Building request."); + builder = builder.header("label", &label); + }; + + // Add compression headers if needed + if let Some(ce) = self.compression.content_encoding() { + builder = builder.header("Content-Encoding", ce); + } + + if let Some(ae) = self.compression.accept_encoding() { + builder = builder.header("Accept-Encoding", ae); + } + + // Add custom headers + for (header, value) in self.headers.as_ref() { + builder = builder.header(&header[..], &value[..]); + } + + let body = Body::from(payload.clone()); + let mut request = builder.body(body).map_err(|source| { + debug!( + message = "Failed to build HTTP request.", + %source, + uri = %uri + ); + StreamLoadError::BuildRequest { source } + })?; + + if let Some(auth) = &self.auth { + auth.apply(&mut request); + } + + debug!( + message = "Request built successfully.", + method = %request.method(), + uri = %request.uri(), + headers_count = request.headers().len() + ); + + Ok(request) + } + + /// Handle redirects and send the HTTP request to Doris + /// Returns the HTTP response and event status + pub async fn send_stream_load( + &self, + database: String, + table: String, + payload: Bytes, + ) -> Result { + // Track visited URLs to prevent redirect loops + let mut visited_urls = HashSet::new(); + let mut redirect_count = 0; + // Doris Stream Load typically redirects once (FE -> BE), but we allow up to 3 + // redirects to handle potential multi-hop scenarios while preventing infinite loops. + const MAX_REDIRECTS: u8 = 3; + + let payload_ref = &payload; + + // Build and send initial request + let request = self + .build_request(&database, &table, payload_ref, None) + .await?; + let endpoint = request.uri().to_string(); + let byte_size = payload.len(); + + let mut response = self.http_client.send(request).await?; + let mut status = response.status(); + + // Track protocol for metrics + let protocol = if endpoint.starts_with("https://") { + "https" + } else { + "http" + }; + + // Handle redirect loop + while (status == StatusCode::TEMPORARY_REDIRECT + || status == StatusCode::PERMANENT_REDIRECT + || status == StatusCode::FOUND) + && redirect_count < MAX_REDIRECTS + { + // Get redirect location + if let Some(location) = response.headers().get(http::header::LOCATION) { + if let Ok(location_str) = location.to_str() { + debug!( + message = "Following redirect.", + status = %status, + to = %location_str, + redirect_count = redirect_count + 1 + ); + + // Check for redirect loop + if !visited_urls.insert(location_str.to_string()) { + return Err(StreamLoadError::RedirectLoop.into()); + } + + // Build and send redirect request + let redirect_req = self + .build_request(&database, &table, payload_ref, Some(location_str)) + .await?; + + response = self.http_client.send(redirect_req).await?; + status = response.status(); + + // Increment redirect counter + redirect_count += 1; + + debug!( + message = "Received response after redirect.", + new_status = %status, + redirect_count = redirect_count + ); + } else { + return Err(StreamLoadError::InvalidLocationHeader.into()); + } + } else { + return Err(StreamLoadError::MissingLocationHeader.into()); + } + } + + // Check if maximum redirects exceeded + if redirect_count >= MAX_REDIRECTS { + return Err(StreamLoadError::MaxRedirectsExceeded { max: MAX_REDIRECTS }.into()); + } + + // Log endpoint bytes sent metric + emit!(EndpointBytesSent { + byte_size, + protocol, + endpoint: endpoint.as_str(), + }); + + // Extract response body + let (parts, body) = response.into_parts(); + let body_bytes = body + .collect() + .await + .map(Collected::to_bytes) + .map_err(|source| StreamLoadError::ReadResponseBody { source })?; + + let status = parts.status; + + let response_json = serde_json::from_slice::(&body_bytes) + .map_err(|source| StreamLoadError::ParseResponseJson { source })?; + + let stream_load_status = + if let Some(status_str) = response_json.get("Status").and_then(|v| v.as_str()) { + if status_str.to_lowercase() == "success" { + StreamLoadStatus::Successful + } else { + StreamLoadStatus::Failure + } + } else { + StreamLoadStatus::Failure + }; + + Ok(DorisStreamLoadResponse { + http_status_code: status, + stream_load_status, + response: Response::from_parts(parts, body_bytes), + response_json, + }) + } + + pub async fn healthcheck_fenode(&self, endpoint: &Uri) -> crate::Result<()> { + // Use Doris bootstrap API endpoint for health check, GET method + let scheme = endpoint.scheme_str().unwrap_or("http"); + let authority = endpoint.authority().map(|a| a.as_str()).unwrap_or(""); + let uri_str = format!("{}://{}/api/bootstrap", scheme, authority); + + let uri = uri_str.parse::().map_err(|source| { + debug!( + message = "Failed to parse health check URI.", + %source, + url = %uri_str + ); + HealthCheckError::InvalidUri { source } + })?; + + debug!( + message = "Sending health check request to Doris FE node.", + uri = %uri + ); + + let mut request = Request::builder() + .method(Method::GET) + .uri(uri) + .body(Body::empty()) + .map_err(|source| HealthCheckError::HealthCheckBuildRequest { source })?; + + if let Some(auth) = &self.auth { + auth.apply(&mut request); + } + + let response = self.http_client.send(request).await?; + let status = response.status(); + + let (_, body) = response.into_parts(); + let body_bytes = body + .collect() + .await + .map(Collected::to_bytes) + .map_err(|source| HealthCheckError::HealthCheckReadResponseBody { source })?; + + if status.is_success() { + // Parse the response JSON + match serde_json::from_slice::(&body_bytes) { + Ok(json) => { + // Check if the msg field is "success" + if let Some(msg) = json.get("msg").and_then(|m| m.as_str()) { + if msg.to_lowercase() == "success" { + debug!( + message = "Doris FE node is healthy.", + node = %endpoint + ); + return Ok(()); + } else { + debug!( + message = "Doris FE node returned non-success message.", + node = %endpoint, + message = %msg + ); + return Err(HealthCheckError::HealthCheckFailed { + message: msg.to_string(), + } + .into()); + } + } + } + Err(source) => { + return Err(HealthCheckError::HealthCheckParseResponse { source }.into()); + } + } + } + + debug!( + message = "Doris FE node health check failed.", + node = %endpoint, + status = %status + ); + + Err(HealthCheckError::HealthCheckFailed { + message: format!("HTTP status: {}", status), + } + .into()) + } +} + +#[allow(dead_code)] +#[derive(Debug)] +pub struct DorisStreamLoadResponse { + pub http_status_code: StatusCode, + pub stream_load_status: StreamLoadStatus, + pub response: Response, + pub response_json: Value, +} + +impl Clone for DorisStreamLoadResponse { + fn clone(&self) -> Self { + let cloned_response = Response::builder() + .status(self.http_status_code) + .body(self.response.body().clone()) + .unwrap_or_else(|_| { + Response::builder() + .status(StatusCode::INTERNAL_SERVER_ERROR) + .body(Bytes::new()) + .unwrap() + }); + + Self { + http_status_code: self.http_status_code, + stream_load_status: self.stream_load_status.clone(), + response: cloned_response, + response_json: self.response_json.clone(), + } + } +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum StreamLoadStatus { + Successful, + Failure, +} +impl std::fmt::Display for StreamLoadStatus { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Successful => write!(f, "Successful"), + Self::Failure => write!(f, "Failure"), + } + } +} + +impl From for vector_common::finalization::EventStatus { + fn from(status: StreamLoadStatus) -> Self { + match status { + StreamLoadStatus::Successful => vector_common::finalization::EventStatus::Delivered, + StreamLoadStatus::Failure => vector_common::finalization::EventStatus::Errored, + } + } +} + +/// Errors that can occur during Doris Stream Load operations. +#[derive(Debug, Snafu)] +pub enum StreamLoadError { + #[snafu(display("Invalid redirect URI: {}", source))] + InvalidRedirectUri { source: http::uri::InvalidUri }, + + #[snafu(display("Invalid stream load URI: {}", source))] + InvalidStreamLoadUri { source: http::uri::InvalidUri }, + + #[snafu(display("Detected redirect loop"))] + RedirectLoop, + + #[snafu(display("Invalid Location header in redirect response"))] + InvalidLocationHeader, + + #[snafu(display("Missing Location header in redirect response"))] + MissingLocationHeader, + + #[snafu(display("Exceeded maximum number of redirects ({})", max))] + MaxRedirectsExceeded { max: u8 }, + + #[snafu(display("Failed to build request: {}", source))] + BuildRequest { source: http::Error }, + + #[snafu(display("Failed to read response body: {}", source))] + ReadResponseBody { source: hyper::Error }, + + #[snafu(display("Failed to parse response JSON: {}", source))] + ParseResponseJson { source: serde_json::Error }, +} + +/// Errors that can occur during Doris health check operations. +#[derive(Debug, Snafu)] +pub enum HealthCheckError { + #[snafu(display("Invalid health check URI: {}", source))] + InvalidUri { source: http::uri::InvalidUri }, + + #[snafu(display("Failed to build health check request: {}", source))] + HealthCheckBuildRequest { source: http::Error }, + + #[snafu(display("Failed to read health check response body: {}", source))] + HealthCheckReadResponseBody { source: hyper::Error }, + + #[snafu(display("Failed to parse health check response: {}", source))] + HealthCheckParseResponse { source: serde_json::Error }, + + #[snafu(display("Doris health check failed: {}", message))] + HealthCheckFailed { message: String }, +} diff --git a/src/sinks/doris/common.rs b/src/sinks/doris/common.rs new file mode 100644 index 0000000000000..65af2023cb36d --- /dev/null +++ b/src/sinks/doris/common.rs @@ -0,0 +1,67 @@ +use crate::{ + codecs::Encoder, + http::{Auth, MaybeAuth}, + sinks::{ + doris::{ + DorisConfig, client::ThreadSafeDorisSinkClient, request_builder::DorisRequestBuilder, + }, + prelude::Compression, + util::UriSerde, + }, + tls::TlsSettings, +}; +use http::Uri; +use vector_lib::codecs::encoding::Framer; + +#[derive(Debug, Clone)] +pub struct DorisCommon { + pub base_url: Uri, + pub auth: Option, + pub request_builder: DorisRequestBuilder, + pub tls_settings: TlsSettings, +} + +impl DorisCommon { + pub async fn parse_config(config: &DorisConfig, endpoint: &UriSerde) -> crate::Result { + if endpoint.uri.host().is_none() { + return Err( + format!("Invalid host: {}, host must include hostname", endpoint.uri).into(), + ); + } + + // basic auth must be some for now + let auth = config.auth.choose_one(&endpoint.auth)?; + let base_url = endpoint.uri.clone(); + let tls_settings = TlsSettings::from_options(config.tls.as_ref())?; + + // Build encoder from the encoding configuration + let transformer = config.encoding.transformer(); + let (framer, serializer) = config + .encoding + .build(crate::codecs::SinkType::StreamBased)?; + let encoder = Encoder::::new(framer, serializer); + + let request_builder = DorisRequestBuilder { + compression: Compression::None, + encoder: (transformer, encoder), + }; + + Ok(Self { + base_url, + auth, + request_builder, + tls_settings, + }) + } + pub async fn parse_many(config: &DorisConfig) -> crate::Result> { + let mut commons = Vec::new(); + for endpoint in config.endpoints.iter() { + commons.push(Self::parse_config(config, endpoint).await?); + } + Ok(commons) + } + + pub async fn healthcheck(&self, client: ThreadSafeDorisSinkClient) -> crate::Result<()> { + client.healthcheck_fenode(&self.base_url).await + } +} diff --git a/src/sinks/doris/config.rs b/src/sinks/doris/config.rs new file mode 100644 index 0000000000000..5c26d3028e404 --- /dev/null +++ b/src/sinks/doris/config.rs @@ -0,0 +1,265 @@ +//! Configuration for the `Doris` sink. + +use super::sink::DorisSink; + +use crate::{ + codecs::EncodingConfigWithFraming, + http::{Auth, HttpClient}, + sinks::{ + doris::{ + client::DorisSinkClient, common::DorisCommon, health::DorisHealthLogic, + retry::DorisRetryLogic, service::DorisService, + }, + prelude::*, + util::{RealtimeSizeBasedDefaultBatchSettings, UriSerde, service::HealthConfig}, + }, +}; +use futures; +use futures_util::TryFutureExt; +use std::collections::HashMap; +use std::sync::Arc; + +/// Configuration for the `doris` sink. +#[configurable_component(sink("doris", "Deliver log data to an Apache Doris database."))] +#[derive(Clone, Debug)] +#[serde(deny_unknown_fields)] +pub struct DorisConfig { + /// A list of Doris endpoints to send logs to. + /// + /// The endpoint must contain an HTTP scheme, and may specify a + /// hostname or IP address and port. + #[serde(default)] + #[configurable(metadata(docs::examples = "http://127.0.0.1:8030"))] + pub endpoints: Vec, + + /// The database that contains the table data will be inserted into. + #[configurable(metadata(docs::examples = "mydatabase"))] + pub database: Template, + + /// The table data is inserted into. + #[configurable(metadata(docs::examples = "mytable"))] + pub table: Template, + + /// The prefix for Stream Load label. + /// The final label will be in format: `{label_prefix}_{database}_{table}_{timestamp}_{uuid}`. + #[configurable(metadata(docs::examples = "vector"))] + #[serde(default = "default_label_prefix")] + pub label_prefix: String, + + /// Enable request logging. + #[serde(default, skip_serializing_if = "crate::serde::is_default")] + pub log_request: bool, + + /// Custom HTTP headers to add to the request. + /// + /// These headers can be used to set Doris-specific Stream Load parameters: + /// - `format`: Data format (json, csv.) + /// - `read_json_by_line`: Whether to read JSON line by line + /// - `strip_outer_array`: Whether to strip outer array brackets + /// - Column mappings and transformations + /// + /// See [Doris Stream Load documentation](https://doris.apache.org/docs/data-operate/import/import-way/stream-load-manual) + /// for all available parameters. + #[serde(default)] + #[configurable(metadata(docs::additional_props_description = "An HTTP header value."))] + pub headers: HashMap, + + #[serde(flatten)] + pub encoding: EncodingConfigWithFraming, + + /// Compression algorithm to use for HTTP requests. + #[serde(default)] + pub compression: Compression, + + /// Number of retries attempted before failing. + #[serde(default = "default_max_retries")] + pub max_retries: isize, + + #[configurable(derived)] + #[serde(default)] + pub batch: BatchConfig, + + #[configurable(derived)] + pub auth: Option, + + #[serde(default)] + #[configurable(derived)] + pub request: TowerRequestConfig, + + #[configurable(derived)] + pub tls: Option, + + /// Options for determining the health of Doris endpoints. + #[serde(default)] + #[configurable(derived)] + #[serde(rename = "distribution")] + pub endpoint_health: Option, + + #[configurable(derived)] + #[serde( + default, + deserialize_with = "crate::serde::bool_or_struct", + skip_serializing_if = "crate::serde::is_default" + )] + pub acknowledgements: AcknowledgementsConfig, +} + +fn default_label_prefix() -> String { + "vector".to_string() +} + +const fn default_max_retries() -> isize { + -1 +} + +impl Default for DorisConfig { + fn default() -> Self { + Self { + endpoints: Vec::new(), + database: Template::try_from("").unwrap(), + table: Template::try_from("").unwrap(), + label_prefix: default_label_prefix(), + log_request: false, + headers: HashMap::new(), + encoding: ( + Some(vector_lib::codecs::encoding::FramingConfig::NewlineDelimited), + vector_lib::codecs::JsonSerializerConfig::default(), + ) + .into(), + compression: Compression::default(), + max_retries: default_max_retries(), + batch: BatchConfig::default(), + auth: None, + request: TowerRequestConfig::default(), + tls: None, + endpoint_health: None, + acknowledgements: AcknowledgementsConfig::default(), + } + } +} + +impl_generate_config_from_default!(DorisConfig); + +#[async_trait::async_trait] +#[typetag::serde(name = "doris")] +impl SinkConfig for DorisConfig { + async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> { + let endpoints = self.endpoints.clone(); + + if endpoints.is_empty() { + return Err("No endpoints configured.'.".into()); + } + let commons = DorisCommon::parse_many(self).await?; + let common = commons[0].clone(); + + let client = HttpClient::new(common.tls_settings.clone(), &cx.proxy)?; + + // Setup retry logic using the configured request settings + let request_settings = self.request.into_settings(); + + let health_config = self.endpoint_health.clone().unwrap_or_default(); + + let services_futures = commons + .iter() + .map(|common| { + let client_clone = client.clone(); + let compression = self.compression; + let label_prefix = self.label_prefix.clone(); + let headers = self.headers.clone(); + let log_request = self.log_request; + let base_url = common.base_url.clone(); + let auth = common.auth.clone(); + + async move { + let endpoint = base_url.to_string(); + + let doris_client = DorisSinkClient::new( + client_clone, + base_url, + auth, + compression, + label_prefix, + headers, + ) + .await; + + let doris_client_safe = doris_client.into_thread_safe(); + + let service = DorisService::new(doris_client_safe, log_request); + + Ok::<_, crate::Error>((endpoint, service)) + } + }) + .collect::>(); + + // Wait for all futures to complete + let services_results = futures::future::join_all(services_futures).await; + + // Filter out successful results + let services = services_results + .into_iter() + .filter_map(Result::ok) + .collect::>(); + + let service = request_settings.distributed_service( + DorisRetryLogic {}, + services, + health_config, + DorisHealthLogic, + 1, // Buffer bound is hardcoded to 1 for sinks + ); + + // Create DorisSink with the configured service + let sink = DorisSink::new(service, self, &common)?; + + let sink = VectorSink::from_event_streamsink(sink); + + // Create a shared client instance to avoid repeated creation + let healthcheck_doris_client = { + let doris_client = DorisSinkClient::new( + client.clone(), + common.base_url.clone(), + common.auth.clone(), + self.compression, + self.label_prefix.clone(), + self.headers.clone(), + ) + .await; + doris_client.into_thread_safe() + }; + + // Use the previously saved client for health check, no need to create a new instance + let healthcheck = futures::future::select_ok(commons.into_iter().map(move |common| { + let client = Arc::clone(&healthcheck_doris_client); + async move { common.healthcheck(client).await }.boxed() + })) + .map_ok(|((), _)| ()) + .boxed(); + + Ok((sink, healthcheck)) + } + + fn input(&self) -> Input { + Input::log() + } + + fn acknowledgements(&self) -> &AcknowledgementsConfig { + &self.acknowledgements + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn generate_config() { + crate::test_util::test_generate_config::(); + } + + #[test] + fn test_default_values() { + assert_eq!(default_label_prefix(), "vector"); + assert_eq!(default_max_retries(), -1); + } +} diff --git a/src/sinks/doris/health.rs b/src/sinks/doris/health.rs new file mode 100644 index 0000000000000..008a4aac9befd --- /dev/null +++ b/src/sinks/doris/health.rs @@ -0,0 +1,57 @@ +use crate::{ + http::HttpError, + sinks::{doris::service::DorisResponse, util::service::HealthLogic}, +}; +use tracing::{debug, error}; + +#[derive(Clone)] +pub struct DorisHealthLogic; + +impl HealthLogic for DorisHealthLogic { + type Error = crate::Error; + type Response = DorisResponse; + + fn is_healthy(&self, response: &Result) -> Option { + match response { + Ok(response) => { + let status = response.http_response.status(); + if status.is_success() { + debug!( + message = "Health check succeeded with success status code.", + status_code = %status + ); + Some(true) + } else if status.is_server_error() { + error!( + message = "Health check failed with server error status code.", + status_code = %status + ); + Some(false) + } else { + debug!( + message = "Health check returned non-success status code, but not determining health state.", + status_code = %status + ); + None + } + } + Err(error) => match error.downcast_ref::() { + Some(http_error) => { + error!( + message = "Health check failed with HTTP error.", + error_type = "HttpError::CallRequest", + %http_error + ); + Some(false) + } + _ => { + debug!( + message = "Health check failed with non-HTTP error, not determining health state.", + %error + ); + None + } + }, + } + } +} diff --git a/src/sinks/doris/integration_test.rs b/src/sinks/doris/integration_test.rs new file mode 100644 index 0000000000000..ffd864f0a9a9b --- /dev/null +++ b/src/sinks/doris/integration_test.rs @@ -0,0 +1,412 @@ +use futures::{future::ready, stream}; +use http::Uri; +use sqlx::{ + ConnectOptions, Connection, Executor as _, MySqlConnection, Row, mysql::MySqlConnectOptions, +}; +use std::collections::HashMap; +use tracing::{info, warn}; +use vector_lib::{ + codecs::{JsonSerializerConfig, MetricTagValues, encoding::FramingConfig}, + event::{BatchNotifier, BatchStatusReceiver, Event, LogEvent, Value}, +}; +// use vector_common::finalization::BatchStatus; +use vector_common::sensitive_string::SensitiveString; + +use super::*; +use crate::{ + config::{SinkConfig, SinkContext}, + sinks::util::BatchConfig, + test_util::{ + components::{SINK_TAGS, run_and_assert_sink_compliance}, + random_string, trace_init, + }, +}; + +fn doris_mysql_address_port() -> (String, u16) { + let host_port = doris_address(); + let uri = host_port.parse::().expect("invalid uri"); + let host = uri.host().unwrap_or("localhost").to_string(); + (host, 9030) +} + +// Set up Doris connection information +fn doris_address() -> String { + std::env::var("DORIS_ADDRESS").unwrap_or_else(|_| "http://localhost:8030".into()) +} + +fn make_event() -> (Event, BatchStatusReceiver) { + let (batch, receiver) = BatchNotifier::new_with_receiver(); + let mut event = LogEvent::from("raw log line").with_batch_notifier(&batch); + event.insert("host", "example.com"); + (event.into(), receiver) +} + +// Verify event fields match database row data +fn assert_fields_match( + event_log: &LogEvent, + db_row: &HashMap, + fields: &[&str], + table_name: Option<&str>, +) { + for field in fields { + // Get field value from event + let event_value = event_log.get(*field).cloned().unwrap_or(Value::Null); + + // Get field value from database row + let db_value = db_row.get(*field).cloned().unwrap_or(DbValue::Null); + + // Convert event value to string + let event_str = match &event_value { + Value::Bytes(bytes) => String::from_utf8_lossy(bytes).to_string(), + other => other.to_string(), + }; + // Database value already has Display implementation, use directly + let db_str = db_value.to_string(); + + // Build error message + let error_msg = if let Some(table) = table_name { + format!("Field '{}' mismatch in table {}", field, table) + } else { + format!("Field '{}' mismatch", field) + }; + + // Compare string representations + assert_eq!(event_str, db_str, "{}", error_msg); + } +} + +#[derive(Clone)] +struct DorisAuth { + user: String, + password: String, +} + +fn config_auth() -> DorisAuth { + DorisAuth { + user: "root".to_string(), + password: "".to_string(), + } +} + +fn default_headers() -> HashMap { + vec![ + ("format".to_string(), "json".to_string()), + ("strip_outer_array".to_string(), "false".to_string()), + ("read_json_by_line".to_string(), "true".to_string()), + ] + .into_iter() + .collect() +} + +#[tokio::test] +async fn insert_events() { + trace_init(); + + let database = format!("test_db_{}_point", random_string(5).to_lowercase()); + let table = format!("test_table_{}", random_string(5).to_lowercase()); + + let client = DorisTestClient::new(doris_mysql_address_port()).await; + info!( + message = "DorisTestClient created successfully, creating database...", + internal_log_rate_limit = true + ); + + client.create_database(&database).await; + + client + .create_table(&database, &table, "host Varchar(100), message String") + .await; + + let mut batch = BatchConfig::default(); + batch.max_events = Some(1); + + let config = DorisConfig { + endpoints: vec![doris_address().parse().unwrap()], + database: database.clone().try_into().unwrap(), + table: table.clone().try_into().unwrap(), + label_prefix: "vector_test".to_string(), + log_request: true, // Explicitly enable for integration tests + encoding: ( + Some(FramingConfig::NewlineDelimited), + JsonSerializerConfig::new(MetricTagValues::Full, Default::default()), + ) + .into(), + headers: default_headers(), + batch, + auth: Some(crate::http::Auth::Basic { + user: config_auth().user.clone(), + password: SensitiveString::from(config_auth().password.clone()), + }), + request: Default::default(), + ..Default::default() + }; + + let (sink, _hc) = config.build(SinkContext::default()).await.unwrap(); + + let (input_event, _rc) = make_event(); + + run_and_assert_sink_compliance(sink, stream::once(ready(input_event.clone())), &SINK_TAGS) + .await; + + // assert_eq!(rc.try_recv(), Ok(BatchStatus::Delivered)); + + let row_count = client.count_rows(&database, &table).await; + assert_eq!(1, row_count); + + let db_row = client.get_first_row(&database, &table).await; + + // Use helper function to check field matching + assert_fields_match(input_event.as_log(), &db_row, &["host", "message"], None); + + client.drop_table(&database, &table).await; + client.drop_database(&database).await; +} + +// Define an enum type that can represent different types of values +#[derive(Debug, Clone)] +enum DbValue { + String(String), + Integer(i64), + Float(f64), + Null, +} + +impl std::fmt::Display for DbValue { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + DbValue::String(s) => write!(f, "{}", s), + DbValue::Integer(i) => write!(f, "{}", i), + DbValue::Float(fl) => write!(f, "{}", fl), + DbValue::Null => write!(f, "null"), + } + } +} + +#[derive(Clone)] +struct DorisTestClient { + connect_options: MySqlConnectOptions, +} + +impl DorisTestClient { + async fn new(query_address_port: (String, u16)) -> Self { + let auth = config_auth(); + let (host, port) = query_address_port; + + info!( + message = "Connecting to Doris MySQL interface.", + %host, + %port, + user = %auth.user, + internal_log_rate_limit = true + ); + + // Configure MySQL connection parameters - For Doris specifically adjusted + // Disable these options to not send `SET @@sql_mode=CONCAT(@@sql_mode, {})` which is not supported on Doris. + let mut connect_options = MySqlConnectOptions::new() + .host(&host) + .port(port) + .username(&auth.user) + .no_engine_substitution(false) // Keep false to avoid SET statement + .pipes_as_concat(false) // Keep false to avoid SET statement + .ssl_mode(sqlx::mysql::MySqlSslMode::Disabled) + .disable_statement_logging(); + + // Only set password if it's not empty (Doris root user has no password by default) + if !auth.password.is_empty() { + connect_options = connect_options.password(&auth.password); + } + + info!( + message = "DorisTestClient initialized successfully.", + internal_log_rate_limit = true + ); + DorisTestClient { connect_options } + } + + /// Create a new database connection + async fn create_connection(&self) -> MySqlConnection { + MySqlConnection::connect_with(&self.connect_options) + .await + .unwrap_or_else(|e| panic!("Failed to connect to database: {}", e)) + } + + /// Execute a query that doesn't return data (DDL/DML operations) + async fn execute_query(&self, query: &str) { + info!( + message = "Executing SQL query.", + %query, + internal_log_rate_limit = true + ); + + let mut conn = self.create_connection().await; + + match conn.execute(query).await { + Ok(result) => { + info!( + message = "SQL query execution successful.", + %query, + rows_affected = %result.rows_affected(), + internal_log_rate_limit = true + ); + } + Err(e) => { + // Handle specific ignorable errors + if self.is_ignorable_error(query, &e) { + warn!( + message = "Ignoring expected error for query.", + %query, + %e, + internal_log_rate_limit = true + ); + return; + } + panic!("SQL query execution failed: {} - {}", query, e); + } + } + // Connection is automatically closed when it goes out of scope + } + + /// Check if an error can be safely ignored + fn is_ignorable_error(&self, query: &str, error: &sqlx::Error) -> bool { + let error_str = error.to_string(); + (query.starts_with("CREATE DATABASE") && error_str.contains("already exists")) + || (query.starts_with("CREATE TABLE") && error_str.contains("already exists")) + } + + /// Execute a query that returns a single row + async fn fetch_one_query(&self, query: &str, operation_name: &str) -> sqlx::mysql::MySqlRow { + info!( + message = "Executing fetch one query.", + %operation_name, + %query, + internal_log_rate_limit = true + ); + + let mut conn = self.create_connection().await; + + // Connection is automatically closed when it goes out of scope + conn.fetch_one(query) + .await + .unwrap_or_else(|e| panic!("{} failed: {} - {}", operation_name, query, e)) + } + + /// Execute a query that returns multiple rows + async fn fetch_all_query( + &self, + query: &str, + operation_name: &str, + ) -> Vec { + let mut conn = self.create_connection().await; + + // Connection is automatically closed when it goes out of scope + conn.fetch_all(query).await.unwrap_or_else(|e| { + warn!( + message = "Query failed.", + %operation_name, + %query, + %e, + internal_log_rate_limit = true + ); + Vec::new() + }) + } + + /// Create database using the common execute pattern + async fn create_database(&self, database: &str) { + let query = format!("CREATE DATABASE IF NOT EXISTS {}", database); + self.execute_query(&query).await; + } + + /// Create table using the common execute pattern + async fn create_table(&self, database: &str, table: &str, schema: &str) { + let query = format!( + "CREATE TABLE IF NOT EXISTS {}.{} ({}) ENGINE=OLAP + DISTRIBUTED BY HASH(`host`) BUCKETS 1 + PROPERTIES(\"replication_num\" = \"1\")", + database, table, schema + ); + self.execute_query(&query).await; + } + + /// Drop table using the common execute pattern + async fn drop_table(&self, database: &str, table: &str) { + let query = format!("DROP TABLE IF EXISTS {}.{}", database, table); + self.execute_query(&query).await; + } + + /// Drop database using the common execute pattern + async fn drop_database(&self, database: &str) { + let query = format!("DROP DATABASE IF EXISTS {}", database); + self.execute_query(&query).await; + } + + /// Count rows using the common fetch_one pattern + async fn count_rows(&self, database: &str, table: &str) -> i64 { + let query = format!("SELECT COUNT(*) FROM {}.{}", database, table); + let row = self.fetch_one_query(&query, "Counting rows").await; + + let count: i64 = row.get(0); + info!( + message = "Count result.", + rows = %count, + internal_log_rate_limit = true + ); + count + } + + /// Get column names using the common fetch_all pattern + async fn get_column_names(&self, database: &str, table: &str) -> Vec { + let query = format!( + "SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA = '{}' AND TABLE_NAME = '{}' ORDER BY ORDINAL_POSITION", + database, table + ); + + let rows = self.fetch_all_query(&query, "Getting column names").await; + rows.iter().map(|row| row.get::(0)).collect() + } + + /// Convert a database row value to DbValue enum + fn extract_db_value(row: &sqlx::mysql::MySqlRow, column_index: usize) -> DbValue { + // Try different types in order of preference + if let Ok(value) = row.try_get::, _>(column_index) { + match value { + Some(s) => DbValue::String(s), + None => DbValue::Null, + } + } else if let Ok(value) = row.try_get::, _>(column_index) { + match value { + Some(n) => DbValue::Integer(n), + None => DbValue::Null, + } + } else if let Ok(value) = row.try_get::, _>(column_index) { + match value { + Some(f) => DbValue::Float(f), + None => DbValue::Null, + } + } else { + DbValue::Null + } + } + + /// Get first row data using the refactored helper methods + async fn get_first_row(&self, database: &str, table: &str) -> HashMap { + let query = format!("SELECT * FROM {}.{} LIMIT 1", database, table); + + // Get column names and row data using helper methods + let columns = self.get_column_names(database, table).await; + let row = self.fetch_one_query(&query, "Getting first row data").await; + + // Build result using the helper method + let mut result = HashMap::new(); + for (i, column) in columns.iter().enumerate() { + let db_value = Self::extract_db_value(&row, i); + result.insert(column.clone(), db_value); + } + + info!( + message = "Getting first row data successful.", + internal_log_rate_limit = true + ); + result + } +} diff --git a/src/sinks/doris/mod.rs b/src/sinks/doris/mod.rs new file mode 100644 index 0000000000000..c3c250f8dfdba --- /dev/null +++ b/src/sinks/doris/mod.rs @@ -0,0 +1,24 @@ +//! The Doris [`vector_lib::sink::VectorSink`] +//! +//! This module contains the [`vector_lib::sink::VectorSink`] instance that is responsible for +//! taking a stream of [`vector_lib::event::Event`] instances and forwarding them to Apache Doris. +//! +//! Events are sent to Doris using the HTTP interface with Stream Load protocol. The event payload +//! is encoded as new-line delimited JSON or other formats specified by the user. +//! +//! This sink only supports logs for now but could support metrics and traces as well in the future. + +mod common; +mod config; +mod health; +mod request_builder; +mod retry; +mod service; +mod sink; + +#[cfg(all(test, feature = "doris-integration-tests"))] +mod integration_test; + +mod client; + +pub use self::config::DorisConfig; diff --git a/src/sinks/doris/request_builder.rs b/src/sinks/doris/request_builder.rs new file mode 100644 index 0000000000000..965960ce8d7c6 --- /dev/null +++ b/src/sinks/doris/request_builder.rs @@ -0,0 +1,58 @@ +//! `RequestBuilder` implementation for the `Doris` sink. + +use super::sink::DorisPartitionKey; +use crate::sinks::{prelude::*, util::http::HttpRequest}; +use bytes::Bytes; +use vector_lib::codecs::encoding::Framer; + +#[derive(Debug, Clone)] +pub struct DorisRequestBuilder { + pub(super) compression: Compression, + pub(super) encoder: (Transformer, Encoder), +} + +impl RequestBuilder<(DorisPartitionKey, Vec)> for DorisRequestBuilder { + type Metadata = (DorisPartitionKey, EventFinalizers); + type Events = Vec; + type Encoder = (Transformer, Encoder); + type Payload = Bytes; + type Request = HttpRequest; + type Error = std::io::Error; + + fn compression(&self) -> Compression { + self.compression + } + + fn encoder(&self) -> &Self::Encoder { + &self.encoder + } + + fn split_input( + &self, + input: (DorisPartitionKey, Vec), + ) -> (Self::Metadata, RequestMetadataBuilder, Self::Events) { + let (key, mut events) = input; + + let finalizers = events.take_finalizers(); + let builder = RequestMetadataBuilder::from_events(&events); + ((key, finalizers), builder, events) + } + + fn build_request( + &self, + metadata: Self::Metadata, + request_metadata: RequestMetadata, + payload: EncodeResult, + ) -> Self::Request { + let (key, finalizers) = metadata; + HttpRequest::new( + payload.into_payload(), + finalizers, + request_metadata, + DorisPartitionKey { + database: key.database, + table: key.table, + }, + ) + } +} diff --git a/src/sinks/doris/retry.rs b/src/sinks/doris/retry.rs new file mode 100644 index 0000000000000..a5a49a9191beb --- /dev/null +++ b/src/sinks/doris/retry.rs @@ -0,0 +1,87 @@ +use serde::Deserialize; +use tracing::{debug, error}; + +use crate::{ + http::HttpError, + sinks::{ + doris::{service::DorisResponse, sink::DorisPartitionKey}, + prelude::*, + util::{http::HttpRequest, retries::RetryAction}, + }, +}; + +/// Internal struct for parsing Doris Stream Load API responses +#[derive(Debug, Deserialize)] +struct DorisStreamLoadResponse { + #[serde(rename = "Status")] + status: String, + + #[serde(rename = "Message")] + message: Option, +} + +#[derive(Debug, Clone)] +pub struct DorisRetryLogic; + +impl RetryLogic for DorisRetryLogic { + type Error = HttpError; + type Request = HttpRequest; + type Response = DorisResponse; + + fn is_retriable_error(&self, _error: &Self::Error) -> bool { + true + } + + fn should_retry_response(&self, response: &Self::Response) -> RetryAction { + let status = response.http_response.status(); + let body = response.http_response.body(); + let body_str = String::from_utf8_lossy(body); + + // Only return success when HTTP status is successful and Doris response status is "Success" + if status.is_success() { + // Try to parse Doris response + if let Ok(doris_resp) = serde_json::from_str::(&body_str) { + if doris_resp.status == "Success" { + debug!(message = "Doris stream load completed successfully."); + return RetryAction::Successful; + } + + // Retry for non-Success status + let message = doris_resp.message.unwrap_or_default(); + error!( + message = "Doris stream load failed, will retry.", + doris_status = %doris_resp.status, + error_message = %message + ); + return RetryAction::Retry( + format!("Doris error: {} - {}", doris_resp.status, message).into(), + ); + } else { + // HTTP success but failed to parse response + // Don't retry to avoid data duplication, but log the response for debugging + error!( + message = "Failed to parse Doris response, not retrying to avoid data duplication.", + status_code = %status, + body = %body_str + ); + return RetryAction::DontRetry("Failed to parse Doris response".into()); + } + } + + // Retry only for server errors (5xx) + if status.is_server_error() { + error!( + message = "Server error encountered, will retry.", + status_code = %status + ); + return RetryAction::Retry(format!("Server error from Doris: {}", status).into()); + } + + // Don't retry for client errors (4xx) and other cases + error!( + message = "Client error encountered, not retrying.", + status_code = %status + ); + RetryAction::DontRetry(format!("Client error from Doris: {}", status).into()) + } +} diff --git a/src/sinks/doris/service.rs b/src/sinks/doris/service.rs new file mode 100644 index 0000000000000..a28a42200f904 --- /dev/null +++ b/src/sinks/doris/service.rs @@ -0,0 +1,143 @@ +use crate::{ + internal_events::{DorisRowsFiltered, DorisRowsLoaded}, + sinks::{ + doris::{ + client::{DorisStreamLoadResponse, StreamLoadStatus, ThreadSafeDorisSinkClient}, + sink::DorisPartitionKey, + }, + prelude::{BoxFuture, DriverResponse, Service}, + util::http::HttpRequest, + }, +}; +use bytes::Bytes; +use http::Response; +use serde_json; +use snafu::Snafu; +use std::task::{Context, Poll}; +use tracing::{info, warn}; +use vector_common::{ + finalization::EventStatus, + request_metadata::{GroupedCountByteSize, MetaDescriptive, RequestMetadata}, +}; + +#[derive(Clone)] +pub struct DorisService { + client: ThreadSafeDorisSinkClient, + log_request: bool, +} + +impl DorisService { + pub const fn new(client: ThreadSafeDorisSinkClient, log_request: bool) -> DorisService { + DorisService { + client, + log_request, + } + } + pub(crate) async fn reporter_run(&self, response: DorisStreamLoadResponse) { + let stream_load_status = response.stream_load_status; + let http_status_code = response.http_status_code; + let response_json = response.response_json; + if self.log_request { + // Format the JSON with proper indentation for better readability + let formatted_json = match serde_json::to_string_pretty(&response_json) { + Ok(pretty_json) => pretty_json, + Err(err) => { + // Log the error but continue with the original format + warn!(message = "Failed to prettify JSON response.", error = %err); + response_json.to_string() + } + }; + + info!( + message = "Doris stream load response received.", + status_code = %http_status_code, + stream_load_status = %stream_load_status, + response = %formatted_json, + ); + } + if http_status_code.is_success() && stream_load_status == StreamLoadStatus::Successful { + // Emit metrics for successfully loaded data + let load_bytes = response_json.get("LoadBytes").and_then(|b| b.as_i64()); + let loaded_rows = response_json + .get("NumberLoadedRows") + .and_then(|r| r.as_i64()); + if let Some(loaded_rows) = loaded_rows + && let Some(load_bytes) = load_bytes + { + emit!(DorisRowsLoaded { + loaded_rows, + load_bytes, + }); + } + + // Emit metrics for filtered rows + if let Some(filtered_rows) = response_json + .get("NumberFilteredRows") + .and_then(|r| r.as_i64()) + && filtered_rows > 0 + { + emit!(DorisRowsFiltered { filtered_rows }); + } + } + } +} +#[derive(Debug, Snafu)] +pub struct DorisResponse { + pub metadata: RequestMetadata, + pub http_response: Response, + pub event_status: EventStatus, +} + +impl DriverResponse for DorisResponse { + fn event_status(&self) -> EventStatus { + self.event_status + } + + fn events_sent(&self) -> &GroupedCountByteSize { + self.metadata.events_estimated_json_encoded_byte_size() + } + + fn bytes_sent(&self) -> Option { + Some(self.metadata.request_encoded_size()) + } +} + +impl Service> for DorisService { + type Response = DorisResponse; + type Error = crate::Error; + type Future = BoxFuture<'static, Result>; + + fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, req: HttpRequest) -> Self::Future { + let service = self.clone(); + + let future = async move { + let mut request = req; + let table = request.get_additional_metadata().table.clone(); + let database = request.get_additional_metadata().database.clone(); + let doris_response = service + .client + .send_stream_load(database, table, request.take_payload()) + .await?; + let report_response = doris_response.clone(); + service.reporter_run(report_response).await; + + let event_status = if doris_response.stream_load_status == StreamLoadStatus::Successful + { + EventStatus::Delivered + } else { + EventStatus::Errored + }; + + Ok(DorisResponse { + metadata: request.get_metadata().clone(), + http_response: doris_response.response, + event_status, + }) + }; + Box::pin(future) + } +} diff --git a/src/sinks/doris/sink.rs b/src/sinks/doris/sink.rs new file mode 100644 index 0000000000000..ac9a77f09742e --- /dev/null +++ b/src/sinks/doris/sink.rs @@ -0,0 +1,115 @@ +use crate::sinks::{ + doris::{DorisConfig, common::DorisCommon, request_builder::DorisRequestBuilder}, + prelude::*, + util::http::HttpRequest, +}; + +pub struct DorisSink { + batch_settings: BatcherSettings, + service: S, + request_builder: DorisRequestBuilder, + database: Template, + table: Template, +} + +impl DorisSink +where + S: Service> + Send + 'static, + S::Future: Send + 'static, + S::Response: DriverResponse + Send + 'static, + S::Error: std::fmt::Debug + Into + Send, +{ + pub fn new(service: S, config: &DorisConfig, common: &DorisCommon) -> crate::Result { + let batch_settings = config.batch.into_batcher_settings()?; + Ok(DorisSink { + batch_settings, + service, + request_builder: common.request_builder.clone(), + database: config.database.clone(), + table: config.table.clone(), + }) + } + + async fn run_inner(self: Box, input: BoxStream<'_, Event>) -> Result<(), ()> { + let batch_settings = self.batch_settings; + let key_partitioner = DorisKeyPartitioner::new(self.database, self.table); + input + .batched_partitioned(key_partitioner, || batch_settings.as_byte_size_config()) + .filter_map(|(key, batch)| async move { key.map(move |k| (k, batch)) }) + .request_builder( + default_request_builder_concurrency_limit(), + self.request_builder, + ) + .filter_map(|request| async { + match request { + Err(error) => { + emit!(SinkRequestBuildError { error }); + None + } + Ok(req) => Some(req), + } + }) + .into_driver(self.service) + .run() + .await + } +} + +#[async_trait::async_trait] +impl StreamSink for DorisSink +where + S: Service> + Send + 'static, + S::Future: Send + 'static, + S::Response: DriverResponse + Send + 'static, + S::Error: std::fmt::Debug + Into + Send, +{ + async fn run( + self: Box, + input: futures_util::stream::BoxStream<'_, Event>, + ) -> Result<(), ()> { + self.run_inner(input).await + } +} + +/// PartitionKey used to partition events by (database, table) pair. +#[derive(Hash, Eq, PartialEq, Clone, Debug)] +pub struct DorisPartitionKey { + pub database: String, + pub table: String, +} + +/// KeyPartitioner that partitions events by (database, table) pair. +struct DorisKeyPartitioner { + database: Template, + table: Template, +} + +impl DorisKeyPartitioner { + const fn new(database: Template, table: Template) -> Self { + Self { database, table } + } + + fn render(template: &Template, item: &Event, field: &'static str) -> Option { + template + .render_string(item) + .map_err(|error| { + emit!(TemplateRenderingError { + error, + field: Some(field), + drop_event: true, + }); + }) + .ok() + } +} + +impl Partitioner for DorisKeyPartitioner { + type Item = Event; + type Key = Option; + + fn partition(&self, item: &Self::Item) -> Self::Key { + let database = Self::render(&self.database, item, "database_key")?; + let table = Self::render(&self.table, item, "table_key")?; + Some(DorisPartitionKey { database, table }) + } +} diff --git a/src/sinks/mod.rs b/src/sinks/mod.rs index b5a45a462566e..75dcd683f46a5 100644 --- a/src/sinks/mod.rs +++ b/src/sinks/mod.rs @@ -45,6 +45,8 @@ pub mod databend; feature = "sinks-datadog_traces" ))] pub mod datadog; +#[cfg(feature = "sinks-doris")] +pub mod doris; #[cfg(feature = "sinks-elasticsearch")] pub mod elasticsearch; #[cfg(feature = "sinks-file")] diff --git a/tests/integration/doris/config/compose.yaml b/tests/integration/doris/config/compose.yaml new file mode 100644 index 0000000000000..5ea3e7ecf3624 --- /dev/null +++ b/tests/integration/doris/config/compose.yaml @@ -0,0 +1,25 @@ +version: "3" +services: + doris: + image: docker.io/yagagagaga/doris-standalone:${CONFIG_VERSION} + container_name: doris + hostname: doris + restart: unless-stopped + privileged: true + ports: + - "8030" + - "9030" + - "8040" + environment: + - SKIP_CHECK_ULIMIT=true + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:8030/api/bootstrap"] + interval: 10s + timeout: 5s + retries: 30 + start_period: 60s + +networks: + default: + name: ${VECTOR_NETWORK} + external: true diff --git a/tests/integration/doris/config/test.yaml b/tests/integration/doris/config/test.yaml new file mode 100644 index 0000000000000..799a369d19a2f --- /dev/null +++ b/tests/integration/doris/config/test.yaml @@ -0,0 +1,19 @@ +features: +- sinks-doris +- doris-integration-tests + +test_filter: 'sinks::doris::integration_test::' + +env: + DORIS_ADDRESS: http://doris:8040 + +matrix: + version: ['2.1.7'] + +# changes to these files/paths will invoke the integration test in CI +# expressions are evaluated using https://github.com/micromatch/picomatch +paths: +- "src/internal_events/doris.rs" +- "src/sinks/doris/**" +- "src/sinks/util/**" +- "tests/integration/doris/**" diff --git a/website/cue/reference/components/sinks/doris.cue b/website/cue/reference/components/sinks/doris.cue new file mode 100644 index 0000000000000..820f6a7d61682 --- /dev/null +++ b/website/cue/reference/components/sinks/doris.cue @@ -0,0 +1,185 @@ +package metadata + +components: sinks: doris: { + title: "Doris" + + classes: { + commonly_used: false + delivery: "at_least_once" + development: "beta" + egress_method: "batch" + service_providers: ["Apache"] + stateful: false + } + + features: { + acknowledgements: true + auto_generated: true + healthcheck: enabled: true + send: { + batch: { + enabled: true + common: false + max_bytes: 10_000_000 + timeout_secs: 1.0 + } + compression: { + enabled: true + default: "none" + algorithms: ["none", "gzip"] + levels: ["none", "fast", "default", "best", 0, 1, 2, 3, 4, 5, 6, 7, 8, 9] + } + encoding: { + enabled: true + codec: enabled: false + } + proxy: enabled: false + request: { + enabled: true + headers: true + } + tls: { + enabled: true + can_verify_certificate: true + can_verify_hostname: true + enabled_default: false + enabled_by_scheme: true + } + to: { + service: services.doris + + interface: { + socket: { + api: { + title: "Doris Stream Load API" + url: urls.doris_stream_load + } + direction: "outgoing" + protocols: ["http"] + ssl: "optional" + } + } + } + } + } + + support: { + requirements: [ + #""" + Doris version 1.0 or higher is required for optimal compatibility. + """#, + ] + warnings: [] + notices: [] + } + + configuration: generated.components.sinks.doris.configuration + + input: { + logs: true + metrics: null + traces: false + } + + how_it_works: { + stream_load: { + title: "Stream Load" + body: """ + Vector uses Doris's [Stream Load](\(urls.doris_stream_load)) API to efficiently + ingest data. Stream Load is Doris's primary method for real-time data ingestion, + providing high throughput and low latency. + + Each batch of events is sent as a single Stream Load request with a unique label + to ensure exactly-once semantics. The label is generated using the configured + `label_prefix` and a timestamp-based suffix. + """ + } + + batching: { + title: "Batching" + body: """ + Vector batches events before sending them to Doris to improve throughput and + reduce the number of Stream Load requests. The batching behavior is controlled + by the `batch` configuration options: + + - `max_events`: Maximum number of events per batch + - `max_bytes`: Maximum size of a batch in bytes + - `timeout_secs`: Maximum time to wait before flushing a partial batch + + When any of these limits is reached, the batch is flushed to Doris. + """ + } + + authentication: { + title: "Authentication" + body: """ + Vector supports HTTP basic authentication for connecting to Doris. The + credentials are configured using the `auth.user` and `auth.password` options. + + The authentication is performed on each Stream Load request to the Doris + Frontend (FE) nodes. + """ + } + + error_handling: { + title: "Error Handling" + body: """ + Vector implements comprehensive error handling for Doris Stream Load operations: + + - **Retries**: Failed requests are automatically retried based on the + `max_retries` configuration. Set to `-1` for unlimited retries. + - **Backoff**: Exponential backoff is used between retry attempts to avoid + overwhelming the Doris cluster. + - **Partial Failures**: If a Stream Load request fails due to data format + issues, Vector logs the error and continues processing subsequent batches. + - **Connection Failures**: Network-level failures trigger automatic failover + to other configured endpoints if available. + """ + } + + load_balancing: { + title: "Load Balancing and Failover" + body: """ + When multiple endpoints are configured, Vector automatically distributes + Stream Load requests across all available Doris Frontend nodes. This provides + both load balancing and high availability: + + - **Round-robin distribution**: Requests are distributed evenly across endpoints + - **Health monitoring**: Unhealthy endpoints are automatically excluded + - **Automatic failover**: If an endpoint becomes unavailable, traffic is + redirected to healthy endpoints + - **Recovery**: Previously failed endpoints are periodically retested and + re-included when they become healthy again + """ + } + + data_format: { + title: "Data Format" + body: """ + Vector sends data to Doris in JSON format by default. Each event is serialized + as a JSON object, and multiple events are sent as newline-delimited JSON (NDJSON). + + The data format can be customized using the `headers` configuration to set + Doris-specific Stream Load parameters. See the `headers` configuration option + for available parameters. + """ + } + + exactly_once: { + title: "Exactly-Once Semantics" + body: """ + Vector ensures exactly-once delivery to Doris through the use of unique labels + for each Stream Load request. Each label is generated using: + + - The configured `label_prefix` + - A timestamp component + - A unique identifier for the batch + + Doris uses these labels to detect and reject duplicate Stream Load requests, + ensuring that data is not duplicated even if Vector retries a request. + + Labels follow the format: `{label_prefix}_{timestamp}_{batch_id}` + """ + } + } +} diff --git a/website/cue/reference/components/sinks/generated/doris.cue b/website/cue/reference/components/sinks/generated/doris.cue new file mode 100644 index 0000000000000..63e420e01f6e9 --- /dev/null +++ b/website/cue/reference/components/sinks/generated/doris.cue @@ -0,0 +1,1153 @@ +package metadata + +generated: components: sinks: doris: configuration: { + acknowledgements: { + description: """ + Controls how acknowledgements are handled for this sink. + + See [End-to-end Acknowledgements][e2e_acks] for more information on how event acknowledgement is handled. + + [e2e_acks]: https://vector.dev/docs/architecture/end-to-end-acknowledgements/ + """ + required: false + type: object: options: enabled: { + description: """ + Controls whether or not end-to-end acknowledgements are enabled. + + When enabled for a sink, any source that supports end-to-end + acknowledgements that is connected to that sink waits for events + to be acknowledged by **all connected sinks** before acknowledging them at the source. + + Enabling or disabling acknowledgements at the sink level takes precedence over any global + [`acknowledgements`][global_acks] configuration. + + [global_acks]: https://vector.dev/docs/reference/configuration/global-options/#acknowledgements + """ + required: false + type: bool: {} + } + } + auth: { + description: """ + Configuration of the authentication strategy for HTTP requests. + + HTTP authentication should be used with HTTPS only, as the authentication credentials are passed as an + HTTP header without any additional encryption beyond what is provided by the transport itself. + """ + required: false + type: object: options: { + auth: { + description: "The AWS authentication configuration." + relevant_when: "strategy = \"aws\"" + required: true + type: object: options: { + access_key_id: { + description: "The AWS access key ID." + required: true + type: string: examples: ["AKIAIOSFODNN7EXAMPLE"] + } + assume_role: { + description: """ + The ARN of an [IAM role][iam_role] to assume. + + [iam_role]: https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles.html + """ + required: true + type: string: examples: ["arn:aws:iam::123456789098:role/my_role"] + } + credentials_file: { + description: "Path to the credentials file." + required: true + type: string: examples: ["/my/aws/credentials"] + } + external_id: { + description: """ + The optional unique external ID in conjunction with role to assume. + + [external_id]: https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles_create_for-user_externalid.html + """ + required: false + type: string: examples: ["randomEXAMPLEidString"] + } + imds: { + description: "Configuration for authenticating with AWS through IMDS." + required: false + type: object: options: { + connect_timeout_seconds: { + description: "Connect timeout for IMDS." + required: false + type: uint: { + default: 1 + unit: "seconds" + } + } + max_attempts: { + description: "Number of IMDS retries for fetching tokens and metadata." + required: false + type: uint: default: 4 + } + read_timeout_seconds: { + description: "Read timeout for IMDS." + required: false + type: uint: { + default: 1 + unit: "seconds" + } + } + } + } + load_timeout_secs: { + description: """ + Timeout for successfully loading any credentials, in seconds. + + Relevant when the default credentials chain or `assume_role` is used. + """ + required: false + type: uint: { + examples: [30] + unit: "seconds" + } + } + profile: { + description: """ + The credentials profile to use. + + Used to select AWS credentials from a provided credentials file. + """ + required: false + type: string: { + default: "default" + examples: ["develop"] + } + } + region: { + description: """ + The [AWS region][aws_region] to send STS requests to. + + If not set, this defaults to the configured region + for the service itself. + + [aws_region]: https://docs.aws.amazon.com/general/latest/gr/rande.html#regional-endpoints + """ + required: false + type: string: examples: ["us-west-2"] + } + secret_access_key: { + description: "The AWS secret access key." + required: true + type: string: examples: ["wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"] + } + session_name: { + description: """ + The optional [RoleSessionName][role_session_name] is a unique session identifier for your assumed role. + + Should be unique per principal or reason. + If not set, the session name is autogenerated like assume-role-provider-1736428351340 + + [role_session_name]: https://docs.aws.amazon.com/STS/latest/APIReference/API_AssumeRole.html + """ + required: false + type: string: examples: ["vector-indexer-role"] + } + session_token: { + description: """ + The AWS session token. + See [AWS temporary credentials](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_temp_use-resources.html) + """ + required: false + type: string: examples: ["AQoDYXdz...AQoDYXdz..."] + } + } + } + password: { + description: "The basic authentication password." + relevant_when: "strategy = \"basic\"" + required: true + type: string: examples: ["${PASSWORD}", "password"] + } + service: { + description: "The AWS service name to use for signing." + relevant_when: "strategy = \"aws\"" + required: true + type: string: {} + } + strategy: { + description: "The authentication strategy to use." + required: true + type: string: enum: { + aws: "AWS authentication." + basic: """ + Basic authentication. + + The username and password are concatenated and encoded using [base64][base64]. + + [base64]: https://en.wikipedia.org/wiki/Base64 + """ + bearer: """ + Bearer authentication. + + The bearer token value (OAuth2, JWT, etc.) is passed as-is. + """ + custom: "Custom Authorization Header Value, will be inserted into the headers as `Authorization: < value >`" + } + } + token: { + description: "The bearer authentication token." + relevant_when: "strategy = \"bearer\"" + required: true + type: string: {} + } + user: { + description: "The basic authentication username." + relevant_when: "strategy = \"basic\"" + required: true + type: string: examples: ["${USERNAME}", "username"] + } + value: { + description: "Custom string value of the Authorization header" + relevant_when: "strategy = \"custom\"" + required: true + type: string: examples: ["${AUTH_HEADER_VALUE}", "CUSTOM_PREFIX ${TOKEN}"] + } + } + } + batch: { + description: "Event batching behavior." + required: false + type: object: options: { + max_bytes: { + description: """ + The maximum size of a batch that is processed by a sink. + + This is based on the uncompressed size of the batched events, before they are + serialized or compressed. + """ + required: false + type: uint: { + default: 10000000 + unit: "bytes" + } + } + max_events: { + description: "The maximum size of a batch before it is flushed." + required: false + type: uint: unit: "events" + } + timeout_secs: { + description: "The maximum age of a batch before it is flushed." + required: false + type: float: { + default: 1.0 + unit: "seconds" + } + } + } + } + compression: { + description: "Compression algorithm to use for HTTP requests." + required: false + type: string: { + default: "none" + enum: { + gzip: """ + [Gzip][gzip] compression. + + [gzip]: https://www.gzip.org/ + """ + none: "No compression." + snappy: """ + [Snappy][snappy] compression. + + [snappy]: https://github.com/google/snappy/blob/main/docs/README.md + """ + zlib: """ + [Zlib][zlib] compression. + + [zlib]: https://zlib.net/ + """ + zstd: """ + [Zstandard][zstd] compression. + + [zstd]: https://facebook.github.io/zstd/ + """ + } + } + } + database: { + description: "The database that contains the table data will be inserted into." + required: true + type: string: { + examples: ["mydatabase"] + syntax: "template" + } + } + distribution: { + description: "Options for determining the health of Doris endpoints." + required: false + type: object: options: { + retry_initial_backoff_secs: { + description: "Initial delay between attempts to reactivate endpoints once they become unhealthy." + required: false + type: uint: { + default: 1 + unit: "seconds" + } + } + retry_max_duration_secs: { + description: "Maximum delay between attempts to reactivate endpoints once they become unhealthy." + required: false + type: uint: { + default: 3600 + unit: "seconds" + } + } + } + } + encoding: { + description: """ + Encoding configuration. + Configures how events are encoded into raw bytes. + The selected encoding also determines which input types (logs, metrics, traces) are supported. + """ + required: true + type: object: options: { + avro: { + description: "Apache Avro-specific encoder options." + relevant_when: "codec = \"avro\"" + required: true + type: object: options: schema: { + description: "The Avro schema." + required: true + type: string: examples: ["{ \"type\": \"record\", \"name\": \"log\", \"fields\": [{ \"name\": \"message\", \"type\": \"string\" }] }"] + } + } + cef: { + description: "The CEF Serializer Options." + relevant_when: "codec = \"cef\"" + required: true + type: object: options: { + device_event_class_id: { + description: """ + Unique identifier for each event type. Identifies the type of event reported. + The value length must be less than or equal to 1023. + """ + required: true + type: string: {} + } + device_product: { + description: """ + Identifies the product of a vendor. + The part of a unique device identifier. No two products can use the same combination of device vendor and device product. + The value length must be less than or equal to 63. + """ + required: true + type: string: {} + } + device_vendor: { + description: """ + Identifies the vendor of the product. + The part of a unique device identifier. No two products can use the same combination of device vendor and device product. + The value length must be less than or equal to 63. + """ + required: true + type: string: {} + } + device_version: { + description: """ + Identifies the version of the problem. The combination of the device product, vendor, and this value make up the unique id of the device that sends messages. + The value length must be less than or equal to 31. + """ + required: true + type: string: {} + } + extensions: { + description: """ + The collection of key-value pairs. Keys are the keys of the extensions, and values are paths that point to the extension values of a log event. + The event can have any number of key-value pairs in any order. + """ + required: false + type: object: options: "*": { + description: "This is a path that points to the extension value of a log event." + required: true + type: string: {} + } + } + name: { + description: """ + This is a path that points to the human-readable description of a log event. + The value length must be less than or equal to 512. + Equals "cef.name" by default. + """ + required: true + type: string: {} + } + severity: { + description: """ + This is a path that points to the field of a log event that reflects importance of the event. + + It must point to a number from 0 to 10. + 0 = lowest_importance, 10 = highest_importance. + Set to "cef.severity" by default. + """ + required: true + type: string: {} + } + version: { + description: """ + CEF Version. Can be either 0 or 1. + Set to "0" by default. + """ + required: true + type: string: enum: { + V0: "CEF specification version 0.1." + V1: "CEF specification version 1.x." + } + } + } + } + codec: { + description: "The codec to use for encoding events." + required: true + type: string: enum: { + avro: """ + Encodes an event as an [Apache Avro][apache_avro] message. + + [apache_avro]: https://avro.apache.org/ + """ + cef: "Encodes an event as a CEF (Common Event Format) formatted message." + csv: """ + Encodes an event as a CSV message. + + This codec must be configured with fields to encode. + """ + gelf: """ + Encodes an event as a [GELF][gelf] message. + + This codec is experimental for the following reason: + + The GELF specification is more strict than the actual Graylog receiver. + Vector's encoder currently adheres more strictly to the GELF spec, with + the exception that some characters such as `@` are allowed in field names. + + Other GELF codecs, such as Loki's, use a [Go SDK][implementation] that is maintained + by Graylog and is much more relaxed than the GELF spec. + + Going forward, Vector will use that [Go SDK][implementation] as the reference implementation, which means + the codec might continue to relax the enforcement of the specification. + + [gelf]: https://docs.graylog.org/docs/gelf + [implementation]: https://github.com/Graylog2/go-gelf/blob/v2/gelf/reader.go + """ + json: """ + Encodes an event as [JSON][json]. + + [json]: https://www.json.org/ + """ + logfmt: """ + Encodes an event as a [logfmt][logfmt] message. + + [logfmt]: https://brandur.org/logfmt + """ + native: """ + Encodes an event in the [native Protocol Buffers format][vector_native_protobuf]. + + This codec is **[experimental][experimental]**. + + [vector_native_protobuf]: https://github.com/vectordotdev/vector/blob/master/lib/vector-core/proto/event.proto + [experimental]: https://vector.dev/highlights/2022-03-31-native-event-codecs + """ + native_json: """ + Encodes an event in the [native JSON format][vector_native_json]. + + This codec is **[experimental][experimental]**. + + [vector_native_json]: https://github.com/vectordotdev/vector/blob/master/lib/codecs/tests/data/native_encoding/schema.cue + [experimental]: https://vector.dev/highlights/2022-03-31-native-event-codecs + """ + otlp: """ + Encodes an event in the [OTLP (OpenTelemetry Protocol)][otlp] format. + + This codec uses protobuf encoding, which is the recommended format for OTLP. + The output is suitable for sending to OTLP-compatible endpoints with + `content-type: application/x-protobuf`. + + [otlp]: https://opentelemetry.io/docs/specs/otlp/ + """ + protobuf: """ + Encodes an event as a [Protobuf][protobuf] message. + + [protobuf]: https://protobuf.dev/ + """ + raw_message: """ + No encoding. + + This encoding uses the `message` field of a log event. + + Be careful if you are modifying your log events (for example, by using a `remap` + transform) and removing the message field while doing additional parsing on it, as this + could lead to the encoding emitting empty strings for the given event. + """ + syslog: """ + Syslog encoding + RFC 3164 and 5424 are supported + """ + text: """ + Plain text encoding. + + This encoding uses the `message` field of a log event. For metrics, it uses an + encoding that resembles the Prometheus export format. + + Be careful if you are modifying your log events (for example, by using a `remap` + transform) and removing the message field while doing additional parsing on it, as this + could lead to the encoding emitting empty strings for the given event. + """ + } + } + csv: { + description: "The CSV Serializer Options." + relevant_when: "codec = \"csv\"" + required: true + type: object: options: { + capacity: { + description: """ + Sets the capacity (in bytes) of the internal buffer used in the CSV writer. + This defaults to 8192 bytes (8KB). + """ + required: false + type: uint: default: 8192 + } + delimiter: { + description: "The field delimiter to use when writing CSV." + required: false + type: ascii_char: default: "," + } + double_quote: { + description: """ + Enables double quote escapes. + + This is enabled by default, but you can disable it. When disabled, quotes in + field data are escaped instead of doubled. + """ + required: false + type: bool: default: true + } + escape: { + description: """ + The escape character to use when writing CSV. + + In some variants of CSV, quotes are escaped using a special escape character + like \\ (instead of escaping quotes by doubling them). + + To use this, `double_quotes` needs to be disabled as well; otherwise, this setting is ignored. + """ + required: false + type: ascii_char: default: "\"" + } + fields: { + description: """ + Configures the fields that are encoded, as well as the order in which they + appear in the output. + + If a field is not present in the event, the output for that field is an empty string. + + Values of type `Array`, `Object`, and `Regex` are not supported, and the + output for any of these types is an empty string. + """ + required: true + type: array: items: type: string: {} + } + quote: { + description: "The quote character to use when writing CSV." + required: false + type: ascii_char: default: "\"" + } + quote_style: { + description: "The quoting style to use when writing CSV data." + required: false + type: string: { + default: "necessary" + enum: { + always: "Always puts quotes around every field." + necessary: """ + Puts quotes around fields only when necessary. + They are necessary when fields contain a quote, delimiter, or record terminator. + Quotes are also necessary when writing an empty record + (which is indistinguishable from a record with one empty field). + """ + never: "Never writes quotes, even if it produces invalid CSV data." + non_numeric: """ + Puts quotes around all fields that are non-numeric. + This means that when writing a field that does not parse as a valid float or integer, + quotes are used even if they aren't strictly necessary. + """ + } + } + } + } + } + except_fields: { + description: "List of fields that are excluded from the encoded event." + required: false + type: array: items: type: string: {} + } + gelf: { + description: "The GELF Serializer Options." + relevant_when: "codec = \"gelf\"" + required: false + type: object: options: max_chunk_size: { + description: """ + Maximum size for each GELF chunked datagram (including 12-byte header). + Chunking starts when datagrams exceed this size. + For Graylog target, keep at or below 8192 bytes; for Vector target (`gelf` decoding with `chunked_gelf` framing), up to 65,500 bytes is recommended. + """ + required: false + type: uint: default: 8192 + } + } + json: { + description: "Options for the JsonSerializer." + relevant_when: "codec = \"json\"" + required: false + type: object: options: pretty: { + description: "Whether to use pretty JSON formatting." + required: false + type: bool: default: false + } + } + metric_tag_values: { + description: """ + Controls how metric tag values are encoded. + + When set to `single`, only the last non-bare value of tags are displayed with the + metric. When set to `full`, all metric tags are exposed as separate assignments. + """ + relevant_when: "codec = \"json\" or codec = \"text\"" + required: false + type: string: { + default: "single" + enum: { + full: "All tags are exposed as arrays of either string or null values." + single: """ + Tag values are exposed as single strings, the same as they were before this config + option. Tags with multiple values show the last assigned value, and null values + are ignored. + """ + } + } + } + only_fields: { + description: "List of fields that are included in the encoded event." + required: false + type: array: items: type: string: {} + } + protobuf: { + description: "Options for the Protobuf serializer." + relevant_when: "codec = \"protobuf\"" + required: true + type: object: options: { + desc_file: { + description: """ + The path to the protobuf descriptor set file. + + This file is the output of `protoc -I -o ` + + You can read more [here](https://buf.build/docs/reference/images/#how-buf-images-work). + """ + required: true + type: string: examples: ["/etc/vector/protobuf_descriptor_set.desc"] + } + message_type: { + description: "The name of the message type to use for serializing." + required: true + type: string: examples: ["package.Message"] + } + use_json_names: { + description: """ + Use JSON field names (camelCase) instead of protobuf field names (snake_case). + + When enabled, the serializer looks for fields using their JSON names as defined + in the `.proto` file (for example `jobDescription` instead of `job_description`). + + This is useful when working with data that has already been converted from JSON or + when interfacing with systems that use JSON naming conventions. + """ + required: false + type: bool: default: false + } + } + } + syslog: { + description: "Options for the Syslog serializer." + relevant_when: "codec = \"syslog\"" + required: false + type: object: options: { + app_name: { + description: """ + Path to a field in the event to use for the app name. + + If not provided, the encoder checks for a semantic "service" field. + If that is also missing, it defaults to "vector". + """ + required: false + type: string: {} + } + facility: { + description: "Path to a field in the event to use for the facility. Defaults to \"user\"." + required: false + type: string: {} + } + msg_id: { + description: "Path to a field in the event to use for the msg ID." + required: false + type: string: {} + } + proc_id: { + description: "Path to a field in the event to use for the proc ID." + required: false + type: string: {} + } + rfc: { + description: "RFC to use for formatting." + required: false + type: string: { + default: "rfc5424" + enum: { + rfc3164: "The legacy RFC3164 syslog format." + rfc5424: "The modern RFC5424 syslog format." + } + } + } + severity: { + description: "Path to a field in the event to use for the severity. Defaults to \"informational\"." + required: false + type: string: {} + } + } + } + timestamp_format: { + description: "Format used for timestamp fields." + required: false + type: string: enum: { + rfc3339: "Represent the timestamp as a RFC 3339 timestamp." + unix: "Represent the timestamp as a Unix timestamp." + unix_float: "Represent the timestamp as a Unix timestamp in floating point." + unix_ms: "Represent the timestamp as a Unix timestamp in milliseconds." + unix_ns: "Represent the timestamp as a Unix timestamp in nanoseconds." + unix_us: "Represent the timestamp as a Unix timestamp in microseconds." + } + } + } + } + endpoints: { + description: """ + A list of Doris endpoints to send logs to. + + The endpoint must contain an HTTP scheme, and may specify a + hostname or IP address and port. + """ + required: false + type: array: { + default: [] + items: type: string: examples: ["http://127.0.0.1:8030"] + } + } + framing: { + description: "Framing configuration." + required: false + type: object: options: { + character_delimited: { + description: "Options for the character delimited encoder." + relevant_when: "method = \"character_delimited\"" + required: true + type: object: options: delimiter: { + description: "The ASCII (7-bit) character that delimits byte sequences." + required: true + type: ascii_char: {} + } + } + length_delimited: { + description: "Options for the length delimited decoder." + relevant_when: "method = \"length_delimited\"" + required: true + type: object: options: { + length_field_is_big_endian: { + description: "Length field byte order (little or big endian)" + required: false + type: bool: default: true + } + length_field_length: { + description: "Number of bytes representing the field length" + required: false + type: uint: default: 4 + } + length_field_offset: { + description: "Number of bytes in the header before the length field" + required: false + type: uint: default: 0 + } + max_frame_length: { + description: "Maximum frame length" + required: false + type: uint: default: 8388608 + } + } + } + max_frame_length: { + description: "Maximum frame length" + relevant_when: "method = \"varint_length_delimited\"" + required: false + type: uint: default: 8388608 + } + method: { + description: "The framing method." + required: true + type: string: enum: { + bytes: "Event data is not delimited at all." + character_delimited: "Event data is delimited by a single ASCII (7-bit) character." + length_delimited: """ + Event data is prefixed with its length in bytes. + + The prefix is a 32-bit unsigned integer, little endian. + """ + newline_delimited: "Event data is delimited by a newline (LF) character." + varint_length_delimited: """ + Event data is prefixed with its length in bytes as a varint. + + This is compatible with protobuf's length-delimited encoding. + """ + } + } + } + } + headers: { + description: """ + Custom HTTP headers to add to the request. + + These headers can be used to set Doris-specific Stream Load parameters: + - `format`: Data format (json, csv.) + - `read_json_by_line`: Whether to read JSON line by line + - `strip_outer_array`: Whether to strip outer array brackets + - Column mappings and transformations + + See [Doris Stream Load documentation](https://doris.apache.org/docs/data-operate/import/import-way/stream-load-manual) + for all available parameters. + """ + required: false + type: object: options: "*": { + description: "An HTTP header value." + required: true + type: string: {} + } + } + label_prefix: { + description: """ + The prefix for Stream Load label. + The final label will be in format: `{label_prefix}_{database}_{table}_{timestamp}_{uuid}`. + """ + required: false + type: string: { + default: "vector" + examples: [ + "vector", + ] + } + } + log_request: { + description: "Enable request logging." + required: false + type: bool: default: false + } + max_retries: { + description: "Number of retries attempted before failing." + required: false + type: int: default: -1 + } + request: { + description: """ + Middleware settings for outbound requests. + + Various settings can be configured, such as concurrency and rate limits, timeouts, and retry behavior. + + Note that the retry backoff policy follows the Fibonacci sequence. + """ + required: false + type: object: options: { + adaptive_concurrency: { + description: """ + Configuration of adaptive concurrency parameters. + + These parameters typically do not require changes from the default, and incorrect values can lead to meta-stable or + unstable performance and sink behavior. Proceed with caution. + """ + required: false + type: object: options: { + decrease_ratio: { + description: """ + The fraction of the current value to set the new concurrency limit when decreasing the limit. + + Valid values are greater than `0` and less than `1`. Smaller values cause the algorithm to scale back rapidly + when latency increases. + + **Note**: The new limit is rounded down after applying this ratio. + """ + required: false + type: float: default: 0.9 + } + ewma_alpha: { + description: """ + The weighting of new measurements compared to older measurements. + + Valid values are greater than `0` and less than `1`. + + ARC uses an exponentially weighted moving average (EWMA) of past RTT measurements as a reference to compare with + the current RTT. Smaller values cause this reference to adjust more slowly, which may be useful if a service has + unusually high response variability. + """ + required: false + type: float: default: 0.4 + } + initial_concurrency: { + description: """ + The initial concurrency limit to use. If not specified, the initial limit is 1 (no concurrency). + + Datadog recommends setting this value to your service's average limit if you're seeing that it takes a + long time to ramp up adaptive concurrency after a restart. You can find this value by looking at the + `adaptive_concurrency_limit` metric. + """ + required: false + type: uint: default: 1 + } + max_concurrency_limit: { + description: """ + The maximum concurrency limit. + + The adaptive request concurrency limit does not go above this bound. This is put in place as a safeguard. + """ + required: false + type: uint: default: 200 + } + rtt_deviation_scale: { + description: """ + Scale of RTT deviations which are not considered anomalous. + + Valid values are greater than or equal to `0`, and reasonable values range from `1.0` to `3.0`. + + When calculating the past RTT average, a secondary “deviation” value is also computed that indicates how variable + those values are. That deviation is used when comparing the past RTT average to the current measurements, so we + can ignore increases in RTT that are within an expected range. This factor is used to scale up the deviation to + an appropriate range. Larger values cause the algorithm to ignore larger increases in the RTT. + """ + required: false + type: float: default: 2.5 + } + } + } + concurrency: { + description: """ + Configuration for outbound request concurrency. + + This can be set either to one of the below enum values or to a positive integer, which denotes + a fixed concurrency limit. + """ + required: false + type: { + string: { + default: "adaptive" + enum: { + adaptive: """ + Concurrency is managed by Vector's [Adaptive Request Concurrency][arc] feature. + + [arc]: https://vector.dev/docs/architecture/arc/ + """ + none: """ + A fixed concurrency of 1. + + Only one request can be outstanding at any given time. + """ + } + } + uint: {} + } + } + rate_limit_duration_secs: { + description: "The time window used for the `rate_limit_num` option." + required: false + type: uint: { + default: 1 + unit: "seconds" + } + } + rate_limit_num: { + description: "The maximum number of requests allowed within the `rate_limit_duration_secs` time window." + required: false + type: uint: { + default: 9223372036854775807 + unit: "requests" + } + } + retry_attempts: { + description: "The maximum number of retries to make for failed requests." + required: false + type: uint: { + default: 9223372036854775807 + unit: "retries" + } + } + retry_initial_backoff_secs: { + description: """ + The amount of time to wait before attempting the first retry for a failed request. + + After the first retry has failed, the Fibonacci sequence is used to select future backoffs. + """ + required: false + type: uint: { + default: 1 + unit: "seconds" + } + } + retry_jitter_mode: { + description: "The jitter mode to use for retry backoff behavior." + required: false + type: string: { + default: "Full" + enum: { + Full: """ + Full jitter. + + The random delay is anywhere from 0 up to the maximum current delay calculated by the backoff + strategy. + + Incorporating full jitter into your backoff strategy can greatly reduce the likelihood + of creating accidental denial of service (DoS) conditions against your own systems when + many clients are recovering from a failure state. + """ + None: "No jitter." + } + } + } + retry_max_duration_secs: { + description: "The maximum amount of time to wait between retries." + required: false + type: uint: { + default: 30 + unit: "seconds" + } + } + timeout_secs: { + description: """ + The time a request can take before being aborted. + + Datadog highly recommends that you do not lower this value below the service's internal timeout, as this could + create orphaned requests, pile on retries, and result in duplicate data downstream. + """ + required: false + type: uint: { + default: 60 + unit: "seconds" + } + } + } + } + table: { + description: "The table data is inserted into." + required: true + type: string: { + examples: ["mytable"] + syntax: "template" + } + } + tls: { + description: "TLS configuration." + required: false + type: object: options: { + alpn_protocols: { + description: """ + Sets the list of supported ALPN protocols. + + Declare the supported ALPN protocols, which are used during negotiation with a peer. They are prioritized in the order + that they are defined. + """ + required: false + type: array: items: type: string: examples: ["h2"] + } + ca_file: { + description: """ + Absolute path to an additional CA certificate file. + + The certificate must be in the DER or PEM (X.509) format. Additionally, the certificate can be provided as an inline string in PEM format. + """ + required: false + type: string: examples: ["/path/to/certificate_authority.crt"] + } + crt_file: { + description: """ + Absolute path to a certificate file used to identify this server. + + The certificate must be in DER, PEM (X.509), or PKCS#12 format. Additionally, the certificate can be provided as + an inline string in PEM format. + + If this is set _and_ is not a PKCS#12 archive, `key_file` must also be set. + """ + required: false + type: string: examples: ["/path/to/host_certificate.crt"] + } + key_file: { + description: """ + Absolute path to a private key file used to identify this server. + + The key must be in DER or PEM (PKCS#8) format. Additionally, the key can be provided as an inline string in PEM format. + """ + required: false + type: string: examples: ["/path/to/host_certificate.key"] + } + key_pass: { + description: """ + Passphrase used to unlock the encrypted key file. + + This has no effect unless `key_file` is set. + """ + required: false + type: string: examples: ["${KEY_PASS_ENV_VAR}", "PassWord1"] + } + server_name: { + description: """ + Server name to use when using Server Name Indication (SNI). + + Only relevant for outgoing connections. + """ + required: false + type: string: examples: ["www.example.com"] + } + verify_certificate: { + description: """ + Enables certificate verification. For components that create a server, this requires that the + client connections have a valid client certificate. For components that initiate requests, + this validates that the upstream has a valid certificate. + + If enabled, certificates must not be expired and must be issued by a trusted + issuer. This verification operates in a hierarchical manner, checking that the leaf certificate (the + certificate presented by the client/server) is not only valid, but that the issuer of that certificate is also valid, and + so on, until the verification process reaches a root certificate. + + Do NOT set this to `false` unless you understand the risks of not verifying the validity of certificates. + """ + required: false + type: bool: {} + } + verify_hostname: { + description: """ + Enables hostname verification. + + If enabled, the hostname used to connect to the remote host must be present in the TLS certificate presented by + the remote host, either as the Common Name or as an entry in the Subject Alternative Name extension. + + Only relevant for outgoing connections. + + Do NOT set this to `false` unless you understand the risks of not verifying the remote hostname. + """ + required: false + type: bool: {} + } + } + } +} diff --git a/website/cue/reference/components/sources/internal_metrics.cue b/website/cue/reference/components/sources/internal_metrics.cue index da94e39ecf645..61cdec4a02e72 100644 --- a/website/cue/reference/components/sources/internal_metrics.cue +++ b/website/cue/reference/components/sources/internal_metrics.cue @@ -235,6 +235,24 @@ components: sources: internal_metrics: { default_namespace: "vector" tags: _component_tags } + doris_bytes_loaded_total: { + description: "The total number of bytes loaded into Doris." + type: "counter" + default_namespace: "vector" + tags: _component_tags + } + doris_rows_filtered_total: { + description: "The total number of rows filtered by Doris during stream load." + type: "counter" + default_namespace: "vector" + tags: _component_tags + } + doris_rows_loaded_total: { + description: "The total number of rows successfully loaded into Doris." + type: "counter" + default_namespace: "vector" + tags: _component_tags + } k8s_format_picker_edge_cases_total: { description: "The total number of edge cases encountered while picking format of the Kubernetes log message." type: "counter" diff --git a/website/cue/reference/services/doris.cue b/website/cue/reference/services/doris.cue new file mode 100644 index 0000000000000..f10639b495462 --- /dev/null +++ b/website/cue/reference/services/doris.cue @@ -0,0 +1,10 @@ +package metadata + +services: doris: { + name: "Apache Doris" + thing: "an \(name) database" + url: urls.doris + versions: null + + description: "[Apache Doris](\(urls.doris)) is a modern MPP (Massively Parallel Processing) analytical database product. It can provide sub-second query response time on large datasets and can support both high-concurrent point queries and high-throughput complex analysis scenarios. Doris is widely used for Online Analytical Processing scenarios including real-time data warehouses, ad-hoc queries, unified data analytics, and log analysis." +} diff --git a/website/cue/reference/urls.cue b/website/cue/reference/urls.cue index 66dee24e23ea3..079d8378e2696 100644 --- a/website/cue/reference/urls.cue +++ b/website/cue/reference/urls.cue @@ -125,6 +125,8 @@ urls: { databend: "https://databend.rs" databend_rest: "https://databend.rs/doc/integrations/api/rest" databend_cloud: "https://www.databend.com" + doris: "https://doris.apache.org" + doris_stream_load: "https://doris.apache.org/docs/data-operate/import/import-way/stream-load-manual" datadog: "https://www.datadoghq.com" datadog_agent: "https://docs.datadoghq.com/agent/" datadog_agent_vector_config_template: "https://github.com/DataDog/datadog-agent/blob/7.64.1/pkg/config/config_template.yaml#L594-L650"