Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 83 additions & 3 deletions attic/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import struct
import sys
from zlib import crc32
import threading
import queue

from .hashindex import NSIndex
from .helpers import Error, IntegrityError, read_msgpack, write_msgpack, unhexlify, UpgradableLock
Expand Down Expand Up @@ -377,7 +379,6 @@ def preload(self, ids):
"""Preload objects (only applies to remote repositories
"""


class LoggedIO(object):

header_fmt = struct.Struct('<IIB')
Expand All @@ -400,11 +401,13 @@ def __init__(self, path, limit, segments_per_dir, capacity=90):
self.segments_per_dir = segments_per_dir
self.offset = 0
self._write_fd = None
self.writeback = FsyncWorker()

def close(self):
for segment in list(self.fds.keys()):
self.fds.pop(segment).close()
self.close_segment()
self.writeback.close()
self.fds = None # Just to make sure we're disabled

def segment_iterator(self, reverse=False):
Expand Down Expand Up @@ -572,11 +575,88 @@ def write_commit(self):
crc = self.crc_fmt.pack(crc32(header) & 0xffffffff)
fd.write(b''.join((crc, header)))
self.close_segment()
self.writeback.flush()

def close_segment(self):
if self._write_fd:
self.segment += 1
self.offset = 0
os.fsync(self._write_fd)
self._write_fd.close()
self.writeback.fsync_and_close_fd(self._write_fd)
self._write_fd = None

class FsyncWorker(object):
"""os.fsync() in a background thread.

One fd is processed at a time. If the thread is already working,
the caller will block. This provides double-buffering.

Any exceptions (from os.fsync() or fd.close()) will be re-raised
on the next call into FsyncWorker. (Naturally this applies to
the .flush() and .close() methods as well as .fsync_and_close_fd()).
"""

def __init__(self):
self.channel = Channel()
self.exception = None
thread = threading.Thread(target=self._run)
thread.daemon = True
thread.start()

def _run(self):
while True: # worker thread loop
task = self.channel.get()
if task is None:
break # thread shutdown requested
try:
task()
except Exception as e:
self.exception = e

def fsync_and_close_fd(self, fd):
"""fsync() and close() fd in the background"""
def task():
try:
os.fsync(fd)
finally:
fd.close()
self.flush() # raise any pending exception
self.channel.put(task)

def flush(self):
"""Wait for any pending fsync.

This will also make sure an IOError is re-raised
in the calling thread, if necessary.
"""
def task():
pass
self.channel.put(task)

if self.exception is not None:
e = self.exception
self.exception = None
raise e

def close(self):
try:
self.flush()
finally:
self.channel.put(None) # tell thread to shutdown

class Channel(object):
"""A blocking channel, like in CSP or Go.

This can also be considered as a Queue with zero buffer space.
"""

def __init__(self):
self.q = queue.Queue()

def get(self):
value = self.q.get()
self.q.task_done()
return value

def put(self, item):
self.q.put(item)
self.q.join() # wait for task_done(), in reader thread