|
21 | 21 | import io
|
22 | 22 | import json
|
23 | 23 | import logging
|
| 24 | +import time |
24 | 25 | import typing
|
25 | 26 | import urllib.request
|
26 | 27 |
|
@@ -93,13 +94,16 @@ def __init__(
|
93 | 94 | self._pool = pool
|
94 | 95 | self._ns = ns
|
95 | 96 | self._key = key
|
| 97 | + self._lock_description = "sambacc RADOS library" |
| 98 | + self._lock_duration = None |
96 | 99 |
|
97 | 100 | self._open(interface)
|
98 | 101 | if must_exist:
|
99 | 102 | self._test()
|
100 | 103 |
|
101 | 104 | def _open(self, interface: _RADOSInterface) -> None:
|
102 | 105 | # TODO: connection caching
|
| 106 | + self._api = interface.api |
103 | 107 | self._conn = interface.Rados()
|
104 | 108 | self._conn.connect()
|
105 | 109 | self._connected = True
|
@@ -205,6 +209,38 @@ def write(self, s: typing.Any) -> int:
|
205 | 209 | def writelines(self, ls: typing.Iterable[typing.Any]) -> None:
|
206 | 210 | raise NotImplementedError()
|
207 | 211 |
|
| 212 | + def write_full(self, data: bytes) -> None: |
| 213 | + """Write the object such that its contents are exactly `data`.""" |
| 214 | + self._ioctx.write_full(self._key, data) |
| 215 | + |
| 216 | + def _lock_exclusive(self, name: str, cookie: str) -> None: |
| 217 | + self._ioctx.lock_exclusive( |
| 218 | + self._key, |
| 219 | + name, |
| 220 | + cookie, |
| 221 | + desc=self._lock_description, |
| 222 | + duration=self._lock_duration, |
| 223 | + ) |
| 224 | + |
| 225 | + def _acquire_lock_exclusive( |
| 226 | + self, name: str, cookie: str, *, delay: int = 1 |
| 227 | + ) -> None: |
| 228 | + while True: |
| 229 | + try: |
| 230 | + self._lock_exclusive(name, cookie) |
| 231 | + return |
| 232 | + except self._api.ObjectBusy: |
| 233 | + _logger.debug( |
| 234 | + "lock failed: %r, %r, %r: object busy", |
| 235 | + self._key, |
| 236 | + name, |
| 237 | + cookie, |
| 238 | + ) |
| 239 | + time.sleep(delay) |
| 240 | + |
| 241 | + def _unlock(self, name: str, cookie: str) -> None: |
| 242 | + self._ioctx.unlock(self._key, name, cookie) |
| 243 | + |
208 | 244 |
|
209 | 245 | def _get_mon_config_key(interface: _RADOSInterface, key: str) -> io.BytesIO:
|
210 | 246 | mcmd = json.dumps(
|
|
0 commit comments