Skip to content

Commit 4ed46aa

Browse files
authored
[python] Add totalRecordCount and deltaRecordCount in Snapshot (apache#6141)
1 parent e2e471e commit 4ed46aa

File tree

2 files changed

+56
-0
lines changed

2 files changed

+56
-0
lines changed

paimon-python/pypaimon/tests/writer_test.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,3 +73,22 @@ def test_writer(self):
7373
self.assertTrue(os.path.exists(self.warehouse + "/test_db.db/test_table/bucket-0"))
7474
self.assertEqual(len(glob.glob(self.warehouse + "/test_db.db/test_table/manifest/*.avro")), 2)
7575
self.assertEqual(len(glob.glob(self.warehouse + "/test_db.db/test_table/bucket-0/*.parquet")), 1)
76+
77+
with open(self.warehouse + '/test_db.db/test_table/snapshot/snapshot-1', 'r', encoding='utf-8') as file:
78+
content = ''.join(file.readlines())
79+
self.assertTrue(content.__contains__('\"totalRecordCount\": 3'))
80+
self.assertTrue(content.__contains__('\"deltaRecordCount\": 3'))
81+
82+
write_builder = table.new_batch_write_builder()
83+
table_write = write_builder.new_write()
84+
table_commit = write_builder.new_commit()
85+
table_write.write_arrow(expect)
86+
commit_messages = table_write.prepare_commit()
87+
table_commit.commit(commit_messages)
88+
table_write.close()
89+
table_commit.close()
90+
91+
with open(self.warehouse + '/test_db.db/test_table/snapshot/snapshot-2', 'r', encoding='utf-8') as file:
92+
content = ''.join(file.readlines())
93+
self.assertTrue(content.__contains__('\"totalRecordCount\": 6'))
94+
self.assertTrue(content.__contains__('\"deltaRecordCount\": 3'))

paimon-python/pypaimon/write/file_store_commit.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,10 +58,19 @@ def commit(self, commit_messages: List[CommitMessage], commit_identifier: int):
5858
new_manifest_files = self.manifest_file_manager.write(commit_messages)
5959
if not new_manifest_files:
6060
return
61+
6162
latest_snapshot = self.snapshot_manager.get_latest_snapshot()
63+
6264
existing_manifest_files = []
65+
record_count_add = self._generate_record_count_add(commit_messages)
66+
total_record_count = record_count_add
67+
6368
if latest_snapshot:
6469
existing_manifest_files = self.manifest_list_manager.read_all_manifest_files(latest_snapshot)
70+
previous_record_count = latest_snapshot.total_record_count
71+
if previous_record_count:
72+
total_record_count += previous_record_count
73+
6574
new_manifest_files.extend(existing_manifest_files)
6675
manifest_list = self.manifest_list_manager.write(new_manifest_files)
6776

@@ -72,6 +81,8 @@ def commit(self, commit_messages: List[CommitMessage], commit_identifier: int):
7281
schema_id=0,
7382
base_manifest_list=manifest_list,
7483
delta_manifest_list=manifest_list,
84+
total_record_count=total_record_count,
85+
delta_record_count=record_count_add,
7586
commit_user=self.commit_user,
7687
commit_identifier=commit_identifier,
7788
commit_kind="APPEND",
@@ -100,13 +111,17 @@ def overwrite(self, partition, commit_messages: List[CommitMessage], commit_iden
100111
# In overwrite mode, we don't merge with existing manifests
101112
manifest_list = self.manifest_list_manager.write(new_manifest_files)
102113

114+
record_count_add = self._generate_record_count_add(commit_messages)
115+
103116
new_snapshot_id = self._generate_snapshot_id()
104117
snapshot_data = Snapshot(
105118
version=3,
106119
id=new_snapshot_id,
107120
schema_id=0,
108121
base_manifest_list=manifest_list,
109122
delta_manifest_list=manifest_list,
123+
total_record_count=record_count_add,
124+
delta_record_count=record_count_add,
110125
commit_user=self.commit_user,
111126
commit_identifier=commit_identifier,
112127
commit_kind="OVERWRITE",
@@ -234,3 +249,25 @@ def _generate_partition_statistics(self, commit_messages: List[CommitMessage]) -
234249
)
235250
for stats in partition_stats.values()
236251
]
252+
253+
def _generate_record_count_add(self, commit_messages: List[CommitMessage]) -> int:
254+
"""
255+
Generate record count add from commit messages.
256+
257+
This method follows the Java implementation pattern from
258+
org.apache.paimon.manifest.ManifestEntry.recordCountAdd().
259+
260+
Args:
261+
commit_messages: List of commit messages to analyze
262+
263+
Returns:
264+
Count of add record
265+
"""
266+
record_count = 0
267+
268+
for message in commit_messages:
269+
new_files = message.new_files()
270+
for file_meta in new_files:
271+
record_count += file_meta.row_count
272+
273+
return record_count

0 commit comments

Comments
 (0)