-
Notifications
You must be signed in to change notification settings - Fork 216
Expand file tree
/
Copy pathfifo_lock.py
More file actions
133 lines (110 loc) · 4.14 KB
/
fifo_lock.py
File metadata and controls
133 lines (110 loc) · 4.14 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
"""
FIFO Lock implementation that guarantees first-in-first-out access ordering.
This provides fair lock access where threads acquire the lock in the exact order
they requested it, preventing starvation that can occur with standard RLock.
"""
import threading
import time
from collections import deque
from typing import Any, Self
class FIFOLock:
"""
A reentrant lock that guarantees FIFO (first-in-first-out) access ordering.
Unlike Python's standard RLock, this lock ensures that threads acquire
the lock in the exact order they requested it, providing fairness and
preventing lock starvation.
Features:
- Reentrant: Same thread can acquire multiple times
- FIFO ordering: Threads get lock in request order
- Context manager support: Use with 'with' statement
- Thread-safe: Safe for concurrent access
"""
_mutex: threading.Lock
_count: int
def __init__(self) -> None:
self._mutex = threading.Lock() # Protects internal state
self._waiters: deque[threading.Condition] = (
deque()
) # FIFO queue of waiting threads
self._owner: int | None = None # Current lock owner thread ID
self._count = 0 # Reentrancy counter
def acquire(self, blocking: bool = True, timeout: float = -1) -> bool:
"""
Acquire the lock.
Args:
blocking: If True, block until lock is acquired. If False, return
immediately.
timeout: Maximum time to wait for lock (ignored if blocking=False).
-1 means wait indefinitely.
Returns:
True if lock was acquired, False otherwise.
"""
ident = threading.get_ident()
start = time.monotonic()
with self._mutex:
# Reentrant case
if self._owner == ident:
self._count += 1
return True
if self._owner is None and not self._waiters:
self._owner = ident
self._count = 1
return True
if not blocking:
# Give up immediately
return False
# Add to wait queue
me = threading.Condition(self._mutex)
self._waiters.append(me)
while True:
# If I'm at the front of the queue and nobody owns it → acquire
if self._waiters[0] is me and self._owner is None:
self._waiters.popleft()
self._owner = ident
self._count = 1
return True
if timeout >= 0:
remaining = timeout - (time.monotonic() - start)
if remaining <= 0:
self._waiters.remove(me)
return False
me.wait(remaining)
else:
me.wait()
def release(self) -> None:
"""
Release the lock.
Raises:
RuntimeError: If the current thread doesn't own the lock.
"""
ident = threading.get_ident()
with self._mutex:
if self._owner != ident:
raise RuntimeError("Cannot release lock not owned by current thread")
assert self._count >= 1, (
"When releasing the resource, the count must be >= 1"
)
self._count -= 1
if self._count == 0:
self._owner = None
if self._waiters:
self._waiters[0].notify()
def __enter__(self: Self) -> Self:
"""Context manager entry."""
self.acquire()
return self
def __exit__(self, exc_type: Any, _exc_val: Any, _exc_tb: Any) -> None:
"""Context manager exit."""
self.release()
def locked(self) -> bool:
"""
Return True if the lock is currently held by any thread.
"""
with self._mutex:
return self._owner is not None
def owned(self) -> bool:
"""
Return True if the lock is currently held by the calling thread.
"""
with self._mutex:
return self._owner == threading.get_ident()