Skip to content

Commit 1c40b86

Browse files
committed
use backon, code cleanup
1 parent c2baf4e commit 1c40b86

File tree

15 files changed

+206
-188
lines changed

15 files changed

+206
-188
lines changed

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/catalog/memory/src/catalog.rs

Lines changed: 39 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,9 @@ use iceberg::spec::{TableMetadata, TableMetadataBuilder};
2626
use iceberg::table::Table;
2727
use iceberg::{
2828
Catalog, Error, ErrorKind, Namespace, NamespaceIdent, Result, TableCommit, TableCreation,
29-
TableIdent, TableRequirement, TableUpdate,
29+
TableIdent,
3030
};
3131
use itertools::Itertools;
32-
use regex::Regex;
3332
use uuid::Uuid;
3433

3534
use crate::namespace_state::NamespaceState;
@@ -278,75 +277,50 @@ impl Catalog for MemoryCatalog {
278277
}
279278

280279
/// Update a table to the catalog.
281-
async fn update_table(&self, commit: TableCommit) -> Result<Table> {
280+
async fn update_table(&self, _commit: TableCommit) -> Result<Table> {
282281
Err(Error::new(
283282
ErrorKind::FeatureUnsupported,
284283
"MemoryCatalog does not currently support updating tables.",
285284
))
286285
}
287286

288-
async fn commit_table(&self, base: &Table, current: Table) -> Result<Table> {
289-
if base.metadata() == current.metadata() {
290-
// no change
291-
return Ok(current);
292-
}
293-
294-
let mut root_namespace_state = self.root_namespace_state.lock().await;
295-
// TODO: caller needs to retry on the error below
296-
let _ = root_namespace_state
297-
.check_metadata_location(base.identifier(), base.metadata_location())?;
298-
299-
let next_metadata_version = if let Some(base_metadata_location) = base.metadata_location() {
300-
self.parse_metadata_version(base_metadata_location) + 1
301-
} else {
302-
0
303-
};
304-
305-
// write metadata
306-
let metadata_location = format!(
307-
"{}/metadata/{}-{}.metadata.json",
308-
current.metadata().location(),
309-
next_metadata_version,
310-
Uuid::new_v4()
311-
);
312-
313-
// TODO instead of using current.metadata(), build a new metadata with some properties like last_updated_ms updated
314-
self.file_io
315-
.new_output(&metadata_location)?
316-
.write(serde_json::to_vec(current.metadata())?.into())
317-
.await?;
318-
319-
root_namespace_state
320-
.update_existing_table_location(current.identifier(), current.metadata_location())?;
321-
322-
// TODO same here, need to update the metadata location
323-
Ok(current)
324-
}
325-
}
326-
327-
// todo move this to metadata?
328-
fn parse_metadata_version(metadata_location: &str) -> Result<i32> {
329-
let pattern = r"(\d+)-([\w-]{36})(?:\.\w+)?\.metadata\.json"; // todo make this constant
330-
331-
if let Some(metadata_file_name) = metadata_location.split('/').last() {
332-
let re = Regex::new(pattern).expect("Failed to parse regex for metadata file!");
333-
if let Some(caps) = re.captures(metadata_file_name) {
334-
let metadata_version_str = &caps[1];
335-
let uuid_str = &caps[2];
336-
337-
let metadata_version = metadata_version_str
338-
.parse()
339-
.expect(format!("Invalid metadata version: {metadata_version_str}").as_str());
340-
let uuid = Uuid::parse_str(uuid_str)?;
341-
342-
return Ok(metadata_version);
343-
}
344-
}
345-
346-
Err(Error::new(
347-
ErrorKind::Unexpected,
348-
format!("Unrecognizable metadata location: {metadata_location}"),
349-
))
287+
// async fn commit_table(&self, base: &Table, current: Table) -> Result<Table> {
288+
// if base.metadata() == current.metadata() {
289+
// // no change
290+
// return Ok(current);
291+
// }
292+
//
293+
// let mut root_namespace_state = self.root_namespace_state.lock().await;
294+
// // TODO: caller needs to retry on the error below
295+
// let _ = root_namespace_state
296+
// .check_metadata_location(base.identifier(), base.metadata_location())?;
297+
//
298+
// let next_metadata_version = if let Some(base_metadata_location) = base.metadata_location() {
299+
// self.parse_metadata_version(base_metadata_location) + 1
300+
// } else {
301+
// 0
302+
// };
303+
//
304+
// // write metadata
305+
// let metadata_location = format!(
306+
// "{}/metadata/{}-{}.metadata.json",
307+
// current.metadata().location(),
308+
// next_metadata_version,
309+
// Uuid::new_v4()
310+
// );
311+
//
312+
// // TODO instead of using current.metadata(), build a new metadata with some properties like last_updated_ms updated
313+
// self.file_io
314+
// .new_output(&metadata_location)?
315+
// .write(serde_json::to_vec(current.metadata())?.into())
316+
// .await?;
317+
//
318+
// root_namespace_state
319+
// .update_existing_table_location(current.identifier(), current.metadata_location())?;
320+
//
321+
// // TODO same here, need to update the metadata location
322+
// Ok(current)
323+
// }
350324
}
351325

352326
#[cfg(test)]

crates/catalog/memory/src/namespace_state.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -293,11 +293,11 @@ impl NamespaceState {
293293
return Ok(());
294294
}
295295

296-
let mut namespace = self.get_mut_namespace(table_ident.namespace())?;
296+
let namespace = self.get_mut_namespace(table_ident.namespace())?;
297297
namespace
298298
.table_metadata_locations
299299
.entry(table_ident.name().to_string())
300-
.insert_entry(new_metadata_location.unwrap().into_string());
300+
.insert_entry(new_metadata_location.unwrap().to_string());
301301
Ok(())
302302
}
303303

crates/catalog/rest/src/catalog.rs

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2104,7 +2104,9 @@ mod tests {
21042104
.create_async()
21052105
.await;
21062106

2107-
let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
2107+
let catalog = Arc::new(RestCatalog::new(
2108+
RestCatalogConfig::builder().uri(server.url()).build(),
2109+
));
21082110

21092111
let table1 = {
21102112
let file = File::open(format!(
@@ -2125,10 +2127,10 @@ mod tests {
21252127
.unwrap()
21262128
};
21272129

2128-
let table = Transaction::new(&table1)
2130+
let table = Transaction::new(table1)
21292131
.upgrade_table_version(FormatVersion::V2)
21302132
.unwrap()
2131-
.commit(&catalog)
2133+
.commit(catalog)
21322134
.await
21332135
.unwrap();
21342136

@@ -2229,7 +2231,9 @@ mod tests {
22292231
.create_async()
22302232
.await;
22312233

2232-
let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
2234+
let catalog = Arc::new(RestCatalog::new(
2235+
RestCatalogConfig::builder().uri(server.url()).build(),
2236+
));
22332237

22342238
let table1 = {
22352239
let file = File::open(format!(
@@ -2253,7 +2257,7 @@ mod tests {
22532257
let table_result = Transaction::new(&table1)
22542258
.upgrade_table_version(FormatVersion::V2)
22552259
.unwrap()
2256-
.commit(&catalog)
2260+
.commit(catalog)
22572261
.await;
22582262

22592263
assert!(table_result.is_err());

crates/catalog/rest/tests/rest_catalog_test.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
2020
use std::collections::HashMap;
2121
use std::net::SocketAddr;
22-
use std::sync::RwLock;
22+
use std::sync::{Arc, RwLock};
2323

2424
use ctor::{ctor, dtor};
2525
use iceberg::spec::{FormatVersion, NestedField, PrimitiveType, Schema, Type};
@@ -347,10 +347,10 @@ async fn test_update_table() {
347347
);
348348

349349
// Update table by committing transaction
350-
let table2 = Transaction::new(&table)
350+
let table2 = Transaction::new(table)
351351
.set_properties(HashMap::from([("prop1".to_string(), "v1".to_string())]))
352352
.unwrap()
353-
.commit(&catalog)
353+
.commit(Arc::new(catalog))
354354
.await
355355
.unwrap();
356356

crates/iceberg/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ arrow-string = { workspace = true }
5656
async-std = { workspace = true, optional = true, features = ["attributes"] }
5757
async-trait = { workspace = true }
5858
tokio-retry2 = { version = "0.5.7" }
59+
backon = { version = "1.5.1"}
5960
base64 = { workspace = true }
6061
bimap = { workspace = true }
6162
bytes = { workspace = true }
@@ -87,6 +88,7 @@ typed-builder = { workspace = true }
8788
url = { workspace = true }
8889
uuid = { workspace = true }
8990
zstd = { workspace = true }
91+
regex = "1.11.1"
9092

9193
[dev-dependencies]
9294
ctor = { workspace = true }

crates/iceberg/src/catalog/mod.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -95,8 +95,6 @@ pub trait Catalog: Debug + Sync + Send {
9595

9696
/// Update a table to the catalog.
9797
async fn update_table(&self, commit: TableCommit) -> Result<Table>;
98-
99-
async fn commit_table(&self, base: &Table, current: Table) -> Result<Table>;
10098
}
10199

102100
/// NamespaceIdent represents the identifier of a namespace in the catalog.

crates/iceberg/src/spec/table_metadata.rs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ use std::sync::Arc;
2626

2727
use _serde::TableMetadataEnum;
2828
use chrono::{DateTime, Utc};
29+
use regex::Regex;
2930
use serde::{Deserialize, Serialize};
3031
use serde_repr::{Deserialize_repr, Serialize_repr};
3132
use uuid::Uuid;
@@ -40,6 +41,7 @@ use crate::error::{Result, timestamp_ms_to_utc};
4041
use crate::{Error, ErrorKind};
4142

4243
static MAIN_BRANCH: &str = "main";
44+
const TABLE_METADATA_FILE_NAME_REGEX: &str = r"(\d+)-([\w-]{36})(?:\.\w+)?\.metadata\.json";
4345
pub(crate) static ONE_MINUTE_MS: i64 = 60_000;
4446

4547
pub(crate) static EMPTY_SNAPSHOT_ID: i64 = -1;
@@ -432,6 +434,30 @@ impl TableMetadata {
432434
self.encryption_keys.get(key_id)
433435
}
434436

437+
/// Parse metadata version and uuid from metadata filename
438+
fn parse_metadata_filename(metadata_location: &str) -> Result<(i32, Uuid)> {
439+
if let Some(metadata_file_name) = metadata_location.split('/').last() {
440+
let re = Regex::new(TABLE_METADATA_FILE_NAME_REGEX)
441+
.expect("Failed to parse regex for metadata file!");
442+
if let Some(caps) = re.captures(metadata_file_name) {
443+
let metadata_version_str = &caps[1];
444+
let uuid_str = &caps[2];
445+
446+
let metadata_version = metadata_version_str
447+
.parse()
448+
.expect(format!("Invalid metadata version: {metadata_version_str}").as_str());
449+
let uuid = Uuid::parse_str(uuid_str)?;
450+
451+
return Ok((metadata_version, uuid));
452+
}
453+
}
454+
455+
Err(Error::new(
456+
ErrorKind::Unexpected,
457+
format!("Unrecognizable metadata location: {metadata_location}"),
458+
))
459+
}
460+
435461
/// Normalize this partition spec.
436462
///
437463
/// This is an internal method

crates/iceberg/src/transaction/action/mod.rs

Lines changed: 5 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,12 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
use std::mem::take;
1918
use std::sync::Arc;
19+
2020
use async_trait::async_trait;
21-
use crate::{Error, ErrorKind, Result, TableRequirement, TableUpdate};
21+
2222
use crate::transaction::Transaction;
23+
use crate::{Error, ErrorKind, Result, TableRequirement, TableUpdate};
2324

2425
pub type BoxedTransactionAction = Arc<dyn TransactionAction>;
2526

@@ -30,27 +31,6 @@ pub(crate) trait TransactionAction: Sync + Send {
3031
fn commit(self: Arc<Self>, tx: &mut Transaction) -> Result<()>;
3132
}
3233

33-
pub struct TransactionActionCommit {
34-
action: Option<BoxedTransactionAction>,
35-
updates: Vec<TableUpdate>,
36-
requirements: Vec<TableRequirement>,
37-
}
38-
39-
// TODO probably we don't need this?
40-
impl TransactionActionCommit {
41-
pub fn take_action(&mut self) -> Option<BoxedTransactionAction> {
42-
take(&mut self.action)
43-
}
44-
45-
pub fn take_updates(&mut self) -> Vec<TableUpdate> {
46-
take(&mut self.updates)
47-
}
48-
49-
pub fn take_requirements(&mut self) -> Vec<TableRequirement> {
50-
take(&mut self.requirements)
51-
}
52-
}
53-
5434
pub struct SetLocation {
5535
pub location: Option<String>,
5636
}
@@ -79,9 +59,9 @@ impl TransactionAction for SetLocation {
7959
"Location is not set for SetLocation!",
8060
));
8161
}
82-
62+
8363
tx.actions.push(self);
84-
64+
8565
tx.apply(updates, requirements)
8666
}
8767
}

0 commit comments

Comments
 (0)