Skip to content

Commit 5308c94

Browse files
committed
feat: multi part upload in compaction log
Signed-off-by: Ion Koutsouris <15728914+ion-elgreco@users.noreply.github.com> Signed-off-by: Ion Koutsouris <15728914+ion-elgreco@users.noreply.github.com>
1 parent 82301bd commit 5308c94

File tree

2 files changed

+35
-7
lines changed

2 files changed

+35
-7
lines changed

crates/core/src/protocol/log_compaction.rs

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,9 @@ use uuid::Uuid;
1010
use crate::kernel::{Snapshot, spawn_blocking_with_span};
1111
use crate::logstore::LogStore;
1212
use crate::protocol::to_rb;
13-
1413
use crate::{DeltaResult, DeltaTable, DeltaTableError};
14+
use arrow_json::LineDelimitedWriter;
15+
use object_store::MultipartUpload;
1516

1617
#[tracing::instrument(skip(log_store, snapshot), fields(operation = "log_compaction", start_version = start_version, end_version = end_version, table_uri = %log_store.root_url()))]
1718
pub(crate) async fn compact_logs_for(
@@ -51,11 +52,11 @@ pub(crate) async fn compact_logs_for(
5152

5253
let root_store = log_store.root_object_store(operation_id);
5354

54-
let mut writer = arrow_json::LineDelimitedWriter::new(Vec::new());
55+
let mut upload = root_store.put_multipart(&lc_path).await?;
56+
let mut buffer = Vec::with_capacity(8 * 1024 * 1024);
5557

56-
let mut current_batch;
5758
loop {
58-
(current_batch, lc_data) = spawn_blocking_with_span(move || {
59+
let (current_batch, lc_data_next) = spawn_blocking_with_span(move || {
5960
let Some(first_batch) = lc_data.next() else {
6061
return Ok::<_, DeltaTableError>((None, lc_data));
6162
};
@@ -64,16 +65,26 @@ pub(crate) async fn compact_logs_for(
6465
.await
6566
.map_err(|e| DeltaTableError::Generic(e.to_string()))??;
6667

68+
lc_data = lc_data_next;
69+
6770
let Some(batch) = current_batch else {
6871
break;
6972
};
73+
74+
let mut writer = LineDelimitedWriter::new(&mut buffer);
7075
writer.write(&batch)?;
76+
writer.finish()?;
77+
78+
if buffer.len() >= 5 * 1024 * 1024 {
79+
upload.put_part(std::mem::take(&mut buffer).into()).await?;
80+
}
7181
}
72-
writer.finish()?;
7382

74-
let bytes = writer.into_inner();
83+
if !buffer.is_empty() {
84+
upload.put_part(buffer.into()).await?;
85+
}
7586

76-
root_store.put(&lc_path, bytes.into()).await?;
87+
upload.complete().await?;
7788

7889
Ok(())
7990
}

python/tests/test_log_compaction.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import json
12
import pathlib
23
from typing import TYPE_CHECKING
34

@@ -19,6 +20,7 @@ def test_log_compaction(tmp_path: pathlib.Path, sample_table: Table):
1920
/ "_delta_log"
2021
/ "00000000000000000000.00000000000000000003.compacted.json"
2122
)
23+
2224
for i in range(4):
2325
write_deltalake(str(tmp_table_path), sample_table, mode="append")
2426

@@ -28,3 +30,18 @@ def test_log_compaction(tmp_path: pathlib.Path, sample_table: Table):
2830
delta_table.compact_logs(starting_version=0, ending_version=3)
2931

3032
assert compaction_log_path.exists()
33+
34+
with open(compaction_log_path) as fp:
35+
compaction_data = fp.readlines()
36+
compaction_data = [json.loads(row) for row in compaction_data]
37+
38+
action_types = {}
39+
for action in compaction_data:
40+
key = next(iter(action.keys()))
41+
if key not in action_types:
42+
action_types[key] = 1
43+
else:
44+
action_types[next(iter(action.keys()))] += 1
45+
46+
assert len(compaction_data) == 6
47+
assert action_types == {"add": 4, "protocol": 1, "metaData": 1}

0 commit comments

Comments
 (0)