Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Random;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -142,6 +143,7 @@
import static org.apache.paimon.predicate.SortValue.NullOrdering.NULLS_LAST;
import static org.apache.paimon.predicate.SortValue.SortDirection.ASCENDING;
import static org.apache.paimon.predicate.SortValue.SortDirection.DESCENDING;
import static org.apache.paimon.table.SpecialFields.KEY_FIELD_PREFIX;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

Expand Down Expand Up @@ -248,6 +250,33 @@ public void testStatsModePerLevel(String format) throws Exception {
file = ((DataSplit) table.newScan().plan().splits().get(0)).dataFiles().get(0);
assertThat(file.level()).isEqualTo(5);
assertThat(file.valueStats().maxValues().getFieldCount()).isGreaterThan(4);

if (file.valueStatsCols() == null) {
int expectedFieldCount = table.schema().fields().size();
int actualFieldCount = file.valueStats().minValues().getFieldCount();
assertThat(actualFieldCount)
.as(
"When value_stats_cols is null, value_stats field count should match table.fields count. "
+ "This ensures value_stats does NOT contain system fields.")
.isEqualTo(expectedFieldCount);
} else {
for (String fieldName : Objects.requireNonNull(file.valueStatsCols())) {
boolean isSystemField =
fieldName.startsWith(KEY_FIELD_PREFIX)
|| SpecialFields.isSystemField(fieldName);
assertThat(isSystemField)
.as("value_stats_cols should NOT contain system field: " + fieldName)
.isFalse();
}
assertThat(file.valueStats().minValues().getFieldCount())
.as("value_stats field count should match value_stats_cols size")
.isEqualTo(Objects.requireNonNull(file.valueStatsCols()).size());
}

assertThat(file.valueStats().minValues().getFieldCount())
.isEqualTo(file.valueStats().maxValues().getFieldCount());
assertThat(file.valueStats().nullCounts().size())
.isEqualTo(file.valueStats().minValues().getFieldCount());
}

@Test
Expand Down
132 changes: 132 additions & 0 deletions paimon-python/pypaimon/tests/reader_base_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,81 @@ def test_value_stats_cols_param(self):
test_name="specific_case"
)

schema_with_stats = Schema.from_pyarrow_schema(pa_schema, options={'metadata.stats-mode': 'full'})
catalog.create_table("test_db.test_value_stats_cols_schema_match", schema_with_stats, False)
table_with_stats = catalog.get_table("test_db.test_value_stats_cols_schema_match")
self._test_append_only_schema_match_case(table_with_stats, pa_schema)

def test_primary_key_value_stats_excludes_system_fields(self):
catalog = CatalogFactory.create({
"warehouse": self.warehouse
})
catalog.create_database("test_db_system_fields", True)

pk_pa_schema = pa.schema([
('id', pa.int64()),
('name', pa.string()),
('price', pa.float64()),
])
pk_schema = Schema.from_pyarrow_schema(
pk_pa_schema,
primary_keys=['id'],
options={'metadata.stats-mode': 'full', 'bucket': '2'}
)
catalog.create_table("test_db_system_fields.test_pk_value_stats_system_fields", pk_schema, False)
pk_table = catalog.get_table("test_db_system_fields.test_pk_value_stats_system_fields")

pk_test_data = pa.Table.from_pydict({
'id': [1, 2, 3],
'name': ['Alice', 'Bob', 'Charlie'],
'price': [10.5, 20.3, 30.7],
}, schema=pk_pa_schema)

pk_write_builder = pk_table.new_batch_write_builder()
pk_writer = pk_write_builder.new_write()
pk_writer.write_arrow(pk_test_data)
pk_commit_messages = pk_writer.prepare_commit()
pk_commit = pk_write_builder.new_commit()
pk_commit.commit(pk_commit_messages)
pk_writer.close()

pk_read_builder = pk_table.new_read_builder()
pk_table_scan = pk_read_builder.new_scan()
latest_snapshot = SnapshotManager(pk_table).get_latest_snapshot()
pk_manifest_files = pk_table_scan.starting_scanner.manifest_list_manager.read_all(latest_snapshot)
pk_manifest_entries = pk_table_scan.starting_scanner.manifest_file_manager.read(
pk_manifest_files[0].file_name,
lambda row: pk_table_scan.starting_scanner._filter_manifest_entry(row),
False
)

self.assertGreater(len(pk_manifest_entries), 0, "Should have at least one manifest entry")
pk_file_meta = pk_manifest_entries[0].file

pk_table_field_names = {f.name for f in pk_table.fields}
system_fields = {'_KEY_id', '_SEQUENCE_NUMBER', '_VALUE_KIND', '_ROW_ID'}
pk_table_has_system_fields = bool(pk_table_field_names & system_fields)
self.assertFalse(pk_table_has_system_fields,
f"table.fields should NOT contain system fields, but got: {pk_table_field_names}")

if pk_file_meta.value_stats_cols is None:
pk_value_stats_fields = pk_table_scan.starting_scanner.manifest_file_manager._get_value_stats_fields(
{'_VALUE_STATS_COLS': None},
pk_table.fields
)
expected_count = len(pk_value_stats_fields)
actual_count = pk_file_meta.value_stats.min_values.arity
self.assertEqual(actual_count, expected_count,
f"Field count mismatch: value_stats has {actual_count} fields, "
f"but table.fields has {expected_count} fields. "
f"This indicates value_stats contains system fields that are not in table.fields.")
else:
for field_name in pk_file_meta.value_stats_cols:
is_system_field = (field_name.startswith('_KEY_') or
field_name in ['_SEQUENCE_NUMBER', '_VALUE_KIND', '_ROW_ID'])
self.assertFalse(is_system_field,
f"value_stats_cols should not contain system field: {field_name}")

def test_types(self):
data_fields = [
DataField(0, "f0", AtomicType('TINYINT'), 'desc'),
Expand Down Expand Up @@ -700,6 +775,63 @@ def _test_value_stats_cols_case(self, manifest_manager, table, value_stats_cols,

self.assertEqual(read_entry.file.value_stats.null_counts, null_counts)

def _test_append_only_schema_match_case(self, table, pa_schema):
"""Test that for append-only tables, data.schema matches table.fields.

This verifies the assumption in data_writer.py that for append-only tables,
PyarrowFieldParser.to_paimon_schema(data.schema) should have the same fields
as self.table.fields (same count and same field names).
"""
self.assertFalse(table.is_primary_key_table,
"Table should be append-only (no primary keys)")

test_data = pa.Table.from_pydict({
'id': [1, 2, 3],
'name': ['Alice', 'Bob', 'Charlie'],
'price': [10.5, 20.3, 30.7],
'category': ['A', 'B', 'C'],
}, schema=pa_schema)

write_builder = table.new_batch_write_builder()
writer = write_builder.new_write()
writer.write_arrow(test_data)
commit_messages = writer.prepare_commit()
commit = write_builder.new_commit()
commit.commit(commit_messages)
writer.close()

# Verify that data.schema (converted to paimon schema) matches table.fields
data_fields_from_schema = PyarrowFieldParser.to_paimon_schema(test_data.schema)
table_fields = table.fields

# Verify field count matches
self.assertEqual(len(data_fields_from_schema), len(table_fields),
f"Field count mismatch: data.schema has {len(data_fields_from_schema)} fields, "
f"but table.fields has {len(table_fields)} fields")

# Verify field names match (order may differ, but names should match)
data_field_names = {field.name for field in data_fields_from_schema}
table_field_names = {field.name for field in table_fields}
self.assertEqual(data_field_names, table_field_names,
f"Field names mismatch: data.schema has {data_field_names}, "
f"but table.fields has {table_field_names}")

# Read manifest to verify value_stats_cols is None (all fields included)
read_builder = table.new_read_builder()
table_scan = read_builder.new_scan()
latest_snapshot = SnapshotManager(table).get_latest_snapshot()
manifest_files = table_scan.starting_scanner.manifest_list_manager.read_all(latest_snapshot)
manifest_entries = table_scan.starting_scanner.manifest_file_manager.read(
manifest_files[0].file_name,
lambda row: table_scan.starting_scanner._filter_manifest_entry(row),
False
)

if len(manifest_entries) > 0:
file_meta = manifest_entries[0].file
self.assertIsNone(file_meta.value_stats_cols,
"value_stats_cols should be None when all table fields are included")

def test_split_target_size(self):
"""Test source.split.target-size configuration effect on split generation."""
from pypaimon.common.options.core_options import CoreOptions
Expand Down
7 changes: 5 additions & 2 deletions paimon-python/pypaimon/write/writer/data_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,11 @@ def _write_data_to_file(self, data: pa.Table):

# key stats & value stats
value_stats_enabled = self.options.metadata_stats_enabled()
stats_fields = PyarrowFieldParser.to_paimon_schema(data.schema) if value_stats_enabled\
else self.table.trimmed_primary_keys_fields
if value_stats_enabled:
stats_fields = self.table.fields if self.table.is_primary_key_table \
else PyarrowFieldParser.to_paimon_schema(data.schema)
else:
stats_fields = self.table.trimmed_primary_keys_fields
column_stats = {
field.name: self._get_column_stats(data, field.name)
for field in stats_fields
Expand Down
Loading