Skip to content

Commit 84a2c04

Browse files
authored
Support CreateTableTransaction for HiveCatalog (#683)
1 parent 0339e7f commit 84a2c04

File tree

3 files changed

+104
-75
lines changed

3 files changed

+104
-75
lines changed

pyiceberg/catalog/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -761,7 +761,7 @@ def _create_staged_table(
761761
metadata = new_table_metadata(
762762
location=location, schema=schema, partition_spec=partition_spec, sort_order=sort_order, properties=properties
763763
)
764-
io = load_file_io(properties=self.properties, location=metadata_location)
764+
io = self._load_file_io(properties=properties, location=metadata_location)
765765
return StagedTable(
766766
identifier=(self.name, database_name, table_name),
767767
metadata=metadata,

pyiceberg/catalog/hive.py

Lines changed: 94 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -70,23 +70,22 @@
7070
NamespaceNotEmptyError,
7171
NoSuchIcebergTableError,
7272
NoSuchNamespaceError,
73+
NoSuchPropertyException,
7374
NoSuchTableError,
7475
TableAlreadyExistsError,
7576
WaitingForLockException,
7677
)
77-
from pyiceberg.io import FileIO, load_file_io
7878
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
7979
from pyiceberg.schema import Schema, SchemaVisitor, visit
8080
from pyiceberg.serializers import FromInputFile
8181
from pyiceberg.table import (
8282
CommitTableRequest,
8383
CommitTableResponse,
8484
PropertyUtil,
85+
StagedTable,
8586
Table,
8687
TableProperties,
87-
update_table_metadata,
8888
)
89-
from pyiceberg.table.metadata import new_table_metadata
9089
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
9190
from pyiceberg.typedef import EMPTY_DICT, Identifier, Properties
9291
from pyiceberg.types import (
@@ -272,10 +271,12 @@ def __init__(self, name: str, **properties: str):
272271
DEFAULT_LOCK_CHECK_RETRIES,
273272
)
274273

275-
def _convert_hive_into_iceberg(self, table: HiveTable, io: FileIO) -> Table:
274+
def _convert_hive_into_iceberg(self, table: HiveTable) -> Table:
276275
properties: Dict[str, str] = table.parameters
277276
if TABLE_TYPE not in properties:
278-
raise NoSuchTableError(f"Property table_type missing, could not determine type: {table.dbName}.{table.tableName}")
277+
raise NoSuchPropertyException(
278+
f"Property table_type missing, could not determine type: {table.dbName}.{table.tableName}"
279+
)
279280

280281
table_type = properties[TABLE_TYPE]
281282
if table_type.lower() != ICEBERG:
@@ -286,8 +287,9 @@ def _convert_hive_into_iceberg(self, table: HiveTable, io: FileIO) -> Table:
286287
if prop_metadata_location := properties.get(METADATA_LOCATION):
287288
metadata_location = prop_metadata_location
288289
else:
289-
raise NoSuchTableError(f"Table property {METADATA_LOCATION} is missing")
290+
raise NoSuchPropertyException(f"Table property {METADATA_LOCATION} is missing")
290291

292+
io = self._load_file_io(location=metadata_location)
291293
file = io.new_input(metadata_location)
292294
metadata = FromInputFile.table_metadata(file)
293295
return Table(
@@ -298,6 +300,38 @@ def _convert_hive_into_iceberg(self, table: HiveTable, io: FileIO) -> Table:
298300
catalog=self,
299301
)
300302

303+
def _convert_iceberg_into_hive(self, table: Table) -> HiveTable:
304+
identifier_tuple = self.identifier_to_tuple_without_catalog(table.identifier)
305+
database_name, table_name = self.identifier_to_database_and_table(identifier_tuple, NoSuchTableError)
306+
current_time_millis = int(time.time() * 1000)
307+
308+
return HiveTable(
309+
dbName=database_name,
310+
tableName=table_name,
311+
owner=table.properties[OWNER] if table.properties and OWNER in table.properties else getpass.getuser(),
312+
createTime=current_time_millis // 1000,
313+
lastAccessTime=current_time_millis // 1000,
314+
sd=_construct_hive_storage_descriptor(
315+
table.schema(),
316+
table.location(),
317+
PropertyUtil.property_as_bool(self.properties, HIVE2_COMPATIBLE, HIVE2_COMPATIBLE_DEFAULT),
318+
),
319+
tableType=EXTERNAL_TABLE,
320+
parameters=_construct_parameters(table.metadata_location),
321+
)
322+
323+
def _create_hive_table(self, open_client: Client, hive_table: HiveTable) -> None:
324+
try:
325+
open_client.create_table(hive_table)
326+
except AlreadyExistsException as e:
327+
raise TableAlreadyExistsError(f"Table {hive_table.dbName}.{hive_table.tableName} already exists") from e
328+
329+
def _get_hive_table(self, open_client: Client, database_name: str, table_name: str) -> HiveTable:
330+
try:
331+
return open_client.get_table(dbname=database_name, tbl_name=table_name)
332+
except NoSuchObjectException as e:
333+
raise NoSuchTableError(f"Table does not exists: {table_name}") from e
334+
301335
def create_table(
302336
self,
303337
identifier: Union[str, Identifier],
@@ -324,45 +358,25 @@ def create_table(
324358
AlreadyExistsError: If a table with the name already exists.
325359
ValueError: If the identifier is invalid.
326360
"""
327-
schema: Schema = self._convert_schema_if_needed(schema) # type: ignore
328-
329361
properties = {**DEFAULT_PROPERTIES, **properties}
330-
database_name, table_name = self.identifier_to_database_and_table(identifier)
331-
current_time_millis = int(time.time() * 1000)
332-
333-
location = self._resolve_table_location(location, database_name, table_name)
334-
335-
metadata_location = self._get_metadata_location(location=location)
336-
metadata = new_table_metadata(
337-
location=location,
362+
staged_table = self._create_staged_table(
363+
identifier=identifier,
338364
schema=schema,
365+
location=location,
339366
partition_spec=partition_spec,
340367
sort_order=sort_order,
341368
properties=properties,
342369
)
343-
io = load_file_io({**self.properties, **properties}, location=location)
344-
self._write_metadata(metadata, io, metadata_location)
370+
database_name, table_name = self.identifier_to_database_and_table(identifier)
345371

346-
tbl = HiveTable(
347-
dbName=database_name,
348-
tableName=table_name,
349-
owner=properties[OWNER] if properties and OWNER in properties else getpass.getuser(),
350-
createTime=current_time_millis // 1000,
351-
lastAccessTime=current_time_millis // 1000,
352-
sd=_construct_hive_storage_descriptor(
353-
schema, location, PropertyUtil.property_as_bool(self.properties, HIVE2_COMPATIBLE, HIVE2_COMPATIBLE_DEFAULT)
354-
),
355-
tableType=EXTERNAL_TABLE,
356-
parameters=_construct_parameters(metadata_location),
357-
)
358-
try:
359-
with self._client as open_client:
360-
open_client.create_table(tbl)
361-
hive_table = open_client.get_table(dbname=database_name, tbl_name=table_name)
362-
except AlreadyExistsException as e:
363-
raise TableAlreadyExistsError(f"Table {database_name}.{table_name} already exists") from e
372+
self._write_metadata(staged_table.metadata, staged_table.io, staged_table.metadata_location)
373+
tbl = self._convert_iceberg_into_hive(staged_table)
374+
375+
with self._client as open_client:
376+
self._create_hive_table(open_client, tbl)
377+
hive_table = open_client.get_table(dbname=database_name, tbl_name=table_name)
364378

365-
return self._convert_hive_into_iceberg(hive_table, io)
379+
return self._convert_hive_into_iceberg(hive_table)
366380

367381
def register_table(self, identifier: Union[str, Identifier], metadata_location: str) -> Table:
368382
"""Register a new table using existing metadata.
@@ -437,36 +451,52 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons
437451
else:
438452
raise CommitFailedException(f"Failed to acquire lock for {table_request.identifier}, state: {lock.state}")
439453

440-
hive_table = open_client.get_table(dbname=database_name, tbl_name=table_name)
441-
io = load_file_io({**self.properties, **hive_table.parameters}, hive_table.sd.location)
442-
current_table = self._convert_hive_into_iceberg(hive_table, io)
443-
444-
base_metadata = current_table.metadata
445-
for requirement in table_request.requirements:
446-
requirement.validate(base_metadata)
447-
448-
updated_metadata = update_table_metadata(base_metadata, table_request.updates)
449-
if updated_metadata == base_metadata:
454+
hive_table: Optional[HiveTable]
455+
current_table: Optional[Table]
456+
try:
457+
hive_table = self._get_hive_table(open_client, database_name, table_name)
458+
current_table = self._convert_hive_into_iceberg(hive_table)
459+
except NoSuchTableError:
460+
hive_table = None
461+
current_table = None
462+
463+
updated_staged_table = self._update_and_stage_table(current_table, table_request)
464+
if current_table and updated_staged_table.metadata == current_table.metadata:
450465
# no changes, do nothing
451-
return CommitTableResponse(metadata=base_metadata, metadata_location=current_table.metadata_location)
452-
453-
# write new metadata
454-
new_metadata_version = self._parse_metadata_version(current_table.metadata_location) + 1
455-
new_metadata_location = self._get_metadata_location(current_table.metadata.location, new_metadata_version)
456-
self._write_metadata(updated_metadata, current_table.io, new_metadata_location)
457-
458-
hive_table.parameters = _construct_parameters(
459-
metadata_location=new_metadata_location, previous_metadata_location=current_table.metadata_location
466+
return CommitTableResponse(metadata=current_table.metadata, metadata_location=current_table.metadata_location)
467+
self._write_metadata(
468+
metadata=updated_staged_table.metadata,
469+
io=updated_staged_table.io,
470+
metadata_path=updated_staged_table.metadata_location,
460471
)
461-
open_client.alter_table(dbname=database_name, tbl_name=table_name, new_tbl=hive_table)
462-
except NoSuchObjectException as e:
463-
raise NoSuchTableError(f"Table does not exist: {table_name}") from e
472+
473+
if hive_table and current_table:
474+
# Table exists, update it.
475+
hive_table.parameters = _construct_parameters(
476+
metadata_location=updated_staged_table.metadata_location,
477+
previous_metadata_location=current_table.metadata_location,
478+
)
479+
open_client.alter_table(dbname=database_name, tbl_name=table_name, new_tbl=hive_table)
480+
else:
481+
# Table does not exist, create it.
482+
hive_table = self._convert_iceberg_into_hive(
483+
StagedTable(
484+
identifier=(self.name, database_name, table_name),
485+
metadata=updated_staged_table.metadata,
486+
metadata_location=updated_staged_table.metadata_location,
487+
io=updated_staged_table.io,
488+
catalog=self,
489+
)
490+
)
491+
self._create_hive_table(open_client, hive_table)
464492
except WaitingForLockException as e:
465493
raise CommitFailedException(f"Failed to acquire lock for {table_request.identifier}, state: {lock.state}") from e
466494
finally:
467495
open_client.unlock(UnlockRequest(lockid=lock.lockid))
468496

469-
return CommitTableResponse(metadata=updated_metadata, metadata_location=new_metadata_location)
497+
return CommitTableResponse(
498+
metadata=updated_staged_table.metadata, metadata_location=updated_staged_table.metadata_location
499+
)
470500

471501
def load_table(self, identifier: Union[str, Identifier]) -> Table:
472502
"""Load the table's metadata and return the table instance.
@@ -485,14 +515,11 @@ def load_table(self, identifier: Union[str, Identifier]) -> Table:
485515
"""
486516
identifier_tuple = self.identifier_to_tuple_without_catalog(identifier)
487517
database_name, table_name = self.identifier_to_database_and_table(identifier_tuple, NoSuchTableError)
488-
try:
489-
with self._client as open_client:
490-
hive_table = open_client.get_table(dbname=database_name, tbl_name=table_name)
491-
except NoSuchObjectException as e:
492-
raise NoSuchTableError(f"Table does not exists: {table_name}") from e
493518

494-
io = load_file_io({**self.properties, **hive_table.parameters}, hive_table.sd.location)
495-
return self._convert_hive_into_iceberg(hive_table, io)
519+
with self._client as open_client:
520+
hive_table = self._get_hive_table(open_client, database_name, table_name)
521+
522+
return self._convert_hive_into_iceberg(hive_table)
496523

497524
def drop_table(self, identifier: Union[str, Identifier]) -> None:
498525
"""Drop a table.

tests/integration/test_writes/test_writes.py

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434

3535
from pyiceberg.catalog import Catalog
3636
from pyiceberg.catalog.hive import HiveCatalog
37+
from pyiceberg.catalog.rest import RestCatalog
3738
from pyiceberg.catalog.sql import SqlCatalog
3839
from pyiceberg.exceptions import NoSuchTableError
3940
from pyiceberg.partitioning import PartitionField, PartitionSpec
@@ -637,17 +638,18 @@ def test_write_and_evolve(session_catalog: Catalog, format_version: int) -> None
637638

638639

639640
@pytest.mark.integration
640-
@pytest.mark.parametrize("format_version", [2])
641-
def test_create_table_transaction(session_catalog: Catalog, format_version: int) -> None:
642-
if format_version == 1:
641+
@pytest.mark.parametrize("format_version", [1, 2])
642+
@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")])
643+
def test_create_table_transaction(catalog: Catalog, format_version: int) -> None:
644+
if format_version == 1 and isinstance(catalog, RestCatalog):
643645
pytest.skip(
644646
"There is a bug in the REST catalog (maybe server side) that prevents create and commit a staged version 1 table"
645647
)
646648

647-
identifier = f"default.arrow_create_table_transaction{format_version}"
649+
identifier = f"default.arrow_create_table_transaction_{catalog.name}_{format_version}"
648650

649651
try:
650-
session_catalog.drop_table(identifier=identifier)
652+
catalog.drop_table(identifier=identifier)
651653
except NoSuchTableError:
652654
pass
653655

@@ -669,7 +671,7 @@ def test_create_table_transaction(session_catalog: Catalog, format_version: int)
669671
]),
670672
)
671673

672-
with session_catalog.create_table_transaction(
674+
with catalog.create_table_transaction(
673675
identifier=identifier, schema=pa_table.schema, properties={"format-version": str(format_version)}
674676
) as txn:
675677
with txn.update_snapshot().fast_append() as snapshot_update:
@@ -685,7 +687,7 @@ def test_create_table_transaction(session_catalog: Catalog, format_version: int)
685687
):
686688
snapshot_update.append_data_file(data_file)
687689

688-
tbl = session_catalog.load_table(identifier=identifier)
690+
tbl = catalog.load_table(identifier=identifier)
689691
assert tbl.format_version == format_version
690692
assert len(tbl.scan().to_arrow()) == 6
691693

0 commit comments

Comments
 (0)