Skip to content

Commit b3fcdcf

Browse files
committed
Merge branch 'main' of github.com:apache/iceberg-python into fd-add-ability-to-delete-full-data-files
2 parents 5adf3f0 + b8023d2 commit b3fcdcf

22 files changed

+612
-706
lines changed

.asf.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ github:
4545
collaborators: # Note: the number of collaborators is limited to 10
4646
- ajantha-bhat
4747
- syun64
48+
- kevinjqliu
4849
ghp_branch: gh-pages
4950
ghp_path: /
5051

.github/workflows/python-release.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ jobs:
5959
if: startsWith(matrix.os, 'ubuntu')
6060

6161
- name: Build wheels
62-
uses: pypa/cibuildwheel@v2.17.0
62+
uses: pypa/cibuildwheel@v2.18.1
6363
with:
6464
output-dir: wheelhouse
6565
config-file: "pyproject.toml"

mkdocs/docs/api.md

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -606,6 +606,56 @@ min_snapshots_to_keep: [[null,10]]
606606
max_snapshot_age_in_ms: [[null,604800000]]
607607
```
608608

609+
### Manifests
610+
611+
To show a table's current file manifests:
612+
613+
```python
614+
table.inspect.manifests()
615+
```
616+
617+
```
618+
pyarrow.Table
619+
content: int8 not null
620+
path: string not null
621+
length: int64 not null
622+
partition_spec_id: int32 not null
623+
added_snapshot_id: int64 not null
624+
added_data_files_count: int32 not null
625+
existing_data_files_count: int32 not null
626+
deleted_data_files_count: int32 not null
627+
added_delete_files_count: int32 not null
628+
existing_delete_files_count: int32 not null
629+
deleted_delete_files_count: int32 not null
630+
partition_summaries: list<item: struct<contains_null: bool not null, contains_nan: bool, lower_bound: string, upper_bound: string>> not null
631+
child 0, item: struct<contains_null: bool not null, contains_nan: bool, lower_bound: string, upper_bound: string>
632+
child 0, contains_null: bool not null
633+
child 1, contains_nan: bool
634+
child 2, lower_bound: string
635+
child 3, upper_bound: string
636+
----
637+
content: [[0]]
638+
path: [["s3://warehouse/default/table_metadata_manifests/metadata/3bf5b4c6-a7a4-4b43-a6ce-ca2b4887945a-m0.avro"]]
639+
length: [[6886]]
640+
partition_spec_id: [[0]]
641+
added_snapshot_id: [[3815834705531553721]]
642+
added_data_files_count: [[1]]
643+
existing_data_files_count: [[0]]
644+
deleted_data_files_count: [[0]]
645+
added_delete_files_count: [[0]]
646+
existing_delete_files_count: [[0]]
647+
deleted_delete_files_count: [[0]]
648+
partition_summaries: [[ -- is_valid: all not null
649+
-- child 0 type: bool
650+
[false]
651+
-- child 1 type: bool
652+
[false]
653+
-- child 2 type: string
654+
["test"]
655+
-- child 3 type: string
656+
["test"]]]
657+
```
658+
609659
## Add Files
610660

611661
Expert Iceberg users may choose to commit existing parquet files to the Iceberg table as data files, without rewriting them.

mkdocs/docs/configuration.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -298,4 +298,4 @@ PyIceberg uses multiple threads to parallelize operations. The number of workers
298298

299299
# Backward Compatibility
300300

301-
Previous versions of Java (`<1.4.0`) implementations incorrectly assume the optional attribute `current-snapshot-id` to be a required attribute in TableMetadata. This means that if `current-snapshot-id` is missing in the metadata file (e.g. on table creation), the application will throw an exception without being able to load the table. This assumption has been corrected in more recent Iceberg versions. However, it is possible to force PyIceberg to create a table with a metadata file that will be compatible with previous versions. This can be configured by setting the `legacy-current-snapshot-id` entry as "True" in the configuration file, or by setting the `LEGACY_CURRENT_SNAPSHOT_ID` environment variable. Refer to the [PR discussion](https://github.com/apache/iceberg-python/pull/473) for more details on the issue
301+
Previous versions of Java (`<1.4.0`) implementations incorrectly assume the optional attribute `current-snapshot-id` to be a required attribute in TableMetadata. This means that if `current-snapshot-id` is missing in the metadata file (e.g. on table creation), the application will throw an exception without being able to load the table. This assumption has been corrected in more recent Iceberg versions. However, it is possible to force PyIceberg to create a table with a metadata file that will be compatible with previous versions. This can be configured by setting the `legacy-current-snapshot-id` entry as "True" in the configuration file, or by setting the `PYICEBERG_LEGACY_CURRENT_SNAPSHOT_ID` environment variable. Refer to the [PR discussion](https://github.com/apache/iceberg-python/pull/473) for more details on the issue

mkdocs/requirements.txt

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,13 @@
1616
# under the License.
1717

1818
mkdocs==1.6.0
19-
griffe==0.44.0
19+
griffe==0.45.2
2020
jinja2==3.1.4
2121
mkdocstrings==0.25.1
22-
mkdocstrings-python==1.10.0
22+
mkdocstrings-python==1.10.3
2323
mkdocs-literate-nav==0.6.1
2424
mkdocs-autorefs==1.0.1
2525
mkdocs-gen-files==0.5.0
26-
mkdocs-material==9.5.21
26+
mkdocs-material==9.5.24
2727
mkdocs-material-extensions==1.3.1
2828
mkdocs-section-index==0.3.9

poetry.lock

Lines changed: 128 additions & 128 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyiceberg/catalog/__init__.py

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,13 @@
3636
cast,
3737
)
3838

39-
from pyiceberg.exceptions import NoSuchNamespaceError, NoSuchTableError, NotInstalledError, TableAlreadyExistsError
39+
from pyiceberg.exceptions import (
40+
NamespaceAlreadyExistsError,
41+
NoSuchNamespaceError,
42+
NoSuchTableError,
43+
NotInstalledError,
44+
TableAlreadyExistsError,
45+
)
4046
from pyiceberg.io import FileIO, load_file_io
4147
from pyiceberg.manifest import ManifestFile
4248
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
@@ -477,6 +483,18 @@ def create_namespace(self, namespace: Union[str, Identifier], properties: Proper
477483
NamespaceAlreadyExistsError: If a namespace with the given name already exists.
478484
"""
479485

486+
def create_namespace_if_not_exists(self, namespace: Union[str, Identifier], properties: Properties = EMPTY_DICT) -> None:
487+
"""Create a namespace if it does not exist.
488+
489+
Args:
490+
namespace (str | Identifier): Namespace identifier.
491+
properties (Properties): A string dictionary of properties for the given namespace.
492+
"""
493+
try:
494+
self.create_namespace(namespace, properties)
495+
except NamespaceAlreadyExistsError:
496+
pass
497+
480498
@abstractmethod
481499
def drop_namespace(self, namespace: Union[str, Identifier]) -> None:
482500
"""Drop a namespace.

pyiceberg/catalog/glue.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -417,7 +417,14 @@ def register_table(self, identifier: Union[str, Identifier], metadata_location:
417417
Raises:
418418
TableAlreadyExistsError: If the table already exists
419419
"""
420-
raise NotImplementedError
420+
database_name, table_name = self.identifier_to_database_and_table(identifier)
421+
properties = EMPTY_DICT
422+
io = self._load_file_io(location=metadata_location)
423+
file = io.new_input(metadata_location)
424+
metadata = FromInputFile.table_metadata(file)
425+
table_input = _construct_table_input(table_name, metadata_location, properties, metadata)
426+
self._create_glue_table(database_name=database_name, table_name=table_name, table_input=table_input)
427+
return self.load_table(identifier=identifier)
421428

422429
def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse:
423430
"""Update the table.

pyiceberg/catalog/hive.py

Lines changed: 59 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
# specific language governing permissions and limitations
1616
# under the License.
1717
import getpass
18+
import logging
1819
import socket
1920
import time
2021
from types import TracebackType
@@ -33,6 +34,7 @@
3334
from hive_metastore.ThriftHiveMetastore import Client
3435
from hive_metastore.ttypes import (
3536
AlreadyExistsException,
37+
CheckLockRequest,
3638
FieldSchema,
3739
InvalidOperationException,
3840
LockComponent,
@@ -49,6 +51,7 @@
4951
)
5052
from hive_metastore.ttypes import Database as HiveDatabase
5153
from hive_metastore.ttypes import Table as HiveTable
54+
from tenacity import retry, retry_if_exception_type, stop_after_attempt, wait_exponential
5255
from thrift.protocol import TBinaryProtocol
5356
from thrift.transport import TSocket, TTransport
5457

@@ -69,12 +72,20 @@
6972
NoSuchNamespaceError,
7073
NoSuchTableError,
7174
TableAlreadyExistsError,
75+
WaitingForLockException,
7276
)
7377
from pyiceberg.io import FileIO, load_file_io
7478
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
7579
from pyiceberg.schema import Schema, SchemaVisitor, visit
7680
from pyiceberg.serializers import FromInputFile
77-
from pyiceberg.table import CommitTableRequest, CommitTableResponse, PropertyUtil, Table, TableProperties, update_table_metadata
81+
from pyiceberg.table import (
82+
CommitTableRequest,
83+
CommitTableResponse,
84+
PropertyUtil,
85+
Table,
86+
TableProperties,
87+
update_table_metadata,
88+
)
7889
from pyiceberg.table.metadata import new_table_metadata
7990
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
8091
from pyiceberg.typedef import EMPTY_DICT, Identifier, Properties
@@ -111,6 +122,15 @@
111122
HIVE2_COMPATIBLE = "hive.hive2-compatible"
112123
HIVE2_COMPATIBLE_DEFAULT = False
113124

125+
LOCK_CHECK_MIN_WAIT_TIME = "lock-check-min-wait-time"
126+
LOCK_CHECK_MAX_WAIT_TIME = "lock-check-max-wait-time"
127+
LOCK_CHECK_RETRIES = "lock-check-retries"
128+
DEFAULT_LOCK_CHECK_MIN_WAIT_TIME = 0.1 # 100 milliseconds
129+
DEFAULT_LOCK_CHECK_MAX_WAIT_TIME = 60 # 1 min
130+
DEFAULT_LOCK_CHECK_RETRIES = 4
131+
132+
logger = logging.getLogger(__name__)
133+
114134

115135
class _HiveClient:
116136
"""Helper class to nicely open and close the transport."""
@@ -240,6 +260,18 @@ def __init__(self, name: str, **properties: str):
240260
super().__init__(name, **properties)
241261
self._client = _HiveClient(properties["uri"], properties.get("ugi"))
242262

263+
self._lock_check_min_wait_time = PropertyUtil.property_as_float(
264+
properties, LOCK_CHECK_MIN_WAIT_TIME, DEFAULT_LOCK_CHECK_MIN_WAIT_TIME
265+
)
266+
self._lock_check_max_wait_time = PropertyUtil.property_as_float(
267+
properties, LOCK_CHECK_MAX_WAIT_TIME, DEFAULT_LOCK_CHECK_MAX_WAIT_TIME
268+
)
269+
self._lock_check_retries = PropertyUtil.property_as_float(
270+
properties,
271+
LOCK_CHECK_RETRIES,
272+
DEFAULT_LOCK_CHECK_RETRIES,
273+
)
274+
243275
def _convert_hive_into_iceberg(self, table: HiveTable, io: FileIO) -> Table:
244276
properties: Dict[str, str] = table.parameters
245277
if TABLE_TYPE not in properties:
@@ -356,6 +388,26 @@ def _create_lock_request(self, database_name: str, table_name: str) -> LockReque
356388

357389
return lock_request
358390

391+
def _wait_for_lock(self, database_name: str, table_name: str, lockid: int, open_client: Client) -> LockResponse:
392+
@retry(
393+
retry=retry_if_exception_type(WaitingForLockException),
394+
wait=wait_exponential(multiplier=2, min=self._lock_check_min_wait_time, max=self._lock_check_max_wait_time),
395+
stop=stop_after_attempt(self._lock_check_retries),
396+
reraise=True,
397+
)
398+
def _do_wait_for_lock() -> LockResponse:
399+
response: LockResponse = open_client.check_lock(CheckLockRequest(lockid=lockid))
400+
if response.state == LockState.ACQUIRED:
401+
return response
402+
elif response.state == LockState.WAITING:
403+
msg = f"Wait on lock for {database_name}.{table_name}"
404+
logger.warning(msg)
405+
raise WaitingForLockException(msg)
406+
else:
407+
raise CommitFailedException(f"Failed to check lock for {database_name}.{table_name}, state: {response.state}")
408+
409+
return _do_wait_for_lock()
410+
359411
def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse:
360412
"""Update the table.
361413
@@ -380,7 +432,10 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons
380432

381433
try:
382434
if lock.state != LockState.ACQUIRED:
383-
raise CommitFailedException(f"Failed to acquire lock for {table_request.identifier}, state: {lock.state}")
435+
if lock.state == LockState.WAITING:
436+
self._wait_for_lock(database_name, table_name, lock.lockid, open_client)
437+
else:
438+
raise CommitFailedException(f"Failed to acquire lock for {table_request.identifier}, state: {lock.state}")
384439

385440
hive_table = open_client.get_table(dbname=database_name, tbl_name=table_name)
386441
io = load_file_io({**self.properties, **hive_table.parameters}, hive_table.sd.location)
@@ -406,6 +461,8 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons
406461
open_client.alter_table(dbname=database_name, tbl_name=table_name, new_tbl=hive_table)
407462
except NoSuchObjectException as e:
408463
raise NoSuchTableError(f"Table does not exist: {table_name}") from e
464+
except WaitingForLockException as e:
465+
raise CommitFailedException(f"Failed to acquire lock for {table_request.identifier}, state: {lock.state}") from e
409466
finally:
410467
open_client.unlock(UnlockRequest(lockid=lock.lockid))
411468

pyiceberg/catalog/rest.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -715,7 +715,7 @@ def create_namespace(self, namespace: Union[str, Identifier], properties: Proper
715715
try:
716716
response.raise_for_status()
717717
except HTTPError as exc:
718-
self._handle_non_200_response(exc, {404: NoSuchNamespaceError, 409: NamespaceAlreadyExistsError})
718+
self._handle_non_200_response(exc, {409: NamespaceAlreadyExistsError})
719719

720720
@retry(**_RETRY_ARGS)
721721
def drop_namespace(self, namespace: Union[str, Identifier]) -> None:
@@ -790,4 +790,4 @@ def table_exists(self, identifier: Union[str, Identifier]) -> bool:
790790
response = self._session.head(
791791
self.url(Endpoints.load_table, prefixed=True, **self._split_identifier_for_path(identifier_tuple))
792792
)
793-
return response.status_code == 200
793+
return response.status_code in (200, 204)

0 commit comments

Comments
 (0)