From 16b633f4bf14c9010481f4790db94bb1801c1262 Mon Sep 17 00:00:00 2001 From: anant Date: Tue, 7 Jan 2025 17:50:03 +0530 Subject: [PATCH 1/2] Build fix for Kafka --- Cargo.toml | 5 ++++- src/cli.rs | 30 +++++++++++++++++------------ src/handlers/http/health_check.rs | 2 +- src/lib.rs | 1 + src/main.rs | 6 +++++- src/query/stream_schema_provider.rs | 4 +++- 6 files changed, 32 insertions(+), 16 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index d054735a8..8cc07430f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -67,7 +67,6 @@ once_cell = "1.17.1" opentelemetry-proto = {git = "https://github.com/parseablehq/opentelemetry-rust", branch="fix-metrics-u64-serialization"} prometheus = { version = "0.13", features = ["process"] } rand = "0.8.5" -rdkafka = { version = "0.36.2", default-features = false, features = ["tokio"] } regex = "1.7.3" relative-path = { version = "1.7", features = ["serde"] } reqwest = { version = "0.11.27", default-features = false, features = [ @@ -134,3 +133,7 @@ debug = [] inherits = "release" lto = "fat" codegen-units = 1 + +# adding rdkafka here because, for unsupported platforms, cargo skips other deps which come after this +[target.'cfg(all(target_os = "linux", target_arch = "x86_64"))'.dependencies] +rdkafka = { version = "0.36.2", default-features = false, features = ["tokio"] } \ No newline at end of file diff --git a/src/cli.rs b/src/cli.rs index aab90eb1e..423339516 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -22,11 +22,16 @@ use std::path::PathBuf; use url::Url; use crate::{ - kafka::SslProtocol, oidc::{self, OpenidConfig}, option::{validation, Compression, Mode}, }; +#[cfg(all(target_os = "linux", target_arch = "x86_64"))] +use crate::kafka::SslProtocol as KafkaSslProtocol; + +#[cfg(not(all(target_os = "linux", target_arch = "x86_64")))] +use std::string::String as KafkaSslProtocol; + #[derive(Debug, Default)] pub struct Cli { /// The location of TLS Cert file @@ -107,7 +112,7 @@ pub struct Cli { pub kafka_host: Option, pub kafka_group: Option, pub kafka_client_id: Option, - pub kafka_security_protocol: Option, + pub kafka_security_protocol: Option, pub kafka_partitions: Option, // Audit Logging env vars @@ -502,16 +507,17 @@ impl FromArgMatches for Cli { } fn update_from_arg_matches(&mut self, m: &clap::ArgMatches) -> Result<(), clap::Error> { - self.kafka_topics = m.get_one::(Self::KAFKA_TOPICS).cloned(); - self.kafka_security_protocol = m - .get_one::(Self::KAFKA_SECURITY_PROTOCOL) - .cloned(); - self.kafka_group = m.get_one::(Self::KAFKA_GROUP).cloned(); - self.kafka_client_id = m.get_one::(Self::KAFKA_CLIENT_ID).cloned(); - self.kafka_security_protocol = m - .get_one::(Self::KAFKA_SECURITY_PROTOCOL) - .cloned(); - self.kafka_partitions = m.get_one::(Self::KAFKA_PARTITIONS).cloned(); + #[cfg(all(target_os = "linux", target_arch = "x86_64"))] + { + self.kafka_topics = m.get_one::(Self::KAFKA_TOPICS).cloned(); + self.kafka_security_protocol = m + .get_one::(Self::KAFKA_SECURITY_PROTOCOL) + .cloned(); + self.kafka_group = m.get_one::(Self::KAFKA_GROUP).cloned(); + self.kafka_client_id = m.get_one::(Self::KAFKA_CLIENT_ID).cloned(); + self.kafka_host = m.get_one::(Self::KAFKA_HOST).cloned(); + self.kafka_partitions = m.get_one::(Self::KAFKA_PARTITIONS).cloned(); + } self.audit_logger = m.get_one::(Self::AUDIT_LOGGER).cloned(); self.audit_username = m.get_one::(Self::AUDIT_USERNAME).cloned(); diff --git a/src/handlers/http/health_check.rs b/src/handlers/http/health_check.rs index 1b97df121..85156a77c 100644 --- a/src/handlers/http/health_check.rs +++ b/src/handlers/http/health_check.rs @@ -58,7 +58,7 @@ pub async fn handle_signals(shutdown_signal: Arc { - log::info!("Received SIGINT signal at Readiness Probe Handler"); + info!("Received SIGINT signal at Readiness Probe Handler"); shutdown(shutdown_signal).await; } } diff --git a/src/lib.rs b/src/lib.rs index cd703c4dd..e6300ca2e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -27,6 +27,7 @@ pub mod correlation; mod event; pub mod handlers; pub mod hottier; +#[cfg(all(target_os = "linux", target_arch = "x86_64"))] pub mod kafka; mod livetail; mod metadata; diff --git a/src/main.rs b/src/main.rs index d1663d539..798372339 100644 --- a/src/main.rs +++ b/src/main.rs @@ -17,12 +17,15 @@ */ use parseable::{ - banner, kafka, + banner, option::{Mode, CONFIG}, rbac, storage, IngestServer, ParseableServer, QueryServer, Server, }; use tracing_subscriber::EnvFilter; +#[cfg(all(target_os = "linux", target_arch = "x86_64"))] +use parseable::kafka; + #[actix_web::main] async fn main() -> anyhow::Result<()> { tracing_subscriber::fmt() @@ -46,6 +49,7 @@ async fn main() -> anyhow::Result<()> { // keep metadata info in mem metadata.set_global(); + #[cfg(all(target_os = "linux", target_arch = "x86_64"))] // load kafka server if CONFIG.parseable.mode != Mode::Query { tokio::task::spawn(kafka::setup_integration()); diff --git a/src/query/stream_schema_provider.rs b/src/query/stream_schema_provider.rs index 662ad0b99..7b83b9099 100644 --- a/src/query/stream_schema_provider.rs +++ b/src/query/stream_schema_provider.rs @@ -301,7 +301,9 @@ impl StandardTableProvider { #[cfg(windows)] { if CONFIG.storage_name.eq("drive") { - file_path = object_store::path::Path::from_absolute_path(file_path).unwrap(); + file_path = object_store::path::Path::from_absolute_path(file_path) + .unwrap() + .to_string(); } } let pf = PartitionedFile::new(file_path, file.file_size); From 4bb829c9e5fe7ccf7d487152f11ca9cb9596e944 Mon Sep 17 00:00:00 2001 From: parmesant Date: Wed, 8 Jan 2025 13:06:09 +0530 Subject: [PATCH 2/2] Update Cargo.toml Co-authored-by: Devdutt Shenoi Signed-off-by: parmesant --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index c397d83d3..879fe2572 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -138,4 +138,4 @@ codegen-units = 1 # adding rdkafka here because, for unsupported platforms, cargo skips other deps which come after this [target.'cfg(all(target_os = "linux", target_arch = "x86_64"))'.dependencies] -rdkafka = { version = "0.36.2", default-features = false, features = ["tokio"] } \ No newline at end of file +rdkafka = { version = "0.36.2", default-features = false, features = ["tokio"] }