Skip to content

Commit e2591d1

Browse files
[python] light refactor for stats collect (#6941)
1 parent 7e21bda commit e2591d1

File tree

3 files changed

+226
-40
lines changed

3 files changed

+226
-40
lines changed

paimon-python/pypaimon/tests/reader_base_test.py

Lines changed: 184 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -546,6 +546,84 @@ def test_primary_key_value_stats_excludes_system_fields(self):
546546
self.assertFalse(is_system_field,
547547
f"value_stats_cols should not contain system field: {field_name}")
548548

549+
def test_value_stats_empty_when_stats_disabled(self):
550+
catalog = CatalogFactory.create({
551+
"warehouse": self.warehouse
552+
})
553+
catalog.create_database("test_db_stats_disabled", True)
554+
555+
pa_schema = pa.schema([
556+
('id', pa.int64()),
557+
('name', pa.string()),
558+
('price', pa.float64()),
559+
])
560+
schema = Schema.from_pyarrow_schema(
561+
pa_schema,
562+
primary_keys=['id'],
563+
options={'metadata.stats-mode': 'none', 'bucket': '2'} # Stats disabled
564+
)
565+
catalog.create_table("test_db_stats_disabled.test_stats_disabled", schema, False)
566+
table = catalog.get_table("test_db_stats_disabled.test_stats_disabled")
567+
568+
test_data = pa.Table.from_pydict({
569+
'id': [1, 2, 3],
570+
'name': ['Alice', 'Bob', 'Charlie'],
571+
'price': [10.5, 20.3, 30.7],
572+
}, schema=pa_schema)
573+
574+
write_builder = table.new_batch_write_builder()
575+
writer = write_builder.new_write()
576+
writer.write_arrow(test_data)
577+
commit_messages = writer.prepare_commit()
578+
commit = write_builder.new_commit()
579+
commit.commit(commit_messages)
580+
writer.close()
581+
582+
read_builder = table.new_read_builder()
583+
table_scan = read_builder.new_scan()
584+
latest_snapshot = SnapshotManager(table).get_latest_snapshot()
585+
manifest_files = table_scan.starting_scanner.manifest_list_manager.read_all(latest_snapshot)
586+
manifest_entries = table_scan.starting_scanner.manifest_file_manager.read(
587+
manifest_files[0].file_name,
588+
lambda row: table_scan.starting_scanner._filter_manifest_entry(row),
589+
False
590+
)
591+
592+
self.assertGreater(len(manifest_entries), 0, "Should have at least one manifest entry")
593+
file_meta = manifest_entries[0].file
594+
595+
self.assertEqual(
596+
file_meta.value_stats_cols, [],
597+
"value_stats_cols should be empty list [] when stats are disabled"
598+
)
599+
600+
self.assertEqual(
601+
file_meta.value_stats.min_values.arity, 0,
602+
"value_stats.min_values should be empty (arity=0) when stats are disabled"
603+
)
604+
self.assertEqual(
605+
file_meta.value_stats.max_values.arity, 0,
606+
"value_stats.max_values should be empty (arity=0) when stats are disabled"
607+
)
608+
self.assertEqual(
609+
len(file_meta.value_stats.null_counts), 0,
610+
"value_stats.null_counts should be empty when stats are disabled"
611+
)
612+
613+
empty_stats = SimpleStats.empty_stats()
614+
self.assertEqual(
615+
file_meta.value_stats.min_values.arity, len(empty_stats.min_values),
616+
"value_stats.min_values should be empty (same as SimpleStats.empty_stats()) when stats are disabled"
617+
)
618+
self.assertEqual(
619+
file_meta.value_stats.max_values.arity, len(empty_stats.max_values),
620+
"value_stats.max_values should be empty (same as SimpleStats.empty_stats()) when stats are disabled"
621+
)
622+
self.assertEqual(
623+
len(file_meta.value_stats.null_counts), len(empty_stats.null_counts),
624+
"value_stats.null_counts should be empty (same as SimpleStats.empty_stats()) when stats are disabled"
625+
)
626+
549627
def test_types(self):
550628
data_fields = [
551629
DataField(0, "f0", AtomicType('TINYINT'), 'desc'),
@@ -776,20 +854,55 @@ def _test_value_stats_cols_case(self, manifest_manager, table, value_stats_cols,
776854
self.assertEqual(read_entry.file.value_stats.null_counts, null_counts)
777855

778856
def _test_append_only_schema_match_case(self, table, pa_schema):
779-
"""Test that for append-only tables, data.schema matches table.fields.
857+
from pypaimon.schema.data_types import PyarrowFieldParser
780858

781-
This verifies the assumption in data_writer.py that for append-only tables,
782-
PyarrowFieldParser.to_paimon_schema(data.schema) should have the same fields
783-
as self.table.fields (same count and same field names).
784-
"""
785859
self.assertFalse(table.is_primary_key_table,
786860
"Table should be append-only (no primary keys)")
787861

788862
test_data = pa.Table.from_pydict({
789863
'id': [1, 2, 3],
790864
'name': ['Alice', 'Bob', 'Charlie'],
791865
'price': [10.5, 20.3, 30.7],
792-
'category': ['A', 'B', 'C'],
866+
'category': ['A', 'B', 'C']
867+
}, schema=pa_schema)
868+
869+
data_fields_from_schema = PyarrowFieldParser.to_paimon_schema(test_data.schema)
870+
table_fields = table.fields
871+
872+
self.assertEqual(
873+
len(data_fields_from_schema), len(table_fields),
874+
f"Field count mismatch: data.schema has {len(data_fields_from_schema)} fields, "
875+
f"but table.fields has {len(table_fields)} fields"
876+
)
877+
878+
data_field_names = {field.name for field in data_fields_from_schema}
879+
table_field_names = {field.name for field in table_fields}
880+
self.assertEqual(
881+
data_field_names, table_field_names,
882+
f"Field names mismatch: data.schema has {data_field_names}, "
883+
f"but table.fields has {table_field_names}"
884+
)
885+
886+
def test_primary_key_value_stats(self):
887+
pa_schema = pa.schema([
888+
('id', pa.int64()),
889+
('name', pa.string()),
890+
('price', pa.float64()),
891+
('category', pa.string())
892+
])
893+
schema = Schema.from_pyarrow_schema(
894+
pa_schema,
895+
primary_keys=['id'],
896+
options={'metadata.stats-mode': 'full', 'bucket': '2'}
897+
)
898+
self.catalog.create_table('default.test_pk_value_stats', schema, False)
899+
table = self.catalog.get_table('default.test_pk_value_stats')
900+
901+
test_data = pa.Table.from_pydict({
902+
'id': [1, 2, 3, 4, 5],
903+
'name': ['Alice', 'Bob', 'Charlie', 'David', 'Eve'],
904+
'price': [10.5, 20.3, 30.7, 40.1, 50.9],
905+
'category': ['A', 'B', 'C', 'D', 'E']
793906
}, schema=pa_schema)
794907

795908
write_builder = table.new_batch_write_builder()
@@ -831,6 +944,71 @@ def _test_append_only_schema_match_case(self, table, pa_schema):
831944
file_meta = manifest_entries[0].file
832945
self.assertIsNone(file_meta.value_stats_cols,
833946
"value_stats_cols should be None when all table fields are included")
947+
self.assertGreater(len(manifest_entries), 0, "Should have at least one manifest entry")
948+
file_meta = manifest_entries[0].file
949+
950+
key_stats = file_meta.key_stats
951+
self.assertIsNotNone(key_stats, "key_stats should not be None")
952+
self.assertGreater(key_stats.min_values.arity, 0, "key_stats should contain key fields")
953+
self.assertEqual(key_stats.min_values.arity, 1, "key_stats should contain exactly 1 key field (id)")
954+
955+
value_stats = file_meta.value_stats
956+
self.assertIsNotNone(value_stats, "value_stats should not be None")
957+
958+
if file_meta.value_stats_cols is None:
959+
expected_value_fields = ['name', 'price', 'category']
960+
self.assertGreaterEqual(value_stats.min_values.arity, len(expected_value_fields),
961+
f"value_stats should contain at least {len(expected_value_fields)} value fields")
962+
else:
963+
self.assertNotIn('id', file_meta.value_stats_cols,
964+
"Key field 'id' should NOT be in value_stats_cols")
965+
966+
expected_value_fields = ['name', 'price', 'category']
967+
self.assertTrue(set(expected_value_fields).issubset(set(file_meta.value_stats_cols)),
968+
f"value_stats_cols should contain value fields: {expected_value_fields}, "
969+
f"but got: {file_meta.value_stats_cols}")
970+
971+
expected_arity = len(file_meta.value_stats_cols)
972+
self.assertEqual(value_stats.min_values.arity, expected_arity,
973+
f"value_stats should contain {expected_arity} fields (matching value_stats_cols), "
974+
f"but got {value_stats.min_values.arity}")
975+
self.assertEqual(value_stats.max_values.arity, expected_arity,
976+
f"value_stats should contain {expected_arity} fields (matching value_stats_cols), "
977+
f"but got {value_stats.max_values.arity}")
978+
self.assertEqual(len(value_stats.null_counts), expected_arity,
979+
f"value_stats null_counts should have {expected_arity} elements, "
980+
f"but got {len(value_stats.null_counts)}")
981+
982+
self.assertEqual(value_stats.min_values.arity, len(file_meta.value_stats_cols),
983+
f"value_stats.min_values.arity ({value_stats.min_values.arity}) must match "
984+
f"value_stats_cols length ({len(file_meta.value_stats_cols)})")
985+
986+
for field_name in file_meta.value_stats_cols:
987+
is_system_field = (field_name.startswith('_KEY_') or
988+
field_name in ['_SEQUENCE_NUMBER', '_VALUE_KIND', '_ROW_ID'])
989+
self.assertFalse(is_system_field,
990+
f"value_stats_cols should not contain system field: {field_name}")
991+
992+
value_stats_fields = table_scan.starting_scanner.manifest_file_manager._get_value_stats_fields(
993+
{'_VALUE_STATS_COLS': file_meta.value_stats_cols},
994+
table.fields
995+
)
996+
min_value_stats = GenericRowDeserializer.from_bytes(
997+
value_stats.min_values.data,
998+
value_stats_fields
999+
).values
1000+
max_value_stats = GenericRowDeserializer.from_bytes(
1001+
value_stats.max_values.data,
1002+
value_stats_fields
1003+
).values
1004+
1005+
self.assertEqual(len(min_value_stats), 3, "min_value_stats should have 3 values")
1006+
self.assertEqual(len(max_value_stats), 3, "max_value_stats should have 3 values")
1007+
1008+
actual_data = read_builder.new_read().to_arrow(table_scan.plan().splits())
1009+
self.assertEqual(actual_data.num_rows, 5, "Should have 5 rows")
1010+
actual_ids = sorted(actual_data.column('id').to_pylist())
1011+
self.assertEqual(actual_ids, [1, 2, 3, 4, 5], "All IDs should be present")
8341012

8351013
def test_split_target_size(self):
8361014
"""Test source.split.target-size configuration effect on split generation."""

paimon-python/pypaimon/write/writer/data_blob_writer.py

Lines changed: 3 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -276,14 +276,7 @@ def _create_data_file_meta(self, file_name: str, file_path: str, data: pa.Table,
276276
# Column stats (only for normal columns)
277277
metadata_stats_enabled = self.options.metadata_stats_enabled()
278278
stats_columns = self.normal_columns if metadata_stats_enabled else []
279-
column_stats = {
280-
field.name: self._get_column_stats(data, field.name)
281-
for field in stats_columns
282-
}
283-
284-
min_value_stats = [column_stats[field.name]['min_values'] for field in stats_columns]
285-
max_value_stats = [column_stats[field.name]['max_values'] for field in stats_columns]
286-
value_null_counts = [column_stats[field.name]['null_counts'] for field in stats_columns]
279+
value_stats = self._collect_value_stats(data, stats_columns)
287280

288281
self.sequence_generator.start = self.sequence_generator.current
289282

@@ -293,14 +286,8 @@ def _create_data_file_meta(self, file_name: str, file_path: str, data: pa.Table,
293286
row_count=data.num_rows,
294287
min_key=GenericRow([], []),
295288
max_key=GenericRow([], []),
296-
key_stats=SimpleStats(
297-
GenericRow([], []),
298-
GenericRow([], []),
299-
[]),
300-
value_stats=SimpleStats(
301-
GenericRow(min_value_stats, stats_columns),
302-
GenericRow(max_value_stats, stats_columns),
303-
value_null_counts),
289+
key_stats=SimpleStats.empty_stats(),
290+
value_stats=value_stats,
304291
min_sequence_number=-1,
305292
max_sequence_number=-1,
306293
schema_id=self.table.table_schema.id,

paimon-python/pypaimon/write/writer/data_writer.py

Lines changed: 39 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -201,17 +201,14 @@ def _write_data_to_file(self, data: pa.Table):
201201
field.name: self._get_column_stats(data, field.name)
202202
for field in stats_fields
203203
}
204-
data_fields = stats_fields if value_stats_enabled else []
205-
min_value_stats = [column_stats[field.name]['min_values'] for field in data_fields]
206-
max_value_stats = [column_stats[field.name]['max_values'] for field in data_fields]
207-
value_null_counts = [column_stats[field.name]['null_counts'] for field in data_fields]
208204
key_fields = self.trimmed_primary_keys_fields
209-
min_key_stats = [column_stats[field.name]['min_values'] for field in key_fields]
210-
max_key_stats = [column_stats[field.name]['max_values'] for field in key_fields]
211-
key_null_counts = [column_stats[field.name]['null_counts'] for field in key_fields]
212-
if not all(count == 0 for count in key_null_counts):
205+
key_stats = self._collect_value_stats(data, key_fields, column_stats)
206+
if not all(count == 0 for count in key_stats.null_counts):
213207
raise RuntimeError("Primary key should not be null")
214208

209+
value_fields = stats_fields if value_stats_enabled else []
210+
value_stats = self._collect_value_stats(data, value_fields, column_stats)
211+
215212
min_seq = self.sequence_generator.start
216213
max_seq = self.sequence_generator.current
217214
self.sequence_generator.start = self.sequence_generator.current
@@ -221,16 +218,8 @@ def _write_data_to_file(self, data: pa.Table):
221218
row_count=data.num_rows,
222219
min_key=GenericRow(min_key, self.trimmed_primary_keys_fields),
223220
max_key=GenericRow(max_key, self.trimmed_primary_keys_fields),
224-
key_stats=SimpleStats(
225-
GenericRow(min_key_stats, self.trimmed_primary_keys_fields),
226-
GenericRow(max_key_stats, self.trimmed_primary_keys_fields),
227-
key_null_counts,
228-
),
229-
value_stats=SimpleStats(
230-
GenericRow(min_value_stats, data_fields),
231-
GenericRow(max_value_stats, data_fields),
232-
value_null_counts,
233-
),
221+
key_stats=key_stats,
222+
value_stats=value_stats,
234223
min_sequence_number=min_seq,
235224
max_sequence_number=max_seq,
236225
schema_id=self.table.table_schema.id,
@@ -278,6 +267,27 @@ def _find_optimal_split_point(data: pa.RecordBatch, target_size: int) -> int:
278267

279268
return best_split
280269

270+
def _collect_value_stats(self, data: pa.Table, fields: List,
271+
column_stats: Optional[Dict[str, Dict]] = None) -> SimpleStats:
272+
if not fields:
273+
return SimpleStats.empty_stats()
274+
275+
if column_stats is None or not column_stats:
276+
column_stats = {
277+
field.name: self._get_column_stats(data, field.name)
278+
for field in fields
279+
}
280+
281+
min_stats = [column_stats[field.name]['min_values'] for field in fields]
282+
max_stats = [column_stats[field.name]['max_values'] for field in fields]
283+
null_counts = [column_stats[field.name]['null_counts'] for field in fields]
284+
285+
return SimpleStats(
286+
GenericRow(min_stats, fields),
287+
GenericRow(max_stats, fields),
288+
null_counts
289+
)
290+
281291
@staticmethod
282292
def _get_column_stats(record_batch: pa.RecordBatch, column_name: str) -> Dict:
283293
column_array = record_batch.column(column_name)
@@ -287,6 +297,17 @@ def _get_column_stats(record_batch: pa.RecordBatch, column_name: str) -> Dict:
287297
"max_values": None,
288298
"null_counts": column_array.null_count,
289299
}
300+
301+
column_type = column_array.type
302+
supports_minmax = not (pa.types.is_nested(column_type) or pa.types.is_map(column_type))
303+
304+
if not supports_minmax:
305+
return {
306+
"min_values": None,
307+
"max_values": None,
308+
"null_counts": column_array.null_count,
309+
}
310+
290311
min_values = pc.min(column_array).as_py()
291312
max_values = pc.max(column_array).as_py()
292313
null_counts = column_array.null_count

0 commit comments

Comments
 (0)