3737import org .apache .paimon .options .CatalogOptions ;
3838import org .apache .paimon .options .Options ;
3939import org .apache .paimon .partition .Partition ;
40+ import org .apache .paimon .partition .PartitionStatistics ;
4041import org .apache .paimon .rest .auth .AuthProvider ;
4142import org .apache .paimon .rest .auth .RESTAuthParameter ;
4243import org .apache .paimon .rest .requests .AlterDatabaseRequest ;
@@ -562,16 +563,8 @@ private MockResponse commitTableHandle(Identifier identifier, String data) throw
562563 if (!tableMetadataStore .containsKey (identifier .getFullName ())) {
563564 throw new Catalog .TableNotExistException (identifier );
564565 }
565- FileStoreTable table = getFileTable (identifier );
566- RenamingSnapshotCommit commit =
567- new RenamingSnapshotCommit (table .snapshotManager (), Lock .empty ());
568- String branchName = identifier .getBranchName ();
569- if (branchName == null ) {
570- branchName = "main" ;
571- }
572566 boolean success =
573- commit .commit (requestBody .getSnapshot (), branchName , Collections .emptyList ());
574- commitSnapshot (identifier , requestBody .getSnapshot (), null );
567+ commitSnapshot (identifier , requestBody .getSnapshot (), requestBody .getStatistics ());
575568 CommitTableResponse response = new CommitTableResponse (success );
576569 return mockResponse (response , 200 );
577570 }
@@ -1262,7 +1255,7 @@ protected void alterTableImpl(Identifier identifier, List<SchemaChange> changes)
12621255 }
12631256
12641257 private boolean commitSnapshot (
1265- Identifier identifier , Snapshot snapshot , List <Partition > statistics )
1258+ Identifier identifier , Snapshot snapshot , List <PartitionStatistics > statistics )
12661259 throws Catalog .TableNotExistException {
12671260 FileStoreTable table = getFileTable (identifier );
12681261 RenamingSnapshotCommit commit =
@@ -1273,6 +1266,7 @@ private boolean commitSnapshot(
12731266 }
12741267 try {
12751268 boolean success = commit .commit (snapshot , branchName , Collections .emptyList ());
1269+ // update snapshot and stats
12761270 tableSnapshotStore .compute (
12771271 identifier .getFullName (),
12781272 (k , old ) -> {
@@ -1281,12 +1275,12 @@ private boolean commitSnapshot(
12811275 long fileCount = 0 ;
12821276 long lastFileCreationTime = 0 ;
12831277 if (statistics != null ) {
1284- for (Partition partition : statistics ) {
1285- recordCount += partition .recordCount ();
1286- fileSizeInBytes += partition .fileSizeInBytes ();
1287- fileCount += partition .fileCount ();
1288- if (partition .lastFileCreationTime () > lastFileCreationTime ) {
1289- lastFileCreationTime = partition .lastFileCreationTime ();
1278+ for (PartitionStatistics stats : statistics ) {
1279+ recordCount += stats .recordCount ();
1280+ fileSizeInBytes += stats .fileSizeInBytes ();
1281+ fileCount += stats .fileCount ();
1282+ if (stats .lastFileCreationTime () > lastFileCreationTime ) {
1283+ lastFileCreationTime = stats .lastFileCreationTime ();
12901284 }
12911285 }
12921286 }
@@ -1305,6 +1299,82 @@ private boolean commitSnapshot(
13051299 lastFileCreationTime ,
13061300 fileSizeInBytes );
13071301 });
1302+ // upsert partitions stats
1303+ if (!tablePartitionsStore .containsKey (identifier .getFullName ())) {
1304+ if (statistics != null ) {
1305+ List <Partition > newPartitions =
1306+ statistics .stream ()
1307+ .map (
1308+ stats ->
1309+ new Partition (
1310+ stats .spec (),
1311+ stats .recordCount (),
1312+ stats .fileSizeInBytes (),
1313+ stats .fileCount (),
1314+ stats .lastFileCreationTime (),
1315+ false ))
1316+ .collect (Collectors .toList ());
1317+ tablePartitionsStore .put (identifier .getFullName (), newPartitions );
1318+ }
1319+ } else {
1320+ tablePartitionsStore .compute (
1321+ identifier .getFullName (),
1322+ (k , oldPartitions ) -> {
1323+ if (oldPartitions == null || statistics == null ) {
1324+ return oldPartitions ;
1325+ }
1326+ Map <Map <String , String >, PartitionStatistics > partitionStatisticsMap =
1327+ statistics .stream ()
1328+ .collect (
1329+ Collectors .toMap (
1330+ PartitionStatistics ::spec ,
1331+ y -> y ,
1332+ (a , b ) -> a ));
1333+ List <Partition > updatedPartitions =
1334+ oldPartitions .stream ()
1335+ .map (
1336+ oldPartition -> {
1337+ PartitionStatistics stats =
1338+ partitionStatisticsMap .get (
1339+ oldPartition .spec ());
1340+ if (stats == null ) {
1341+ return oldPartition ; // 如果没有新的统计信息,保持原样
1342+ }
1343+ return new Partition (
1344+ oldPartition .spec (),
1345+ oldPartition .recordCount ()
1346+ + stats .recordCount (),
1347+ oldPartition .fileSizeInBytes ()
1348+ + stats .fileSizeInBytes (),
1349+ oldPartition .fileCount ()
1350+ + stats .fileCount (),
1351+ Math .max (
1352+ oldPartition
1353+ .lastFileCreationTime (),
1354+ stats
1355+ .lastFileCreationTime ()),
1356+ oldPartition .done ());
1357+ })
1358+ .collect (Collectors .toList ());
1359+ return updatedPartitions ;
1360+ });
1361+ }
1362+ // clean up partitions
1363+ tablePartitionsStore
1364+ .entrySet ()
1365+ .removeIf (
1366+ entry -> {
1367+ List <Partition > partitions = entry .getValue ();
1368+ if (partitions == null ) {
1369+ return true ;
1370+ }
1371+ partitions .removeIf (
1372+ partition ->
1373+ partition .fileSizeInBytes () <= 0
1374+ && partition .fileCount () <= 0
1375+ && partition .recordCount () <= 0 );
1376+ return partitions .isEmpty ();
1377+ });
13081378 return success ;
13091379 } catch (Exception e ) {
13101380 throw new RuntimeException (e );
0 commit comments