diff --git a/paimon-python/pypaimon/tests/reader_base_test.py b/paimon-python/pypaimon/tests/reader_base_test.py index 81a7a5baf9b9..c6223dadcb31 100644 --- a/paimon-python/pypaimon/tests/reader_base_test.py +++ b/paimon-python/pypaimon/tests/reader_base_test.py @@ -546,6 +546,84 @@ def test_primary_key_value_stats_excludes_system_fields(self): self.assertFalse(is_system_field, f"value_stats_cols should not contain system field: {field_name}") + def test_value_stats_empty_when_stats_disabled(self): + catalog = CatalogFactory.create({ + "warehouse": self.warehouse + }) + catalog.create_database("test_db_stats_disabled", True) + + pa_schema = pa.schema([ + ('id', pa.int64()), + ('name', pa.string()), + ('price', pa.float64()), + ]) + schema = Schema.from_pyarrow_schema( + pa_schema, + primary_keys=['id'], + options={'metadata.stats-mode': 'none', 'bucket': '2'} # Stats disabled + ) + catalog.create_table("test_db_stats_disabled.test_stats_disabled", schema, False) + table = catalog.get_table("test_db_stats_disabled.test_stats_disabled") + + test_data = pa.Table.from_pydict({ + 'id': [1, 2, 3], + 'name': ['Alice', 'Bob', 'Charlie'], + 'price': [10.5, 20.3, 30.7], + }, schema=pa_schema) + + write_builder = table.new_batch_write_builder() + writer = write_builder.new_write() + writer.write_arrow(test_data) + commit_messages = writer.prepare_commit() + commit = write_builder.new_commit() + commit.commit(commit_messages) + writer.close() + + read_builder = table.new_read_builder() + table_scan = read_builder.new_scan() + latest_snapshot = SnapshotManager(table).get_latest_snapshot() + manifest_files = table_scan.starting_scanner.manifest_list_manager.read_all(latest_snapshot) + manifest_entries = table_scan.starting_scanner.manifest_file_manager.read( + manifest_files[0].file_name, + lambda row: table_scan.starting_scanner._filter_manifest_entry(row), + False + ) + + self.assertGreater(len(manifest_entries), 0, "Should have at least one manifest entry") + file_meta = manifest_entries[0].file + + self.assertEqual( + file_meta.value_stats_cols, [], + "value_stats_cols should be empty list [] when stats are disabled" + ) + + self.assertEqual( + file_meta.value_stats.min_values.arity, 0, + "value_stats.min_values should be empty (arity=0) when stats are disabled" + ) + self.assertEqual( + file_meta.value_stats.max_values.arity, 0, + "value_stats.max_values should be empty (arity=0) when stats are disabled" + ) + self.assertEqual( + len(file_meta.value_stats.null_counts), 0, + "value_stats.null_counts should be empty when stats are disabled" + ) + + empty_stats = SimpleStats.empty_stats() + self.assertEqual( + file_meta.value_stats.min_values.arity, len(empty_stats.min_values), + "value_stats.min_values should be empty (same as SimpleStats.empty_stats()) when stats are disabled" + ) + self.assertEqual( + file_meta.value_stats.max_values.arity, len(empty_stats.max_values), + "value_stats.max_values should be empty (same as SimpleStats.empty_stats()) when stats are disabled" + ) + self.assertEqual( + len(file_meta.value_stats.null_counts), len(empty_stats.null_counts), + "value_stats.null_counts should be empty (same as SimpleStats.empty_stats()) when stats are disabled" + ) + def test_types(self): data_fields = [ DataField(0, "f0", AtomicType('TINYINT'), 'desc'), @@ -776,12 +854,8 @@ def _test_value_stats_cols_case(self, manifest_manager, table, value_stats_cols, self.assertEqual(read_entry.file.value_stats.null_counts, null_counts) def _test_append_only_schema_match_case(self, table, pa_schema): - """Test that for append-only tables, data.schema matches table.fields. + from pypaimon.schema.data_types import PyarrowFieldParser - This verifies the assumption in data_writer.py that for append-only tables, - PyarrowFieldParser.to_paimon_schema(data.schema) should have the same fields - as self.table.fields (same count and same field names). - """ self.assertFalse(table.is_primary_key_table, "Table should be append-only (no primary keys)") @@ -789,7 +863,46 @@ def _test_append_only_schema_match_case(self, table, pa_schema): 'id': [1, 2, 3], 'name': ['Alice', 'Bob', 'Charlie'], 'price': [10.5, 20.3, 30.7], - 'category': ['A', 'B', 'C'], + 'category': ['A', 'B', 'C'] + }, schema=pa_schema) + + data_fields_from_schema = PyarrowFieldParser.to_paimon_schema(test_data.schema) + table_fields = table.fields + + self.assertEqual( + len(data_fields_from_schema), len(table_fields), + f"Field count mismatch: data.schema has {len(data_fields_from_schema)} fields, " + f"but table.fields has {len(table_fields)} fields" + ) + + data_field_names = {field.name for field in data_fields_from_schema} + table_field_names = {field.name for field in table_fields} + self.assertEqual( + data_field_names, table_field_names, + f"Field names mismatch: data.schema has {data_field_names}, " + f"but table.fields has {table_field_names}" + ) + + def test_primary_key_value_stats(self): + pa_schema = pa.schema([ + ('id', pa.int64()), + ('name', pa.string()), + ('price', pa.float64()), + ('category', pa.string()) + ]) + schema = Schema.from_pyarrow_schema( + pa_schema, + primary_keys=['id'], + options={'metadata.stats-mode': 'full', 'bucket': '2'} + ) + self.catalog.create_table('default.test_pk_value_stats', schema, False) + table = self.catalog.get_table('default.test_pk_value_stats') + + test_data = pa.Table.from_pydict({ + 'id': [1, 2, 3, 4, 5], + 'name': ['Alice', 'Bob', 'Charlie', 'David', 'Eve'], + 'price': [10.5, 20.3, 30.7, 40.1, 50.9], + 'category': ['A', 'B', 'C', 'D', 'E'] }, schema=pa_schema) write_builder = table.new_batch_write_builder() @@ -831,6 +944,71 @@ def _test_append_only_schema_match_case(self, table, pa_schema): file_meta = manifest_entries[0].file self.assertIsNone(file_meta.value_stats_cols, "value_stats_cols should be None when all table fields are included") + self.assertGreater(len(manifest_entries), 0, "Should have at least one manifest entry") + file_meta = manifest_entries[0].file + + key_stats = file_meta.key_stats + self.assertIsNotNone(key_stats, "key_stats should not be None") + self.assertGreater(key_stats.min_values.arity, 0, "key_stats should contain key fields") + self.assertEqual(key_stats.min_values.arity, 1, "key_stats should contain exactly 1 key field (id)") + + value_stats = file_meta.value_stats + self.assertIsNotNone(value_stats, "value_stats should not be None") + + if file_meta.value_stats_cols is None: + expected_value_fields = ['name', 'price', 'category'] + self.assertGreaterEqual(value_stats.min_values.arity, len(expected_value_fields), + f"value_stats should contain at least {len(expected_value_fields)} value fields") + else: + self.assertNotIn('id', file_meta.value_stats_cols, + "Key field 'id' should NOT be in value_stats_cols") + + expected_value_fields = ['name', 'price', 'category'] + self.assertTrue(set(expected_value_fields).issubset(set(file_meta.value_stats_cols)), + f"value_stats_cols should contain value fields: {expected_value_fields}, " + f"but got: {file_meta.value_stats_cols}") + + expected_arity = len(file_meta.value_stats_cols) + self.assertEqual(value_stats.min_values.arity, expected_arity, + f"value_stats should contain {expected_arity} fields (matching value_stats_cols), " + f"but got {value_stats.min_values.arity}") + self.assertEqual(value_stats.max_values.arity, expected_arity, + f"value_stats should contain {expected_arity} fields (matching value_stats_cols), " + f"but got {value_stats.max_values.arity}") + self.assertEqual(len(value_stats.null_counts), expected_arity, + f"value_stats null_counts should have {expected_arity} elements, " + f"but got {len(value_stats.null_counts)}") + + self.assertEqual(value_stats.min_values.arity, len(file_meta.value_stats_cols), + f"value_stats.min_values.arity ({value_stats.min_values.arity}) must match " + f"value_stats_cols length ({len(file_meta.value_stats_cols)})") + + for field_name in file_meta.value_stats_cols: + is_system_field = (field_name.startswith('_KEY_') or + field_name in ['_SEQUENCE_NUMBER', '_VALUE_KIND', '_ROW_ID']) + self.assertFalse(is_system_field, + f"value_stats_cols should not contain system field: {field_name}") + + value_stats_fields = table_scan.starting_scanner.manifest_file_manager._get_value_stats_fields( + {'_VALUE_STATS_COLS': file_meta.value_stats_cols}, + table.fields + ) + min_value_stats = GenericRowDeserializer.from_bytes( + value_stats.min_values.data, + value_stats_fields + ).values + max_value_stats = GenericRowDeserializer.from_bytes( + value_stats.max_values.data, + value_stats_fields + ).values + + self.assertEqual(len(min_value_stats), 3, "min_value_stats should have 3 values") + self.assertEqual(len(max_value_stats), 3, "max_value_stats should have 3 values") + + actual_data = read_builder.new_read().to_arrow(table_scan.plan().splits()) + self.assertEqual(actual_data.num_rows, 5, "Should have 5 rows") + actual_ids = sorted(actual_data.column('id').to_pylist()) + self.assertEqual(actual_ids, [1, 2, 3, 4, 5], "All IDs should be present") def test_split_target_size(self): """Test source.split.target-size configuration effect on split generation.""" diff --git a/paimon-python/pypaimon/write/writer/data_blob_writer.py b/paimon-python/pypaimon/write/writer/data_blob_writer.py index eaf2b9483cd7..8cdd7428dcbc 100644 --- a/paimon-python/pypaimon/write/writer/data_blob_writer.py +++ b/paimon-python/pypaimon/write/writer/data_blob_writer.py @@ -276,14 +276,7 @@ def _create_data_file_meta(self, file_name: str, file_path: str, data: pa.Table, # Column stats (only for normal columns) metadata_stats_enabled = self.options.metadata_stats_enabled() stats_columns = self.normal_columns if metadata_stats_enabled else [] - column_stats = { - field.name: self._get_column_stats(data, field.name) - for field in stats_columns - } - - min_value_stats = [column_stats[field.name]['min_values'] for field in stats_columns] - max_value_stats = [column_stats[field.name]['max_values'] for field in stats_columns] - value_null_counts = [column_stats[field.name]['null_counts'] for field in stats_columns] + value_stats = self._collect_value_stats(data, stats_columns) self.sequence_generator.start = self.sequence_generator.current @@ -293,14 +286,8 @@ def _create_data_file_meta(self, file_name: str, file_path: str, data: pa.Table, row_count=data.num_rows, min_key=GenericRow([], []), max_key=GenericRow([], []), - key_stats=SimpleStats( - GenericRow([], []), - GenericRow([], []), - []), - value_stats=SimpleStats( - GenericRow(min_value_stats, stats_columns), - GenericRow(max_value_stats, stats_columns), - value_null_counts), + key_stats=SimpleStats.empty_stats(), + value_stats=value_stats, min_sequence_number=-1, max_sequence_number=-1, schema_id=self.table.table_schema.id, diff --git a/paimon-python/pypaimon/write/writer/data_writer.py b/paimon-python/pypaimon/write/writer/data_writer.py index 38dd58c15e88..fa5f004b8e18 100644 --- a/paimon-python/pypaimon/write/writer/data_writer.py +++ b/paimon-python/pypaimon/write/writer/data_writer.py @@ -201,17 +201,14 @@ def _write_data_to_file(self, data: pa.Table): field.name: self._get_column_stats(data, field.name) for field in stats_fields } - data_fields = stats_fields if value_stats_enabled else [] - min_value_stats = [column_stats[field.name]['min_values'] for field in data_fields] - max_value_stats = [column_stats[field.name]['max_values'] for field in data_fields] - value_null_counts = [column_stats[field.name]['null_counts'] for field in data_fields] key_fields = self.trimmed_primary_keys_fields - min_key_stats = [column_stats[field.name]['min_values'] for field in key_fields] - max_key_stats = [column_stats[field.name]['max_values'] for field in key_fields] - key_null_counts = [column_stats[field.name]['null_counts'] for field in key_fields] - if not all(count == 0 for count in key_null_counts): + key_stats = self._collect_value_stats(data, key_fields, column_stats) + if not all(count == 0 for count in key_stats.null_counts): raise RuntimeError("Primary key should not be null") + value_fields = stats_fields if value_stats_enabled else [] + value_stats = self._collect_value_stats(data, value_fields, column_stats) + min_seq = self.sequence_generator.start max_seq = self.sequence_generator.current self.sequence_generator.start = self.sequence_generator.current @@ -221,16 +218,8 @@ def _write_data_to_file(self, data: pa.Table): row_count=data.num_rows, min_key=GenericRow(min_key, self.trimmed_primary_keys_fields), max_key=GenericRow(max_key, self.trimmed_primary_keys_fields), - key_stats=SimpleStats( - GenericRow(min_key_stats, self.trimmed_primary_keys_fields), - GenericRow(max_key_stats, self.trimmed_primary_keys_fields), - key_null_counts, - ), - value_stats=SimpleStats( - GenericRow(min_value_stats, data_fields), - GenericRow(max_value_stats, data_fields), - value_null_counts, - ), + key_stats=key_stats, + value_stats=value_stats, min_sequence_number=min_seq, max_sequence_number=max_seq, schema_id=self.table.table_schema.id, @@ -278,6 +267,27 @@ def _find_optimal_split_point(data: pa.RecordBatch, target_size: int) -> int: return best_split + def _collect_value_stats(self, data: pa.Table, fields: List, + column_stats: Optional[Dict[str, Dict]] = None) -> SimpleStats: + if not fields: + return SimpleStats.empty_stats() + + if column_stats is None or not column_stats: + column_stats = { + field.name: self._get_column_stats(data, field.name) + for field in fields + } + + min_stats = [column_stats[field.name]['min_values'] for field in fields] + max_stats = [column_stats[field.name]['max_values'] for field in fields] + null_counts = [column_stats[field.name]['null_counts'] for field in fields] + + return SimpleStats( + GenericRow(min_stats, fields), + GenericRow(max_stats, fields), + null_counts + ) + @staticmethod def _get_column_stats(record_batch: pa.RecordBatch, column_name: str) -> Dict: column_array = record_batch.column(column_name) @@ -287,6 +297,17 @@ def _get_column_stats(record_batch: pa.RecordBatch, column_name: str) -> Dict: "max_values": None, "null_counts": column_array.null_count, } + + column_type = column_array.type + supports_minmax = not (pa.types.is_nested(column_type) or pa.types.is_map(column_type)) + + if not supports_minmax: + return { + "min_values": None, + "max_values": None, + "null_counts": column_array.null_count, + } + min_values = pc.min(column_array).as_py() max_values = pc.max(column_array).as_py() null_counts = column_array.null_count