Skip to content

Commit d5e8348

Browse files
CTTYDerGutliurenjie1024
authored
feat(catalog): Implement update_table for MemoryCatalog (#1549)
## Which issue does this PR close? - A part of #1389 ## What changes are included in this PR? - Implemented `update_table` for `MemoryCatalog` - Cherry-picked the metadata parsing logic and some parts of `update_table` impl from #1405 - Metadata parsing logic is put under `MetadataLocationParser` - Switch to use `MetadataLocationParser` for multiple catalogs, and remove the previous metadata location generator - Fixed an issue with `TableCommit::apply` that causes it not updating `MetadataLog` ## Are these changes tested? Added unit tests --------- Co-authored-by: DerGut <[email protected]> Co-authored-by: Renjie Liu <[email protected]> Co-authored-by: Jannik Steinmann <[email protected]>
1 parent b3ea8d1 commit d5e8348

File tree

7 files changed

+471
-47
lines changed

7 files changed

+471
-47
lines changed

crates/iceberg/src/catalog/memory/catalog.rs

Lines changed: 156 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -20,17 +20,16 @@
2020
use std::collections::HashMap;
2121

2222
use async_trait::async_trait;
23-
use futures::lock::Mutex;
23+
use futures::lock::{Mutex, MutexGuard};
2424
use itertools::Itertools;
25-
use uuid::Uuid;
2625

2726
use super::namespace_state::NamespaceState;
2827
use crate::io::FileIO;
2928
use crate::spec::{TableMetadata, TableMetadataBuilder};
3029
use crate::table::Table;
3130
use crate::{
32-
Catalog, Error, ErrorKind, Namespace, NamespaceIdent, Result, TableCommit, TableCreation,
33-
TableIdent,
31+
Catalog, Error, ErrorKind, MetadataLocation, Namespace, NamespaceIdent, Result, TableCommit,
32+
TableCreation, TableIdent,
3433
};
3534

3635
/// namespace `location` property
@@ -45,14 +44,31 @@ pub struct MemoryCatalog {
4544
}
4645

4746
impl MemoryCatalog {
48-
/// Creates an memory catalog.
47+
/// Creates a memory catalog.
4948
pub fn new(file_io: FileIO, warehouse_location: Option<String>) -> Self {
5049
Self {
5150
root_namespace_state: Mutex::new(NamespaceState::default()),
5251
file_io,
5352
warehouse_location,
5453
}
5554
}
55+
56+
/// Loads a table from the locked namespace state.
57+
async fn load_table_from_locked_state(
58+
&self,
59+
table_ident: &TableIdent,
60+
root_namespace_state: &MutexGuard<'_, NamespaceState>,
61+
) -> Result<Table> {
62+
let metadata_location = root_namespace_state.get_existing_table_location(table_ident)?;
63+
let metadata = TableMetadata::read_from(&self.file_io, metadata_location).await?;
64+
65+
Table::builder()
66+
.identifier(table_ident.clone())
67+
.metadata(metadata)
68+
.metadata_location(metadata_location.to_string())
69+
.file_io(self.file_io.clone())
70+
.build()
71+
}
5672
}
5773

5874
#[async_trait]
@@ -203,12 +219,7 @@ impl Catalog for MemoryCatalog {
203219
let metadata = TableMetadataBuilder::from_table_creation(table_creation)?
204220
.build()?
205221
.metadata;
206-
let metadata_location = format!(
207-
"{}/metadata/{}-{}.metadata.json",
208-
&location,
209-
0,
210-
Uuid::new_v4()
211-
);
222+
let metadata_location = MetadataLocation::new_with_table_location(location).to_string();
212223

213224
metadata.write_to(&self.file_io, &metadata_location).await?;
214225

@@ -226,15 +237,8 @@ impl Catalog for MemoryCatalog {
226237
async fn load_table(&self, table_ident: &TableIdent) -> Result<Table> {
227238
let root_namespace_state = self.root_namespace_state.lock().await;
228239

229-
let metadata_location = root_namespace_state.get_existing_table_location(table_ident)?;
230-
let metadata = TableMetadata::read_from(&self.file_io, metadata_location).await?;
231-
232-
Table::builder()
233-
.file_io(self.file_io.clone())
234-
.metadata_location(metadata_location.clone())
235-
.metadata(metadata)
236-
.identifier(table_ident.clone())
237-
.build()
240+
self.load_table_from_locked_state(table_ident, &root_namespace_state)
241+
.await
238242
}
239243

240244
/// Drop a table from the catalog.
@@ -289,12 +293,30 @@ impl Catalog for MemoryCatalog {
289293
.build()
290294
}
291295

292-
/// Update a table to the catalog.
293-
async fn update_table(&self, _commit: TableCommit) -> Result<Table> {
294-
Err(Error::new(
295-
ErrorKind::FeatureUnsupported,
296-
"MemoryCatalog does not currently support updating tables.",
297-
))
296+
/// Update a table in the catalog.
297+
async fn update_table(&self, commit: TableCommit) -> Result<Table> {
298+
let mut root_namespace_state = self.root_namespace_state.lock().await;
299+
300+
let current_table = self
301+
.load_table_from_locked_state(commit.identifier(), &root_namespace_state)
302+
.await?;
303+
304+
// Apply TableCommit to get staged table
305+
let staged_table = commit.apply(current_table)?;
306+
307+
// Write table metadata to the new location
308+
staged_table
309+
.metadata()
310+
.write_to(
311+
staged_table.file_io(),
312+
staged_table.metadata_location_result()?,
313+
)
314+
.await?;
315+
316+
// Flip the pointer to reference the new metadata file.
317+
let updated_table = root_namespace_state.commit_table_update(staged_table)?;
318+
319+
Ok(updated_table)
298320
}
299321
}
300322

@@ -303,13 +325,15 @@ mod tests {
303325
use std::collections::HashSet;
304326
use std::hash::Hash;
305327
use std::iter::FromIterator;
328+
use std::vec;
306329

307330
use regex::Regex;
308331
use tempfile::TempDir;
309332

310333
use super::*;
311334
use crate::io::FileIOBuilder;
312335
use crate::spec::{NestedField, PartitionSpec, PrimitiveType, Schema, SortOrder, Type};
336+
use crate::transaction::{ApplyTransactionAction, Transaction};
313337

314338
fn temp_path() -> String {
315339
let temp_dir = TempDir::new().unwrap();
@@ -335,7 +359,7 @@ mod tests {
335359
}
336360
}
337361

338-
fn to_set<T: std::cmp::Eq + Hash>(vec: Vec<T>) -> HashSet<T> {
362+
fn to_set<T: Eq + Hash>(vec: Vec<T>) -> HashSet<T> {
339363
HashSet::from_iter(vec)
340364
}
341365

@@ -348,8 +372,8 @@ mod tests {
348372
.unwrap()
349373
}
350374

351-
async fn create_table<C: Catalog>(catalog: &C, table_ident: &TableIdent) {
352-
let _ = catalog
375+
async fn create_table<C: Catalog>(catalog: &C, table_ident: &TableIdent) -> Table {
376+
catalog
353377
.create_table(
354378
&table_ident.namespace,
355379
TableCreation::builder()
@@ -358,7 +382,7 @@ mod tests {
358382
.build(),
359383
)
360384
.await
361-
.unwrap();
385+
.unwrap()
362386
}
363387

364388
async fn create_tables<C: Catalog>(catalog: &C, table_idents: Vec<&TableIdent>) {
@@ -367,6 +391,14 @@ mod tests {
367391
}
368392
}
369393

394+
async fn create_table_with_namespace<C: Catalog>(catalog: &C) -> Table {
395+
let namespace_ident = NamespaceIdent::new("abc".into());
396+
create_namespace(catalog, &namespace_ident).await;
397+
398+
let table_ident = TableIdent::new(namespace_ident, "test".to_string());
399+
create_table(catalog, &table_ident).await
400+
}
401+
370402
fn assert_table_eq(table: &Table, expected_table_ident: &TableIdent, expected_schema: &Schema) {
371403
assert_eq!(table.identifier(), expected_table_ident);
372404

@@ -411,7 +443,12 @@ mod tests {
411443
fn assert_table_metadata_location_matches(table: &Table, regex_str: &str) {
412444
let actual = table.metadata_location().unwrap().to_string();
413445
let regex = Regex::new(regex_str).unwrap();
414-
assert!(regex.is_match(&actual))
446+
assert!(
447+
regex.is_match(&actual),
448+
"Expected metadata location to match regex, but got location: {} and regex: {}",
449+
actual,
450+
regex
451+
)
415452
}
416453

417454
#[tokio::test]
@@ -1063,7 +1100,7 @@ mod tests {
10631100
let table_name = "tbl1";
10641101
let expected_table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
10651102
let expected_table_metadata_location_regex = format!(
1066-
"^{}/tbl1/metadata/0-{}.metadata.json$",
1103+
"^{}/tbl1/metadata/00000-{}.metadata.json$",
10671104
namespace_location, UUID_REGEX_STR,
10681105
);
10691106

@@ -1116,7 +1153,7 @@ mod tests {
11161153
let expected_table_ident =
11171154
TableIdent::new(nested_namespace_ident.clone(), table_name.into());
11181155
let expected_table_metadata_location_regex = format!(
1119-
"^{}/tbl1/metadata/0-{}.metadata.json$",
1156+
"^{}/tbl1/metadata/00000-{}.metadata.json$",
11201157
nested_namespace_location, UUID_REGEX_STR,
11211158
);
11221159

@@ -1157,7 +1194,7 @@ mod tests {
11571194
let table_name = "tbl1";
11581195
let expected_table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
11591196
let expected_table_metadata_location_regex = format!(
1160-
"^{}/a/tbl1/metadata/0-{}.metadata.json$",
1197+
"^{}/a/tbl1/metadata/00000-{}.metadata.json$",
11611198
warehouse_location, UUID_REGEX_STR
11621199
);
11631200

@@ -1205,7 +1242,7 @@ mod tests {
12051242
let expected_table_ident =
12061243
TableIdent::new(nested_namespace_ident.clone(), table_name.into());
12071244
let expected_table_metadata_location_regex = format!(
1208-
"^{}/a/b/tbl1/metadata/0-{}.metadata.json$",
1245+
"^{}/a/b/tbl1/metadata/00000-{}.metadata.json$",
12091246
warehouse_location, UUID_REGEX_STR
12101247
);
12111248

@@ -1705,7 +1742,7 @@ mod tests {
17051742
.unwrap_err()
17061743
.to_string(),
17071744
format!(
1708-
"TableAlreadyExists => Cannot create table {:? }. Table already exists.",
1745+
"TableAlreadyExists => Cannot create table {:?}. Table already exists.",
17091746
&dst_table_ident
17101747
),
17111748
);
@@ -1754,4 +1791,87 @@ mod tests {
17541791
metadata_location
17551792
);
17561793
}
1794+
1795+
#[tokio::test]
1796+
async fn test_update_table() {
1797+
let catalog = new_memory_catalog();
1798+
1799+
let table = create_table_with_namespace(&catalog).await;
1800+
1801+
// Assert the table doesn't contain the update yet
1802+
assert!(!table.metadata().properties().contains_key("key"));
1803+
1804+
// Update table metadata
1805+
let tx = Transaction::new(&table);
1806+
let updated_table = tx
1807+
.update_table_properties()
1808+
.set("key".to_string(), "value".to_string())
1809+
.apply(tx)
1810+
.unwrap()
1811+
.commit(&catalog)
1812+
.await
1813+
.unwrap();
1814+
1815+
assert_eq!(
1816+
updated_table.metadata().properties().get("key").unwrap(),
1817+
"value"
1818+
);
1819+
1820+
assert_eq!(table.identifier(), updated_table.identifier());
1821+
assert_eq!(table.metadata().uuid(), updated_table.metadata().uuid());
1822+
assert!(table.metadata().last_updated_ms() < updated_table.metadata().last_updated_ms());
1823+
assert_ne!(table.metadata_location(), updated_table.metadata_location());
1824+
1825+
assert!(
1826+
table.metadata().metadata_log().len() < updated_table.metadata().metadata_log().len()
1827+
);
1828+
}
1829+
1830+
#[tokio::test]
1831+
async fn test_update_table_fails_if_table_doesnt_exist() {
1832+
let catalog = new_memory_catalog();
1833+
1834+
let namespace_ident = NamespaceIdent::new("a".into());
1835+
create_namespace(&catalog, &namespace_ident).await;
1836+
1837+
// This table is not known to the catalog.
1838+
let table_ident = TableIdent::new(namespace_ident, "test".to_string());
1839+
let table = build_table(table_ident);
1840+
1841+
let tx = Transaction::new(&table);
1842+
let err = tx
1843+
.update_table_properties()
1844+
.set("key".to_string(), "value".to_string())
1845+
.apply(tx)
1846+
.unwrap()
1847+
.commit(&catalog)
1848+
.await
1849+
.unwrap_err();
1850+
assert_eq!(err.kind(), ErrorKind::TableNotFound);
1851+
}
1852+
1853+
fn build_table(ident: TableIdent) -> Table {
1854+
let file_io = FileIOBuilder::new_fs_io().build().unwrap();
1855+
1856+
let temp_dir = TempDir::new().unwrap();
1857+
let location = temp_dir.path().to_str().unwrap().to_string();
1858+
1859+
let table_creation = TableCreation::builder()
1860+
.name(ident.name().to_string())
1861+
.schema(simple_table_schema())
1862+
.location(location)
1863+
.build();
1864+
let metadata = TableMetadataBuilder::from_table_creation(table_creation)
1865+
.unwrap()
1866+
.build()
1867+
.unwrap()
1868+
.metadata;
1869+
1870+
Table::builder()
1871+
.identifier(ident)
1872+
.metadata(metadata)
1873+
.file_io(file_io)
1874+
.build()
1875+
.unwrap()
1876+
}
17571877
}

crates/iceberg/src/catalog/memory/namespace_state.rs

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use std::collections::{HashMap, hash_map};
1919

2020
use itertools::Itertools;
2121

22+
use crate::table::Table;
2223
use crate::{Error, ErrorKind, NamespaceIdent, Result, TableIdent};
2324

2425
// Represents the state of a namespace
@@ -259,7 +260,7 @@ impl NamespaceState {
259260

260261
match namespace.table_metadata_locations.get(table_ident.name()) {
261262
None => no_such_table_err(table_ident),
262-
Some(table_metadadata_location) => Ok(table_metadadata_location),
263+
Some(table_metadata_location) => Ok(table_metadata_location),
263264
}
264265
}
265266

@@ -296,4 +297,22 @@ impl NamespaceState {
296297
Some(metadata_location) => Ok(metadata_location),
297298
}
298299
}
300+
301+
/// Updates the metadata location of the given table or returns an error if it doesn't exist
302+
pub(crate) fn commit_table_update(&mut self, staged_table: Table) -> Result<Table> {
303+
let namespace = self.get_mut_namespace(staged_table.identifier().namespace())?;
304+
305+
let _ = namespace
306+
.table_metadata_locations
307+
.insert(
308+
staged_table.identifier().name().to_string(),
309+
staged_table.metadata_location_result()?.to_string(),
310+
)
311+
.ok_or(Error::new(
312+
ErrorKind::TableNotFound,
313+
format!("No such table: {:?}", staged_table.identifier()),
314+
))?;
315+
316+
Ok(staged_table)
317+
}
299318
}

0 commit comments

Comments
 (0)