This repository was archived by the owner on May 5, 2025. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 11
feat: support zstd compression in miniostorage #405
Merged
Merged
Changes from 1 commit
Commits
Show all changes
7 commits
Select commit
Hold shift + click to select a range
1ad7fa7
feat: support zstd compression in miniostorage
joseph-sentry a69e79b
fix: address feedback
joseph-sentry d579531
fix: update MinioStorageService
joseph-sentry 70549aa
Merge branch 'main' of github.com:codecov/shared into joseph/zstandard
joseph-sentry 200f3fc
fix(minio): check urllib3 version in read_file
joseph-sentry f30ca66
feat: add feature flag for new minio storage
joseph-sentry 54bfa03
fix: revert changes to old minio
joseph-sentry File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,13 +1,15 @@ | ||
| import gzip | ||
| import datetime | ||
| import json | ||
| import logging | ||
| import os | ||
| import shutil | ||
| import sys | ||
| import tempfile | ||
| from io import BytesIO | ||
| from typing import BinaryIO, overload | ||
| from typing import BinaryIO, Protocol, overload | ||
|
|
||
| import sentry_sdk | ||
| import sentry_sdk.scope | ||
| import zstandard | ||
| from minio import Minio | ||
| from minio.credentials import ( | ||
| ChainedProvider, | ||
|
|
@@ -17,13 +19,29 @@ | |
| ) | ||
| from minio.deleteobjects import DeleteObject | ||
| from minio.error import MinioException, S3Error | ||
| from urllib3.response import HTTPResponse | ||
|
|
||
| from shared.storage.base import CHUNK_SIZE, BaseStorageService | ||
Swatinem marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| from shared.storage.base import BaseStorageService | ||
| from shared.storage.exceptions import BucketAlreadyExistsError, FileNotInStorageError | ||
|
|
||
| log = logging.getLogger(__name__) | ||
|
|
||
|
|
||
| class Readable(Protocol): | ||
| def read(self, size: int = -1) -> bytes: ... | ||
|
|
||
|
|
||
| class GetObjectToFileResponse(Protocol): | ||
| bucket_name: str | ||
| object_name: str | ||
| last_modified: datetime.datetime | None | ||
| etag: str | ||
| size: int | ||
| content_type: str | None | ||
| metadata: dict[str, str] | ||
| version_id: str | None | ||
Swatinem marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
|
|
||
| # Service class for interfacing with codecov's underlying storage layer, minio | ||
| class MinioStorageService(BaseStorageService): | ||
| def __init__(self, minio_config): | ||
|
|
@@ -57,20 +75,21 @@ def init_minio_client( | |
| region: str = None, | ||
| ): | ||
| """ | ||
| Initialize the minio client | ||
| Initialize the minio client | ||
|
|
||
| `iam_auth` adds support for IAM base authentication in a fallback pattern. | ||
| The following will be checked in order: | ||
| The following will be checked in order: | ||
|
|
||
| * EC2 metadata -- a custom endpoint can be provided, default is None. | ||
| * AWS env vars, specifically AWS_ACCESS_KEY and AWS_SECRECT_KEY | ||
| * Minio env vars, specifically MINIO_ACCESS_KEY and MINIO_SECRET_KEY | ||
| * AWS env vars, specifically AWS_ACCESS_KEY and AWS_SECRECT_KEY | ||
|
|
||
| to support backward compatibility, the iam_auth setting should be used in the installation | ||
| configuration | ||
| to support backward compatibility, the iam_auth setting should be used | ||
| in the installation configuration | ||
|
|
||
| Args: | ||
| host (str): The address of the host where minio lives | ||
|
|
||
| port (str): The port number (as str or int should be ok) | ||
| access_key (str, optional): The access key (optional if IAM is being used) | ||
| secret_key (str, optional): The secret key (optional if IAM is being used) | ||
|
|
@@ -143,50 +162,64 @@ def create_root_storage(self, bucket_name="archive", region="us-east-1"): | |
| # Writes a file to storage will gzip if not compressed already | ||
| def write_file( | ||
| self, | ||
| bucket_name, | ||
| path, | ||
| data, | ||
| reduced_redundancy=False, | ||
| bucket_name: str, | ||
| path: str, | ||
| data: BinaryIO, | ||
| reduced_redundancy: bool = False, | ||
| *, | ||
| is_already_gzipped: bool = False, | ||
| is_already_gzipped: bool = False, # deprecated | ||
| is_compressed: bool = False, | ||
| compression_type: str = "zstd", | ||
| ): | ||
| if is_already_gzipped: | ||
| log.warning( | ||
Swatinem marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| "is_already_gzipped is deprecated and will be removed in a future version, instead compress using zstd and use the is_already_zstd_compressed argument" | ||
| ) | ||
| with sentry_sdk.new_scope() as scope: | ||
| scope.set_extra("bucket_name", bucket_name) | ||
| scope.set_extra("path", path) | ||
| sentry_sdk.capture_message("is_already_gzipped passed with True") | ||
| is_compressed = True | ||
| compression_type = "gzip" | ||
|
|
||
| if isinstance(data, str): | ||
| data = data.encode() | ||
| log.warning( | ||
Swatinem marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| "passing data as a str to write_file is deprecated and will be removed in a future version, instead pass an object compliant with the BinaryIO type" | ||
| ) | ||
| with sentry_sdk.new_scope() as scope: | ||
| scope.set_extra("bucket_name", bucket_name) | ||
| scope.set_extra("path", path) | ||
| sentry_sdk.capture_message("write_file data argument passed as str") | ||
|
|
||
| if isinstance(data, bytes): | ||
| if not is_already_gzipped: | ||
| out = BytesIO() | ||
| with gzip.GzipFile(fileobj=out, mode="w", compresslevel=9) as gz: | ||
| gz.write(data) | ||
| else: | ||
| out = BytesIO(data) | ||
|
|
||
| # get file size | ||
| out.seek(0, os.SEEK_END) | ||
| out_size = out.tell() | ||
| else: | ||
| # data is already a file-like object | ||
| if not is_already_gzipped: | ||
| _, filename = tempfile.mkstemp() | ||
| with gzip.open(filename, "wb") as f: | ||
| shutil.copyfileobj(data, f) | ||
| out = open(filename, "rb") | ||
| else: | ||
| out = data | ||
| data = BytesIO(data.encode()) | ||
|
|
||
| out_size = os.stat(filename).st_size | ||
| if not is_compressed: | ||
| cctx = zstandard.ZstdCompressor() | ||
Swatinem marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| reader: zstandard.ZstdCompressionReader = cctx.stream_reader(data) | ||
| _, filepath = tempfile.mkstemp() | ||
| with open(filepath, "wb") as f: | ||
| while chunk := reader.read(16384): | ||
| f.write(chunk) | ||
| data = open(filepath, "rb") | ||
|
|
||
| try: | ||
| # reset pos for minio reading. | ||
| out.seek(0) | ||
| out_size = data.seek(0, os.SEEK_END) | ||
Swatinem marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| data.seek(0) | ||
|
|
||
| if compression_type == "gzip": | ||
| content_encoding = "gzip" | ||
| elif compression_type == "zstd": | ||
| content_encoding = "zstd" | ||
|
|
||
| headers = {"Content-Encoding": content_encoding} | ||
|
|
||
| headers = {"Content-Encoding": "gzip"} | ||
| if reduced_redundancy: | ||
| headers["x-amz-storage-class"] = "REDUCED_REDUNDANCY" | ||
|
|
||
| self.minio_client.put_object( | ||
| bucket_name, | ||
| path, | ||
| out, | ||
| data, | ||
| out_size, | ||
| metadata=headers, | ||
| content_type="text/plain", | ||
|
|
@@ -195,25 +228,65 @@ def write_file( | |
|
|
||
| except MinioException: | ||
| raise | ||
| finally: | ||
| if not is_compressed: | ||
| data.close() | ||
| os.unlink(filepath) | ||
|
|
||
| @overload | ||
| def read_file(self, bucket_name: str, path: str) -> bytes: ... | ||
| def read_file( | ||
| self, bucket_name: str, path: str, file_obj: None = None | ||
| ) -> bytes: ... | ||
|
|
||
| @overload | ||
| def read_file(self, bucket_name: str, path: str, file_obj: BinaryIO) -> None: ... | ||
| def read_file(self, bucket_name: str, path: str, file_obj: str) -> None: ... | ||
Swatinem marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| def read_file(self, bucket_name, path, file_obj=None) -> bytes | None: | ||
| try: | ||
| res = self.minio_client.get_object(bucket_name, path) | ||
| if file_obj is None: | ||
| data = BytesIO() | ||
| for d in res.stream(CHUNK_SIZE): | ||
| data.write(d) | ||
| data.seek(0) | ||
| return data.getvalue() | ||
| headers = {"Accept-Encoding": "gzip, zstd"} | ||
| if file_obj: | ||
| _, tmpfilepath = tempfile.mkstemp() | ||
| to_file_response: GetObjectToFileResponse = ( | ||
| self.minio_client.fget_object( | ||
Swatinem marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| bucket_name, path, tmpfilepath, request_headers=headers | ||
| ) | ||
| ) | ||
| data = open(tmpfilepath, "rb") | ||
| content_encoding = to_file_response.metadata.get( | ||
| "Content-Encoding", None | ||
| ) | ||
| else: | ||
| response: HTTPResponse = self.minio_client.get_object( | ||
| bucket_name, path, request_headers=headers | ||
| ) | ||
| data = response | ||
| content_encoding = response.headers.get("Content-Encoding", None) | ||
|
|
||
| reader: Readable | None = None | ||
| if content_encoding == "gzip": | ||
| # HTTPResponse automatically decodes gzipped data for us | ||
|
||
| # minio_client.fget_object uses HTTPResponse under the hood, | ||
| # so this applies to both get_object and fget_object | ||
| reader = data | ||
| elif content_encoding == "zstd": | ||
| # we have to manually decompress zstandard compressed data | ||
| cctx = zstandard.ZstdDecompressor() | ||
| reader = cctx.stream_reader(data) | ||
| else: | ||
| with sentry_sdk.new_scope() as scope: | ||
| scope.set_extra("bucket_name", bucket_name) | ||
| scope.set_extra("path", path) | ||
| raise ValueError("Blob does not have Content-Encoding set") | ||
Swatinem marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| if file_obj: | ||
| while chunk := reader.read(16384): | ||
| file_obj.write(chunk) | ||
| return None | ||
Swatinem marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| else: | ||
| for d in res.stream(CHUNK_SIZE): | ||
| file_obj.write(d) | ||
| res = BytesIO() | ||
| while chunk := reader.read(16384): | ||
| res.write(chunk) | ||
| return res.getvalue() | ||
| except S3Error as e: | ||
| if e.code == "NoSuchKey": | ||
| raise FileNotInStorageError( | ||
|
|
@@ -222,6 +295,10 @@ def read_file(self, bucket_name, path, file_obj=None) -> bytes | None: | |
| raise e | ||
| except MinioException: | ||
| raise | ||
| finally: | ||
| if file_obj: | ||
| data.close() | ||
| os.unlink(tmpfilepath) | ||
|
|
||
| """ | ||
| Deletes file url in specified bucket. | ||
|
|
||
Empty file.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.