Skip to content

Commit 280e408

Browse files
committed
added ring buffer
1 parent b39cdca commit 280e408

File tree

1 file changed

+278
-0
lines changed

1 file changed

+278
-0
lines changed
Lines changed: 278 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,278 @@
1+
# MIT License
2+
3+
# Copyright (c) 2024-2025 GvozdevLeonid
4+
5+
# Permission is hereby granted, free of charge, to any person obtaining a copy
6+
# of this software and associated documentation files (the "Software"), to deal
7+
# in the Software without restriction, including without limitation the rights
8+
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9+
# copies of the Software, and to permit persons to whom the Software is
10+
# furnished to do so, subject to the following conditions:
11+
12+
# The above copyright notice and this permission notice shall be included in all
13+
# copies or substantial portions of the Software.
14+
15+
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16+
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17+
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18+
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19+
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20+
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21+
# SOFTWARE.
22+
23+
import atexit
24+
import io
25+
import os
26+
import sys
27+
import time
28+
from queue import Queue
29+
from tempfile import NamedTemporaryFile
30+
from threading import Event, RLock, Thread
31+
from typing import Any
32+
33+
import numpy as np
34+
35+
36+
class FileBuffer:
37+
'''
38+
A file-based buffer designed for efficient data transmission and reception, minimizing RAM usage.
39+
Provides methods for appending data, retrieving new data, accessing the entire buffer, and processing data in chunks.
40+
Supports ring-buffer behavior.
41+
'''
42+
def __init__(self, dtype: type = np.complex64, use_thread: bool = False) -> None:
43+
self._use_thread = use_thread
44+
self._dtype = dtype
45+
46+
self._read_ptr = 0
47+
self._write_ptr = 0
48+
49+
self._dtype_size = np.dtype(dtype).itemsize
50+
51+
self._temp_file = NamedTemporaryFile(mode='r+b', delete=True)
52+
self._writer = io.FileIO(self._temp_file.name, mode='w')
53+
self._reader = io.FileIO(self._temp_file.name, mode='r')
54+
self._not_empty = Event()
55+
self._rlock = RLock()
56+
self._wlock = RLock()
57+
58+
if use_thread:
59+
self._run_available = True
60+
self._queue = Queue()
61+
self._append_thread = Thread(target=self._append, daemon=True)
62+
self._append_thread.start()
63+
64+
self._register_cleanup()
65+
66+
def __del__(self) -> None:
67+
self._cleanup()
68+
69+
def __getitem__(self, index: int | slice) -> Any:
70+
with self._rlock:
71+
72+
write_ptr = self._write_ptr
73+
74+
if write_ptr == 0:
75+
self._not_empty.clear()
76+
self._not_empty.wait()
77+
write_ptr = self._write_ptr
78+
79+
size = write_ptr // self._dtype_size
80+
if isinstance(index, int):
81+
index = index + size if index < 0 else index
82+
if index < 0 or index >= size:
83+
raise IndexError('index out of range')
84+
self._reader.seek(index * self._dtype_size)
85+
result = np.frombuffer(self._reader.read(self._dtype_size), dtype=self._dtype, count=1)[0]
86+
self._reader.seek(self._read_ptr)
87+
return result
88+
89+
if isinstance(index, slice):
90+
start, stop, step = index.indices(size)
91+
byte_start, byte_stop = start * self._dtype_size, stop * self._dtype_size
92+
if byte_start < 0 or byte_start >= size or byte_stop < 0 or byte_stop >= size:
93+
raise IndexError('slice out of range')
94+
self._reader.seek(byte_start)
95+
result = np.frombuffer(self._reader.read(byte_stop - byte_start), dtype=self._dtype)[::step]
96+
self._reader.seek(self._read_ptr)
97+
return result
98+
99+
raise TypeError('index must be int or slice')
100+
101+
def _cleanup(self) -> None:
102+
if self._temp_file:
103+
filepath = self._temp_file.name
104+
105+
if self._use_thread:
106+
self._run_available = False
107+
108+
try:
109+
with self._wlock:
110+
with self._rlock:
111+
self._reader.close()
112+
self._writer.close()
113+
self._temp_file.close()
114+
115+
except Exception as er:
116+
print(f'Exception during cleanup: {er}', file=sys.stderr)
117+
118+
if os.path.exists(filepath) and not self._save_file:
119+
os.remove(filepath)
120+
121+
def _register_cleanup(self) -> None:
122+
atexit.register(self._cleanup)
123+
124+
def _append(self) -> None:
125+
while self._run_available:
126+
if not self._queue.empty():
127+
data, chunk_size = self._queue.get_nowait()
128+
129+
data = data.astype(self._dtype, copy=False)
130+
chunk_elements = chunk_size // self._dtype_size
131+
132+
with self._wlock:
133+
for i in range(0, len(data), chunk_elements):
134+
chunk = data[i:i + chunk_elements]
135+
136+
self._writer.write(chunk)
137+
self._write_ptr += self._dtype_size * chunk.size
138+
139+
self._not_empty.set()
140+
141+
if not self._run_available:
142+
break
143+
else:
144+
time.sleep(.035)
145+
146+
def append(self, data: np.ndarray, chunk_size: int = 131072) -> None:
147+
if len(data) == 0:
148+
return
149+
if self._use_thread:
150+
self._queue.put_nowait((data, chunk_size))
151+
else:
152+
data = data.astype(self._dtype, copy=False)
153+
chunk_elements = chunk_size // self._dtype_size
154+
155+
with self._wlock:
156+
for i in range(0, len(data), chunk_elements):
157+
chunk = data[i:i + chunk_elements]
158+
159+
self._writer.write(chunk)
160+
self._write_ptr += self._dtype_size * chunk.size
161+
162+
self._not_empty.set()
163+
164+
if not self._run_available:
165+
break
166+
167+
def get_all(self, use_memmap: bool = False, wait: bool = False, timeout: float | None = None) -> np.ndarray:
168+
with self._rlock:
169+
170+
if self._write_ptr == 0:
171+
self._not_empty.clear()
172+
if not self._not_empty.wait(timeout):
173+
return np.array([], dtype=self._dtype)
174+
175+
if not use_memmap:
176+
self._reader.seek(0)
177+
result = np.frombuffer(self._reader.read(), dtype=self._dtype)
178+
self._reader.seek(self._read_ptr)
179+
return result
180+
181+
return np.memmap(self._temp_file, dtype=self._dtype)
182+
183+
def get_new(self, wait: bool = False, timeout: float | None = None) -> np.ndarray:
184+
with self._rlock:
185+
186+
write_ptr = self._write_ptr
187+
while write_ptr in {0, self._read_ptr}:
188+
if not wait:
189+
return np.array([], dtype=self._dtype)
190+
191+
self._not_empty.clear()
192+
if not self._not_empty.wait(timeout):
193+
return np.array([], dtype=self._dtype)
194+
195+
write_ptr = self._write_ptr
196+
197+
result = np.frombuffer(self._reader.read(write_ptr - self._read_ptr), dtype=self._dtype)
198+
self._read_ptr = write_ptr
199+
return result
200+
201+
def get_chunk(self, num_elements: int, ring: bool = True, wait: bool = False, timeout: float | None = None) -> np.ndarray:
202+
with self._rlock:
203+
204+
if num_elements <= 0:
205+
return np.array([], dtype=self._dtype)
206+
207+
write_ptr = self._write_ptr
208+
while write_ptr in {0, self._read_ptr}:
209+
if not wait:
210+
return np.array([], dtype=self._dtype)
211+
212+
self._not_empty.clear()
213+
if not self._not_empty.wait(timeout):
214+
return np.array([], dtype=self._dtype)
215+
216+
write_ptr = self._write_ptr
217+
218+
total_bytes = num_elements * self._dtype_size
219+
available_bytes = write_ptr - self._read_ptr
220+
221+
if available_bytes >= total_bytes:
222+
result = np.frombuffer(self._reader.read(total_bytes), dtype=self._dtype)
223+
self._read_ptr += total_bytes
224+
return result
225+
226+
if not ring:
227+
result = np.frombuffer(self._reader.read(available_bytes), dtype=self._dtype)
228+
self._read_ptr += available_bytes
229+
return result
230+
231+
result = np.empty(num_elements, dtype=self._dtype)
232+
filled_elements = 0
233+
while filled_elements < num_elements:
234+
if available_bytes <= 0:
235+
available_bytes = write_ptr
236+
self._reader.seek(0)
237+
self._read_ptr = 0
238+
239+
to_read = min((num_elements - filled_elements) * self._dtype_size, available_bytes)
240+
new_elements = to_read // self._dtype_size
241+
result[filled_elements: filled_elements + new_elements] = np.frombuffer(self._reader.read(to_read), dtype=self._dtype)
242+
filled_elements += new_elements
243+
available_bytes -= to_read
244+
self._read_ptr += to_read
245+
246+
return result
247+
248+
def empty(self) -> bool:
249+
return self._write_ptr == 0
250+
251+
def has_new_data(self) -> bool:
252+
return self._read_ptr < self._write_ptr
253+
254+
def size(self) -> int:
255+
return self._write_ptr // self._dtype_size
256+
257+
def rewind(self) -> None:
258+
with self._rlock:
259+
self._reader.seek(0)
260+
self._read_ptr = 0
261+
262+
def clear(self) -> None:
263+
if self._use_thread:
264+
self._run_available = False
265+
266+
with self._wlock:
267+
with self._rlock:
268+
os.truncate(self._temp_file.fileno(), 0)
269+
self._reader.seek(0)
270+
self._writer.seek(0)
271+
self._write_ptr = 0
272+
self._read_ptr = 0
273+
274+
if self._use_thread:
275+
self._run_available = True
276+
self._queue = Queue()
277+
self._append_thread = Thread(target=self._append, daemon=True)
278+
self._append_thread.start()

0 commit comments

Comments
 (0)