Skip to content

Commit 54a1e6f

Browse files
committed
chore: don't cache entire table in memory
1 parent ab51706 commit 54a1e6f

File tree

2 files changed

+42
-50
lines changed

2 files changed

+42
-50
lines changed

src/sinks/delta_lake/service.rs

Lines changed: 38 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,13 @@ impl DriverResponse for DeltaLakeResponse {
4747
/// This service writes Arrow RecordBatches directly to Delta Lake tables,
4848
/// avoiding the overhead of Parquet serialization/deserialization round-trips.
4949
///
50-
/// The table is wrapped in RwLock to allow updating the cached snapshot after
51-
/// successful commits, reducing unnecessary conflict retries.
50+
/// ## Memory Management
51+
///
52+
/// The table is reloaded from storage before each write operation. This bounds
53+
/// memory usage by starting each write with a fresh snapshot from the latest
54+
/// checkpoint, rather than accumulating transaction log state across writes.
55+
/// This adds ~100-200ms latency per write but prevents OOM in long-running
56+
/// processes with high-version-count tables.
5257
///
5358
/// ## Schema Evolution Support
5459
///
@@ -115,12 +120,20 @@ impl Service<DeltaLakeRequest> for DeltaLakeService {
115120
// Use batches directly from request - no Parquet deserialization needed
116121
let batches = request.batches;
117122

118-
// Get a clone of the current table snapshot
123+
// Get a clone of the table and reload fresh state from storage.
124+
// This bounds memory usage by not accumulating transaction log state
125+
// across writes - each write starts with a fresh snapshot loaded from
126+
// the latest checkpoint.
119127
let mut table = {
120128
let table_guard = table_lock.read().await;
121129
table_guard.clone()
122130
};
123131

132+
// Reload to get fresh state from latest checkpoint
133+
table.load().await.map_err(|e| {
134+
DeltaTableError::Generic(format!("Failed to load table before write: {}", e))
135+
})?;
136+
124137
// Retry loop for handling concurrent transaction conflicts and schema evolution
125138
// When optimization operations (like z-order) rewrite files, or when schema changes,
126139
// we need to reload the table snapshot and retry the write
@@ -155,52 +168,28 @@ impl Service<DeltaLakeRequest> for DeltaLakeService {
155168
// Execute write and commit
156169
match write_builder.await {
157170
Ok(new_table) => {
158-
// Success! Update the cached table to the latest version
159-
// Only update if our committed version is newer than the cached version
160-
// This prevents "race to the bottom" where slower requests could
161-
// overwrite newer state with older state
162-
{
163-
let mut table_guard = table_lock.write().await;
164-
165-
let new_version = new_table.version();
166-
let cached_version = table_guard.version();
167-
168-
if new_version > cached_version {
169-
// Get schema before moving new_table to avoid unnecessary clone
170-
let new_schema = new_table.schema();
171-
172-
// Log schema evolution if new fields were added
173-
let old_schema = shared_schema.load();
174-
let new_fields: Vec<_> = new_schema
175-
.fields()
176-
.iter()
177-
.filter(|f| old_schema.field_with_name(f.name()).is_err())
178-
.map(|f| f.name().as_str())
179-
.collect();
180-
181-
if !new_fields.is_empty() {
182-
info!(
183-
message = "Schema evolution: new fields added to table",
184-
new_fields = ?new_fields,
185-
total_fields = new_schema.fields().len(),
186-
version = new_version,
187-
);
188-
}
189-
190-
// Update schema cache while holding table lock to keep them in sync
191-
// TableProvider::schema() returns the Arrow schema directly
192-
shared_schema.store(new_schema);
193-
194-
// Update table cache - move instead of clone
195-
*table_guard = new_table;
196-
} else {
197-
debug!(
198-
message =
199-
"Skipping cache update - cached version is newer or equal",
200-
new_version = new_version,
201-
cached_version = cached_version,
202-
);
203-
}
171+
// Update schema cache for request builder (used for schema evolution)
172+
// We don't cache the full table state since we reload before each write,
173+
// which bounds memory usage by not accumulating transaction log state.
174+
let new_schema = new_table.schema();
175+
let old_schema = shared_schema.load();
176+
177+
// Log and update schema if new fields were added
178+
let new_fields: Vec<_> = new_schema
179+
.fields()
180+
.iter()
181+
.filter(|f| old_schema.field_with_name(f.name()).is_err())
182+
.map(|f| f.name().as_str())
183+
.collect();
184+
185+
if !new_fields.is_empty() {
186+
info!(
187+
message = "Schema evolution: new fields added to table",
188+
new_fields = ?new_fields,
189+
total_fields = new_schema.fields().len(),
190+
version = new_table.version(),
191+
);
192+
shared_schema.store(new_schema);
204193
}
205194

206195
// Get the byte size from the request (Arrow in-memory size)

src/sources/delta_lake_cdf/source.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -304,7 +304,10 @@ async fn create_cdf_stream(
304304
table: &DeltaTable,
305305
start_version: i64,
306306
end_version: i64,
307-
) -> Result<impl futures::Stream<Item = Result<deltalake::arrow::record_batch::RecordBatch, DeltaTableError>>, DeltaTableError> {
307+
) -> Result<
308+
impl futures::Stream<Item = Result<deltalake::arrow::record_batch::RecordBatch, DeltaTableError>>,
309+
DeltaTableError,
310+
> {
308311
// Clone table and create CDF builder
309312
let cdf_builder = table
310313
.clone()

0 commit comments

Comments
 (0)