) -> anyhow::Result<()> {
+ let len = records.len();
+ debug!("Processing {} records", len);
+
+ self.build_event_from_chunk(&records)
+ .await?
+ .process()
+ .await?;
+
+ debug!("Processed {} records", len);
+ Ok(())
+ }
+}
+
+#[derive(Clone)]
+pub struct StreamWorker
+where
+ P: Processor, ()>,
+{
+ processor: Arc,
+ consumer: Arc,
+ buffer_config: BufferConfig,
+}
+
+impl StreamWorker
+where
+ P: Processor, ()> + Send + Sync + 'static,
+{
+ pub fn new(processor: Arc, consumer: Arc) -> Self {
+ let buffer_config = consumer
+ .context()
+ .config()
+ .consumer()
+ .expect("Consumer config is missing")
+ .buffer_config();
+
+ Self {
+ processor,
+ consumer,
+ buffer_config,
+ }
+ }
+
+ pub async fn process_partition(
+ &self,
+ tp: TopicPartition,
+ record_stream: ReceiverStream,
+ ) -> anyhow::Result<()> {
+ let chunked_stream = tokio_stream::StreamExt::chunks_timeout(
+ record_stream,
+ self.buffer_config.buffer_size,
+ self.buffer_config.buffer_timeout,
+ );
+
+ chunked_stream
+ .for_each_concurrent(None, |records| async {
+ if let Some(last_record) = records.iter().max_by_key(|r| r.offset) {
+ let tpl = last_record.offset_to_commit().unwrap();
+
+ if let Err(e) = self.processor.process(records).await {
+ error!("Failed to process records for {:?}: {:?}", tp, e);
+ }
+
+ //CommitMode::Async race condition.
+ //@see https://github.com/confluentinc/librdkafka/issues/4534
+ //@see https://github.com/confluentinc/librdkafka/issues/4059
+ if let Err(e) = self.consumer.commit(&tpl, CommitMode::Sync) {
+ error!(error = %e, "Failed to commit offsets for {:?}", tpl);
+ } else {
+ debug!("Committed offsets for {:?}", tpl);
+ }
+ }
+ })
+ .await;
+
+ self.processor.post_stream().await?;
+
+ Ok(())
+ }
+}
diff --git a/src/connectors/kafka/rebalance_listener.rs b/src/connectors/kafka/rebalance_listener.rs
new file mode 100644
index 000000000..8372e1b5b
--- /dev/null
+++ b/src/connectors/kafka/rebalance_listener.rs
@@ -0,0 +1,87 @@
+/*
+ * Parseable Server (C) 2022 - 2024 Parseable, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ *
+ */
+
+use crate::connectors::common::shutdown::Shutdown;
+use crate::connectors::kafka::state::StreamState;
+use crate::connectors::kafka::RebalanceEvent;
+use std::sync::Arc;
+use tokio::sync::RwLock;
+use tokio::{runtime::Handle, sync::mpsc::Receiver};
+use tracing::{info, warn};
+
+pub struct RebalanceListener {
+ rebalance_rx: Receiver,
+ stream_state: Arc>,
+ shutdown_handle: Shutdown,
+}
+
+impl RebalanceListener {
+ pub fn new(
+ rebalance_rx: Receiver,
+ stream_state: Arc>,
+ shutdown_handle: Shutdown,
+ ) -> Self {
+ Self {
+ rebalance_rx,
+ stream_state,
+ shutdown_handle,
+ }
+ }
+
+ pub fn start(self) {
+ let mut rebalance_receiver = self.rebalance_rx;
+ let stream_state = self.stream_state.clone();
+ let shutdown_handle = self.shutdown_handle.clone();
+ let tokio_runtime_handle = Handle::current();
+
+ std::thread::Builder::new().name("rebalance-listener-thread".to_string()).spawn(move || {
+ tokio_runtime_handle.block_on(async move {
+ loop {
+ tokio::select! {
+ rebalance = rebalance_receiver.recv() => {
+ match rebalance {
+ Some(RebalanceEvent::Assign(tpl)) => info!("RebalanceEvent Assign: {:?}", tpl),
+ Some(RebalanceEvent::Revoke(tpl, callback)) => {
+ info!("RebalanceEvent Revoke: {:?}", tpl);
+ if let Ok(mut stream_state) = stream_state.try_write() {
+ stream_state.terminate_partition_streams(tpl).await;
+ drop(stream_state);
+ } else {
+ warn!("Stream state lock is busy, skipping rebalance revoke for {:?}", tpl);
+ }
+ if let Err(err) = callback.send(()) {
+ warn!("Error during sending response to context. Cause: {:?}", err);
+ }
+ info!("Finished Rebalance Revoke");
+ }
+ None => {
+ info!("Rebalance event sender is closed!");
+ break
+ }
+ }
+ },
+ _ = shutdown_handle.recv() => {
+ info!("Gracefully stopping rebalance listener!");
+ break;
+ },
+ }
+ }
+ })
+ }).expect("Failed to start rebalance listener thread");
+ }
+}
diff --git a/src/connectors/kafka/sink.rs b/src/connectors/kafka/sink.rs
new file mode 100644
index 000000000..447955686
--- /dev/null
+++ b/src/connectors/kafka/sink.rs
@@ -0,0 +1,94 @@
+/*
+ * Parseable Server (C) 2022 - 2024 Parseable, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ *
+ */
+use crate::connectors::common::build_runtime;
+use crate::connectors::common::processor::Processor;
+use crate::connectors::kafka::consumer::KafkaStreams;
+use crate::connectors::kafka::processor::StreamWorker;
+use crate::connectors::kafka::ConsumerRecord;
+use anyhow::Result;
+use futures_util::StreamExt;
+use rdkafka::consumer::Consumer;
+use std::sync::Arc;
+use tokio::runtime::Runtime;
+use tracing::{error, info};
+
+pub struct KafkaSinkConnector
+where
+ P: Processor, ()>,
+{
+ streams: KafkaStreams,
+ stream_processor: Arc>,
+ runtime: Runtime,
+}
+
+impl KafkaSinkConnector
+where
+ P: Processor, ()> + Send + Sync + 'static,
+{
+ pub fn new(kafka_streams: KafkaStreams, processor: P) -> Self {
+ let consumer = kafka_streams.consumer();
+ let stream_processor = Arc::new(StreamWorker::new(
+ Arc::new(processor),
+ Arc::clone(&consumer),
+ ));
+
+ let runtime = build_runtime(
+ consumer.context().config.partition_listener_concurrency,
+ "kafka-sink-worker",
+ )
+ .expect("Failed to build runtime");
+ let _ = runtime.enter();
+
+ Self {
+ streams: kafka_streams,
+ stream_processor,
+ runtime,
+ }
+ }
+
+ pub async fn run(self) -> Result<()> {
+ self.streams
+ .partitioned()
+ .map(|partition_stream| {
+ let worker = Arc::clone(&self.stream_processor);
+ let tp = partition_stream.topic_partition().clone();
+ self.runtime.spawn(async move {
+ partition_stream
+ .run_drain(|partition_records| async {
+ info!("Starting task for partition: {:?}", tp);
+
+ worker
+ .process_partition(tp.clone(), partition_records)
+ .await
+ .unwrap();
+ })
+ .await;
+
+ info!("Task completed for partition: {:?}", tp);
+ })
+ })
+ .for_each_concurrent(None, |task| async {
+ if let Err(e) = task.await {
+ error!("Task failed: {:?}", e);
+ }
+ })
+ .await;
+
+ Ok(())
+ }
+}
diff --git a/src/connectors/kafka/state.rs b/src/connectors/kafka/state.rs
new file mode 100644
index 000000000..7b20ae762
--- /dev/null
+++ b/src/connectors/kafka/state.rs
@@ -0,0 +1,68 @@
+/*
+ * Parseable Server (C) 2022 - 2024 Parseable, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ *
+ */
+
+use crate::connectors::kafka::partition_stream::PartitionStreamSender;
+use crate::connectors::kafka::{TopicPartition, TopicPartitionList};
+use std::collections::HashMap;
+use tracing::info;
+
+pub struct StreamState {
+ partition_senders: HashMap,
+}
+
+impl StreamState {
+ pub fn new(capacity: usize) -> Self {
+ Self {
+ partition_senders: HashMap::with_capacity(capacity),
+ }
+ }
+
+ pub fn insert_partition_sender(
+ &mut self,
+ tp: TopicPartition,
+ sender: PartitionStreamSender,
+ ) -> Option {
+ self.partition_senders.insert(tp, sender)
+ }
+
+ pub fn get_partition_sender(&self, tp: &TopicPartition) -> Option<&PartitionStreamSender> {
+ self.partition_senders.get(tp)
+ }
+
+ pub async fn terminate_partition_streams(&mut self, tpl: TopicPartitionList) {
+ info!("Terminating streams: {:?}", tpl);
+
+ for tp in tpl.tpl {
+ if let Some(sender) = self.partition_senders.remove(&tp) {
+ info!("Terminating stream for {:?}", tp);
+ sender.terminate();
+ drop(sender);
+ info!("Stream terminated for {:?}", tp);
+ } else {
+ info!("Stream already completed for {:?}", tp);
+ }
+ }
+
+ info!("All streams terminated!");
+ }
+
+ pub fn clear(&mut self) {
+ info!("Clearing all stream states...");
+ self.partition_senders.clear();
+ }
+}
diff --git a/src/connectors/mod.rs b/src/connectors/mod.rs
new file mode 100644
index 000000000..757c49a97
--- /dev/null
+++ b/src/connectors/mod.rs
@@ -0,0 +1,100 @@
+/*
+ * Parseable Server (C) 2022 - 2024 Parseable, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ *
+ */
+
+use crate::connectors::common::processor::Processor;
+use crate::connectors::common::shutdown::Shutdown;
+use crate::connectors::kafka::config::KafkaConfig;
+use crate::connectors::kafka::consumer::KafkaStreams;
+use crate::connectors::kafka::metrics::KafkaMetricsCollector;
+use crate::connectors::kafka::processor::ParseableSinkProcessor;
+use crate::connectors::kafka::rebalance_listener::RebalanceListener;
+use crate::connectors::kafka::sink::KafkaSinkConnector;
+use crate::connectors::kafka::state::StreamState;
+use crate::connectors::kafka::{ConsumerRecord, KafkaContext};
+use crate::option::{Mode, CONFIG};
+use actix_web_prometheus::PrometheusMetrics;
+use prometheus::Registry;
+use std::sync::Arc;
+use tokio::sync::RwLock;
+use tracing::{info, warn};
+
+pub mod common;
+pub mod kafka;
+
+pub async fn init(prometheus: &PrometheusMetrics) -> anyhow::Result<()> {
+ if matches!(CONFIG.options.mode, Mode::Ingest | Mode::All) {
+ match CONFIG.kafka_config.validate() {
+ Err(e) => {
+ warn!("Kafka connector configuration invalid. {}", e);
+ }
+ Ok(_) => {
+ let config = CONFIG.kafka_config.clone();
+ let shutdown_handle = Shutdown::default();
+ let registry = prometheus.registry.clone();
+ let processor = ParseableSinkProcessor;
+
+ tokio::spawn({
+ let shutdown_handle = shutdown_handle.clone();
+ async move {
+ shutdown_handle.signal_listener().await;
+ info!("Connector received shutdown signal!");
+ }
+ });
+
+ run_kafka2parseable(config, registry, processor, shutdown_handle).await?;
+ }
+ }
+ }
+
+ Ok(())
+}
+
+async fn run_kafka2parseable(
+ config: KafkaConfig,
+ registry: Registry,
+ processor: P,
+ shutdown_handle: Shutdown,
+) -> anyhow::Result<()>
+where
+ P: Processor, ()> + Send + Sync + 'static,
+{
+ info!("Initializing KafkaSink connector...");
+
+ let kafka_config = Arc::new(config.clone());
+ let (kafka_context, rebalance_rx) = KafkaContext::new(kafka_config);
+
+ //TODO: fetch topics metadata from kafka then give dynamic value to StreamState
+ let stream_state = Arc::new(RwLock::new(StreamState::new(60)));
+ let rebalance_listener = RebalanceListener::new(
+ rebalance_rx,
+ Arc::clone(&stream_state),
+ shutdown_handle.clone(),
+ );
+
+ let kafka_streams = KafkaStreams::init(kafka_context, stream_state, shutdown_handle.clone())?;
+
+ let stats = kafka_streams.statistics();
+ registry.register(Box::new(KafkaMetricsCollector::new(stats)?))?;
+
+ let kafka_parseable_sink_connector = KafkaSinkConnector::new(kafka_streams, processor);
+
+ rebalance_listener.start();
+ kafka_parseable_sink_connector.run().await?;
+
+ Ok(())
+}
diff --git a/src/handlers/http/modal/ingest_server.rs b/src/handlers/http/modal/ingest_server.rs
index bafb58080..4cdfb6428 100644
--- a/src/handlers/http/modal/ingest_server.rs
+++ b/src/handlers/http/modal/ingest_server.rs
@@ -31,7 +31,6 @@ use crate::handlers::http::logstream;
use crate::handlers::http::middleware::DisAllowRootUser;
use crate::handlers::http::middleware::RouteExt;
use crate::handlers::http::role;
-use crate::metrics;
use crate::migration;
use crate::migration::metadata_migration::migrate_ingester_metadata;
use crate::rbac::role::Action;
@@ -44,9 +43,10 @@ use crate::sync;
use crate::utils::get_ingestor_id;
use crate::utils::get_url;
use crate::{handlers::http::base_path, option::CONFIG};
+
use actix_web::web;
-use actix_web::web::resource;
use actix_web::Scope;
+use actix_web_prometheus::PrometheusMetrics;
use async_trait::async_trait;
use base64::Engine;
use bytes::Bytes;
@@ -177,8 +177,8 @@ impl ParseableServer for IngestServer {
// parseable can't use local storage for persistence when running a distributed setup
if CONFIG.get_storage_mode_string() == "Local drive" {
return Err(anyhow::Error::msg(
- "This instance of the Parseable server has been configured to run in a distributed setup, it doesn't support local storage.",
- ));
+ "This instance of the Parseable server has been configured to run in a distributed setup, it doesn't support local storage.",
+ ));
}
// check for querier state. Is it there, or was it there in the past
@@ -190,9 +190,12 @@ impl ParseableServer for IngestServer {
}
/// configure the server and start an instance to ingest data
- async fn init(&self, shutdown_rx: oneshot::Receiver<()>) -> anyhow::Result<()> {
- let prometheus = metrics::build_metrics_handler();
- CONFIG.storage().register_store_metrics(&prometheus);
+ async fn init(
+ &self,
+ prometheus: &PrometheusMetrics,
+ shutdown_rx: oneshot::Receiver<()>,
+ ) -> anyhow::Result<()> {
+ CONFIG.storage().register_store_metrics(prometheus);
migration::run_migration(&CONFIG).await?;
@@ -207,7 +210,7 @@ impl ParseableServer for IngestServer {
set_ingestor_metadata().await?;
// Ingestors shouldn't have to deal with OpenId auth flow
- let app = self.start(shutdown_rx, prometheus, None);
+ let app = self.start(shutdown_rx, prometheus.clone(), None);
tokio::pin!(app);
loop {
@@ -237,7 +240,7 @@ impl ParseableServer for IngestServer {
(remote_sync_handler, remote_sync_outbox, remote_sync_inbox) = sync::object_store_sync().await;
}
- };
+ }
}
}
}
@@ -258,21 +261,21 @@ impl IngestServer {
pub fn get_user_role_webscope() -> Scope {
web::scope("/role")
// GET Role List
- .service(resource("").route(web::get().to(role::list).authorize(Action::ListRole)))
+ .service(web::resource("").route(web::get().to(role::list).authorize(Action::ListRole)))
.service(
// PUT and GET Default Role
- resource("/default")
+ web::resource("/default")
.route(web::put().to(role::put_default).authorize(Action::PutRole))
.route(web::get().to(role::get_default).authorize(Action::GetRole)),
)
.service(
// PUT, GET, DELETE Roles
- resource("/{name}")
+ web::resource("/{name}")
.route(web::delete().to(role::delete).authorize(Action::DeleteRole))
.route(web::get().to(role::get).authorize(Action::GetRole)),
)
.service(
- resource("/{name}/sync")
+ web::resource("/{name}/sync")
.route(web::put().to(ingestor_role::put).authorize(Action::PutRole)),
)
}
diff --git a/src/handlers/http/modal/mod.rs b/src/handlers/http/modal/mod.rs
index d36372b2b..f791d16b2 100644
--- a/src/handlers/http/modal/mod.rs
+++ b/src/handlers/http/modal/mod.rs
@@ -68,7 +68,11 @@ pub trait ParseableServer {
async fn load_metadata(&self) -> anyhow::Result