Skip to content

Commit 6d52325

Browse files
authored
Hive catalog: Add retry logic for hive locking (#701)
1 parent 20b7b53 commit 6d52325

File tree

5 files changed

+142
-3
lines changed

5 files changed

+142
-3
lines changed

pyiceberg/catalog/hive.py

Lines changed: 59 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
# specific language governing permissions and limitations
1616
# under the License.
1717
import getpass
18+
import logging
1819
import socket
1920
import time
2021
from types import TracebackType
@@ -33,6 +34,7 @@
3334
from hive_metastore.ThriftHiveMetastore import Client
3435
from hive_metastore.ttypes import (
3536
AlreadyExistsException,
37+
CheckLockRequest,
3638
FieldSchema,
3739
InvalidOperationException,
3840
LockComponent,
@@ -49,6 +51,7 @@
4951
)
5052
from hive_metastore.ttypes import Database as HiveDatabase
5153
from hive_metastore.ttypes import Table as HiveTable
54+
from tenacity import retry, retry_if_exception_type, stop_after_attempt, wait_exponential
5255
from thrift.protocol import TBinaryProtocol
5356
from thrift.transport import TSocket, TTransport
5457

@@ -69,12 +72,20 @@
6972
NoSuchNamespaceError,
7073
NoSuchTableError,
7174
TableAlreadyExistsError,
75+
WaitingForLockException,
7276
)
7377
from pyiceberg.io import FileIO, load_file_io
7478
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
7579
from pyiceberg.schema import Schema, SchemaVisitor, visit
7680
from pyiceberg.serializers import FromInputFile
77-
from pyiceberg.table import CommitTableRequest, CommitTableResponse, PropertyUtil, Table, TableProperties, update_table_metadata
81+
from pyiceberg.table import (
82+
CommitTableRequest,
83+
CommitTableResponse,
84+
PropertyUtil,
85+
Table,
86+
TableProperties,
87+
update_table_metadata,
88+
)
7889
from pyiceberg.table.metadata import new_table_metadata
7990
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
8091
from pyiceberg.typedef import EMPTY_DICT, Identifier, Properties
@@ -111,6 +122,15 @@
111122
HIVE2_COMPATIBLE = "hive.hive2-compatible"
112123
HIVE2_COMPATIBLE_DEFAULT = False
113124

125+
LOCK_CHECK_MIN_WAIT_TIME = "lock-check-min-wait-time"
126+
LOCK_CHECK_MAX_WAIT_TIME = "lock-check-max-wait-time"
127+
LOCK_CHECK_RETRIES = "lock-check-retries"
128+
DEFAULT_LOCK_CHECK_MIN_WAIT_TIME = 0.1 # 100 milliseconds
129+
DEFAULT_LOCK_CHECK_MAX_WAIT_TIME = 60 # 1 min
130+
DEFAULT_LOCK_CHECK_RETRIES = 4
131+
132+
logger = logging.getLogger(__name__)
133+
114134

115135
class _HiveClient:
116136
"""Helper class to nicely open and close the transport."""
@@ -240,6 +260,18 @@ def __init__(self, name: str, **properties: str):
240260
super().__init__(name, **properties)
241261
self._client = _HiveClient(properties["uri"], properties.get("ugi"))
242262

263+
self._lock_check_min_wait_time = PropertyUtil.property_as_float(
264+
properties, LOCK_CHECK_MIN_WAIT_TIME, DEFAULT_LOCK_CHECK_MIN_WAIT_TIME
265+
)
266+
self._lock_check_max_wait_time = PropertyUtil.property_as_float(
267+
properties, LOCK_CHECK_MAX_WAIT_TIME, DEFAULT_LOCK_CHECK_MAX_WAIT_TIME
268+
)
269+
self._lock_check_retries = PropertyUtil.property_as_float(
270+
properties,
271+
LOCK_CHECK_RETRIES,
272+
DEFAULT_LOCK_CHECK_RETRIES,
273+
)
274+
243275
def _convert_hive_into_iceberg(self, table: HiveTable, io: FileIO) -> Table:
244276
properties: Dict[str, str] = table.parameters
245277
if TABLE_TYPE not in properties:
@@ -356,6 +388,26 @@ def _create_lock_request(self, database_name: str, table_name: str) -> LockReque
356388

357389
return lock_request
358390

391+
def _wait_for_lock(self, database_name: str, table_name: str, lockid: int, open_client: Client) -> LockResponse:
392+
@retry(
393+
retry=retry_if_exception_type(WaitingForLockException),
394+
wait=wait_exponential(multiplier=2, min=self._lock_check_min_wait_time, max=self._lock_check_max_wait_time),
395+
stop=stop_after_attempt(self._lock_check_retries),
396+
reraise=True,
397+
)
398+
def _do_wait_for_lock() -> LockResponse:
399+
response: LockResponse = open_client.check_lock(CheckLockRequest(lockid=lockid))
400+
if response.state == LockState.ACQUIRED:
401+
return response
402+
elif response.state == LockState.WAITING:
403+
msg = f"Wait on lock for {database_name}.{table_name}"
404+
logger.warning(msg)
405+
raise WaitingForLockException(msg)
406+
else:
407+
raise CommitFailedException(f"Failed to check lock for {database_name}.{table_name}, state: {response.state}")
408+
409+
return _do_wait_for_lock()
410+
359411
def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse:
360412
"""Update the table.
361413
@@ -380,7 +432,10 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons
380432

381433
try:
382434
if lock.state != LockState.ACQUIRED:
383-
raise CommitFailedException(f"Failed to acquire lock for {table_request.identifier}, state: {lock.state}")
435+
if lock.state == LockState.WAITING:
436+
self._wait_for_lock(database_name, table_name, lock.lockid, open_client)
437+
else:
438+
raise CommitFailedException(f"Failed to acquire lock for {table_request.identifier}, state: {lock.state}")
384439

385440
hive_table = open_client.get_table(dbname=database_name, tbl_name=table_name)
386441
io = load_file_io({**self.properties, **hive_table.parameters}, hive_table.sd.location)
@@ -406,6 +461,8 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons
406461
open_client.alter_table(dbname=database_name, tbl_name=table_name, new_tbl=hive_table)
407462
except NoSuchObjectException as e:
408463
raise NoSuchTableError(f"Table does not exist: {table_name}") from e
464+
except WaitingForLockException as e:
465+
raise CommitFailedException(f"Failed to acquire lock for {table_request.identifier}, state: {lock.state}") from e
409466
finally:
410467
open_client.unlock(UnlockRequest(lockid=lock.lockid))
411468

pyiceberg/exceptions.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,3 +110,7 @@ class CommitFailedException(Exception):
110110

111111
class CommitStateUnknownException(RESTError):
112112
"""Commit failed due to unknown reason."""
113+
114+
115+
class WaitingForLockException(Exception):
116+
"""Need to wait for a lock, try again."""

pyiceberg/table/__init__.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -251,6 +251,16 @@ def property_as_int(properties: Dict[str, str], property_name: str, default: Opt
251251
else:
252252
return default
253253

254+
@staticmethod
255+
def property_as_float(properties: Dict[str, str], property_name: str, default: Optional[float] = None) -> Optional[float]:
256+
if value := properties.get(property_name):
257+
try:
258+
return float(value)
259+
except ValueError as e:
260+
raise ValueError(f"Could not parse table property {property_name} to a float: {value}") from e
261+
else:
262+
return default
263+
254264
@staticmethod
255265
def property_as_bool(properties: Dict[str, str], property_name: str, default: bool) -> bool:
256266
if value := properties.get(property_name):

tests/catalog/test_hive.py

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
AlreadyExistsException,
2525
FieldSchema,
2626
InvalidOperationException,
27+
LockResponse,
28+
LockState,
2729
MetaException,
2830
NoSuchObjectException,
2931
SerDeInfo,
@@ -34,12 +36,19 @@
3436
from hive_metastore.ttypes import Table as HiveTable
3537

3638
from pyiceberg.catalog import PropertiesUpdateSummary
37-
from pyiceberg.catalog.hive import HiveCatalog, _construct_hive_storage_descriptor
39+
from pyiceberg.catalog.hive import (
40+
LOCK_CHECK_MAX_WAIT_TIME,
41+
LOCK_CHECK_MIN_WAIT_TIME,
42+
LOCK_CHECK_RETRIES,
43+
HiveCatalog,
44+
_construct_hive_storage_descriptor,
45+
)
3846
from pyiceberg.exceptions import (
3947
NamespaceAlreadyExistsError,
4048
NamespaceNotEmptyError,
4149
NoSuchNamespaceError,
4250
NoSuchTableError,
51+
WaitingForLockException,
4352
)
4453
from pyiceberg.partitioning import PartitionField, PartitionSpec
4554
from pyiceberg.schema import Schema
@@ -1158,3 +1167,31 @@ def test_resolve_table_location_warehouse(hive_database: HiveDatabase) -> None:
11581167

11591168
location = catalog._resolve_table_location(None, "database", "table")
11601169
assert location == "/tmp/warehouse/database.db/table"
1170+
1171+
1172+
def test_hive_wait_for_lock() -> None:
1173+
lockid = 12345
1174+
acquired = LockResponse(lockid=lockid, state=LockState.ACQUIRED)
1175+
waiting = LockResponse(lockid=lockid, state=LockState.WAITING)
1176+
prop = {
1177+
"uri": HIVE_METASTORE_FAKE_URL,
1178+
LOCK_CHECK_MIN_WAIT_TIME: 0.1,
1179+
LOCK_CHECK_MAX_WAIT_TIME: 0.5,
1180+
LOCK_CHECK_RETRIES: 5,
1181+
}
1182+
catalog = HiveCatalog(HIVE_CATALOG_NAME, **prop) # type: ignore
1183+
catalog._client = MagicMock()
1184+
catalog._client.lock.return_value = LockResponse(lockid=lockid, state=LockState.WAITING)
1185+
1186+
# lock will be acquired after 3 retries
1187+
catalog._client.check_lock.side_effect = [waiting if i < 2 else acquired for i in range(10)]
1188+
response: LockResponse = catalog._wait_for_lock("db", "tbl", lockid, catalog._client)
1189+
assert response.state == LockState.ACQUIRED
1190+
assert catalog._client.check_lock.call_count == 3
1191+
1192+
# lock wait should exit with WaitingForLockException finally after enough retries
1193+
catalog._client.check_lock.side_effect = [waiting for _ in range(10)]
1194+
catalog._client.check_lock.call_count = 0
1195+
with pytest.raises(WaitingForLockException):
1196+
catalog._wait_for_lock("db", "tbl", lockid, catalog._client)
1197+
assert catalog._client.check_lock.call_count == 5

tests/integration/test_reads.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
# pylint:disable=redefined-outer-name
1818

1919
import math
20+
import time
2021
import uuid
2122
from urllib.parse import urlparse
2223

@@ -48,6 +49,7 @@
4849
StringType,
4950
TimestampType,
5051
)
52+
from pyiceberg.utils.concurrent import ExecutorFactory
5153

5254
DEFAULT_PROPERTIES = {'write.parquet.compression-codec': 'zstd'}
5355

@@ -506,3 +508,32 @@ def test_hive_locking(session_catalog_hive: HiveCatalog) -> None:
506508
table.transaction().set_properties(lock="fail").commit_transaction()
507509
finally:
508510
open_client.unlock(UnlockRequest(lock.lockid))
511+
512+
513+
@pytest.mark.integration
514+
def test_hive_locking_with_retry(session_catalog_hive: HiveCatalog) -> None:
515+
table = create_table(session_catalog_hive)
516+
database_name: str
517+
table_name: str
518+
_, database_name, table_name = table.identifier
519+
session_catalog_hive._lock_check_min_wait_time = 0.1
520+
session_catalog_hive._lock_check_max_wait_time = 0.5
521+
session_catalog_hive._lock_check_retries = 5
522+
523+
hive_client: _HiveClient = _HiveClient(session_catalog_hive.properties["uri"])
524+
525+
executor = ExecutorFactory.get_or_create()
526+
527+
with hive_client as open_client:
528+
529+
def another_task() -> None:
530+
lock: LockResponse = open_client.lock(session_catalog_hive._create_lock_request(database_name, table_name))
531+
time.sleep(1)
532+
open_client.unlock(UnlockRequest(lock.lockid))
533+
534+
# test transaction commit with concurrent locking
535+
executor.submit(another_task)
536+
time.sleep(0.5)
537+
538+
table.transaction().set_properties(lock="xxx").commit_transaction()
539+
assert table.properties.get("lock") == "xxx"

0 commit comments

Comments
 (0)