Skip to content

Commit b9ccee6

Browse files
authored
[python] Support read blob row by offsets in with_shard feature (#6863)
1 parent c222436 commit b9ccee6

File tree

10 files changed

+366
-194
lines changed

10 files changed

+366
-194
lines changed

paimon-python/pypaimon/read/reader/concat_batch_reader.py

Lines changed: 68 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -53,36 +53,6 @@ def close(self) -> None:
5353
self.queue.clear()
5454

5555

56-
class ShardBatchReader(ConcatBatchReader):
57-
58-
def __init__(self, readers, split_start_row, split_end_row):
59-
super().__init__(readers)
60-
self.split_start_row = split_start_row
61-
self.split_end_row = split_end_row
62-
self.cur_end = 0
63-
64-
def read_arrow_batch(self) -> Optional[RecordBatch]:
65-
batch = super().read_arrow_batch()
66-
if batch is None:
67-
return None
68-
if self.split_start_row is not None or self.split_end_row is not None:
69-
cur_begin = self.cur_end # begin idx of current batch based on the split
70-
self.cur_end += batch.num_rows
71-
# shard the first batch and the last batch
72-
if self.split_start_row <= cur_begin < self.cur_end <= self.split_end_row:
73-
return batch
74-
elif cur_begin <= self.split_start_row < self.cur_end:
75-
return batch.slice(self.split_start_row - cur_begin,
76-
min(self.split_end_row, self.cur_end) - self.split_start_row)
77-
elif cur_begin < self.split_end_row <= self.cur_end:
78-
return batch.slice(0, self.split_end_row - cur_begin)
79-
else:
80-
# return empty RecordBatch if the batch size has not reached split_start_row
81-
return pa.RecordBatch.from_arrays([], [])
82-
else:
83-
return batch
84-
85-
8656
class MergeAllBatchReader(RecordBatchReader):
8757
"""
8858
A reader that accepts multiple reader suppliers and concatenates all their arrow batches
@@ -98,13 +68,18 @@ def __init__(self, reader_suppliers: List[Callable], batch_size: int = 4096):
9868

9969
def read_arrow_batch(self) -> Optional[RecordBatch]:
10070
if self.reader:
101-
return self.reader.read_next_batch()
71+
try:
72+
return self.reader.read_next_batch()
73+
except StopIteration:
74+
return None
10275

10376
all_batches = []
10477

10578
# Read all batches from all reader suppliers
10679
for supplier in self.reader_suppliers:
10780
reader = supplier()
81+
if reader is None:
82+
continue
10883
try:
10984
while True:
11085
batch = reader.read_arrow_batch()
@@ -149,3 +124,65 @@ def read_arrow_batch(self) -> Optional[RecordBatch]:
149124
def close(self) -> None:
150125
self.merged_batch = None
151126
self.reader = None
127+
128+
129+
class DataEvolutionMergeReader(RecordBatchReader):
130+
"""
131+
This is a union reader which contains multiple inner readers, Each reader is responsible for reading one file.
132+
133+
This reader, assembling multiple reader into one big and great reader, will merge the batches from all readers.
134+
135+
For example, if rowOffsets is {0, 2, 0, 1, 2, 1} and fieldOffsets is {0, 0, 1, 1, 1, 0}, it means:
136+
- The first field comes from batch0, and it is at offset 0 in batch0.
137+
- The second field comes from batch2, and it is at offset 0 in batch2.
138+
- The third field comes from batch0, and it is at offset 1 in batch0.
139+
- The fourth field comes from batch1, and it is at offset 1 in batch1.
140+
- The fifth field comes from batch2, and it is at offset 1 in batch2.
141+
- The sixth field comes from batch1, and it is at offset 0 in batch1.
142+
"""
143+
144+
def __init__(self, row_offsets: List[int], field_offsets: List[int], readers: List[Optional[RecordBatchReader]]):
145+
if row_offsets is None:
146+
raise ValueError("Row offsets must not be null")
147+
if field_offsets is None:
148+
raise ValueError("Field offsets must not be null")
149+
if len(row_offsets) != len(field_offsets):
150+
raise ValueError("Row offsets and field offsets must have the same length")
151+
if not row_offsets:
152+
raise ValueError("Row offsets must not be empty")
153+
if not readers or len(readers) < 1:
154+
raise ValueError("Readers should be more than 0")
155+
self.row_offsets = row_offsets
156+
self.field_offsets = field_offsets
157+
self.readers = readers
158+
159+
def read_arrow_batch(self) -> Optional[RecordBatch]:
160+
batches: List[Optional[RecordBatch]] = [None] * len(self.readers)
161+
for i, reader in enumerate(self.readers):
162+
if reader is not None:
163+
batch = reader.read_arrow_batch()
164+
if batch is None:
165+
# all readers are aligned, as long as one returns null, the others will also have no data
166+
return None
167+
batches[i] = batch
168+
# Assemble record batches from batches based on row_offsets and field_offsets
169+
columns = []
170+
names = []
171+
for i in range(len(self.row_offsets)):
172+
batch_index = self.row_offsets[i]
173+
field_index = self.field_offsets[i]
174+
if batches[batch_index] is not None:
175+
column = batches[batch_index].column(field_index)
176+
columns.append(column)
177+
names.append(batches[batch_index].schema.names[field_index])
178+
if columns:
179+
return pa.RecordBatch.from_arrays(columns, names)
180+
return None
181+
182+
def close(self) -> None:
183+
try:
184+
for reader in self.readers:
185+
if reader is not None:
186+
reader.close()
187+
except Exception as e:
188+
raise IOError("Failed to close inner readers") from e

paimon-python/pypaimon/read/reader/data_evolution_merge_reader.py

Lines changed: 0 additions & 85 deletions
This file was deleted.

paimon-python/pypaimon/read/reader/data_file_batch_reader.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,15 @@
2222
from pyarrow import RecordBatch
2323

2424
from pypaimon.read.partition_info import PartitionInfo
25+
from pypaimon.read.reader.format_blob_reader import FormatBlobReader
2526
from pypaimon.read.reader.iface.record_batch_reader import RecordBatchReader
2627
from pypaimon.schema.data_types import DataField, PyarrowFieldParser
2728
from pypaimon.table.special_fields import SpecialFields
2829

2930

3031
class DataFileBatchReader(RecordBatchReader):
3132
"""
32-
Reads record batch from data files.
33+
Reads record batch from files of different formats
3334
"""
3435

3536
def __init__(self, format_reader: RecordBatchReader, index_mapping: List[int], partition_info: PartitionInfo,
@@ -48,8 +49,11 @@ def __init__(self, format_reader: RecordBatchReader, index_mapping: List[int], p
4849
self.max_sequence_number = max_sequence_number
4950
self.system_fields = system_fields
5051

51-
def read_arrow_batch(self) -> Optional[RecordBatch]:
52-
record_batch = self.format_reader.read_arrow_batch()
52+
def read_arrow_batch(self, start_idx=None, end_idx=None) -> Optional[RecordBatch]:
53+
if isinstance(self.format_reader, FormatBlobReader):
54+
record_batch = self.format_reader.read_arrow_batch(start_idx, end_idx)
55+
else:
56+
record_batch = self.format_reader.read_arrow_batch()
5357
if record_batch is None:
5458
return None
5559

paimon-python/pypaimon/read/reader/format_blob_reader.py

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,11 @@ def __init__(self, file_io: FileIO, file_path: str, read_fields: List[str],
6363
self._blob_iterator = None
6464
self._current_batch = None
6565

66-
def read_arrow_batch(self) -> Optional[RecordBatch]:
66+
def read_arrow_batch(self, start_idx=None, end_idx=None) -> Optional[RecordBatch]:
67+
"""
68+
start_idx: start index record of the blob file
69+
end_idx: end index record of the blob file
70+
"""
6771
if self._blob_iterator is None:
6872
if self.returned:
6973
return None
@@ -73,7 +77,13 @@ def read_arrow_batch(self) -> Optional[RecordBatch]:
7377
self.blob_offsets, self._fields[0]
7478
)
7579
self._blob_iterator = iter(batch_iterator)
76-
80+
read_size = self._batch_size
81+
if start_idx is not None and end_idx is not None:
82+
if self._blob_iterator.current_position >= end_idx:
83+
return None
84+
if self._blob_iterator.current_position < start_idx:
85+
self._blob_iterator.current_position = start_idx
86+
read_size = min(end_idx - self._blob_iterator.current_position, self._batch_size)
7787
# Collect records for this batch
7888
pydict_data = {name: [] for name in self._fields}
7989
records_in_batch = 0
@@ -93,7 +103,7 @@ def read_arrow_batch(self) -> Optional[RecordBatch]:
93103
pydict_data[field_name].append(blob_data)
94104

95105
records_in_batch += 1
96-
if records_in_batch >= self._batch_size:
106+
if records_in_batch >= read_size:
97107
break
98108

99109
except StopIteration:
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
from typing import Optional
18+
19+
from pyarrow import RecordBatch
20+
from pypaimon.read.reader.iface.record_batch_reader import RecordBatchReader
21+
from pypaimon.read.reader.format_blob_reader import FormatBlobReader
22+
23+
24+
class ShardBatchReader(RecordBatchReader):
25+
"""
26+
A reader that reads a subset of rows from a data file
27+
"""
28+
def __init__(self, reader, start_row, end_row):
29+
self.reader = reader
30+
self.start_row = start_row
31+
self.end_row = end_row
32+
self.current_row = 0
33+
34+
def read_arrow_batch(self) -> Optional[RecordBatch]:
35+
# Check if reader is FormatBlobReader (blob type)
36+
if isinstance(self.reader.format_reader, FormatBlobReader):
37+
# For blob reader, pass begin_idx and end_idx parameters
38+
return self.reader.read_arrow_batch(start_idx=self.start_row, end_idx=self.end_row)
39+
else:
40+
# For non-blob reader (DataFileBatchReader), use standard read_arrow_batch
41+
batch = self.reader.read_arrow_batch()
42+
43+
if batch is None:
44+
return None
45+
46+
# Apply row range filtering for non-blob readers
47+
batch_begin = self.current_row
48+
self.current_row += batch.num_rows
49+
50+
# Check if batch is within the desired range
51+
if self.start_row <= batch_begin < self.current_row <= self.end_row: # batch is within the desired range
52+
return batch
53+
elif batch_begin < self.start_row < self.current_row: # batch starts before the desired range
54+
return batch.slice(self.start_row - batch_begin, self.end_row - self.start_row)
55+
elif batch_begin < self.end_row < self.current_row: # batch ends after the desired range
56+
return batch.slice(0, self.end_row - batch_begin)
57+
else: # batch is outside the desired range
58+
return self.read_arrow_batch()
59+
60+
def close(self):
61+
self.reader.close()

0 commit comments

Comments
 (0)