11# pylint: disable=invalid-name, missing-docstring, line-too-long, too-many-arguments
22
33import base64
4+ import logging
45import typing
56import struct
67from collections import defaultdict
@@ -200,22 +201,9 @@ def _put_rows_with_retry(self, body: dict, slow_retry: bool = True):
200201
201202 def lock_root (self , root_id : np .uint64 , operation_id : np .uint64 ) -> bool :
202203 lock_column = attributes .Concurrency .Lock
203- indefinite_lock_column = attributes .Concurrency .IndefiniteLock
204204 lock_expiry = self ._lock_expiry
205205 row_key = serialize_uint64 (root_id )
206206
207- # Check indefinite lock
208- indef_row = self ._read_byte_row (row_key , columns = indefinite_lock_column )
209- if indef_row :
210- return False
211-
212- # Check new parents
213- new_parents_col = attributes .Hierarchy .NewParent
214- np_row = self ._read_byte_row (row_key , columns = new_parents_col )
215- if np_row :
216- return False
217-
218- # Try checkAndPut: set lock only if column is empty
219207 lock_value = serialize_uint64 (operation_id )
220208 timestamp = get_valid_timestamp (None )
221209 success = self ._check_and_put (
@@ -229,26 +217,30 @@ def lock_root(self, root_id: np.uint64, operation_id: np.uint64) -> bool:
229217 put_timestamp = timestamp ,
230218 )
231219
232- if not success :
233- cells = self ._read_byte_row (row_key , columns = lock_column )
234- if cells :
235- lock_ts = cells [0 ].timestamp
236- if lock_ts and datetime .now (timezone .utc ) - lock_ts > lock_expiry :
237- self ._delete_cell (row_key , lock_column .family_id , lock_column .key )
238- return self .lock_root (root_id , operation_id )
220+ if success :
221+ # Verify no indefinite lock (mirrors BigTable's RowFilterUnion check).
222+ # HBase REST checkAndPut only supports single-column checks, so we
223+ # acquire then verify, rolling back if an indefinite lock exists.
224+ indefinite_lock_column = attributes .Concurrency .IndefiniteLock
225+ if self ._read_byte_row (row_key , columns = indefinite_lock_column ):
226+ self ._delete_cell (row_key , lock_column .family_id , lock_column .key )
227+ return False
228+ return True
229+
230+ cells = self ._read_byte_row (row_key , columns = lock_column )
231+ if cells :
232+ lock_ts = cells [0 ].timestamp
233+ if lock_ts and datetime .now (timezone .utc ) - lock_ts > lock_expiry :
234+ self ._delete_cell (row_key , lock_column .family_id , lock_column .key )
235+ return self .lock_root (root_id , operation_id )
236+ if self .logger .isEnabledFor (logging .DEBUG ):
239237 self .logger .debug (f"Locked operation ids: { [c .value for c in cells ]} " )
240- return False
241- return True
238+ return False
242239
243240 def lock_root_indefinitely (self , root_id : np .uint64 , operation_id : np .uint64 ) -> bool :
244241 lock_column = attributes .Concurrency .IndefiniteLock
245242 row_key = serialize_uint64 (root_id )
246243
247- new_parents_col = attributes .Hierarchy .NewParent
248- np_row = self ._read_byte_row (row_key , columns = new_parents_col )
249- if np_row :
250- return False
251-
252244 lock_value = serialize_uint64 (operation_id )
253245 timestamp = get_valid_timestamp (None )
254246 success = self ._check_and_put (
@@ -262,9 +254,10 @@ def lock_root_indefinitely(self, root_id: np.uint64, operation_id: np.uint64) ->
262254 put_timestamp = timestamp ,
263255 )
264256 if not success :
265- cells = self ._read_byte_row (row_key , columns = lock_column )
266- if cells :
267- self .logger .debug (f"Indefinitely locked operation ids: { [c .value for c in cells ]} " )
257+ if self .logger .isEnabledFor (logging .DEBUG ):
258+ cells = self ._read_byte_row (row_key , columns = lock_column )
259+ if cells :
260+ self .logger .debug (f"Indefinitely locked operation ids: { [c .value for c in cells ]} " )
268261 return False
269262 return True
270263
@@ -295,11 +288,6 @@ def renew_lock(self, root_id: np.uint64, operation_id: np.uint64) -> bool:
295288 row_key = serialize_uint64 (root_id )
296289 lock_value = serialize_uint64 (operation_id )
297290
298- new_parents_col = attributes .Hierarchy .NewParent
299- np_row = self ._read_byte_row (row_key , columns = new_parents_col )
300- if np_row :
301- return False
302-
303291 return self ._check_and_put (
304292 row_key = row_key ,
305293 check_family = lock_column .family_id ,
@@ -414,31 +402,21 @@ def _read_byte_row(self, row_key, columns=None, start_time=None, end_time=None,
414402 return row .get (row_key , [] if single_column else {})
415403
416404 def _fetch_rows_by_keys (self , row_keys , col_specs , start_time , end_time , end_time_inclusive , single_column = False ):
417- result = {}
418- for row_key in row_keys :
419- key_b64 = hbase_utils .encode_value (row_key )
420- col_path = "," .join (col_specs ) if col_specs else ""
421- url = self ._table_url (f"/{ key_b64 } " )
422- if col_path :
423- url = self ._table_url (f"/{ key_b64 } /{ col_path } " )
424-
425- params = {"v" : "1000" }
426- if start_time :
427- ts = hbase_utils .datetime_to_hbase_ts (
428- hbase_utils .get_hbase_compatible_time_stamp (start_time )
429- )
430- params ["ts.from" ] = str (ts )
431- if end_time :
432- et = hbase_utils .get_hbase_compatible_time_stamp (end_time , round_up = end_time_inclusive )
433- params ["ts.to" ] = str (hbase_utils .datetime_to_hbase_ts (et ))
434-
435- resp = self ._session .get (url , params = params )
436- if resp .status_code == 404 :
437- continue
438- resp .raise_for_status ()
439- parsed = hbase_utils .parse_cell_response (resp .json (), single_column = single_column )
440- result .update (parsed )
441- return result
405+ if not row_keys :
406+ return {}
407+ sorted_keys = sorted (row_keys )
408+ all_rows = self ._fetch_rows_by_range (
409+ start_key = sorted_keys [0 ],
410+ end_key = sorted_keys [- 1 ],
411+ end_key_inclusive = True ,
412+ col_specs = col_specs ,
413+ start_time = start_time ,
414+ end_time = end_time ,
415+ end_time_inclusive = end_time_inclusive ,
416+ single_column = single_column ,
417+ )
418+ key_set = set (row_keys )
419+ return {k : v for k , v in all_rows .items () if k in key_set }
442420
443421 def _fetch_rows_by_range (self , start_key , end_key , end_key_inclusive , col_specs , start_time , end_time , end_time_inclusive , single_column = False ):
444422 scanner_spec = {
0 commit comments