1414from struct import Struct
1515from sys import modules
1616from tempfile import NamedTemporaryFile
17- from threading import Semaphore
17+ from threading import Semaphore , Lock
1818from time import sleep
19- from typing import IO , ClassVar , Generator , List , Optional , Set , Tuple , Union
19+ from typing import IO , ClassVar , Dict , Generator , List , Optional , Set , Tuple , Union
2020from zlib import compress , crc32 , decompress
2121
2222try :
@@ -70,11 +70,17 @@ class CaptureFile:
7070 _filenames_opened_for_write : ClassVar [Set [Path ]] = set ()
7171 """For in-process double checking to prevent multiple to-write opens."""
7272
73+ _filenames_with_master_node_lock_sem : ClassVar [Semaphore ] = Semaphore ()
74+ _filenames_with_master_node_lock : ClassVar [
75+ Dict [Path , "ReferenceCountedLock" ]
76+ ] = dict ()
77+
7378 file_name : str
7479 to_write : bool = False
7580 initial_metadata : InitVar [Optional [bytes ]] = None
7681 force_new_empty_file : InitVar [bool ] = False
7782 encoding : Optional [str ] = "utf_8"
83+ use_os_file_locking : bool = False
7884
7985 _file_name : Path = field (init = False )
8086 """A "Path" instance of file_name set during __post_init__"""
@@ -88,6 +94,8 @@ class CaptureFile:
8894 _compression_block : "BytesStream" = field (init = False )
8995
9096 _current_master_node : "MasterNode" = field (init = False )
97+
98+ _new_is_in_progress : bool = field (init = False )
9199
92100 _record_count : int = field (init = False )
93101
@@ -105,7 +113,9 @@ def __post_init__(
105113 self ._file_name = Path (self .file_name )
106114
107115 if force_new_empty_file or (self .to_write and not self ._file_name .is_file ()):
116+ self ._new_is_in_progress = True
108117 self ._new_file (initial_metadata )
118+ self ._new_is_in_progress = False
109119 self .open (self .to_write )
110120
111121 def __str__ (self ):
@@ -167,6 +177,15 @@ def open(self, to_write: bool = False):
167177 )
168178 CaptureFile ._filenames_opened_for_write .add (self ._file_name )
169179
180+ with CaptureFile ._filenames_with_master_node_lock_sem :
181+ if self ._file_name not in CaptureFile ._filenames_with_master_node_lock :
182+ CaptureFile ._filenames_with_master_node_lock [
183+ self ._file_name
184+ ] = ReferenceCountedLock ()
185+ CaptureFile ._filenames_with_master_node_lock [
186+ self ._file_name
187+ ].add_reference ()
188+
170189 self ._config = CaptureFileConfiguration .read (self ._file )
171190 self .refresh ()
172191
@@ -183,6 +202,14 @@ def close(self):
183202 if self .to_write :
184203 with CaptureFile ._filenames_opened_for_write_sem :
185204 CaptureFile ._filenames_opened_for_write .remove (self ._file_name )
205+ if not self ._new_is_in_progress :
206+ # Master node locks are not used for new files since the
207+ # temporary file cannot be in use by any other process
208+ with CaptureFile ._filenames_with_master_node_lock_sem :
209+ if CaptureFile ._filenames_with_master_node_lock [
210+ self ._file_name
211+ ].drop_reference ():
212+ del CaptureFile ._filenames_with_master_node_lock [self ._file_name ]
186213
187214 def __del__ (self ):
188215 self .close ()
@@ -382,20 +409,21 @@ def __exit__(self, exc_type, exc_val, exc_tb):
382409 self .close ()
383410
384411 def _acquire_lock_for_writing (self , / ):
385- if "msvcrt" in modules :
386- # we must be on windows
387- lseek (self ._file .fileno (), CaptureFile ._lock_start_position , SEEK_SET )
388- msvcrt .locking ( # noqa
389- self ._file .fileno (), msvcrt .LK_LOCK , CaptureFile ._lock_size # noqa
390- )
391- else :
392- # we are probably on some Unix variant
393- result = fcntl .lockf (
394- self ._file .fileno (),
395- fcntl .LOCK_EX | fcntl .LOCK_NB ,
396- CaptureFile ._lock_size ,
397- CaptureFile ._lock_start_position ,
398- )
412+ if self .use_os_file_locking :
413+ if "msvcrt" in modules :
414+ # we must be on windows
415+ lseek (self ._file .fileno (), CaptureFile ._lock_start_position , SEEK_SET )
416+ msvcrt .locking ( # noqa
417+ self ._file .fileno (), msvcrt .LK_LOCK , CaptureFile ._lock_size # noqa
418+ )
419+ else :
420+ # we are probably on some Unix variant
421+ result = fcntl .lockf (
422+ self ._file .fileno (),
423+ fcntl .LOCK_EX | fcntl .LOCK_NB ,
424+ CaptureFile ._lock_size ,
425+ CaptureFile ._lock_start_position ,
426+ )
399427
400428 @contextmanager
401429 def _acquire_master_nodes_lock (self , / ):
@@ -407,35 +435,41 @@ def _acquire_master_nodes_lock(self, /):
407435
408436 def _acquire_master_nodes_lock_internal (self , lock : bool , / ):
409437 assert self ._file
410- lock_size = self ._config .master_node_size * 2
411- if "msvcrt" in modules :
412- lseek (self ._file .fileno (), self ._config .page_size , SEEK_SET )
413- # we must be on windows
414- # added comments below to suppress my-py errors when we are viewing code on Linux
415- lock_mode = msvcrt .LK_LOCK if lock else msvcrt .LK_UNLCK # type: ignore[attr-defined]
416- msvcrt .locking (self ._file .fileno (), lock_mode , lock_size ) # type: ignore[attr-defined]
417-
418- # On 2021-09-26 discovered that on Windows 10 buffer reads after the
419- # lock have incorrect bytes after the first 4k bytes if read in
420- # partial pages. E.g. read 4 bytes then read more than 4k more
421- # bytes. The bytes that appeared after the 4k were the original 4
422- # bytes read. This only happened if a read from position 0 happened
423- # before the lock. Reading a page from the file after the lock
424- # seemed to fix the issue. Reading more than 4k to start did not
425- # help
426- self ._file .seek (self ._config .page_size )
427- self ._file .read (self ._config .page_size )
428- else :
429- # we are probably on some Unix variant
430- # added comments below to suppress my-py errors when we are viewing code on Windows
431- lock_type = fcntl .LOCK_EX if self .to_write else fcntl .LOCK_SH # type: ignore[attr-defined]
432- lock_mode = lock_type if lock else fcntl .LOCK_UN # type: ignore[attr-defined]
433- fcntl .lockf ( # type: ignore[attr-defined]
434- self ._file .fileno (),
435- lock_mode ,
436- lock_size ,
437- self ._config .page_size ,
438- )
438+ if not self ._new_is_in_progress :
439+ # Master node locks are not used for new files since the temporary
440+ # file cannot be in use by any other process
441+ CaptureFile ._filenames_with_master_node_lock [self ._file_name ].lock (lock )
442+ if self .use_os_file_locking :
443+ lock_size = self ._config .master_node_size * 2
444+ if "msvcrt" in modules :
445+ # we must be on windows
446+ lseek (self ._file .fileno (), self ._config .page_size , SEEK_SET )
447+ lock_mode = msvcrt .LK_RLCK if lock else msvcrt .LK_UNLCK # type: ignore[attr-defined]
448+ # lock_mode = msvcrt.LK_LOCK if lock else msvcrt.LK_UNLCK # type: ignore[attr-defined]
449+ msvcrt .locking (self ._file .fileno (), lock_mode , lock_size ) # type: ignore[attr-defined]
450+ # added line comments above to suppress my-py errors when we are viewing code on Linux
451+
452+ # On 2021-09-26 discovered that on Windows 10 buffer reads after the
453+ # lock have incorrect bytes after the first 4k bytes if read in
454+ # partial pages. E.g. read 4 bytes then read more than 4k more
455+ # bytes. The bytes that appeared after the 4k were the original 4
456+ # bytes read. This only happened if a read from position 0 happened
457+ # before the lock. Reading a page from the file after the lock
458+ # seemed to fix the issue. Reading more than 4k to start did not
459+ # help
460+ self ._file .seek (self ._config .page_size )
461+ self ._file .read (self ._config .page_size )
462+ else :
463+ # we are probably on some Unix variant
464+ lock_type = fcntl .LOCK_EX if self .to_write else fcntl .LOCK_SH # type: ignore[attr-defined]
465+ lock_mode = lock_type if lock else fcntl .LOCK_UN # type: ignore[attr-defined]
466+ # added line comments above to suppress my-py errors when we are viewing code on Windows
467+ fcntl .lockf ( # type: ignore[attr-defined]
468+ self ._file .fileno (),
469+ lock_mode ,
470+ lock_size ,
471+ self ._config .page_size ,
472+ )
439473
440474 def get_metadata (self , / ) -> Optional [bytes ]:
441475 """Returns the binary metadata that was stored in the capture file on
@@ -860,7 +894,7 @@ def commit(self, /):
860894
861895 if not self .to_write :
862896 raise CaptureFileNotOpenForWrite (
863- f'Cannot commit "{ self .file_name } " because it is not open for writting .'
897+ f'Cannot commit "{ self .file_name } " because it is not open for writing .'
864898 )
865899
866900 self ._file .flush ()
@@ -945,9 +979,20 @@ def __post_init__(self, /):
945979 self .full_node_struct = Struct (">" + "QL" * self .fan_out )
946980
947981 @classmethod
948- def read (cls , file , / ) -> "CaptureFileConfiguration" :
982+ def read (cls , file : IO [ bytes ] , / ) -> "CaptureFileConfiguration" :
949983 file .seek (0 )
950- buffer = file .read (cls .struct .size )
984+ for _ in range (30 ):
985+ # Retry every 0.1 seconds for 3 seconds in case the master node lock
986+ # interferes with the reading. This interference was observed in
987+ # Windows 11 on 2022-06-04 where simply having a master node lock
988+ # prevented the first few bytes of the file from being read even
989+ # though they are not part of the lock range.
990+ try :
991+ buffer = file .read (cls .struct .size )
992+ except PermissionError :
993+ sleep (0.1 )
994+ else :
995+ break
951996 (
952997 header ,
953998 version ,
@@ -1379,6 +1424,39 @@ def zero_fill_to(self, end_position: int, /):
13791424 self .write (b"\0 " * (end_position - self .tell ()))
13801425
13811426
1427+ @dataclass
1428+ class ReferenceCountedLock :
1429+ _reference_count : int = 0
1430+ _lock : Lock = Lock ()
1431+
1432+ def add_reference (self ) -> None :
1433+ self ._reference_count += 1
1434+
1435+ def drop_reference (self ) -> bool :
1436+ """Returns True when the last reference has been dropped to indicate
1437+ this lock is no longer in use and it is time to clear any references to
1438+ this lock"""
1439+ self ._reference_count -= 1
1440+ return self ._reference_count == 0
1441+
1442+ def lock (self , lock : bool ) -> None :
1443+ if lock :
1444+ self ._lock .acquire ()
1445+ else :
1446+ self ._lock .release ()
1447+
1448+
1449+ def leaf_to_root_path (position : int , height : int , fan_out : int , / ) -> List [int ]:
1450+ """Compute the path of child indexes from the leaf through the nodes to the
1451+ root."""
1452+
1453+ path = [0 ] * height
1454+ for i in range (height ):
1455+ position , path [i ] = divmod (position , fan_out )
1456+
1457+ return path
1458+
1459+
13821460class CaptureFileAlreadyOpen (Exception ):
13831461 pass
13841462
@@ -1393,14 +1471,3 @@ class CaptureFileNotOpenForWrite(Exception):
13931471
13941472class InvalidCaptureFile (Exception ):
13951473 pass
1396-
1397-
1398- def leaf_to_root_path (position : int , height : int , fan_out : int , / ) -> List [int ]:
1399- """Compute the path of child indexes from the leaf through the nodes to the
1400- root."""
1401-
1402- path = [0 ] * height
1403- for i in range (height ):
1404- position , path [i ] = divmod (position , fan_out )
1405-
1406- return path
0 commit comments