Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ repos:
'pandas-stubs>=2.0.0',
'types-requests',
]
files: ^python/bucketbase/
args: [
--ignore-missing-imports,
--disallow-untyped-defs,
Expand Down
11 changes: 5 additions & 6 deletions python/bucketbase/file_lock.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import os
import sys
from pathlib import Path

import filelock
Expand All @@ -20,9 +19,9 @@ def _acquire(self):
return super()._acquire()

def _release(self):
if sys.platform.startswith("darwin"):
try:
os.remove(self._lock_file_path)
except OSError:
pass
"""Release the lock and remove the underlying lock file if present."""
try:
os.remove(self._lock_file_path)
except OSError:
pass
return super()._release()
6 changes: 3 additions & 3 deletions python/bucketbase/fs_bucket.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ def exists(self, name: PurePosixPath | str) -> bool:
_obj_path = self._root / _name
return _obj_path.exists() and _obj_path.is_file()

def _try_remove_empty_dirs(self, p):
def _try_remove_empty_dirs(self, p: Path) -> None:
dir_to_remove = p.parent
while dir_to_remove.relative_to(self._root).parts:
try:
Expand Down Expand Up @@ -240,11 +240,11 @@ def __init__(self, base: IBucket, locks_path: Path) -> None:
self._locks_path = locks_path
self._lock_manager = FileLockManager(locks_path)

def _lock_object(self, name: PurePosixPath | str):
def _lock_object(self, name: PurePosixPath | str) -> None:
lock = self._lock_manager.get_lock(name)
lock.acquire()

def _unlock_object(self, name: PurePosixPath | str):
def _unlock_object(self, name: PurePosixPath | str) -> None:
lock = self._lock_manager.get_lock(name, only_existing=True)
lock.release()

Expand Down
35 changes: 23 additions & 12 deletions python/bucketbase/minio_bucket.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import os
import traceback
from pathlib import Path, PurePosixPath
from typing import Iterable, Union, BinaryIO
from typing import BinaryIO, Iterable, Union

import certifi
import minio
Expand All @@ -15,14 +15,14 @@
from streamerate import slist, stream
from urllib3 import BaseHTTPResponse

from bucketbase.ibucket import ShallowListing, IBucket, ObjectStream
from bucketbase.ibucket import IBucket, ObjectStream, ShallowListing


class MinioObjectStream(ObjectStream):
def __init__(self, response: BaseHTTPResponse, object_name: PurePosixPath) -> None:
super().__init__(response, object_name)
self._response = response
self._size = int(response.headers.get('content-length', -1))
self._size = int(response.headers.get("content-length", -1))

def __enter__(self) -> ObjectStream:
return self._response
Expand All @@ -32,13 +32,15 @@ def __exit__(self, exc_type, exc_val, exc_tb) -> None:
self._response.release_conn()


def build_minio_client(endpoints: str,
access_key: str,
secret_key: str,
secure: bool = True,
region: str | None = "custom",
conn_pool_size: int = 128,
timeout: int = 5) -> Minio:
def build_minio_client( # pylint: disable=too-many-arguments
endpoints: str | None,
access_key: str | None,
secret_key: str | None,
secure: bool = True,
region: str | None = "custom",
conn_pool_size: int = 128,
timeout: int = 5,
) -> Minio:
"""
:param endpoints: comma separated list of endpoints
:param access_key: access key
Expand All @@ -48,6 +50,9 @@ def build_minio_client(endpoints: str,
:param conn_pool_size: connection pool size
:param timeout: timeout in seconds
"""
if not endpoints or not access_key or not secret_key:
raise ValueError("Minio endpoints, access_key and secret_key must be provided")

ca_certs = os.environ.get("SSL_CERT_FILE") or certifi.where()
https_pool_manager = urllib3.PoolManager(
timeout=timeout,
Expand Down Expand Up @@ -135,9 +140,15 @@ def put_object(self, name: PurePosixPath | str, content: Union[str, bytes, bytea
f = io.BytesIO(_content)
self._minio_client.put_object(bucket_name=self._bucket_name, object_name=_name, data=f, length=len(_content))

def put_object_stream(self, name: PurePosixPath | str, stream: BinaryIO) -> None:
def put_object_stream(self, name: PurePosixPath | str, data_stream: BinaryIO) -> None: # pylint: disable=arguments-renamed
_name = self._validate_name(name)
self._minio_client.put_object(bucket_name=self._bucket_name, object_name=_name, data=stream, length=-1, part_size=self.PART_SIZE)
self._minio_client.put_object(
bucket_name=self._bucket_name,
object_name=_name,
data=data_stream,
length=-1,
part_size=self.PART_SIZE,
)

def fput_object(self, name: PurePosixPath | str, file_path: Path) -> None:
_name = self._validate_name(name)
Expand Down
49 changes: 27 additions & 22 deletions python/tests/bucket_tester.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def __init__(self, storage: IBucket, test_case: TestCase) -> None:
def cleanup(self):
self.storage.remove_prefix(f"dir{self.us}")

def test_put_and_get_object(self):
def test_put_and_get_object(self) -> None:
unique_dir = f"dir{self.us}"
# binary content
path = PurePosixPath(f"{unique_dir}/file1.bin")
Expand Down Expand Up @@ -58,11 +58,11 @@ def test_put_and_get_object(self):
path = f"{unique_dir}/inexistent.txt"
self.test_case.assertRaises(FileNotFoundError, self.storage.get_object, path)

def validated_put_object_stream(self, name: PurePosixPath | str, stream: BinaryIO) -> None:
assert isinstance(stream, io.IOBase), f"stream must be a BinaryIO, but got {type(stream)}"
return self.storage.put_object_stream(name, stream)
def validated_put_object_stream(self, name: PurePosixPath | str, data_stream: BinaryIO) -> None:
assert isinstance(data_stream, io.IOBase), f"stream must be a BinaryIO, but got {type(data_stream)}"
return self.storage.put_object_stream(name, data_stream)
Comment on lines +61 to +63
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Type-hint contract is violated and the diagnostic message is stale

  1. The helper advertises -> None but immediately returns the inner call’s result.
  2. The assertion message still references the old parameter name.
-    def validated_put_object_stream(self, name: PurePosixPath | str, data_stream: BinaryIO) -> None:
-        assert isinstance(data_stream, io.IOBase), f"stream must be a BinaryIO, but got {type(data_stream)}"
-        return self.storage.put_object_stream(name, data_stream)
+    def validated_put_object_stream(self, name: PurePosixPath | str, data_stream: BinaryIO) -> None:
+        assert isinstance(data_stream, io.IOBase), f"data_stream must be a BinaryIO, but got {type(data_stream)}"
+        self.storage.put_object_stream(name, data_stream)

This aligns the implementation with its static signature and removes dead-weight return values.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def validated_put_object_stream(self, name: PurePosixPath | str, data_stream: BinaryIO) -> None:
assert isinstance(data_stream, io.IOBase), f"stream must be a BinaryIO, but got {type(data_stream)}"
return self.storage.put_object_stream(name, data_stream)
def validated_put_object_stream(self, name: PurePosixPath | str, data_stream: BinaryIO) -> None:
assert isinstance(data_stream, io.IOBase), f"data_stream must be a BinaryIO, but got {type(data_stream)}"
self.storage.put_object_stream(name, data_stream)
🤖 Prompt for AI Agents
In python/tests/bucket_tester.py around lines 61 to 63, the method
validated_put_object_stream is declared to return None but actually returns the
result of self.storage.put_object_stream, violating the type hint. Also, the
assertion message references an outdated parameter name. To fix this, remove the
return statement so the method returns None as declared, and update the
assertion message to correctly reference the current parameter name data_stream.


def test_put_and_get_object_stream(self):
def test_put_and_get_object_stream(self) -> None:
unique_dir = f"dir{self.us}"
# binary content
path = PurePosixPath(f"{unique_dir}/file1.bin")
Expand All @@ -72,15 +72,15 @@ def test_put_and_get_object_stream(self):

self.validated_put_object_stream(path, gzipped_stream)
with self.storage.get_object_stream(path) as file:
with gzip.open(file, 'rt') as file:
with gzip.open(file, "rt") as file:
result = [file.readline() for _ in range(3)]
self.test_case.assertEqual(result, ['Test\n', 'content', ''])
self.test_case.assertEqual(result, ["Test\n", "content", ""])

# string path
path = f"{unique_dir}/file1.bin"
retrieved_content = self.storage.get_object_stream(path)
with retrieved_content as file:
with gzip.open(file, 'rt') as file:
with gzip.open(file, "rt") as file:
result = file.read()
self.test_case.assertEqual(result, "Test\ncontent")

Expand All @@ -90,23 +90,27 @@ def test_put_and_get_object_stream(self):
self.validated_put_object_stream(path_out, file)

with self.storage.get_object_stream(path_out) as file:
with gzip.open(file, 'rt') as file:
with gzip.open(file, "rt") as file:
result = file.read()
self.test_case.assertEqual(result, "Test\ncontent")

# inexistent path
path = f"{unique_dir}/inexistent.txt"
self.test_case.assertRaises(FileNotFoundError, self.storage.get_object_stream, path)

def test_list_objects(self):
def test_list_objects(self) -> None:
unique_dir = f"dir{self.us}"
self.storage.put_object(PurePosixPath(f"{unique_dir}/file1.txt"), b"Content 1")
self.storage.put_object(PurePosixPath(f"{unique_dir}/dir2/file2.txt"), b"Content 2")
self.storage.put_object(PurePosixPath(f"{unique_dir}file1.txt"), b"Content 3")
objects = self.storage.list_objects(PurePosixPath(f"{unique_dir}"))
objects.sort()
self.test_case.assertIsInstance(objects, slist)
expected_objects_all = [PurePosixPath(f"{unique_dir}/dir2/file2.txt"), PurePosixPath(f"{unique_dir}/file1.txt"), PurePosixPath(f"{unique_dir}file1.txt")]
expected_objects_all = [
PurePosixPath(f"{unique_dir}/dir2/file2.txt"),
PurePosixPath(f"{unique_dir}/file1.txt"),
PurePosixPath(f"{unique_dir}file1.txt"),
]
self.test_case.assertListEqual(objects, expected_objects_all)

objects = self.storage.list_objects(f"{unique_dir}/")
Expand All @@ -126,13 +130,13 @@ def test_list_objects(self):
for prefix in self.INVALID_PREFIXES:
self.test_case.assertRaises(ValueError, self.storage.list_objects, prefix)

def test_list_objects_with_over1000keys(self):
def test_list_objects_with_over1000keys(self) -> None:
path_with2025_keys = self._ensure_dir_with_2025_keys()

objects = self.storage.list_objects(path_with2025_keys)
self.test_case.assertEqual(2025, objects.size())

def test_shallow_list_objects(self):
def test_shallow_list_objects(self) -> None:
unique_dir = f"dir{self.us}"
self.storage.put_object(PurePosixPath(f"{unique_dir}/file1.txt"), b"Content 1")
self.storage.put_object(PurePosixPath(f"{unique_dir}/dir2/file2.txt"), b"Content 2")
Expand All @@ -157,30 +161,30 @@ def test_shallow_list_objects(self):

# here we expect that on Minio there will be other dirs/objects, since the bucket is shared, so we just check of our objects do exist
shallow_listing = self.storage.shallow_list_objects("")
expected_objects = {PurePosixPath(f"{unique_dir}file1.txt")}
expected_prefixes = {f"{unique_dir}/"}
self.test_case.assertTrue(expected_objects.issubset(shallow_listing.objects.toSet()))
self.test_case.assertTrue(expected_prefixes.issubset(shallow_listing.prefixes.toSet()))
expected_obj_set = {PurePosixPath(f"{unique_dir}file1.txt")}
expected_prefix_set = {f"{unique_dir}/"}
self.test_case.assertTrue(expected_obj_set.issubset(set(shallow_listing.objects)))
self.test_case.assertTrue(expected_prefix_set.issubset(set(shallow_listing.prefixes)))

# Invalid Prefix cases
for prefix in self.INVALID_PREFIXES:
self.test_case.assertRaises(ValueError, self.storage.shallow_list_objects, prefix)

def test_shallow_list_objects_with_over1000keys(self):
def test_shallow_list_objects_with_over1000keys(self) -> None:
path_with2025_keys = self._ensure_dir_with_2025_keys()
shallow_listing = self.storage.shallow_list_objects(path_with2025_keys)
self.test_case.assertEqual(2025, shallow_listing.objects.size())
self.test_case.assertEqual(0, shallow_listing.prefixes.size())

def test_exists(self):
def test_exists(self) -> None:
unique_dir = f"dir{self.us}"
path = PurePosixPath(f"{unique_dir}/file.txt")
self.storage.put_object(path, b"Content")
self.test_case.assertTrue(self.storage.exists(path))
self.test_case.assertFalse(self.storage.exists(f"{unique_dir}"))
self.test_case.assertRaises(ValueError, self.storage.exists, f"{unique_dir}/")

def test_remove_objects(self):
def test_remove_objects(self) -> None:
# Setup the test
unique_dir = f"dir{self.us}"
path1 = PurePosixPath(f"{unique_dir}/file1.txt")
Expand All @@ -207,15 +211,16 @@ def test_remove_objects(self):
def _ensure_dir_with_2025_keys(self) -> str:
existing_keys = self.storage.list_objects(self.PATH_WITH_2025_KEYS)
if not existing_keys:
def upload_file(i):

def upload_file(i: int) -> None:
path = PurePosixPath(self.PATH_WITH_2025_KEYS) / f"file{i}.txt"
content = f"Content {i}".encode("utf-8")
self.storage.put_object(path, content)

stream(range(2025)).fastmap(upload_file, poolSize=100).to_list()
return self.PATH_WITH_2025_KEYS

def test_get_size(self):
def test_get_size(self) -> None:
# Setup the test
unique_dir = f"dir{self.us}"
path1 = PurePosixPath(f"{unique_dir}/file1.txt")
Expand Down
26 changes: 15 additions & 11 deletions python/tests/test_minio_bucket.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
from unittest import TestCase
from unittest import TestCase, skipUnless

from bucketbase.minio_bucket import build_minio_client, MinioBucket
from bucketbase.minio_bucket import MinioBucket, build_minio_client
from tests.bucket_tester import IBucketTester
from tests.config import CONFIG


@skipUnless(
CONFIG.MINIO_PUBLIC_SERVER and CONFIG.MINIO_ACCESS_KEY and CONFIG.MINIO_SECRET_KEY,
"Minio configuration not provided",
)
class TestIntegratedMinioBucket(TestCase):
def setUp(self) -> None:
minio_client = build_minio_client(endpoints=CONFIG.MINIO_PUBLIC_SERVER, access_key=CONFIG.MINIO_ACCESS_KEY, secret_key=CONFIG.MINIO_SECRET_KEY)
Expand All @@ -14,29 +18,29 @@ def setUp(self) -> None:
def tearDown(self) -> None:
self.tester.cleanup()

def test_put_and_get_object(self):
def test_put_and_get_object(self) -> None:
self.tester.test_put_and_get_object()

def test_put_and_get_object_stream(self):
def test_put_and_get_object_stream(self) -> None:
self.tester.test_put_and_get_object_stream()

def test_list_objects(self):
def test_list_objects(self) -> None:
self.tester.test_list_objects()

def test_list_objects_with_2025_keys(self):
def test_list_objects_with_2025_keys(self) -> None:
self.tester.test_list_objects_with_over1000keys()

def test_shallow_list_objects(self):
def test_shallow_list_objects(self) -> None:
self.tester.test_shallow_list_objects()

def test_shallow_list_objects_with_2025_keys(self):
def test_shallow_list_objects_with_2025_keys(self) -> None:
self.tester.test_shallow_list_objects_with_over1000keys()

def test_exists(self):
def test_exists(self) -> None:
self.tester.test_exists()

def test_remove_objects(self):
def test_remove_objects(self) -> None:
self.tester.test_remove_objects()

def test_get_size(self):
def test_get_size(self) -> None:
self.tester.test_get_size()