Skip to content

Commit c2baf4e

Browse files
committed
Have tx own base table, mark tx_commit_res as not used, need to fix retry
1 parent e881811 commit c2baf4e

File tree

6 files changed

+54
-54
lines changed

6 files changed

+54
-54
lines changed

crates/iceberg/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ arrow-select = { workspace = true }
5555
arrow-string = { workspace = true }
5656
async-std = { workspace = true, optional = true, features = ["attributes"] }
5757
async-trait = { workspace = true }
58+
tokio-retry2 = { version = "0.5.7" }
5859
base64 = { workspace = true }
5960
bimap = { workspace = true }
6061
bytes = { workspace = true }

crates/iceberg/src/transaction/action/mod.rs

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,26 +16,29 @@
1616
// under the License.
1717

1818
use std::mem::take;
19-
19+
use std::sync::Arc;
20+
use async_trait::async_trait;
2021
use crate::{Error, ErrorKind, Result, TableRequirement, TableUpdate};
22+
use crate::transaction::Transaction;
2123

2224
pub type BoxedTransactionAction = Arc<dyn TransactionAction>;
2325

2426
#[async_trait]
2527
pub(crate) trait TransactionAction: Sync + Send {
2628
/// Commit the changes and apply the changes to the transaction,
2729
/// return the transaction with the updated current_table
28-
fn commit(self: Arc<Self>, tx: &Transaction) -> Result<TransactionActionCommit>;
30+
fn commit(self: Arc<Self>, tx: &mut Transaction) -> Result<()>;
2931
}
3032

3133
pub struct TransactionActionCommit {
32-
action: Option<PendingAction>,
34+
action: Option<BoxedTransactionAction>,
3335
updates: Vec<TableUpdate>,
3436
requirements: Vec<TableRequirement>,
3537
}
3638

39+
// TODO probably we don't need this?
3740
impl TransactionActionCommit {
38-
pub fn take_action(&mut self) -> Option<PendingAction> {
41+
pub fn take_action(&mut self) -> Option<BoxedTransactionAction> {
3942
take(&mut self.action)
4043
}
4144

@@ -64,7 +67,7 @@ impl SetLocation {
6467
}
6568

6669
impl TransactionAction for SetLocation {
67-
fn commit(self: Box<Self>) -> Result<TransactionActionCommit> {
70+
fn commit(self: Arc<Self>, tx: &mut Transaction) -> Result<()> {
6871
let updates: Vec<TableUpdate>;
6972
let requirements: Vec<TableRequirement>;
7073
if let Some(location) = self.location.clone() {
@@ -76,11 +79,9 @@ impl TransactionAction for SetLocation {
7679
"Location is not set for SetLocation!",
7780
));
7881
}
79-
80-
Ok(TransactionActionCommit {
81-
action: Some(self),
82-
updates,
83-
requirements,
84-
})
82+
83+
tx.actions.push(self);
84+
85+
tx.apply(updates, requirements)
8586
}
8687
}

crates/iceberg/src/transaction/append.rs

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -31,15 +31,15 @@ use crate::writer::file_writer::ParquetWriter;
3131
use crate::{Error, ErrorKind};
3232

3333
/// FastAppendAction is a transaction action for fast append data files to the table.
34-
pub struct FastAppendAction<'a> {
35-
snapshot_produce_action: SnapshotProduceAction<'a>,
34+
pub struct FastAppendAction {
35+
snapshot_produce_action: SnapshotProduceAction,
3636
check_duplicate: bool,
3737
}
3838

39-
impl<'a> FastAppendAction<'a> {
39+
impl FastAppendAction {
4040
#[allow(clippy::too_many_arguments)]
4141
pub(crate) fn new(
42-
tx: Transaction<'a>,
42+
tx: Transaction,
4343
snapshot_id: i64,
4444
commit_uuid: Uuid,
4545
key_metadata: Vec<u8>,
@@ -89,7 +89,7 @@ impl<'a> FastAppendAction<'a> {
8989
/// Specifically, schema compatibility checks and support for adding to partitioned tables
9090
/// have not yet been implemented.
9191
#[allow(dead_code)]
92-
async fn add_parquet_files(mut self, file_path: Vec<String>) -> Result<Transaction<'a>> {
92+
async fn add_parquet_files(mut self, file_path: Vec<String>) -> Result<Transaction> {
9393
if !self
9494
.snapshot_produce_action
9595
.tx
@@ -119,7 +119,7 @@ impl<'a> FastAppendAction<'a> {
119119
}
120120

121121
/// Finished building the action and apply it to the transaction.
122-
pub async fn apply(self) -> Result<Transaction<'a>> {
122+
pub async fn apply(self) -> Result<Transaction> {
123123
// Checks duplicate files
124124
if self.check_duplicate {
125125
let new_files: HashSet<&str> = self
@@ -185,14 +185,14 @@ impl SnapshotProduceOperation for FastAppendOperation {
185185

186186
async fn delete_entries(
187187
&self,
188-
_snapshot_produce: &SnapshotProduceAction<'_>,
188+
_snapshot_produce: &SnapshotProduceAction,
189189
) -> Result<Vec<ManifestEntry>> {
190190
Ok(vec![])
191191
}
192192

193193
async fn existing_manifest(
194194
&self,
195-
snapshot_produce: &SnapshotProduceAction<'_>,
195+
snapshot_produce: &SnapshotProduceAction,
196196
) -> Result<Vec<ManifestFile>> {
197197
let Some(snapshot) = snapshot_produce
198198
.tx
@@ -234,7 +234,7 @@ mod tests {
234234
#[tokio::test]
235235
async fn test_empty_data_append_action() {
236236
let table = make_v2_minimal_table();
237-
let tx = Transaction::new(&table);
237+
let tx = Transaction::new(table);
238238
let mut action = tx.fast_append(None, vec![]).unwrap();
239239
action.add_data_files(vec![]).unwrap();
240240
assert!(action.apply().await.is_err());
@@ -243,7 +243,7 @@ mod tests {
243243
#[tokio::test]
244244
async fn test_set_snapshot_properties() {
245245
let table = make_v2_minimal_table();
246-
let tx = Transaction::new(&table);
246+
let tx = Transaction::new(table.clone());
247247
let mut action = tx.fast_append(None, vec![]).unwrap();
248248

249249
let mut snapshot_properties = HashMap::new();
@@ -281,7 +281,7 @@ mod tests {
281281
#[tokio::test]
282282
async fn test_fast_append_action() {
283283
let table = make_v2_minimal_table();
284-
let tx = Transaction::new(&table);
284+
let tx = Transaction::new(table.clone());
285285
let mut action = tx.fast_append(None, vec![]).unwrap();
286286

287287
// check add data file with incompatible partition value
@@ -367,7 +367,7 @@ mod tests {
367367
async fn test_add_existing_parquet_files_to_unpartitioned_table() {
368368
let mut fixture = TableTestFixture::new_unpartitioned();
369369
fixture.setup_unpartitioned_manifest_files().await;
370-
let tx = crate::transaction::Transaction::new(&fixture.table);
370+
let tx = crate::transaction::Transaction::new(fixture.table.clone());
371371

372372
let file_paths = vec![
373373
format!("{}/1.parquet", &fixture.table_location),

crates/iceberg/src/transaction/mod.rs

Lines changed: 19 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -35,26 +35,26 @@ use crate::error::Result;
3535
use crate::spec::FormatVersion;
3636
use crate::table::Table;
3737
use crate::transaction::action::{
38-
PendingAction, SetLocation, TransactionAction, TransactionActionCommit,
38+
BoxedTransactionAction, SetLocation, TransactionAction, TransactionActionCommit,
3939
};
4040
use crate::transaction::append::FastAppendAction;
4141
use crate::transaction::sort_order::ReplaceSortOrderAction;
4242
use crate::{Catalog, Error, ErrorKind, TableCommit, TableRequirement, TableUpdate};
4343

4444
/// Table transaction.
45-
pub struct Transaction<'a> {
46-
base_table: &'a Table,
45+
pub struct Transaction {
46+
base_table: Table,
4747
current_table: Table,
48-
actions: Vec<PendingAction>,
48+
actions: Vec<BoxedTransactionAction>,
4949
updates: Vec<TableUpdate>,
5050
requirements: Vec<TableRequirement>,
5151
}
5252

53-
impl<'a> Transaction<'a> {
53+
impl Transaction {
5454
/// Creates a new transaction.
55-
pub fn new(table: &'a Table) -> Self {
55+
pub fn new(table: Table) -> Self {
5656
Self {
57-
base_table: table,
57+
base_table: table.clone(),
5858
current_table: table.clone(),
5959
actions: vec![],
6060
updates: vec![],
@@ -174,7 +174,7 @@ impl<'a> Transaction<'a> {
174174
self,
175175
commit_uuid: Option<Uuid>,
176176
key_metadata: Vec<u8>,
177-
) -> Result<FastAppendAction<'a>> {
177+
) -> Result<FastAppendAction> {
178178
let snapshot_id = self.generate_unique_snapshot_id();
179179
FastAppendAction::new(
180180
self,
@@ -186,7 +186,7 @@ impl<'a> Transaction<'a> {
186186
}
187187

188188
/// Creates replace sort order action.
189-
pub fn replace_sort_order(self) -> ReplaceSortOrderAction<'a> {
189+
pub fn replace_sort_order(self) -> ReplaceSortOrderAction {
190190
ReplaceSortOrderAction {
191191
tx: self,
192192
sort_fields: vec![],
@@ -205,8 +205,7 @@ impl<'a> Transaction<'a> {
205205
/// Set the location of table
206206
pub fn set_location(mut self, location: String) -> Result<Self> {
207207
let set_location = SetLocation::new().set_location(location);
208-
self.apply_commit_result(Box::new(set_location).commit()?)
209-
.expect("Some error msg");
208+
Arc::new(set_location).commit(&mut self)?;
210209
Ok(self)
211210
}
212211

@@ -243,14 +242,13 @@ impl<'a> Transaction<'a> {
243242
|| self.base_table.metadata_location() != refreshed.metadata_location()
244243
{
245244
// current base is stale, use refreshed as base and re-apply transaction actions
246-
self.base_table = &refreshed.clone();
245+
self.base_table = refreshed.clone();
247246
self.current_table = refreshed.clone();
248247

249248
let pending_actions = take(&mut self.actions);
250249

251250
for action in pending_actions {
252-
self.apply_commit_result(action.commit()?)
253-
.expect("Failed to apply updates!");
251+
action.commit(self).expect("Failed to apply updates!");
254252
}
255253
}
256254

@@ -347,7 +345,7 @@ mod tests {
347345
#[test]
348346
fn test_upgrade_table_version_v1_to_v2() {
349347
let table = make_v1_table();
350-
let tx = Transaction::new(&table);
348+
let tx = Transaction::new(table);
351349
let tx = tx.upgrade_table_version(FormatVersion::V2).unwrap();
352350

353351
assert_eq!(
@@ -361,7 +359,7 @@ mod tests {
361359
#[test]
362360
fn test_upgrade_table_version_v2_to_v2() {
363361
let table = make_v2_table();
364-
let tx = Transaction::new(&table);
362+
let tx = Transaction::new(table);
365363
let tx = tx.upgrade_table_version(FormatVersion::V2).unwrap();
366364

367365
assert!(
@@ -377,7 +375,7 @@ mod tests {
377375
#[test]
378376
fn test_downgrade_table_version() {
379377
let table = make_v2_table();
380-
let tx = Transaction::new(&table);
378+
let tx = Transaction::new(table);
381379
let tx = tx.upgrade_table_version(FormatVersion::V1);
382380

383381
assert!(tx.is_err(), "Downgrade table version should fail!");
@@ -386,7 +384,7 @@ mod tests {
386384
#[test]
387385
fn test_set_table_property() {
388386
let table = make_v2_table();
389-
let tx = Transaction::new(&table);
387+
let tx = Transaction::new(table);
390388
let tx = tx
391389
.set_properties(HashMap::from([("a".to_string(), "b".to_string())]))
392390
.unwrap();
@@ -402,7 +400,7 @@ mod tests {
402400
#[test]
403401
fn test_remove_property() {
404402
let table = make_v2_table();
405-
let tx = Transaction::new(&table);
403+
let tx = Transaction::new(table);
406404
let tx = tx
407405
.remove_properties(vec!["a".to_string(), "b".to_string()])
408406
.unwrap();
@@ -418,7 +416,7 @@ mod tests {
418416
#[test]
419417
fn test_set_location() {
420418
let table = make_v2_table();
421-
let tx = Transaction::new(&table);
419+
let tx = Transaction::new(table);
422420
let tx = tx
423421
.set_location(String::from("s3://bucket/prefix/new_table"))
424422
.unwrap();
@@ -434,7 +432,7 @@ mod tests {
434432
#[tokio::test]
435433
async fn test_transaction_apply_upgrade() {
436434
let table = make_v1_table();
437-
let tx = Transaction::new(&table);
435+
let tx = Transaction::new(table);
438436
// Upgrade v1 to v1, do nothing.
439437
let tx = tx.upgrade_table_version(FormatVersion::V1).unwrap();
440438
// Upgrade v1 to v2, success.

crates/iceberg/src/transaction/snapshot.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,8 @@ pub(crate) trait ManifestProcess: Send + Sync {
5959
fn process_manifests(&self, manifests: Vec<ManifestFile>) -> Vec<ManifestFile>;
6060
}
6161

62-
pub(crate) struct SnapshotProduceAction<'a> {
63-
pub tx: Transaction<'a>,
62+
pub(crate) struct SnapshotProduceAction {
63+
pub tx: Transaction,
6464
snapshot_id: i64,
6565
key_metadata: Vec<u8>,
6666
commit_uuid: Uuid,
@@ -72,9 +72,9 @@ pub(crate) struct SnapshotProduceAction<'a> {
7272
manifest_counter: RangeFrom<u64>,
7373
}
7474

75-
impl<'a> SnapshotProduceAction<'a> {
75+
impl SnapshotProduceAction {
7676
pub(crate) fn new(
77-
tx: Transaction<'a>,
77+
tx: Transaction,
7878
snapshot_id: i64,
7979
key_metadata: Vec<u8>,
8080
commit_uuid: Uuid,
@@ -310,7 +310,7 @@ impl<'a> SnapshotProduceAction<'a> {
310310
mut self,
311311
snapshot_produce_operation: OP,
312312
process: MP,
313-
) -> Result<Transaction<'a>> {
313+
) -> Result<Transaction> {
314314
let new_manifests = self
315315
.manifest_file(&snapshot_produce_operation, &process)
316316
.await?;

crates/iceberg/src/transaction/sort_order.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,12 @@ use crate::transaction::Transaction;
2121
use crate::{Error, ErrorKind, TableRequirement, TableUpdate};
2222

2323
/// Transaction action for replacing sort order.
24-
pub struct ReplaceSortOrderAction<'a> {
25-
pub tx: Transaction<'a>,
24+
pub struct ReplaceSortOrderAction {
25+
pub tx: Transaction,
2626
pub sort_fields: Vec<SortField>,
2727
}
2828

29-
impl<'a> ReplaceSortOrderAction<'a> {
29+
impl ReplaceSortOrderAction {
3030
/// Adds a field for sorting in ascending order.
3131
pub fn asc(self, name: &str, null_order: NullOrder) -> Result<Self> {
3232
self.add_sort_field(name, SortDirection::Ascending, null_order)
@@ -38,7 +38,7 @@ impl<'a> ReplaceSortOrderAction<'a> {
3838
}
3939

4040
/// Finished building the action and apply it to the transaction.
41-
pub fn apply(mut self) -> Result<Transaction<'a>> {
41+
pub fn apply(mut self) -> Result<Transaction> {
4242
let unbound_sort_order = SortOrder::builder()
4343
.with_fields(self.sort_fields)
4444
.build_unbound()?;
@@ -114,7 +114,7 @@ mod tests {
114114
#[test]
115115
fn test_replace_sort_order() {
116116
let table = make_v2_table();
117-
let tx = Transaction::new(&table);
117+
let tx = Transaction::new(table);
118118
let tx = tx.replace_sort_order().apply().unwrap();
119119

120120
assert_eq!(

0 commit comments

Comments
 (0)