From 1bcbbacb099257e5a2469cedd6cca972e29f93af Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Mon, 25 May 2026 12:48:46 +0800 Subject: [PATCH] [python] Support row-level Blob access MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add `InternalRow.get_blob(pos)` to pypaimon, aligned with Java `InternalRow.getBlob`. Reads on BLOB columns return a `Blob` object (`BlobData` for inline storage, `BlobRef` for descriptor storage with lazy URI resolution). Also adds `Blob.from_bytes(data, file_io)` factory that auto-dispatches based on the BLOBDESC magic header (mirrors Java `Blob.fromBytes`). - `GetBlobTest` / `GetBlobMultiColumnTest` — row-level access on inline and descriptor blob storage - `GetBlobThroughDescriptorConvertReaderTest` — pins propagation through `BlobDescriptorConvertReader` - `GetBlobNonBlobColumnSecurityTest` — SSRF defence: non-BLOB columns containing magic-prefixed bytes never resolve a URI - `Blob.from_bytes` factory unit tests --- docs/docs/append-table/blob.mdx | 2 + docs/docs/pypaimon/blob.md | 155 +++++++++++ .../reader/blob_descriptor_convert_reader.py | 2 + .../read/reader/concat_batch_reader.py | 4 +- .../read/reader/data_file_batch_reader.py | 23 +- .../read/reader/filter_record_batch_reader.py | 2 + .../read/reader/iface/record_batch_reader.py | 13 +- .../reader/outer_projection_record_reader.py | 20 +- .../reader/row_range_filter_record_reader.py | 2 + paimon-python/pypaimon/read/split_read.py | 20 +- .../pypaimon/table/row/binary_row.py | 9 + paimon-python/pypaimon/table/row/blob.py | 20 ++ .../pypaimon/table/row/generic_row.py | 9 + .../pypaimon/table/row/internal_row.py | 6 +- .../pypaimon/table/row/offset_row.py | 16 +- .../pypaimon/table/row/projected_row.py | 6 + .../pypaimon/tests/blob_table_test.py | 255 ++++++++++++++++++ paimon-python/pypaimon/tests/blob_test.py | 32 +++ 18 files changed, 562 insertions(+), 34 deletions(-) create mode 100644 docs/docs/pypaimon/blob.md diff --git a/docs/docs/append-table/blob.mdx b/docs/docs/append-table/blob.mdx index 4f0ce604acea..7f0afc964e7d 100644 --- a/docs/docs/append-table/blob.mdx +++ b/docs/docs/append-table/blob.mdx @@ -730,6 +730,8 @@ For these configured fields: - writes can still start from raw BLOB input - the field is treated as descriptor-based for operations such as `MERGE INTO` +For the Python equivalent, see [Blob Storage in pypaimon](../pypaimon/blob). + ## Limitations 1. **Append Table Only**: Blob type is designed for append-only tables. Primary key tables are not supported. diff --git a/docs/docs/pypaimon/blob.md b/docs/docs/pypaimon/blob.md new file mode 100644 index 000000000000..916d893571d8 --- /dev/null +++ b/docs/docs/pypaimon/blob.md @@ -0,0 +1,155 @@ +--- +title: "Blob Storage" +sidebar_position: 7 +--- + + +# Blob Storage in pypaimon + +For Paimon's Blob storage concepts (storage modes, table options, SQL usage, +Java API), see [Blob Storage](../append-table/blob). + +This page covers the Python API for reading and writing BLOB columns. + +## Creating a Table + +A BLOB column maps to PyArrow `large_binary()`. The table must enable +`row-tracking.enabled` and `data-evolution.enabled`. + +```python +from pypaimon import CatalogFactory, Schema +import pyarrow as pa + +catalog = CatalogFactory.create({'warehouse': '/tmp/paimon-warehouse'}) +catalog.create_database('my_db', True) + +pa_schema = pa.schema([ + ('id', pa.int32()), + ('name', pa.string()), + ('image', pa.large_binary()), +]) +schema = Schema.from_pyarrow_schema( + pa_schema, + options={ + 'row-tracking.enabled': 'true', + 'data-evolution.enabled': 'true', + }, +) +catalog.create_table('my_db.image_table', schema, True) +``` + +## Writing Blob Data + +Pass raw bytes for the blob column in a PyArrow Table; pypaimon writes them +to dedicated `.blob` files automatically. + +```python +table = catalog.get_table('my_db.image_table') +write_builder = table.new_batch_write_builder() +writer = write_builder.new_write() + +with open('cat.jpg', 'rb') as f1, open('dog.jpg', 'rb') as f2: + writer.write_arrow(pa.Table.from_pydict({ + 'id': [1, 2], + 'name': ['cat', 'dog'], + 'image': [f1.read(), f2.read()], + }, schema=pa_schema)) + +write_builder.new_commit().commit(writer.prepare_commit()) +writer.close() +``` + +## Reading Blob Data + +Use `row.get_blob(pos)` to access blob columns. It returns a `Blob` object +regardless of how the blob is stored. + +```python +read_builder = table.new_read_builder() +splits = read_builder.new_scan().plan().splits() +read = read_builder.new_read() + +for row in read.to_iterator(splits): + blob = row.get_blob(2) + if blob is None: + continue + data = blob.to_data() +``` + +## Streaming for Large Blobs + +`blob.new_input_stream()` returns a file-like object. Whether it is +genuinely lazy depends on how the table is configured: + +- Default mode (`blob-as-descriptor=false`): the read path materialises + the payload before it reaches `row.get_blob(pos)`. `Blob` is a + `BlobData` and `new_input_stream()` wraps the in-memory bytes — not + true streaming. For large blobs this can still OOM. +- Descriptor mode (`blob-as-descriptor=true`): the read path preserves + the descriptor. `Blob` is a `BlobRef` and `new_input_stream()` opens + the underlying file on demand. + +This mirrors Java's `BlobFormatReader` semantics. + +For genuine on-demand streaming of large blobs (videos, model weights), +configure `blob-as-descriptor=true` before reading: + +```python +schema = Schema.from_pyarrow_schema( + pa_schema, + options={ + 'row-tracking.enabled': 'true', + 'data-evolution.enabled': 'true', + 'blob-as-descriptor': 'true', + }, +) +# Reads of this table return BlobRef whose new_input_stream() is lazy. +for row in read.to_iterator(splits): + with row.get_blob(2).new_input_stream() as stream: + chunk = stream.read(1024) +``` + +## Lower-level: `Blob.from_bytes` + +When you already have raw or descriptor bytes (for example from a custom +source) and want to wrap them as a `Blob`, use the factory: + +```python +from pypaimon.table.row.blob import Blob + +# Inline bytes → BlobData (no file_io required) +blob = Blob.from_bytes(b'hello') + +# Descriptor bytes → BlobRef (lazy; requires file_io to resolve the URI) +file_io = table.file_io +blob = Blob.from_bytes(descriptor_bytes, file_io) + +data = blob.to_data() +``` + +The factory auto-dispatches based on the bytes content (BLOBDESC magic +header). This mirrors Java's `Blob.fromBytes(...)`. + +## See Also + +- [Blob Storage](../append-table/blob) — concept, storage modes, + SQL/Java API +- [Data Evolution](./data-evolution) — required for + blob tables diff --git a/paimon-python/pypaimon/read/reader/blob_descriptor_convert_reader.py b/paimon-python/pypaimon/read/reader/blob_descriptor_convert_reader.py index 9ee1a33c3841..35fe046a03ce 100644 --- a/paimon-python/pypaimon/read/reader/blob_descriptor_convert_reader.py +++ b/paimon-python/pypaimon/read/reader/blob_descriptor_convert_reader.py @@ -28,6 +28,8 @@ def __init__(self, inner: RecordBatchReader, table): self._inner = inner self._table = table self._descriptor_fields = CoreOptions.blob_descriptor_fields(table.options) + self.file_io = inner.file_io + self.blob_field_indices = inner.blob_field_indices def read_arrow_batch(self) -> Optional[RecordBatch]: import pyarrow diff --git a/paimon-python/pypaimon/read/reader/concat_batch_reader.py b/paimon-python/pypaimon/read/reader/concat_batch_reader.py index 564f015d245d..722c976a510e 100644 --- a/paimon-python/pypaimon/read/reader/concat_batch_reader.py +++ b/paimon-python/pypaimon/read/reader/concat_batch_reader.py @@ -29,9 +29,11 @@ class ConcatBatchReader(RecordBatchReader): - def __init__(self, reader_suppliers: List[Callable]): + def __init__(self, reader_suppliers: List[Callable], file_io=None, blob_field_indices=None): self.queue: collections.deque[Callable] = collections.deque(reader_suppliers) self.current_reader: Optional[RecordBatchReader] = None + self.file_io = file_io + self.blob_field_indices = blob_field_indices def read_arrow_batch(self) -> Optional[RecordBatch]: while True: diff --git a/paimon-python/pypaimon/read/reader/data_file_batch_reader.py b/paimon-python/pypaimon/read/reader/data_file_batch_reader.py index f71a8ed9ebfd..64da0cc8400e 100644 --- a/paimon-python/pypaimon/read/reader/data_file_batch_reader.py +++ b/paimon-python/pypaimon/read/reader/data_file_batch_reader.py @@ -25,7 +25,7 @@ from pypaimon.read.reader.format_blob_reader import FormatBlobReader from pypaimon.read.reader.iface.record_batch_reader import RecordBatchReader from pypaimon.schema.data_types import DataField, PyarrowFieldParser -from pypaimon.table.row.blob import Blob, BlobDescriptor +from pypaimon.table.row.blob import Blob from pypaimon.table.special_fields import SpecialFields @@ -181,28 +181,9 @@ def _blob_cell_to_data(self, value): value = self._normalize_blob_cell(value) if value is None: return None - if not isinstance(value, bytes): return value - - descriptor = self._deserialize_descriptor_or_none(value) - if descriptor is None: - return value - - try: - uri_reader = self.file_io.uri_reader_factory.create(descriptor.uri) - blob = Blob.from_descriptor(uri_reader, descriptor) - return blob.to_data() - except Exception as e: - raise RuntimeError( - "Failed to read blob bytes from descriptor URI while converting blob value." - ) from e - - @staticmethod - def _deserialize_descriptor_or_none(raw: bytes): - if not BlobDescriptor.is_blob_descriptor(raw): - return None - return BlobDescriptor.deserialize(raw) + return Blob.from_bytes(value, self.file_io).to_data() def _assign_row_tracking(self, record_batch: RecordBatch) -> RecordBatch: """Assign row tracking meta fields (_ROW_ID and _SEQUENCE_NUMBER).""" diff --git a/paimon-python/pypaimon/read/reader/filter_record_batch_reader.py b/paimon-python/pypaimon/read/reader/filter_record_batch_reader.py index 518828e207b5..a6039c46a8c4 100644 --- a/paimon-python/pypaimon/read/reader/filter_record_batch_reader.py +++ b/paimon-python/pypaimon/read/reader/filter_record_batch_reader.py @@ -46,6 +46,8 @@ def __init__( self.predicate = predicate self.field_names = field_names self.schema_fields = schema_fields + self.file_io = reader.file_io + self.blob_field_indices = reader.blob_field_indices def read_arrow_batch(self) -> Optional[pa.RecordBatch]: while True: diff --git a/paimon-python/pypaimon/read/reader/iface/record_batch_reader.py b/paimon-python/pypaimon/read/reader/iface/record_batch_reader.py index 3b29383c2253..e869bd9c8d96 100644 --- a/paimon-python/pypaimon/read/reader/iface/record_batch_reader.py +++ b/paimon-python/pypaimon/read/reader/iface/record_batch_reader.py @@ -34,6 +34,9 @@ class RecordBatchReader(RecordReader): The reader that reads the pyarrow batches of records. """ + file_io = None + blob_field_indices = None + @abstractmethod def read_arrow_batch(self) -> Optional[RecordBatch]: """ @@ -61,13 +64,17 @@ def read_batch(self) -> Optional[RecordIterator[InternalRow]]: df = self.read_next_df() if df is None: return None - return InternalRowWrapperIterator(df.iter_rows(), df.width) + return InternalRowWrapperIterator( + df.iter_rows(), df.width, self.file_io, self.blob_field_indices) class InternalRowWrapperIterator(RecordIterator[InternalRow]): - def __init__(self, iterator: Iterator[tuple], width: int): + def __init__(self, iterator: Iterator[tuple], width: int, + file_io=None, blob_field_indices=None): self._iterator = iterator - self._reused_row = OffsetRow(None, 0, width) + self._reused_row = OffsetRow(None, 0, width, + file_io=file_io, + blob_field_indices=blob_field_indices) def next(self) -> Optional[InternalRow]: row_tuple = next(self._iterator, None) diff --git a/paimon-python/pypaimon/read/reader/outer_projection_record_reader.py b/paimon-python/pypaimon/read/reader/outer_projection_record_reader.py index be17382a1b5f..23e67e023158 100644 --- a/paimon-python/pypaimon/read/reader/outer_projection_record_reader.py +++ b/paimon-python/pypaimon/read/reader/outer_projection_record_reader.py @@ -41,6 +41,8 @@ def __init__( inner: RecordReader[InternalRow], inner_top_names: List[str], name_paths: List[List[str]], + file_io=None, + blob_field_indices=None, ): if not name_paths: raise ValueError("name_paths must be non-empty") @@ -58,12 +60,22 @@ def __init__( self._specs.append(_PathSpec(name_to_top_idx[top_name], list(path[1:]))) self._inner = inner self._flat_arity = len(name_paths) + self._file_io = file_io + self._blob_field_indices = None + if blob_field_indices is not None: + self._blob_field_indices = { + proj_pos + for proj_pos, spec in enumerate(self._specs) + if not spec.sub_names and spec.top_idx in blob_field_indices + } def read_batch(self) -> Optional[RecordIterator[InternalRow]]: inner_batch = self._inner.read_batch() if inner_batch is None: return None - return _OuterProjectionIterator(inner_batch, self._specs, self._flat_arity) + return _OuterProjectionIterator( + inner_batch, self._specs, self._flat_arity, self._file_io, + self._blob_field_indices) def close(self) -> None: self._inner.close() @@ -77,11 +89,15 @@ def __init__( inner: RecordIterator[InternalRow], specs: List["_PathSpec"], flat_arity: int, + file_io=None, + blob_field_indices=None, ): self._inner = inner self._specs = specs self._flat_arity = flat_arity - self._reused_row = OffsetRow(None, 0, flat_arity) + self._reused_row = OffsetRow(None, 0, flat_arity, + file_io=file_io, + blob_field_indices=blob_field_indices) def next(self) -> Optional[InternalRow]: inner_row = self._inner.next() diff --git a/paimon-python/pypaimon/read/reader/row_range_filter_record_reader.py b/paimon-python/pypaimon/read/reader/row_range_filter_record_reader.py index 8f747a7e719e..5f97fb4ad566 100644 --- a/paimon-python/pypaimon/read/reader/row_range_filter_record_reader.py +++ b/paimon-python/pypaimon/read/reader/row_range_filter_record_reader.py @@ -32,6 +32,8 @@ def __init__(self, reader: RecordBatchReader, first_row_id: int, row_id_ranges: self.reader = reader self.current_row_id = first_row_id self.row_id_ranges = row_id_ranges + self.file_io = reader.file_io + self.blob_field_indices = reader.blob_field_indices def read_arrow_batch(self) -> Optional[RecordBatch]: while True: diff --git a/paimon-python/pypaimon/read/split_read.py b/paimon-python/pypaimon/read/split_read.py index c90d406019a9..b6b81028c5d0 100644 --- a/paimon-python/pypaimon/read/split_read.py +++ b/paimon-python/pypaimon/read/split_read.py @@ -71,6 +71,11 @@ _COMPRESS_EXTENSIONS = frozenset(['gz', 'bz2', 'deflate', 'snappy', 'lz4', 'zst']) +def _blob_field_indices(fields: List[DataField]) -> set: + return {i for i, f in enumerate(fields) + if hasattr(f.type, 'type') and f.type.type == 'BLOB'} + + def format_identifier(file_name): idx = file_name.rfind('.') assert idx != -1, "%s is not a legal file name." % file_name @@ -572,7 +577,9 @@ def create_reader(self) -> RecordReader: if not data_readers: return EmptyFileRecordReader() - concat_reader = ConcatBatchReader(data_readers) + concat_reader = ConcatBatchReader( + data_readers, file_io=self.table.file_io, + blob_field_indices=_blob_field_indices(self.read_fields)) # if the table is appendonly table, we don't need extra filter, all predicates has pushed down if self.table.is_primary_key_table and self.predicate_for_reader: return FilterRecordReader(concat_reader, self.predicate_for_reader) @@ -673,9 +680,12 @@ def create_reader(self) -> RecordReader: if self.outer_extract_name_paths: from pypaimon.read.reader.outer_projection_record_reader import \ OuterProjectionRecordReader - inner_top_names = [f.name for f in self.read_fields[-self.value_arity:]] + inner_value_fields = self.read_fields[-self.value_arity:] reader = OuterProjectionRecordReader( - reader, inner_top_names, self.outer_extract_name_paths) + reader, [f.name for f in inner_value_fields], + self.outer_extract_name_paths, + file_io=self.table.file_io, + blob_field_indices=_blob_field_indices(inner_value_fields)) if self.limit is not None: from pypaimon.read.reader.limited_record_reader import \ LimitedRecordReader @@ -729,7 +739,9 @@ def create_reader(self) -> RecordReader: lambda files=need_merge_files: self._create_union_reader(files) ) - merge_reader = ConcatBatchReader(suppliers) + merge_reader = ConcatBatchReader( + suppliers, file_io=self.table.file_io, + blob_field_indices=_blob_field_indices(self.read_fields)) if self.predicate_for_reader is not None: reader = FilterRecordBatchReader( merge_reader, diff --git a/paimon-python/pypaimon/table/row/binary_row.py b/paimon-python/pypaimon/table/row/binary_row.py index f6122bd3683b..8eaaf8525786 100644 --- a/paimon-python/pypaimon/table/row/binary_row.py +++ b/paimon-python/pypaimon/table/row/binary_row.py @@ -51,6 +51,15 @@ def get_field(self, index: int) -> Any: self.arity), index, self.fields[index].type) + def get_blob(self, pos: int): + from pypaimon.table.row.blob import Blob + value = self.get_field(pos) + if value is None: + return None + if isinstance(value, Blob): + return value + raise TypeError(f"Cannot get Blob from {type(value)} at position {pos}") + def get_row_kind(self) -> RowKind: return self.row_kind diff --git a/paimon-python/pypaimon/table/row/blob.py b/paimon-python/pypaimon/table/row/blob.py index b619b6a76aec..19a932a9f921 100644 --- a/paimon-python/pypaimon/table/row/blob.py +++ b/paimon-python/pypaimon/table/row/blob.py @@ -276,6 +276,26 @@ def from_file(file_io, file_path: str, offset: int, length: int) -> 'Blob': def from_descriptor(uri_reader: UriReader, descriptor: BlobDescriptor) -> 'Blob': return BlobRef(uri_reader, descriptor) + @staticmethod + def from_bytes(data: Optional[bytes], file_io=None, allow_blob_data: bool = True) -> Optional['Blob']: + if data is None: + return None + if not isinstance(data, (bytes, bytearray)): + raise TypeError(f"Blob.from_bytes expects bytes, got {type(data)}") + data = bytes(data) + is_descriptor = BlobDescriptor.is_blob_descriptor(data) + if not allow_blob_data and not is_descriptor: + raise ValueError( + "Expected BlobDescriptor bytes, got raw bytes (allow_blob_data=False)" + ) + if is_descriptor: + if file_io is None: + raise ValueError("file_io is required to resolve BlobDescriptor bytes") + descriptor = BlobDescriptor.deserialize(data) + uri_reader = file_io.uri_reader_factory.create(descriptor.uri) + return BlobRef(uri_reader, descriptor) + return BlobData(data) + class BlobData(Blob): diff --git a/paimon-python/pypaimon/table/row/generic_row.py b/paimon-python/pypaimon/table/row/generic_row.py index 79f898d1f2e2..0c2458eff9d5 100644 --- a/paimon-python/pypaimon/table/row/generic_row.py +++ b/paimon-python/pypaimon/table/row/generic_row.py @@ -114,6 +114,15 @@ def get_field(self, pos: int) -> Any: raise IndexError(f"Position {pos} is out of bounds for row arity {len(self.values)}") return self.values[pos] + def get_blob(self, pos: int): + from pypaimon.table.row.blob import Blob + value = self.get_field(pos) + if value is None: + return None + if isinstance(value, Blob): + return value + raise TypeError(f"Cannot get Blob from {type(value)} at position {pos}") + def get_row_kind(self) -> RowKind: return self.row_kind diff --git a/paimon-python/pypaimon/table/row/internal_row.py b/paimon-python/pypaimon/table/row/internal_row.py index ec89a743772a..50d08811d70d 100644 --- a/paimon-python/pypaimon/table/row/internal_row.py +++ b/paimon-python/pypaimon/table/row/internal_row.py @@ -16,7 +16,7 @@ # under the License. from abc import ABC, abstractmethod -from typing import Any +from typing import Any, Optional from pypaimon.table.row.row_kind import RowKind @@ -45,6 +45,10 @@ def __len__(self) -> int: The number does not include RowKind. It is kept separately. """ + @abstractmethod + def get_blob(self, pos: int) -> Optional[Any]: + """Returns the Blob at the given position, or None if null.""" + def __str__(self) -> str: fields = [] for pos in range(self.__len__()): diff --git a/paimon-python/pypaimon/table/row/offset_row.py b/paimon-python/pypaimon/table/row/offset_row.py index a9f02b18678a..1705ae130b8c 100644 --- a/paimon-python/pypaimon/table/row/offset_row.py +++ b/paimon-python/pypaimon/table/row/offset_row.py @@ -15,7 +15,7 @@ # specific language governing permissions and limitations # under the License. -from typing import Optional +from typing import FrozenSet, Iterable, Optional from pypaimon.table.row.internal_row import InternalRow, RowKind @@ -23,11 +23,16 @@ class OffsetRow(InternalRow): """A InternalRow to wrap row with offset.""" - def __init__(self, row_tuple: Optional[tuple], offset: int, arity: int): + def __init__(self, row_tuple: Optional[tuple], offset: int, arity: int, + file_io=None, blob_field_indices: Optional[Iterable[int]] = None): self.row_tuple = row_tuple self.offset = offset self.arity = arity self.row_kind_byte: int = 1 + self._file_io = file_io + self._blob_field_indices: FrozenSet[int] = ( + frozenset(blob_field_indices) if blob_field_indices is not None else frozenset() + ) def replace(self, row_tuple: tuple) -> 'OffsetRow': self.row_tuple = row_tuple @@ -46,6 +51,13 @@ def get_field(self, pos: int): raise IndexError(f"Position {pos} is out of bounds for row arity {self.arity}") return self.row_tuple[self.offset + pos] + def get_blob(self, pos: int): + from pypaimon.table.row.blob import Blob + + if pos not in self._blob_field_indices: + raise TypeError(f"Field at position {pos} is not a BLOB field") + return Blob.from_bytes(self.get_field(pos), self._file_io) + def get_row_kind(self) -> RowKind: return RowKind(self.row_kind_byte) diff --git a/paimon-python/pypaimon/table/row/projected_row.py b/paimon-python/pypaimon/table/row/projected_row.py index 0af44d9502e8..5d7cbd375e4e 100644 --- a/paimon-python/pypaimon/table/row/projected_row.py +++ b/paimon-python/pypaimon/table/row/projected_row.py @@ -48,6 +48,12 @@ def get_field(self, pos: int) -> Any: return None return self.row.get_field(self.index_mapping[pos]) + def get_blob(self, pos: int): + """Returns the Blob at the projected position; delegates to the inner row.""" + if self.index_mapping[pos] < 0: + return None + return self.row.get_blob(self.index_mapping[pos]) + def get_row_kind(self) -> RowKind: """Returns the kind of change that this row describes in a changelog.""" return self.row.get_row_kind() diff --git a/paimon-python/pypaimon/tests/blob_table_test.py b/paimon-python/pypaimon/tests/blob_table_test.py index 95f89ff8de32..c4e5a4d1bd3f 100755 --- a/paimon-python/pypaimon/tests/blob_table_test.py +++ b/paimon-python/pypaimon/tests/blob_table_test.py @@ -3233,5 +3233,260 @@ def test_rename_blob_column_should_fail(self): self.assertIn('Cannot rename BLOB column', str(ctx.exception)) +class GetBlobTest(unittest.TestCase): + + @classmethod + def setUpClass(cls): + cls.temp_dir = tempfile.mkdtemp() + cls.warehouse = os.path.join(cls.temp_dir, 'warehouse') + cls.catalog = CatalogFactory.create({'warehouse': cls.warehouse}) + cls.catalog.create_database('test_db', False) + + pa_schema = pa.schema([ + ('id', pa.int32()), + ('name', pa.string()), + ('picture', pa.large_binary()), + ]) + schema = Schema.from_pyarrow_schema(pa_schema, options={ + 'row-tracking.enabled': 'true', + 'data-evolution.enabled': 'true', + }) + cls.catalog.create_table('test_db.get_blob_test', schema, False) + cls.table = cls.catalog.get_table('test_db.get_blob_test') + + data = pa.Table.from_pydict({ + 'id': [1, 2, 3], + 'name': ['a', 'b', 'c'], + 'picture': [b'img_data_1', b'img_data_2', b'img_data_3'], + }, schema=pa_schema) + + write_builder = cls.table.new_batch_write_builder() + writer = write_builder.new_write() + writer.write_arrow(data) + commit_messages = writer.prepare_commit() + commit = write_builder.new_commit() + commit.commit(commit_messages) + writer.close() + + @classmethod + def tearDownClass(cls): + shutil.rmtree(cls.temp_dir, ignore_errors=True) + + def test_get_blob_access(self): + read_builder = self.table.new_read_builder() + splits = read_builder.new_scan().plan().splits() + read = read_builder.new_read() + + results = [] + for row in read.to_iterator(splits): + blob = row.get_blob(2) + self.assertIsNotNone(blob) + results.append((row.get_field(0), blob.to_data())) + + self.assertEqual(len(results), 3) + results.sort(key=lambda x: x[0]) + self.assertEqual(results[0], (1, b'img_data_1')) + self.assertEqual(results[1], (2, b'img_data_2')) + self.assertEqual(results[2], (3, b'img_data_3')) + + def test_get_blob_streaming(self): + read_builder = self.table.new_read_builder() + splits = read_builder.new_scan().plan().splits() + read = read_builder.new_read() + + results = [] + for row in read.to_iterator(splits): + with row.get_blob(2).new_input_stream() as stream: + results.append((row.get_field(0), stream.read())) + self.assertEqual(len(results), 3) + results.sort(key=lambda x: x[0]) + self.assertEqual(results[0], (1, b'img_data_1')) + self.assertEqual(results[1], (2, b'img_data_2')) + self.assertEqual(results[2], (3, b'img_data_3')) + + def test_get_blob_non_blob_field_raises(self): + read_builder = self.table.new_read_builder() + splits = read_builder.new_scan().plan().splits() + read = read_builder.new_read() + + for row in read.to_iterator(splits): + with self.assertRaises(TypeError): + row.get_blob(0) + break + + def test_to_iterator_yields_all_rows(self): + read_builder = self.table.new_read_builder() + splits = read_builder.new_scan().plan().splits() + read = read_builder.new_read() + + count = 0 + for row in read.to_iterator(splits): + self.assertIsNotNone(row.get_field(0)) + self.assertIsNotNone(row.get_field(1)) + count += 1 + self.assertEqual(count, 3) + + +class GetBlobMultiColumnTest(unittest.TestCase): + + @classmethod + def setUpClass(cls): + cls.temp_dir = tempfile.mkdtemp() + cls.catalog = CatalogFactory.create({'warehouse': os.path.join(cls.temp_dir, 'warehouse')}) + cls.catalog.create_database('test_db', False) + + pa_schema = pa.schema([ + ('id', pa.int32()), + ('cover', pa.large_binary()), + ('thumb', pa.large_binary()), + ]) + schema = Schema.from_pyarrow_schema(pa_schema, options={ + 'row-tracking.enabled': 'true', + 'data-evolution.enabled': 'true', + }) + cls.catalog.create_table('test_db.get_blob_multi', schema, False) + cls.table = cls.catalog.get_table('test_db.get_blob_multi') + + write_builder = cls.table.new_batch_write_builder() + writer = write_builder.new_write() + writer.write_arrow(pa.Table.from_pydict({ + 'id': [1, 2], + 'cover': [b'cover_1', b'cover_2'], + 'thumb': [b'thumb_1', b'thumb_2'], + }, schema=pa_schema)) + write_builder.new_commit().commit(writer.prepare_commit()) + writer.close() + + @classmethod + def tearDownClass(cls): + shutil.rmtree(cls.temp_dir, ignore_errors=True) + + def test_get_blob_each_column_independent(self): + read_builder = self.table.new_read_builder() + splits = read_builder.new_scan().plan().splits() + read = read_builder.new_read() + + results = [] + for row in read.to_iterator(splits): + results.append((row.get_field(0), + row.get_blob(1).to_data(), + row.get_blob(2).to_data())) + results.sort(key=lambda x: x[0]) + self.assertEqual(results, [(1, b'cover_1', b'thumb_1'), + (2, b'cover_2', b'thumb_2')]) + + +class GetBlobThroughDescriptorConvertReaderTest(unittest.TestCase): + """Pin BlobDescriptorConvertReader's file_io / blob_field_indices + propagation. Configuring blob-descriptor-field puts the wrapper on the + read chain; reading via to_iterator() + row.get_blob() is the only path + that consumes those propagated attributes. Regressions on either line + would silently pass the to_arrow()-based blob_descriptor tests.""" + + @classmethod + def setUpClass(cls): + cls.temp_dir = tempfile.mkdtemp() + cls.catalog = CatalogFactory.create({'warehouse': os.path.join(cls.temp_dir, 'warehouse')}) + cls.catalog.create_database('test_db', False) + + pa_schema = pa.schema([ + ('id', pa.int32()), + ('pic1', pa.large_binary()), + ('pic2', pa.large_binary()), + ]) + schema = Schema.from_pyarrow_schema(pa_schema, options={ + 'row-tracking.enabled': 'true', + 'data-evolution.enabled': 'true', + 'blob-descriptor-field': 'pic2', + }) + cls.catalog.create_table('test_db.get_blob_via_descriptor_convert', schema, False) + cls.table = cls.catalog.get_table('test_db.get_blob_via_descriptor_convert') + + from pypaimon.table.row.blob import BlobDescriptor + cls.pic1_data = b'inline_pic1_payload' + cls.pic2_data = b'external_pic2_payload' + cls.pic2_path = os.path.join(cls.temp_dir, 'pic2_external_blob') + with open(cls.pic2_path, 'wb') as f: + f.write(cls.pic2_data) + + write_builder = cls.table.new_batch_write_builder() + writer = write_builder.new_write() + writer.write_arrow(pa.Table.from_pydict({ + 'id': [1], + 'pic1': [cls.pic1_data], + 'pic2': [BlobDescriptor(cls.pic2_path, 0, len(cls.pic2_data)).serialize()], + }, schema=pa_schema)) + write_builder.new_commit().commit(writer.prepare_commit()) + writer.close() + + @classmethod + def tearDownClass(cls): + shutil.rmtree(cls.temp_dir, ignore_errors=True) + + def test_get_blob_resolves_through_descriptor_convert_reader(self): + read_builder = self.table.new_read_builder() + splits = read_builder.new_scan().plan().splits() + read = read_builder.new_read() + + for row in read.to_iterator(splits): + self.assertEqual(row.get_blob(1).to_data(), self.pic1_data) + self.assertEqual(row.get_blob(2).to_data(), self.pic2_data) + break + + +class GetBlobNonBlobColumnSecurityTest(unittest.TestCase): + """SSRF defence: get_blob on a non-BLOB column must NOT resolve a + descriptor URI even when the cell starts with the BLOBDESC magic header.""" + + @classmethod + def setUpClass(cls): + cls.temp_dir = tempfile.mkdtemp() + cls.catalog = CatalogFactory.create({'warehouse': os.path.join(cls.temp_dir, 'warehouse')}) + cls.catalog.create_database('test_db', False) + + pa_schema = pa.schema([ + ('id', pa.int32()), + ('payload', pa.binary()), + ]) + schema = Schema.from_pyarrow_schema(pa_schema) + cls.catalog.create_table('test_db.no_blob_col', schema, False) + cls.table = cls.catalog.get_table('test_db.no_blob_col') + + from pypaimon.table.row.blob import BlobDescriptor + attacker_bytes = BlobDescriptor( + "/etc/should-never-be-read-by-paimon", 0, 32 + ).serialize() + + write_builder = cls.table.new_batch_write_builder() + writer = write_builder.new_write() + writer.write_arrow(pa.Table.from_pydict({ + 'id': [1], + 'payload': [attacker_bytes], + }, schema=pa_schema)) + write_builder.new_commit().commit(writer.prepare_commit()) + writer.close() + + @classmethod + def tearDownClass(cls): + shutil.rmtree(cls.temp_dir, ignore_errors=True) + + def test_get_blob_on_non_blob_column_with_magic_bytes_raises(self): + from unittest.mock import patch + + read_builder = self.table.new_read_builder() + splits = read_builder.new_scan().plan().splits() + read = read_builder.new_read() + + with patch.object( + self.table.file_io.uri_reader_factory, 'create', + side_effect=AssertionError("URI resolution must not happen on non-BLOB column") + ) as mock_create: + for row in read.to_iterator(splits): + with self.assertRaises(TypeError): + row.get_blob(1) + break + mock_create.assert_not_called() + + if __name__ == '__main__': unittest.main() diff --git a/paimon-python/pypaimon/tests/blob_test.py b/paimon-python/pypaimon/tests/blob_test.py index d9cf64210d61..db0c63988863 100644 --- a/paimon-python/pypaimon/tests/blob_test.py +++ b/paimon-python/pypaimon/tests/blob_test.py @@ -134,6 +134,38 @@ def test_from_http(self): self.assertEqual(descriptor.offset, 0) self.assertEqual(descriptor.length, -1) + def test_from_bytes_with_raw_data(self): + raw = b"hello blob" + blob = Blob.from_bytes(raw) + self.assertIsInstance(blob, BlobData) + self.assertEqual(blob.to_data(), raw) + + def test_from_bytes_with_none(self): + self.assertIsNone(Blob.from_bytes(None)) + + def test_from_bytes_with_descriptor(self): + from pypaimon.common.file_io import FileIO + data = b"actual blob content" + with tempfile.TemporaryDirectory() as tmp_dir: + blob_path = os.path.join(tmp_dir, "blob.bin") + with open(blob_path, 'wb') as f: + f.write(data) + descriptor = BlobDescriptor(blob_path, 0, len(data)) + file_io = FileIO.get(f"file://{tmp_dir}", {}) + blob = Blob.from_bytes(descriptor.serialize(), file_io) + self.assertIsInstance(blob, BlobRef) + self.assertEqual(blob.to_data(), data) + + def test_from_bytes_descriptor_without_file_io_raises(self): + descriptor = BlobDescriptor("/tmp/fake", 0, 10) + serialized = descriptor.serialize() + with self.assertRaises(ValueError): + Blob.from_bytes(serialized) + + def test_from_bytes_invalid_type_raises(self): + with self.assertRaises(TypeError): + Blob.from_bytes(12345) + def test_blob_data_interface_compliance(self): """Test that BlobData properly implements Blob interface.""" test_data = b"interface test data"