Skip to content

Commit 7d63832

Browse files
mount: thread safety
There is actual concurrency when using `pyfuse3` in Borg, which necessitates thread safety mechanisms like the recently added `threading.Lock`. Borg's `pyfuse3` implementation is built on top of the **`trio`** async framework. When Borg is started in `pyfuse3` mode, it calls `trio.run(llfuse.main)`. 1. **Async/Await Model**: Unlike the classic `llfuse` (which Borg runs with `workers=1` to remain single-threaded), `pyfuse3` uses an asynchronous event loop. While it often runs on a single OS thread, `trio` allows multiple tasks to be "in-flight" simultaneously. 2. **Context Switching**: When an operation (like reading metadata or data) hits an `await` point—such as during network I/O with a remote repository or disk I/O—`trio` can suspend that task and switch to another one. 3. **Parallel Archive Loading**: In `borg mount` (without a specific archive specified), archives are loaded lazily. If a user or a process (like `find` or a file manager) triggers a `lookup` or `opendir` on multiple archive directories nearly simultaneously, multiple `check_pending_archive` calls can be active at once. 4. **Race Conditions**: Because `iter_archive_items` is a generator that yields control back to the caller (which, in the `pyfuse3` case, happens within an async wrapper), and because it performs I/O via `get_many`, it creates windows where one archive's loading process can be suspended while another begins. Even if running on a single OS thread, the interleaved execution of async tasks can lead to the same data corruption issues as multi-threading: - **Shared State**: Both tasks access the same `ItemCache` instance. - **Inode Collisions**: If two tasks read the `write_offset` before either has incremented it, they will assign the same inode numbers to different files and overwrite each other's metadata in the `meta` bytearray. - **Global Interpreter Lock (GIL)**: While the GIL protects Python's internal memory integrity, it does not prevent logical race conditions at the application level when tasks are interleaved. The use of `threading.Lock` in `check_pending_archive` and the direct instance variable access in `ItemCache` ensure that even when `trio` switches between concurrent FUSE requests, the internal state remains consistent and inode assignment remains unique across all archives.
1 parent d54996f commit 7d63832

File tree

1 file changed

+17
-18
lines changed

1 file changed

+17
-18
lines changed

src/borg/fuse.py

Lines changed: 17 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import struct
77
import sys
88
import tempfile
9+
import threading
910
import time
1011
from collections import defaultdict
1112
from signal import SIGINT
@@ -155,18 +156,15 @@ def iter_archive_items(self, archive_item_ids, filter=None, consider_part_files=
155156
last_chunk_length = 0
156157
msgpacked_bytes = b''
157158

158-
write_offset = self.write_offset
159-
meta = self.meta
160159
pack_indirect_into = self.indirect_entry_struct.pack_into
161160

162161
for key, (csize, data) in zip(archive_item_ids, self.decrypted_repository.get_many(archive_item_ids)):
163162
# Store the chunk ID in the meta-array
164-
if write_offset + 32 >= len(meta):
165-
self.meta = meta = meta + bytes(self.GROW_META_BY)
166-
meta[write_offset:write_offset + 32] = key
167-
current_id_offset = write_offset
168-
write_offset += 32
169-
self.write_offset = write_offset
163+
if self.write_offset + 32 >= len(self.meta):
164+
self.meta = self.meta + bytes(self.GROW_META_BY)
165+
self.meta[self.write_offset:self.write_offset + 32] = key
166+
current_id_offset = self.write_offset
167+
self.write_offset += 32
170168

171169
chunk_begin += last_chunk_length
172170
last_chunk_length = len(data)
@@ -200,8 +198,8 @@ def iter_archive_items(self, archive_item_ids, filter=None, consider_part_files=
200198
current_spans_chunks = stream_offset - current_item_length < chunk_begin
201199
msgpacked_bytes = b''
202200

203-
if write_offset + 9 >= len(meta):
204-
self.meta = meta = meta + bytes(self.GROW_META_BY)
201+
if self.write_offset + 9 >= len(self.meta):
202+
self.meta = self.meta + bytes(self.GROW_META_BY)
205203

206204
# item entries in the meta-array come in two different flavours, both nine bytes long.
207205
# (1) for items that span chunks:
@@ -222,15 +220,14 @@ def iter_archive_items(self, archive_item_ids, filter=None, consider_part_files=
222220
if current_spans_chunks:
223221
pos = self.fd.seek(0, io.SEEK_END)
224222
self.fd.write(current_item)
225-
meta[write_offset:write_offset + 9] = b'S' + pos.to_bytes(8, 'little')
223+
self.meta[self.write_offset:self.write_offset + 9] = b'S' + pos.to_bytes(8, 'little')
226224
self.direct_items += 1
227225
else:
228226
item_offset = stream_offset - current_item_length - chunk_begin
229-
pack_indirect_into(meta, write_offset, b'I', write_offset - current_id_offset, item_offset)
227+
pack_indirect_into(self.meta, self.write_offset, b'I', self.write_offset - current_id_offset, item_offset)
230228
self.indirect_items += 1
231-
inode = write_offset + self.offset
232-
write_offset += 9
233-
self.write_offset = write_offset
229+
inode = self.write_offset + self.offset
230+
self.write_offset += 9
234231

235232
yield inode, item
236233

@@ -269,6 +266,7 @@ def __init__(self, key, manifest, repository, args, decrypted_repository):
269266
self.uid_forced = None
270267
self.gid_forced = None
271268
self.umask = 0
269+
self.lock = threading.Lock()
272270

273271
def _create_filesystem(self):
274272
self._create_dir(parent=1) # first call, create root dir (inode == 1)
@@ -304,9 +302,10 @@ def get_item(self, inode):
304302

305303
def check_pending_archive(self, inode):
306304
# Check if this is an archive we need to load
307-
archive_name = self.pending_archives.pop(inode, None)
308-
if archive_name is not None:
309-
self._process_archive(archive_name, [os.fsencode(archive_name)])
305+
with self.lock:
306+
archive_name = self.pending_archives.pop(inode, None)
307+
if archive_name is not None:
308+
self._process_archive(archive_name, [os.fsencode(archive_name)])
310309

311310
def _allocate_inode(self):
312311
self.inode_count += 1

0 commit comments

Comments
 (0)