Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions components/log-ingestor/src/compression/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,14 @@ impl<Submitter: BufferSubmitter> Buffer<Submitter> {
/// Returns an error if:
///
/// * Forwards [`Self::submit`]'s return values on failure.
pub async fn add(&mut self, object_metadata: ObjectMetadata) -> Result<()> {
self.total_size += object_metadata.size;
self.buf.push(object_metadata);
pub async fn add(&mut self, object_metadata_to_ingest: Vec<ObjectMetadata>) -> Result<()> {
for object_metadata in object_metadata_to_ingest {
self.total_size += object_metadata.size;
self.buf.push(object_metadata);

if self.total_size >= self.size_threshold {
self.submit().await?;
if self.total_size >= self.size_threshold {
self.submit().await?;
}
}

Ok(())
Expand Down
17 changes: 7 additions & 10 deletions components/log-ingestor/src/compression/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use crate::compression::{Buffer, BufferSubmitter};
struct ListenerTask<Submitter: BufferSubmitter> {
buffer: Buffer<Submitter>,
timeout: Duration,
receiver: mpsc::Receiver<ObjectMetadata>,
receiver: mpsc::Receiver<Vec<ObjectMetadata>>,
}

impl<Submitter: BufferSubmitter + Send + 'static> ListenerTask<Submitter> {
Expand Down Expand Up @@ -59,12 +59,8 @@ impl<Submitter: BufferSubmitter + Send + 'static> ListenerTask<Submitter> {
anyhow::anyhow!("Listener channel has been closed unexpectedly")
);
}
Some(object_metadata) => {
tracing::debug!(
object = ? object_metadata,
"Received new object metadata."
);
self.buffer.add(object_metadata).await?;
Some(object_metadata_to_ingest) => {
self.buffer.add(object_metadata_to_ingest).await?;
}
}
}
Expand All @@ -83,7 +79,7 @@ impl<Submitter: BufferSubmitter + Send + 'static> ListenerTask<Submitter> {
/// Represents a listener that accepts S3 object metadata from multiple senders and buffers them
/// for submission.
pub struct Listener {
sender: mpsc::Sender<ObjectMetadata>,
sender: mpsc::Sender<Vec<ObjectMetadata>>,
cancel_token: CancellationToken,
handle: tokio::task::JoinHandle<Result<()>>,
}
Expand Down Expand Up @@ -146,14 +142,15 @@ impl Listener {
}

/// # Returns
/// A new `mpsc::Sender<ObjectMetadata>` that can be used to send metadata to this listener.
/// A new `mpsc::Sender<Vec<ObjectMetadata>>` that can be used to send metadata to this
/// listener.
///
/// The returned sender is a cheap clone of the listener's internal channel sender. It can be
/// freely cloned and moved to other tasks; multiple senders may concurrently send to the same
/// listener. Messages sent by a single sender preserve order; messages from different senders
/// are interleaved in the order they are received by the runtime.
#[must_use]
pub fn get_new_sender(&self) -> mpsc::Sender<ObjectMetadata> {
pub fn get_new_sender(&self) -> mpsc::Sender<Vec<ObjectMetadata>> {
self.sender.clone()
}
}
27 changes: 19 additions & 8 deletions components/log-ingestor/src/ingestion_job.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,24 @@
mod s3_scanner;
mod sqs_listener;
mod state;

use anyhow::Result;
pub use s3_scanner::*;
pub use sqs_listener::*;
pub use state::*;

/// Enum for different types of ingestion jobs.
pub enum IngestionJob {
S3Scanner(S3Scanner),
SqsListener(SqsListener),
///
/// # Type Parameters:
///
/// * [`State`]: A type that implements all required ingestion job state traits for state
/// management.
pub enum IngestionJob<State: IngestionJobState + S3ScannerState + SqsListenerState> {
S3Scanner(S3Scanner<State>),
SqsListener(SqsListener<State>),
}

impl IngestionJob {
impl<State: IngestionJobState + S3ScannerState + SqsListenerState> IngestionJob<State> {
/// Shuts down and waits for the job to complete.
///
/// # Returns
Expand Down Expand Up @@ -45,14 +52,18 @@ impl IngestionJob {
}
}

impl From<S3Scanner> for IngestionJob {
fn from(scanner: S3Scanner) -> Self {
impl<State: IngestionJobState + S3ScannerState + SqsListenerState> From<S3Scanner<State>>
for IngestionJob<State>
{
fn from(scanner: S3Scanner<State>) -> Self {
Self::S3Scanner(scanner)
}
}

impl From<SqsListener> for IngestionJob {
fn from(listener: SqsListener) -> Self {
impl<State: IngestionJobState + S3ScannerState + SqsListenerState> From<SqsListener<State>>
for IngestionJob<State>
{
fn from(listener: SqsListener<State>) -> Self {
Self::SqsListener(listener)
}
}
48 changes: 37 additions & 11 deletions components/log-ingestor/src/ingestion_job/s3_scanner.rs
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably need update the doc strings. How about

diff --git a/components/log-ingestor/src/ingestion_job/s3_scanner.rs b/components/log-ingestor/src/ingestion_job/s3_scanner.rs
index 71f0a418..65e4deb0 100644
--- a/components/log-ingestor/src/ingestion_job/s3_scanner.rs
+++ b/components/log-ingestor/src/ingestion_job/s3_scanner.rs
@@ -73,8 +73,8 @@ impl<S3ClientManager: AwsClientManagerType<Client>, State: S3ScannerState>
         }
     }
 
-    /// Scans the configured S3 bucket and prefix for new objects and sends their metadata to the
-    /// underlying channel sender.
+    /// Scans the configured S3 bucket and prefix for new objects and ingests their metadata through
+    /// the underlying state.
     ///
     /// # Note
     ///
@@ -168,7 +168,7 @@ impl<State: S3ScannerState> S3Scanner<State> {
     /// Creates and spawns a new [`S3Scanner`] backed by a [`Task`].
     ///
     /// This function spawns a [`Task`]. The spawned task will periodically scan the configured S3
-    /// bucket and prefix for new objects and send their metadata to the provided channel sender.
+    /// bucket and prefix for new objects and ingest their metadata through the provided state.
     ///
     /// # Type parameters
     ///
diff --git a/components/log-ingestor/src/ingestion_job/sqs_listener.rs b/components/log-ingestor/src/ingestion_job/sqs_listener.rs
index a3a8314b..015d4fb1 100644
--- a/components/log-ingestor/src/ingestion_job/sqs_listener.rs
+++ b/components/log-ingestor/src/ingestion_job/sqs_listener.rs
@@ -37,7 +37,7 @@ impl<SqsClientManager: AwsClientManagerType<Client>, State: SqsListenerState>
     Task<SqsClientManager, State>
 {
     /// Runs the SQS listener task to listen to SQS messages and extract S3 object metadata. The
-    /// extracted metadata is sent to the provided channel sender.
+    /// extracted metadata is ingested through the underlying state.
     ///
     /// # Returns
     ///
@@ -77,7 +77,8 @@ impl<SqsClientManager: AwsClientManagerType<Client>, State: SqsListenerState>
         }
     }
 
-    /// Processes the SQS response to extract S3 object metadata and send it to the channel sender.
+    /// Processes the SQS response to extract S3 object metadata and ingest it through the
+    /// underlying state.
     ///
     /// # NOTE
     ///
@@ -263,7 +264,7 @@ impl<State: SqsListenerState> SqsListener<State> {
     /// Creates and spawns a new [`SqsListener`] backed by a [`Task`].
     ///
     /// This function spawns a series of [`Task`]. Each spawned task will listen to SQS messages,
-    /// extract relevant S3 object metadata, and send the metadata to the provided channel sender.
+    /// extract relevant S3 object metadata, and ingest the metadata through the provided state.
     ///
     /// # Type parameters
     ///

Original file line number Diff line number Diff line change
Expand Up @@ -4,27 +4,30 @@ use anyhow::Result;
use aws_sdk_s3::Client;
use clp_rust_utils::{job_config::ingestion::s3::S3ScannerConfig, s3::ObjectMetadata};
use non_empty_string::NonEmptyString;
use tokio::{select, sync::mpsc};
use tokio::select;
use tokio_util::sync::CancellationToken;
use uuid::Uuid;

use crate::aws_client_manager::AwsClientManagerType;
use crate::{aws_client_manager::AwsClientManagerType, ingestion_job::S3ScannerState};

/// Represents a S3 scanner task that periodically scans a given prefix under the bucket to fetch
/// object metadata for newly created objects.
///
/// # Type Parameters
///
/// * [`S3ClientManager`]: The type of the AWS S3 client manager.
struct Task<S3ClientManager: AwsClientManagerType<Client>> {
/// * [`State`]: The type that implements [`S3ScannerState`] for managing S3 scanner states.
struct Task<S3ClientManager: AwsClientManagerType<Client>, State: S3ScannerState> {
s3_client_manager: S3ClientManager,
scanning_interval: Duration,
config: S3ScannerConfig,
sender: mpsc::Sender<ObjectMetadata>,
start_after: Option<String>,
state: State,
}

impl<S3ClientManager: AwsClientManagerType<Client>> Task<S3ClientManager> {
impl<S3ClientManager: AwsClientManagerType<Client>, State: S3ScannerState>
Task<S3ClientManager, State>
{
/// Runs the S3 scanner task to scan the given bucket.
///
/// This is a wrapper of [`Self::scan_once`] that supports cancellation via the provided
Expand Down Expand Up @@ -92,6 +95,8 @@ impl<S3ClientManager: AwsClientManagerType<Client>> Task<S3ClientManager> {
/// [`aws_sdk_s3::operation::list_objects_v2::builders::ListObjectsV2FluentBuilder::send`]'s
/// return values on failure.
/// * Forwards [`NonEmptyString::new`]'s return values on failure.
/// * Forwards [`S3ScannerState::ingest`]'s return values on failure.
/// * Forwards [`i64::try_into`]'s return values when failing to convert object size to [`u64`].
pub async fn scan_once(&mut self) -> Result<bool> {
let client = self.s3_client_manager.get().await?;
let response = client
Expand All @@ -105,6 +110,7 @@ impl<S3ClientManager: AwsClientManagerType<Client>> Task<S3ClientManager> {
return Ok(false);
};

let mut object_metadata_to_ingest = Vec::with_capacity(contents.len());
for content in contents {
let (Some(key), Some(size)) = (content.key, content.size) else {
continue;
Expand All @@ -119,10 +125,24 @@ impl<S3ClientManager: AwsClientManagerType<Client>> Task<S3ClientManager> {
size: size.try_into()?,
};
tracing::info!(object = ? object_metadata, "Scanned new object metadata on S3.");
self.sender.send(object_metadata).await?;
self.start_after = Some(key);
object_metadata_to_ingest.push(object_metadata);
}

if object_metadata_to_ingest.is_empty() {
return Ok(response.is_truncated.unwrap_or(false));
}

let last_ingested_key: String = object_metadata_to_ingest
.last()
.expect("`object_metadata_to_ingest` should not be empty")
.key
.clone()
.into();
self.state
.ingest(object_metadata_to_ingest, last_ingested_key.as_str())
.await?;
self.start_after = Some(last_ingested_key);

Ok(response.is_truncated.unwrap_or(false))
}

Expand All @@ -133,13 +153,18 @@ impl<S3ClientManager: AwsClientManagerType<Client>> Task<S3ClientManager> {
}

/// Represents a S3 scanner job that manages the lifecycle of a S3 scanner task.
pub struct S3Scanner {
///
/// # Type Parameters
///
/// * [`State`]: The type that implements [`S3ScannerState`] for managing S3 scanner states.
pub struct S3Scanner<State: S3ScannerState> {
id: Uuid,
cancel_token: CancellationToken,
handle: tokio::task::JoinHandle<Result<()>>,
_state: State,
}

impl S3Scanner {
impl<State: S3ScannerState> S3Scanner<State> {
/// Creates and spawns a new [`S3Scanner`] backed by a [`Task`].
///
/// This function spawns a [`Task`]. The spawned task will periodically scan the configured S3
Expand All @@ -157,16 +182,16 @@ impl S3Scanner {
id: Uuid,
s3_client_manager: S3ClientManager,
config: S3ScannerConfig,
sender: mpsc::Sender<ObjectMetadata>,
state: State,
) -> Self {
let scanning_interval = Duration::from_secs(u64::from(config.scanning_interval_sec));
let start_after = config.start_after.clone().map(Into::into);
let task = Task {
s3_client_manager,
scanning_interval,
config,
sender,
start_after,
state: state.clone(),
};
let cancel_token = CancellationToken::new();
let child_cancel_token = cancel_token.clone();
Expand All @@ -179,6 +204,7 @@ impl S3Scanner {
id,
cancel_token,
handle,
_state: state,
}
}

Expand Down
Loading
Loading