Skip to content

Commit 4be8682

Browse files
committed
add logging and checks
1 parent 405ee53 commit 4be8682

File tree

2 files changed

+84
-0
lines changed

2 files changed

+84
-0
lines changed

gcsfs/prefetcher.py

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
11
import asyncio
22
import ctypes
3+
import logging
34
import threading
45
from collections import deque
56

67
import fsspec.asyn
78

9+
logger = logging.getLogger(__name__)
10+
811
PyBytes_FromStringAndSize = ctypes.pythonapi.PyBytes_FromStringAndSize
912
PyBytes_FromStringAndSize.restype = ctypes.py_object
1013
PyBytes_FromStringAndSize.argtypes = [ctypes.c_void_p, ctypes.c_ssize_t]
@@ -63,11 +66,26 @@ def __init__(
6366
concurrency=4,
6467
**kwargs,
6568
):
69+
logger.debug(
70+
"Initializing BackgroundPrefetcher: blocksize=%d, size=%d, max_prefetch_size=%s, concurrency=%d",
71+
blocksize,
72+
size,
73+
max_prefetch_size,
74+
concurrency,
75+
)
6676
self.blocksize = blocksize
6777
self.size = size
6878
self.fetcher = fetcher
6979
self.concurrency = concurrency
80+
81+
if max_prefetch_size is not None and max_prefetch_size < blocksize:
82+
# Ensure we have a positive prefetch window.
83+
raise ValueError(
84+
f"User max_prefetch_size ({max_prefetch_size}) is smaller than blocksize ({blocksize})."
85+
)
86+
7087
self._user_max_prefetch_size = max_prefetch_size
88+
7189
self.sequential_streak = 0
7290
self.user_offset = 0
7391
self.current_offset = 0
@@ -82,6 +100,7 @@ def __init__(
82100
self._error = None
83101

84102
async def _start_producer():
103+
logger.debug("Starting background producer task")
85104
self.queue = asyncio.Queue()
86105
self._wakeup_producer = asyncio.Event()
87106
self._producer_task = asyncio.create_task(self._producer_loop())
@@ -111,6 +130,10 @@ def max_prefetch_size(self) -> int:
111130
return max(2 * self._get_adaptive_blocksize(), self.DEFAULT_PREFETCH_SIZE)
112131

113132
async def _cancel_all_tasks(self):
133+
logger.debug(
134+
"Cancelling all active producer tasks. Task count: %d",
135+
len(self._active_tasks),
136+
)
114137
self.is_stopped = True
115138
self._wakeup_producer.set()
116139

@@ -142,9 +165,11 @@ async def _cancel_all_tasks(self):
142165
break
143166

144167
if tasks_to_wait:
168+
logger.debug("Waiting for %d cancelled tasks to finish", len(tasks_to_wait))
145169
await asyncio.gather(*tasks_to_wait, return_exceptions=True)
146170

147171
async def _restart_producer(self):
172+
logger.info("Restarting producer loop (likely due to seek operation)")
148173
await self._cancel_all_tasks()
149174
self.is_stopped = False
150175
self._error = None
@@ -154,6 +179,7 @@ async def _restart_producer(self):
154179
self._producer_task = asyncio.create_task(self._producer_loop())
155180

156181
async def _producer_loop(self):
182+
logger.debug("Producer loop running")
157183
try:
158184
while not self.is_stopped:
159185
await self._wakeup_producer.wait()
@@ -164,6 +190,12 @@ async def _producer_loop(self):
164190
(self.sequential_streak + 1) * block_size,
165191
self.max_prefetch_size,
166192
)
193+
logger.debug(
194+
"Producer awake. Current offset: %d, User offset: %d, Prefetch size: %d",
195+
self.current_offset,
196+
self.user_offset,
197+
prefetch_size,
198+
)
167199

168200
while (
169201
not self.is_stopped
@@ -207,6 +239,12 @@ async def _producer_loop(self):
207239
else 1
208240
) # sequential usecase
209241

242+
logger.debug(
243+
"Producer spawning fetch task at offset %d, size %d, split_factor %d",
244+
self.current_offset,
245+
actual_size,
246+
sfactor,
247+
)
210248
download_task = asyncio.create_task(
211249
self.fetcher(
212250
self.current_offset, actual_size, split_factor=sfactor
@@ -219,27 +257,37 @@ async def _producer_loop(self):
219257
self.current_offset += actual_size
220258

221259
except asyncio.CancelledError:
260+
logger.debug("Producer loop cancelled")
222261
pass
223262
except Exception as e:
263+
logger.error("Producer loop encountered an error: %s", e, exc_info=True)
224264
self.is_stopped = True
225265
self._error = e
226266
await self.queue.put(e)
227267

228268
async def read(self):
229269
"""Reads the next chunk from the object."""
230270
if self.user_offset >= self.size:
271+
logger.debug(
272+
"Read requested but user_offset (%d) is >= size (%d). Returning empty bytes.",
273+
self.user_offset,
274+
self.size,
275+
)
231276
return b""
232277
if self.is_stopped and self.queue.empty():
233278
# This may happen if user read despite previous read produced an exception.
279+
logger.error("Read attempted on a stopped producer with an empty queue.")
234280
raise RuntimeError("Could not fetch data, the producer is stopped")
235281

236282
if self.queue.empty():
283+
logger.debug("Queue empty during read, waking up producer.")
237284
self._wakeup_producer.set()
238285

239286
task = await self.queue.get()
240287

241288
# Check if the producer pushed an exception
242289
if isinstance(task, Exception):
290+
logger.error("Exception retrieved from read queue: %s", task)
243291
self.is_stopped = True
244292
self._error = task
245293
raise task
@@ -251,8 +299,10 @@ async def read(self):
251299
self._wakeup_producer.set() # starts prefetching.
252300
return block
253301
except asyncio.CancelledError:
302+
logger.debug("Read task was cancelled.")
254303
raise
255304
except Exception as e:
305+
logger.error("Error awaiting read task: %s", e, exc_info=True)
256306
self.is_stopped = True
257307
self._error = e
258308
raise e
@@ -261,13 +311,16 @@ async def seek(self, new_offset):
261311
if new_offset == self.user_offset:
262312
return
263313

314+
logger.info("Seeking from %d to %d", self.user_offset, new_offset)
264315
self.user_offset = new_offset
265316
self.current_offset = new_offset
266317
await self._restart_producer()
267318

268319
async def _async_fetch(self, start, end):
320+
logger.debug("_async_fetch called for range %d - %d", start, end)
269321
if start != self.user_offset:
270322
# We seeked elsewhere, reset the current block
323+
logger.debug("Seek detected in _async_fetch. Resetting block and seeking.")
271324
self._current_block = b""
272325
self._current_block_idx = 0
273326
await self.seek(start)
@@ -325,21 +378,30 @@ async def _async_fetch(self, start, end):
325378
else:
326379
out = b"".join(chunks)
327380

381+
logger.debug("Finished _async_fetch. Total bytes collected: %d", len(out))
328382
return out
329383

330384
def _fetch(self, start: int | None, end: int | None) -> bytes:
331385
if start is None:
332386
start = 0
333387
if end is None:
334388
end = self.size
389+
390+
logger.debug("_fetch called for start=%s, end=%s", start, end)
391+
335392
if start >= self.size or start >= end:
393+
logger.debug(
394+
"Invalid bounds in _fetch or EOF reached. Returning empty bytes."
395+
)
336396
return b""
337397

338398
with self._lock:
339399
if self._error:
400+
logger.error("Cannot _fetch: instance has an active error state.")
340401
raise self._error
341402

342403
if self.is_stopped:
404+
logger.error("Cannot _fetch: instance is closed/stopped.")
343405
raise RuntimeError(
344406
"The file instance has been closed. This can occur if a close operation"
345407
"is executed concurrently while a read operation is still in progress."
@@ -348,11 +410,13 @@ def _fetch(self, start: int | None, end: int | None) -> bytes:
348410
try:
349411
result = fsspec.asyn.sync(self.loop, self._async_fetch, start, end)
350412
except Exception as e:
413+
logger.error("Exception during synchronous fetch: %s", e, exc_info=True)
351414
self.is_stopped = True
352415
self._error = e
353416
raise
354417

355418
if self.is_stopped:
419+
logger.error("Instance stopped during fetch operation.")
356420
raise RuntimeError(
357421
"The file instance has been closed. This can occur if a close operation"
358422
"is executed concurrently while a read operation is still in progress."
@@ -362,7 +426,9 @@ def _fetch(self, start: int | None, end: int | None) -> bytes:
362426

363427
def close(self):
364428
"""Clean shutdown. Cancels tasks and waits for them to abort."""
429+
logger.info("Closing BackgroundPrefetcher")
365430
if self.is_stopped:
431+
logger.debug("BackgroundPrefetcher already stopped")
366432
return
367433
self.is_stopped = True
368434
with self._lock:

gcsfs/tests/test_prefetcher.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -529,3 +529,21 @@ def test_read_runtime_error_on_stopped_empty():
529529
):
530530
fsspec.asyn.sync(bp.loop, bp.read)
531531
bp.close()
532+
533+
534+
def test_init_invalid_max_prefetch_size():
535+
with pytest.raises(
536+
ValueError,
537+
match=r"User max_prefetch_size \(50\) is smaller than blocksize \(100\)\.",
538+
):
539+
BackgroundPrefetcher(
540+
blocksize=100, fetcher=MockFetcher(b""), size=1000, max_prefetch_size=50
541+
)
542+
543+
544+
def test_init_valid_max_prefetch_size_edge_case():
545+
bp = BackgroundPrefetcher(
546+
blocksize=100, fetcher=MockFetcher(b""), size=1000, max_prefetch_size=100
547+
)
548+
assert bp._user_max_prefetch_size == 100
549+
bp.close()

0 commit comments

Comments
 (0)