Skip to content

Commit c582a48

Browse files
committed
chore: add checkpoint mechanism
1 parent c705f91 commit c582a48

File tree

2 files changed

+32
-0
lines changed

2 files changed

+32
-0
lines changed

src/sinks/delta_lake/service.rs

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use std::sync::Arc;
33
use std::task::{Context, Poll};
44

55
use deltalake::DeltaTableError;
6+
use deltalake::checkpoints::create_checkpoint;
67
use deltalake::datafusion::datasource::TableProvider;
78
use deltalake::operations::write::SchemaMode;
89
use deltalake::protocol::SaveMode;
@@ -13,6 +14,11 @@ use crate::sinks::prelude::*;
1314

1415
use super::request_builder::{DeltaLakeRequest, SharedSchema};
1516

17+
/// Checkpoint interval - create a checkpoint every N commits.
18+
/// Checkpoints consolidate the transaction log into a single parquet file,
19+
/// dramatically improving read performance for tables with many versions.
20+
const CHECKPOINT_INTERVAL: i64 = 10;
21+
1622
/// Response from Delta Lake write operations.
1723
///
1824
/// Contains metrics about the write operation including the number of files
@@ -203,6 +209,30 @@ impl Service<DeltaLakeRequest> for DeltaLakeService {
203209
shared_schema.store(new_schema);
204210
}
205211

212+
// Create checkpoint if we've hit the interval.
213+
// Checkpoints consolidate the transaction log into a single parquet file,
214+
// which dramatically reduces memory usage when opening the table.
215+
if let Some(version) = new_table.version() {
216+
if version % CHECKPOINT_INTERVAL == 0 {
217+
match create_checkpoint(&new_table, None).await {
218+
Ok(()) => {
219+
info!(
220+
message = "Created checkpoint",
221+
version = version,
222+
);
223+
}
224+
Err(e) => {
225+
// Log but don't fail the write - checkpoint is best-effort
226+
warn!(
227+
message = "Failed to create checkpoint",
228+
version = version,
229+
error = %e,
230+
);
231+
}
232+
}
233+
}
234+
}
235+
206236
// Get the byte size from the request (Arrow in-memory size)
207237
let bytes_written = request.byte_size;
208238

src/sinks/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ pub mod databend;
4747
pub mod datadog;
4848
#[cfg(feature = "sinks-delta_lake")]
4949
pub mod delta_lake;
50+
#[cfg(feature = "sinks-iceberg")]
51+
pub mod iceberg;
5052
#[cfg(feature = "sinks-elasticsearch")]
5153
pub mod elasticsearch;
5254
#[cfg(feature = "sinks-file")]

0 commit comments

Comments
 (0)