Skip to content

Commit a0f0933

Browse files
feat(log-ingestor): Add support for recovering log-ingestor on restart (resolves #1978). (#2053)
1 parent a91e5f7 commit a0f0933

File tree

6 files changed

+841
-212
lines changed

6 files changed

+841
-212
lines changed

components/log-ingestor/src/compression/compression_job_submitter.rs

Lines changed: 56 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use clp_rust_utils::{
99
},
1010
job_config::{
1111
ClpIoConfig,
12+
CompressionJobId,
1213
CompressionJobStatus,
1314
InputConfig,
1415
OutputConfig,
@@ -108,6 +109,55 @@ impl CompressionJobSubmitter {
108109
}
109110
}
110111

112+
/// Waits for the compression job to complete and updates the submitted metadata in the state
113+
/// accordingly.
114+
///
115+
/// # NOTE
116+
///
117+
/// This function will be called inside a detached coroutine. Errors are logged only instead of
118+
/// returning them to the caller.
119+
pub async fn wait_for_compression_job_completion_and_update_metadata(
120+
state: ClpCompressionState,
121+
compression_job_id: CompressionJobId,
122+
num_objects_submitted: usize,
123+
) {
124+
let ingestion_job_id = state.get_ingestion_job_id();
125+
match state
126+
.wait_for_compression_and_update_submitted_metadata(
127+
compression_job_id,
128+
num_objects_submitted,
129+
)
130+
.await
131+
{
132+
Ok((compression_job_status, message)) => match compression_job_status {
133+
CompressionJobStatus::Succeeded => tracing::info!(
134+
ingestion_job_id = ? ingestion_job_id,
135+
compression_job_id = ? compression_job_id,
136+
"Compression job succeeded."
137+
),
138+
CompressionJobStatus::Failed | CompressionJobStatus::Killed => tracing::warn!(
139+
ingestion_job_id = ? ingestion_job_id,
140+
compression_job_id = ? compression_job_id,
141+
compression_job_status = ? compression_job_status,
142+
compression_job_status_msg = ? message,
143+
"Compression job failed."
144+
),
145+
_ => unreachable!(
146+
"Unknown compression job status: {:?}",
147+
compression_job_status
148+
),
149+
},
150+
Err(e) => {
151+
tracing::error!(
152+
ingestion_job_id = ? ingestion_job_id,
153+
compression_job_id = ? compression_job_id,
154+
error = ? e,
155+
"Failed to wait for CLP compression job completion."
156+
);
157+
}
158+
}
159+
}
160+
111161
/// Submits a CLP compression job with the given IO config template and waits for its completion.
112162
///
113163
/// # NOTE
@@ -143,41 +193,10 @@ async fn submit_clp_compression_job_and_wait_for_completion(
143193
"Compression job submitted."
144194
);
145195

146-
let (compression_job_status, message) = match state
147-
.wait_for_compression_and_update_submitted_metadata(
148-
compression_job_id,
149-
num_objects_submitted,
150-
)
151-
.await
152-
{
153-
Ok(result_pair) => result_pair,
154-
Err(e) => {
155-
tracing::error!(
156-
ingestion_job_id = ? ingestion_job_id,
157-
compression_job_id = ? compression_job_id,
158-
error = ? e,
159-
"Failed to wait for CLP compression job completion."
160-
);
161-
return;
162-
}
163-
};
164-
165-
match compression_job_status {
166-
CompressionJobStatus::Succeeded => tracing::info!(
167-
ingestion_job_id = ? ingestion_job_id,
168-
compression_job_id = ? compression_job_id,
169-
"Compression job succeeded."
170-
),
171-
CompressionJobStatus::Failed | CompressionJobStatus::Killed => tracing::warn!(
172-
ingestion_job_id = ? ingestion_job_id,
173-
compression_job_id = ? compression_job_id,
174-
compression_job_status = ? compression_job_status,
175-
compression_job_status_msg = ? message,
176-
"Compression job failed."
177-
),
178-
_ => unreachable!(
179-
"Unknown compression job status: {:?}",
180-
compression_job_status
181-
),
182-
}
196+
wait_for_compression_job_completion_and_update_metadata(
197+
state,
198+
compression_job_id,
199+
num_objects_submitted,
200+
)
201+
.await;
183202
}

components/log-ingestor/src/compression/listener.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,9 +55,10 @@ impl<Submitter: BufferSubmitter + Send + 'static> ListenerTask<Submitter> {
5555
match optional_object_metadata {
5656
None => {
5757
self.buffer.submit().await?;
58-
return Err(
59-
anyhow::anyhow!("Listener channel has been closed unexpectedly")
58+
tracing::info!(
59+
"All senders have been dropped. The channel will be closed."
6060
);
61+
return Ok(());
6162
}
6263
Some(object_metadata_to_ingest) => {
6364
self.buffer.add(object_metadata_to_ingest).await?;

0 commit comments

Comments
 (0)