diff --git a/crates/catalog/sql/src/catalog.rs b/crates/catalog/sql/src/catalog.rs
index 35889d451a..c1040c65a0 100644
--- a/crates/catalog/sql/src/catalog.rs
+++ b/crates/catalog/sql/src/catalog.rs
@@ -765,13 +765,30 @@ impl Catalog for SqlCatalog {
async fn register_table(
&self,
- _table_ident: &TableIdent,
- _metadata_location: String,
+ table_ident: &TableIdent,
+ metadata_location: String,
) -> Result
{
- Err(Error::new(
- ErrorKind::FeatureUnsupported,
- "Registering a table is not supported yet",
- ))
+ if self.table_exists(table_ident).await? {
+ return table_already_exists_err(table_ident);
+ }
+
+ let metadata = TableMetadata::read_from(&self.fileio, &metadata_location).await?;
+
+ let namespace = table_ident.namespace();
+ let tbl_name = table_ident.name().to_string();
+
+ self.execute(&format!(
+ "INSERT INTO {CATALOG_TABLE_NAME}
+ ({CATALOG_FIELD_CATALOG_NAME}, {CATALOG_FIELD_TABLE_NAMESPACE}, {CATALOG_FIELD_TABLE_NAME}, {CATALOG_FIELD_METADATA_LOCATION_PROP}, {CATALOG_FIELD_RECORD_TYPE})
+ VALUES (?, ?, ?, ?, ?)
+ "), vec![Some(&self.name), Some(&namespace.join(".")), Some(&tbl_name), Some(&metadata_location), Some(CATALOG_FIELD_TABLE_RECORD_TYPE)], None).await?;
+
+ Ok(Table::builder()
+ .identifier(table_ident.clone())
+ .metadata_location(metadata_location)
+ .metadata(metadata)
+ .file_io(self.fileio.clone())
+ .build()?)
}
async fn update_table(&self, _commit: TableCommit) -> Result {
@@ -1908,4 +1925,63 @@ mod tests {
"Unexpected => No such table: TableIdent { namespace: NamespaceIdent([\"a\"]), name: \"tbl1\" }"
);
}
+
+ #[tokio::test]
+ async fn test_register_table_throws_error_if_table_with_same_name_already_exists() {
+ let warehouse_loc = temp_path();
+ let catalog = new_sql_catalog(warehouse_loc.clone()).await;
+ let namespace_ident = NamespaceIdent::new("a".into());
+ create_namespace(&catalog, &namespace_ident).await;
+ let table_name = "tbl1";
+ let table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
+ create_table(&catalog, &table_ident).await;
+
+ assert_eq!(
+ catalog
+ .register_table(&table_ident, warehouse_loc)
+ .await
+ .unwrap_err()
+ .to_string(),
+ format!("Unexpected => Table {:?} already exists.", &table_ident)
+ );
+ }
+
+ #[tokio::test]
+ async fn test_register_table() {
+ let warehouse_loc = temp_path();
+ let catalog = new_sql_catalog(warehouse_loc.clone()).await;
+ let namespace_ident = NamespaceIdent::new("a".into());
+ create_namespace(&catalog, &namespace_ident).await;
+
+ let table_name = "abc";
+ let location = warehouse_loc.clone();
+ let table_creation = TableCreation::builder()
+ .name(table_name.into())
+ .location(location.clone())
+ .schema(simple_table_schema())
+ .build();
+
+ let table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
+ let expected_table = catalog
+ .create_table(&namespace_ident, table_creation)
+ .await
+ .unwrap();
+
+ let metadata_location = expected_table
+ .metadata_location()
+ .expect("Expected metadata location to be set")
+ .to_string();
+
+ assert_table_eq(&expected_table, &table_ident, &simple_table_schema());
+
+ let _ = catalog.drop_table(&table_ident).await;
+
+ let table = catalog
+ .register_table(&table_ident, metadata_location.clone())
+ .await
+ .unwrap();
+
+ assert_eq!(table.identifier(), expected_table.identifier());
+ assert_eq!(table.metadata_location(), Some(metadata_location.as_str()));
+ }
}