Skip to content

Commit 73cecee

Browse files
Append support (#16)
1 parent a2575b7 commit 73cecee

3 files changed

Lines changed: 61 additions & 5 deletions

File tree

src/error.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,4 +20,6 @@ pub enum DataLoadingError {
2020
JoinError(#[from] tokio::task::JoinError),
2121
#[error("optimistic concurrency error")]
2222
OptimisticConcurrencyError(),
23+
#[error("bad input error")]
24+
BadInputError(String),
2325
}

src/iceberg_destination.rs

Lines changed: 48 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,14 @@ use uuid::Uuid;
2727

2828
use crate::error::DataLoadingError;
2929

30+
// Defines how to behave with existing tables
31+
#[derive(Debug, Clone, PartialEq)]
32+
enum WriteMode {
33+
CreateExclusive, // Error out if the table already exists
34+
Overwrite, // Overwrite existing table data
35+
Append, // Append to existing table data
36+
}
37+
3038
fn create_file_io(target_url: String) -> Result<FileIO, DataLoadingError> {
3139
let mut file_io_props: Vec<(String, String)> = vec![];
3240
if let Ok(aws_endpoint) = std::env::var("AWS_ENDPOINT") {
@@ -96,14 +104,38 @@ fn update_metadata_snapshot(
96104
Ok(new_metadata)
97105
}
98106

107+
async fn get_manifest_files(
108+
file_io: &FileIO,
109+
table_metadata: &TableMetadata,
110+
) -> Result<Option<Vec<ManifestFile>>, DataLoadingError> {
111+
let snapshot = match table_metadata.current_snapshot() {
112+
None => return Ok(None),
113+
Some(s) => s,
114+
};
115+
let manifest_list = snapshot.load_manifest_list(file_io, table_metadata).await?;
116+
Ok(Some(manifest_list.consume_entries().into_iter().collect()))
117+
}
118+
99119
const DEFAULT_SCHEMA_ID: i32 = 0;
100120

101121
pub async fn record_batches_to_iceberg(
102122
record_batch_stream: impl TryStream<Item = Result<RecordBatch, DataLoadingError>>,
103123
arrow_schema: SchemaRef,
104124
target_url: Url,
105125
overwrite: bool,
126+
append: bool,
106127
) -> Result<(), DataLoadingError> {
128+
let write_mode = match (overwrite, append) {
129+
(false, false) => WriteMode::CreateExclusive,
130+
(true, false) => WriteMode::Overwrite,
131+
(false, true) => WriteMode::Append,
132+
(true, true) => {
133+
return Err(DataLoadingError::BadInputError(
134+
"Cannot use overwrite flag with append flag".to_string(),
135+
));
136+
}
137+
};
138+
107139
pin_mut!(record_batch_stream);
108140

109141
let file_io = create_file_io(target_url.to_string())?;
@@ -115,7 +147,7 @@ pub async fn record_batches_to_iceberg(
115147
let version_hint_location = format!("{}/metadata/version-hint.text", target_url);
116148
let version_hint_input = file_io.new_input(&version_hint_location)?;
117149
let old_version_hint: Option<u64> = if version_hint_input.exists().await? {
118-
if !overwrite {
150+
if write_mode == WriteMode::CreateExclusive {
119151
return Err(DataLoadingError::IoError(std::io::Error::other(
120152
"Table exists. Pass the overwrite flag to lakehouse-loader to overwrite data",
121153
)));
@@ -233,8 +265,20 @@ pub async fn record_batches_to_iceberg(
233265
})
234266
.collect(),
235267
);
236-
let manifest_file: ManifestFile = manifest_writer.write(manifest).await?;
237-
info!("Wrote manifest file: {:?}", manifest_file.manifest_path);
268+
let new_manifest_file: ManifestFile = manifest_writer.write(manifest).await?;
269+
info!("Wrote manifest file: {:?}", new_manifest_file.manifest_path);
270+
271+
let new_manifest_files_vec: Vec<ManifestFile> = match write_mode {
272+
WriteMode::CreateExclusive | WriteMode::Overwrite => vec![new_manifest_file], // Only include new manifest
273+
WriteMode::Append => match get_manifest_files(&file_io, &previous_metadata).await? {
274+
Some(mut manifest_files) => {
275+
// Include new manifest and all manifests from previous snapshot
276+
manifest_files.push(new_manifest_file);
277+
manifest_files
278+
}
279+
None => vec![new_manifest_file], // Only include new manifest
280+
},
281+
};
238282

239283
let manifest_list_path = format!(
240284
"{}/metadata/manifest-list-{}.avro",
@@ -244,7 +288,7 @@ pub async fn record_batches_to_iceberg(
244288
let manifest_file_output = file_io.new_output(manifest_list_path.clone())?;
245289
let mut manifest_list_writer: ManifestListWriter =
246290
ManifestListWriter::v2(manifest_file_output, snapshot_id, None, sequence_number);
247-
manifest_list_writer.add_manifests(vec![manifest_file].into_iter())?;
291+
manifest_list_writer.add_manifests(new_manifest_files_vec.into_iter())?;
248292
manifest_list_writer.close().await?;
249293
info!("Wrote manifest list: {:?}", manifest_list_path);
250294

src/lib.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,8 @@ enum Commands {
5555
target_url: Url,
5656
#[clap(long, short, action)]
5757
overwrite: bool,
58+
#[clap(long, action)]
59+
append: bool,
5860
},
5961
#[command(arg_required_else_help = true)]
6062
PgToIceberg {
@@ -64,6 +66,8 @@ enum Commands {
6466
query: String,
6567
#[clap(long, short, action)]
6668
overwrite: bool,
69+
#[clap(long, action)]
70+
append: bool,
6771
#[clap(
6872
long,
6973
short,
@@ -118,6 +122,7 @@ pub async fn do_main(args: Cli) -> Result<(), DataLoadingError> {
118122
source_file,
119123
target_url,
120124
overwrite,
125+
append,
121126
} => {
122127
for _ in 0..OPTIMISTIC_CONCURRENCY_RETRIES {
123128
let file = tokio::fs::File::open(&source_file).await?;
@@ -126,12 +131,15 @@ pub async fn do_main(args: Cli) -> Result<(), DataLoadingError> {
126131
.build()
127132
.unwrap();
128133
let arrow_schema = record_batch_reader.schema().clone();
134+
let record_batch_stream =
135+
record_batch_reader.map_err(DataLoadingError::ParquetError);
129136
info!("File schema: {}", arrow_schema);
130137
match record_batches_to_iceberg(
131-
record_batch_reader.map_err(DataLoadingError::ParquetError),
138+
record_batch_stream,
132139
arrow_schema,
133140
target_url.clone(),
134141
overwrite,
142+
append,
135143
)
136144
.await
137145
{
@@ -154,6 +162,7 @@ pub async fn do_main(args: Cli) -> Result<(), DataLoadingError> {
154162
target_url,
155163
query,
156164
overwrite,
165+
append,
157166
batch_size,
158167
} => {
159168
for _ in 0..OPTIMISTIC_CONCURRENCY_RETRIES {
@@ -168,6 +177,7 @@ pub async fn do_main(args: Cli) -> Result<(), DataLoadingError> {
168177
arrow_schema,
169178
target_url.clone(),
170179
overwrite,
180+
append,
171181
)
172182
.await
173183
{

0 commit comments

Comments
 (0)