Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/paimon-python-checks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ jobs:
python -m pip install -q pyroaring readerwriterlock==1.0.9 'fsspec==2021.10.1' 'cachetools==4.2.4' 'ossfs==2021.8.0' pyarrow==6.0.1 pandas==1.1.5 'polars==0.9.12' 'fastavro==1.4.7' zstandard==0.19.0 dataclasses==0.8.0 flake8 pytest py4j==0.10.9.9 requests parameterized==0.8.1 2>&1 >/dev/null
else
python -m pip install --upgrade pip
python -m pip install -q pyroaring readerwriterlock==1.0.9 fsspec==2024.3.1 cachetools==5.3.3 ossfs==2023.12.0 ray==2.48.0 fastavro==1.11.1 pyarrow==16.0.0 zstandard==0.24.0 polars==1.32.0 duckdb==1.3.2 numpy==1.24.3 pandas==2.0.3 pylance==0.39.0 flake8==4.0.1 pytest~=7.0 py4j==0.10.9.9 requests parameterized==0.9.0 2>&1 >/dev/null
python -m pip install -q portion pyroaring readerwriterlock==1.0.9 fsspec==2024.3.1 cachetools==5.3.3 ossfs==2023.12.0 ray==2.48.0 fastavro==1.11.1 pyarrow==16.0.0 zstandard==0.24.0 polars==1.32.0 duckdb==1.3.2 numpy==1.24.3 pandas==2.0.3 pylance==0.39.0 flake8==4.0.1 pytest~=7.0 py4j==0.10.9.9 requests parameterized==0.9.0 2>&1 >/dev/null
fi
- name: Run lint-python.sh
shell: bash
Expand Down
4 changes: 2 additions & 2 deletions paimon-python/dev/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ pandas>=1.1,<2; python_version < "3.7"
pandas>=1.3,<3; python_version >= "3.7" and python_version < "3.9"
pandas>=1.5,<3; python_version >= "3.9"
polars>=0.9,<1; python_version<"3.8"
polars>=1,<2; python_version=="3.8"
polars>=1,<2; python_version>"3.8"
polars>=1,<2; python_version>="3.8"
portion; python_version>="3.9"
pyarrow>=6,<7; python_version < "3.8"
pyarrow>=16,<20; python_version >= "3.8" and python_version < "3.13"
pyarrow>=16,<20; python_version >= "3.13"
Expand Down
5 changes: 5 additions & 0 deletions paimon-python/pypaimon/common/options/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,3 +78,8 @@ class CatalogOptions:
HTTP_USER_AGENT_HEADER = ConfigOptions.key(
"header.HTTP_USER_AGENT").string_type().no_default_value().with_description("HTTP User Agent header")
BLOB_FILE_IO_DEFAULT_CACHE_SIZE = 2 ** 31 - 1


class DataTypeOptions:
LONG_MIN_VALUE = -(1 << 63)
LONG_MAX_VALUE = (1 << 63) - 1
78 changes: 78 additions & 0 deletions paimon-python/pypaimon/common/range.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from typing import Optional

import portion


class Range:
"""
A range class based on the portion library for interval operations.

This class wraps portion.Interval to provide range operations like
intersection checking for row ID ranges.
"""

def __init__(self, start, end):
"""
Create a closed range [start, end].

Args:
start: The start value of the range (inclusive)
end: The end value of the range (inclusive)
"""
self.start = start
self.end = end
# Create a closed interval [start, end]
self._interval = portion.closed(start, end)

@staticmethod
def intersection(range1: 'Range', range2: 'Range') -> Optional['Range']:
"""
Calculate the intersection of two ranges.

Args:
range1: The first Range object
range2: The second Range object

Returns:
A new Range object representing the intersection, or None if ranges don't overlap
"""
if range1 is None or range2 is None:
return None

# Calculate intersection using portion
intersect = range1._interval & range2._interval

# If intersection is empty, return None
if intersect.empty:
return None

# Extract the bounds from the intersection
# portion returns an Interval which may contain multiple atomic intervals
# For our use case, we expect a single atomic interval
if len(intersect) > 0:
atomic = list(intersect)[0]
return Range(atomic.lower, atomic.upper)

return None

def __repr__(self):
return f"Range({self.start}, {self.end})"

def __str__(self):
return f"[{self.start}, {self.end}]"
4 changes: 4 additions & 0 deletions paimon-python/pypaimon/manifest/manifest_list_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ def read(self, manifest_list_name: str) -> List[ManifestFileMeta]:
num_deleted_files=record['_NUM_DELETED_FILES'],
partition_stats=partition_stats,
schema_id=record['_SCHEMA_ID'],
min_row_id=record['_MIN_ROW_ID'],
max_row_id=record['_MAX_ROW_ID'],
)
manifest_files.append(manifest_file_meta)

Expand All @@ -99,6 +101,8 @@ def write(self, file_name, manifest_file_metas: List[ManifestFileMeta]):
"_NULL_COUNTS": meta.partition_stats.null_counts,
},
"_SCHEMA_ID": meta.schema_id,
"_MIN_ROW_ID": meta.min_row_id,
"_MAX_ROW_ID": meta.max_row_id,
}
avro_records.append(avro_record)

Expand Down
4 changes: 4 additions & 0 deletions paimon-python/pypaimon/manifest/schema/manifest_file_meta.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ class ManifestFileMeta:
num_deleted_files: int
partition_stats: SimpleStats
schema_id: int
min_row_id: int = None
max_row_id: int = None


MANIFEST_FILE_META_SCHEMA = {
Expand All @@ -43,5 +45,7 @@ class ManifestFileMeta:
{"name": "_NUM_DELETED_FILES", "type": "long"},
{"name": "_PARTITION_STATS", "type": PARTITION_STATS_SCHEMA},
{"name": "_SCHEMA_ID", "type": "long"},
{"name": "_MIN_ROW_ID", "type": ["null", "long"]},
{"name": "_MAX_ROW_ID", "type": ["null", "long"]},
]
}
49 changes: 45 additions & 4 deletions paimon-python/pypaimon/read/scanner/full_starting_scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from collections import defaultdict
from typing import Callable, List, Optional, Dict, Set

from pypaimon.common.range import Range
from pypaimon.common.predicate import Predicate
from pypaimon.table.source.deletion_file import DeletionFile
from pypaimon.table.row.generic_row import GenericRow
Expand Down Expand Up @@ -65,6 +66,7 @@ def __init__(self, table, predicate: Optional[Predicate], limit: Optional[int]):
self.number_of_para_subtasks = None
self.start_row_of_this_subtask = None
self.end_row_of_this_subtask = None
self.row_ranges: List[Range] = None

self.only_read_real_buckets = True if options.bucket() == BucketMode.POSTPONE_BUCKET.value else False
self.data_evolution = options.data_evolution_enabled()
Expand Down Expand Up @@ -129,7 +131,7 @@ def with_shard(self, idx_of_this_subtask, number_of_para_subtasks) -> 'FullStart
self.number_of_para_subtasks = number_of_para_subtasks
return self

def with_row_range(self, start_row, end_row) -> 'FullStartingScanner':
def with_row_shard(self, start_row, end_row) -> 'FullStartingScanner':
if start_row >= end_row:
raise Exception("start_row must be less than end_row")
if self.idx_of_this_subtask is not None:
Expand All @@ -138,6 +140,13 @@ def with_row_range(self, start_row, end_row) -> 'FullStartingScanner':
self.end_row_of_this_subtask = end_row
return self

def with_row_ranges(self, row_ranges) -> 'FullStartingScanner':
"""
Filter manifest files by row id ranges.
"""
self.row_ranges = row_ranges
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You introduced the ranges, so please remove start_row_of_this_subtask and end_row_of_this_subtask in this class.

return self

def _append_only_filter_by_row_range(self, partitioned_files: defaultdict,
start_row: int,
end_row: int) -> (defaultdict, int, int):
Expand Down Expand Up @@ -340,10 +349,42 @@ def _apply_push_down_limit(self, splits: List[Split]) -> List[Split]:

def _filter_manifest_file(self, file: ManifestFileMeta) -> bool:
if not self.partition_key_predicate:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

extract a method: _filter_manifest_by_partition_predicate.

return self._filter_manifest_by_row_ranges(file)
if not self.partition_key_predicate.test_by_simple_stats(
file.partition_stats,
file.num_added_files + file.num_deleted_files):
return False
else:
return self._filter_manifest_by_row_ranges(file)

def _filter_manifest_by_row_ranges(self, manifest: ManifestFileMeta) -> bool:
"""
Filter manifest file by row ranges.

Args:
manifest: The manifest file metadata to filter

Returns:
True if the manifest should be included, False otherwise
"""
if self.row_ranges is None:
return True

min_row_id = manifest.min_row_id
max_row_id = manifest.max_row_id

if min_row_id is None or max_row_id is None:
return True
return self.partition_key_predicate.test_by_simple_stats(
file.partition_stats,
file.num_added_files + file.num_deleted_files)

# Create a Range object for the manifest's row range
manifest_row_range = Range(min_row_id, max_row_id)

# Check if manifest range intersects with any expected range
for row_range in self.row_ranges:
if Range.intersection(row_range, manifest_row_range) is not None:
return True

return False

def _filter_manifest_entry(self, entry: ManifestEntry) -> bool:
if self.only_read_real_buckets and entry.bucket < 0:
Expand Down
16 changes: 16 additions & 0 deletions paimon-python/pypaimon/read/scanner/starting_scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,19 @@ class StartingScanner(ABC):
@abstractmethod
def scan(self) -> Plan:
"""Plan the files to read."""

def with_shard(self, idx_of_this_subtask, number_of_para_subtasks) -> 'TableScan':
"""
Filter file entries according to the id of the task
"""

def with_row_shard(self, start_row, end_row) -> 'TableScan':
"""
Filter file entries by row idx range. The row idx corresponds to the row position of the
file in all file entries in table scan's partitioned_files.
"""

def with_row_ranges(self, row_ranges) -> 'TableScan':
"""
Filter manifest files by row id ranges.
"""
12 changes: 6 additions & 6 deletions paimon-python/pypaimon/read/table_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,10 @@ def with_shard(self, idx_of_this_subtask, number_of_para_subtasks) -> 'TableScan
self.starting_scanner.with_shard(idx_of_this_subtask, number_of_para_subtasks)
return self

def with_row_range(self, start_row, end_row) -> 'TableScan':
"""
Filter file entries by row range. The row_id corresponds to the row position of the
file in all file entries in table scan's partitioned_files.
"""
self.starting_scanner.with_row_range(start_row, end_row)
def with_row_shard(self, start_row, end_row) -> 'TableScan':
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

keep with_row_range.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok.

self.starting_scanner.with_row_shard(start_row, end_row)
return self

def with_row_ranges(self, row_ranges) -> 'TableScan':
self.starting_scanner.with_row_ranges(row_ranges)
return self
8 changes: 4 additions & 4 deletions paimon-python/pypaimon/tests/blob_table_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -2373,7 +2373,7 @@ def test_blob_large_data_volume_with_shard(self):
actual = pa.concat_tables([actual1, actual2, actual3]).sort_by('id')
self.assertEqual(actual, expected)

def test_data_blob_writer_with_row_range(self):
def test_data_blob_writer_with_row_shard(self):
"""Test DataBlobWriter with mixed data types in blob column."""

# Create schema with blob column
Expand All @@ -2390,8 +2390,8 @@ def test_data_blob_writer_with_row_range(self):
'data-evolution.enabled': 'true'
}
)
self.catalog.create_table('test_db.with_row_range_test', schema, False)
table = self.catalog.get_table('test_db.with_row_range_test')
self.catalog.create_table('test_db.with_row_shard_test', schema, False)
table = self.catalog.get_table('test_db.with_row_shard_test')

# Use proper table API to create writer
write_builder = table.new_batch_write_builder()
Expand Down Expand Up @@ -2425,7 +2425,7 @@ def test_data_blob_writer_with_row_range(self):

# Read data back using table API
read_builder = table.new_read_builder()
table_scan = read_builder.new_scan().with_row_range(2, 4)
table_scan = read_builder.new_scan().with_row_shard(2, 4)
table_read = read_builder.new_read()
splits = table_scan.plan().splits()
result = table_read.to_arrow(splits)
Expand Down
Loading
Loading