diff --git a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java index 3b9d7d2831ba..ef0ab9de24f6 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java @@ -101,6 +101,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Random; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicInteger; @@ -142,6 +143,7 @@ import static org.apache.paimon.predicate.SortValue.NullOrdering.NULLS_LAST; import static org.apache.paimon.predicate.SortValue.SortDirection.ASCENDING; import static org.apache.paimon.predicate.SortValue.SortDirection.DESCENDING; +import static org.apache.paimon.table.SpecialFields.KEY_FIELD_PREFIX; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -248,6 +250,33 @@ public void testStatsModePerLevel(String format) throws Exception { file = ((DataSplit) table.newScan().plan().splits().get(0)).dataFiles().get(0); assertThat(file.level()).isEqualTo(5); assertThat(file.valueStats().maxValues().getFieldCount()).isGreaterThan(4); + + if (file.valueStatsCols() == null) { + int expectedFieldCount = table.schema().fields().size(); + int actualFieldCount = file.valueStats().minValues().getFieldCount(); + assertThat(actualFieldCount) + .as( + "When value_stats_cols is null, value_stats field count should match table.fields count. " + + "This ensures value_stats does NOT contain system fields.") + .isEqualTo(expectedFieldCount); + } else { + for (String fieldName : Objects.requireNonNull(file.valueStatsCols())) { + boolean isSystemField = + fieldName.startsWith(KEY_FIELD_PREFIX) + || SpecialFields.isSystemField(fieldName); + assertThat(isSystemField) + .as("value_stats_cols should NOT contain system field: " + fieldName) + .isFalse(); + } + assertThat(file.valueStats().minValues().getFieldCount()) + .as("value_stats field count should match value_stats_cols size") + .isEqualTo(Objects.requireNonNull(file.valueStatsCols()).size()); + } + + assertThat(file.valueStats().minValues().getFieldCount()) + .isEqualTo(file.valueStats().maxValues().getFieldCount()); + assertThat(file.valueStats().nullCounts().size()) + .isEqualTo(file.valueStats().minValues().getFieldCount()); } @Test diff --git a/paimon-python/pypaimon/tests/reader_base_test.py b/paimon-python/pypaimon/tests/reader_base_test.py index 92a275585ccc..81a7a5baf9b9 100644 --- a/paimon-python/pypaimon/tests/reader_base_test.py +++ b/paimon-python/pypaimon/tests/reader_base_test.py @@ -471,6 +471,81 @@ def test_value_stats_cols_param(self): test_name="specific_case" ) + schema_with_stats = Schema.from_pyarrow_schema(pa_schema, options={'metadata.stats-mode': 'full'}) + catalog.create_table("test_db.test_value_stats_cols_schema_match", schema_with_stats, False) + table_with_stats = catalog.get_table("test_db.test_value_stats_cols_schema_match") + self._test_append_only_schema_match_case(table_with_stats, pa_schema) + + def test_primary_key_value_stats_excludes_system_fields(self): + catalog = CatalogFactory.create({ + "warehouse": self.warehouse + }) + catalog.create_database("test_db_system_fields", True) + + pk_pa_schema = pa.schema([ + ('id', pa.int64()), + ('name', pa.string()), + ('price', pa.float64()), + ]) + pk_schema = Schema.from_pyarrow_schema( + pk_pa_schema, + primary_keys=['id'], + options={'metadata.stats-mode': 'full', 'bucket': '2'} + ) + catalog.create_table("test_db_system_fields.test_pk_value_stats_system_fields", pk_schema, False) + pk_table = catalog.get_table("test_db_system_fields.test_pk_value_stats_system_fields") + + pk_test_data = pa.Table.from_pydict({ + 'id': [1, 2, 3], + 'name': ['Alice', 'Bob', 'Charlie'], + 'price': [10.5, 20.3, 30.7], + }, schema=pk_pa_schema) + + pk_write_builder = pk_table.new_batch_write_builder() + pk_writer = pk_write_builder.new_write() + pk_writer.write_arrow(pk_test_data) + pk_commit_messages = pk_writer.prepare_commit() + pk_commit = pk_write_builder.new_commit() + pk_commit.commit(pk_commit_messages) + pk_writer.close() + + pk_read_builder = pk_table.new_read_builder() + pk_table_scan = pk_read_builder.new_scan() + latest_snapshot = SnapshotManager(pk_table).get_latest_snapshot() + pk_manifest_files = pk_table_scan.starting_scanner.manifest_list_manager.read_all(latest_snapshot) + pk_manifest_entries = pk_table_scan.starting_scanner.manifest_file_manager.read( + pk_manifest_files[0].file_name, + lambda row: pk_table_scan.starting_scanner._filter_manifest_entry(row), + False + ) + + self.assertGreater(len(pk_manifest_entries), 0, "Should have at least one manifest entry") + pk_file_meta = pk_manifest_entries[0].file + + pk_table_field_names = {f.name for f in pk_table.fields} + system_fields = {'_KEY_id', '_SEQUENCE_NUMBER', '_VALUE_KIND', '_ROW_ID'} + pk_table_has_system_fields = bool(pk_table_field_names & system_fields) + self.assertFalse(pk_table_has_system_fields, + f"table.fields should NOT contain system fields, but got: {pk_table_field_names}") + + if pk_file_meta.value_stats_cols is None: + pk_value_stats_fields = pk_table_scan.starting_scanner.manifest_file_manager._get_value_stats_fields( + {'_VALUE_STATS_COLS': None}, + pk_table.fields + ) + expected_count = len(pk_value_stats_fields) + actual_count = pk_file_meta.value_stats.min_values.arity + self.assertEqual(actual_count, expected_count, + f"Field count mismatch: value_stats has {actual_count} fields, " + f"but table.fields has {expected_count} fields. " + f"This indicates value_stats contains system fields that are not in table.fields.") + else: + for field_name in pk_file_meta.value_stats_cols: + is_system_field = (field_name.startswith('_KEY_') or + field_name in ['_SEQUENCE_NUMBER', '_VALUE_KIND', '_ROW_ID']) + self.assertFalse(is_system_field, + f"value_stats_cols should not contain system field: {field_name}") + def test_types(self): data_fields = [ DataField(0, "f0", AtomicType('TINYINT'), 'desc'), @@ -700,6 +775,63 @@ def _test_value_stats_cols_case(self, manifest_manager, table, value_stats_cols, self.assertEqual(read_entry.file.value_stats.null_counts, null_counts) + def _test_append_only_schema_match_case(self, table, pa_schema): + """Test that for append-only tables, data.schema matches table.fields. + + This verifies the assumption in data_writer.py that for append-only tables, + PyarrowFieldParser.to_paimon_schema(data.schema) should have the same fields + as self.table.fields (same count and same field names). + """ + self.assertFalse(table.is_primary_key_table, + "Table should be append-only (no primary keys)") + + test_data = pa.Table.from_pydict({ + 'id': [1, 2, 3], + 'name': ['Alice', 'Bob', 'Charlie'], + 'price': [10.5, 20.3, 30.7], + 'category': ['A', 'B', 'C'], + }, schema=pa_schema) + + write_builder = table.new_batch_write_builder() + writer = write_builder.new_write() + writer.write_arrow(test_data) + commit_messages = writer.prepare_commit() + commit = write_builder.new_commit() + commit.commit(commit_messages) + writer.close() + + # Verify that data.schema (converted to paimon schema) matches table.fields + data_fields_from_schema = PyarrowFieldParser.to_paimon_schema(test_data.schema) + table_fields = table.fields + + # Verify field count matches + self.assertEqual(len(data_fields_from_schema), len(table_fields), + f"Field count mismatch: data.schema has {len(data_fields_from_schema)} fields, " + f"but table.fields has {len(table_fields)} fields") + + # Verify field names match (order may differ, but names should match) + data_field_names = {field.name for field in data_fields_from_schema} + table_field_names = {field.name for field in table_fields} + self.assertEqual(data_field_names, table_field_names, + f"Field names mismatch: data.schema has {data_field_names}, " + f"but table.fields has {table_field_names}") + + # Read manifest to verify value_stats_cols is None (all fields included) + read_builder = table.new_read_builder() + table_scan = read_builder.new_scan() + latest_snapshot = SnapshotManager(table).get_latest_snapshot() + manifest_files = table_scan.starting_scanner.manifest_list_manager.read_all(latest_snapshot) + manifest_entries = table_scan.starting_scanner.manifest_file_manager.read( + manifest_files[0].file_name, + lambda row: table_scan.starting_scanner._filter_manifest_entry(row), + False + ) + + if len(manifest_entries) > 0: + file_meta = manifest_entries[0].file + self.assertIsNone(file_meta.value_stats_cols, + "value_stats_cols should be None when all table fields are included") + def test_split_target_size(self): """Test source.split.target-size configuration effect on split generation.""" from pypaimon.common.options.core_options import CoreOptions diff --git a/paimon-python/pypaimon/write/writer/data_writer.py b/paimon-python/pypaimon/write/writer/data_writer.py index 73609ed91282..38dd58c15e88 100644 --- a/paimon-python/pypaimon/write/writer/data_writer.py +++ b/paimon-python/pypaimon/write/writer/data_writer.py @@ -192,8 +192,11 @@ def _write_data_to_file(self, data: pa.Table): # key stats & value stats value_stats_enabled = self.options.metadata_stats_enabled() - stats_fields = PyarrowFieldParser.to_paimon_schema(data.schema) if value_stats_enabled\ - else self.table.trimmed_primary_keys_fields + if value_stats_enabled: + stats_fields = self.table.fields if self.table.is_primary_key_table \ + else PyarrowFieldParser.to_paimon_schema(data.schema) + else: + stats_fields = self.table.trimmed_primary_keys_fields column_stats = { field.name: self._get_column_stats(data, field.name) for field in stats_fields