Skip to content

Commit 02c54f5

Browse files
authored
[core] Introduce bucket entries to optimize Spark compact (#4162)
1 parent a2892b1 commit 02c54f5

File tree

18 files changed

+685
-199
lines changed

18 files changed

+685
-199
lines changed

docs/content/maintenance/system-tables.md

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -309,6 +309,22 @@ SELECT * FROM my_table$partitions;
309309
*/
310310
```
311311

312+
### Buckets Table
313+
314+
You can query the bucket files of the table.
315+
316+
```sql
317+
SELECT * FROM my_table$buckets;
318+
319+
/*
320+
+---------------+--------+----------------+--------------------+--------------------+------------------------+
321+
| partition | bucket | record_count | file_size_in_bytes| file_count| last_update_time|
322+
+---------------+--------+----------------+--------------------+--------------------+------------------------+
323+
| [1] | 0 | 1 | 645 | 1 | 2024-06-24 10:25:57.400|
324+
+---------------+--------+----------------+--------------------+--------------------+------------------------+
325+
*/
326+
```
327+
312328
## Global System Table
313329

314330
Global system tables contain the statistical information of all the tables exists in paimon. For convenient of searching, we create a reference system database called `sys`.
Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.manifest;
20+
21+
import org.apache.paimon.annotation.Public;
22+
import org.apache.paimon.data.BinaryRow;
23+
import org.apache.paimon.utils.Pair;
24+
25+
import java.util.Collection;
26+
import java.util.HashMap;
27+
import java.util.Map;
28+
import java.util.Objects;
29+
30+
/** Entry representing a bucket. */
31+
@Public
32+
public class BucketEntry {
33+
34+
private final BinaryRow partition;
35+
private final int bucket;
36+
private final long recordCount;
37+
private final long fileSizeInBytes;
38+
private final long fileCount;
39+
private final long lastFileCreationTime;
40+
41+
public BucketEntry(
42+
BinaryRow partition,
43+
int bucket,
44+
long recordCount,
45+
long fileSizeInBytes,
46+
long fileCount,
47+
long lastFileCreationTime) {
48+
this.partition = partition;
49+
this.bucket = bucket;
50+
this.recordCount = recordCount;
51+
this.fileSizeInBytes = fileSizeInBytes;
52+
this.fileCount = fileCount;
53+
this.lastFileCreationTime = lastFileCreationTime;
54+
}
55+
56+
public BinaryRow partition() {
57+
return partition;
58+
}
59+
60+
public int bucket() {
61+
return bucket;
62+
}
63+
64+
public long recordCount() {
65+
return recordCount;
66+
}
67+
68+
public long fileSizeInBytes() {
69+
return fileSizeInBytes;
70+
}
71+
72+
public long fileCount() {
73+
return fileCount;
74+
}
75+
76+
public long lastFileCreationTime() {
77+
return lastFileCreationTime;
78+
}
79+
80+
public BucketEntry merge(BucketEntry entry) {
81+
return new BucketEntry(
82+
partition,
83+
bucket,
84+
recordCount + entry.recordCount,
85+
fileSizeInBytes + entry.fileSizeInBytes,
86+
fileCount + entry.fileCount,
87+
Math.max(lastFileCreationTime, entry.lastFileCreationTime));
88+
}
89+
90+
public static BucketEntry fromManifestEntry(ManifestEntry entry) {
91+
PartitionEntry partitionEntry = PartitionEntry.fromManifestEntry(entry);
92+
return new BucketEntry(
93+
partitionEntry.partition(),
94+
entry.bucket(),
95+
partitionEntry.recordCount(),
96+
partitionEntry.fileSizeInBytes(),
97+
partitionEntry.fileCount(),
98+
partitionEntry.lastFileCreationTime());
99+
}
100+
101+
public static Collection<BucketEntry> merge(Collection<ManifestEntry> fileEntries) {
102+
Map<Pair<BinaryRow, Integer>, BucketEntry> buckets = new HashMap<>();
103+
for (ManifestEntry entry : fileEntries) {
104+
BucketEntry bucketEntry = fromManifestEntry(entry);
105+
buckets.compute(
106+
Pair.of(entry.partition(), entry.bucket()),
107+
(part, old) -> old == null ? bucketEntry : old.merge(bucketEntry));
108+
}
109+
return buckets.values();
110+
}
111+
112+
public static void merge(
113+
Collection<BucketEntry> from, Map<Pair<BinaryRow, Integer>, BucketEntry> to) {
114+
for (BucketEntry entry : from) {
115+
to.compute(
116+
Pair.of(entry.partition(), entry.bucket),
117+
(part, old) -> old == null ? entry : old.merge(entry));
118+
}
119+
}
120+
121+
@Override
122+
public boolean equals(Object o) {
123+
if (this == o) {
124+
return true;
125+
}
126+
if (o == null || getClass() != o.getClass()) {
127+
return false;
128+
}
129+
BucketEntry that = (BucketEntry) o;
130+
return recordCount == that.recordCount
131+
&& fileSizeInBytes == that.fileSizeInBytes
132+
&& fileCount == that.fileCount
133+
&& lastFileCreationTime == that.lastFileCreationTime
134+
&& bucket == that.bucket
135+
&& Objects.equals(partition, that.partition);
136+
}
137+
138+
@Override
139+
public int hashCode() {
140+
return Objects.hash(
141+
partition, bucket, recordCount, fileSizeInBytes, fileCount, lastFileCreationTime);
142+
}
143+
}

paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.paimon.Snapshot;
2323
import org.apache.paimon.data.BinaryRow;
2424
import org.apache.paimon.data.InternalRow;
25+
import org.apache.paimon.manifest.BucketEntry;
2526
import org.apache.paimon.manifest.FileEntry;
2627
import org.apache.paimon.manifest.ManifestCacheFilter;
2728
import org.apache.paimon.manifest.ManifestEntry;
@@ -55,7 +56,6 @@
5556
import java.util.Map;
5657
import java.util.concurrent.ConcurrentHashMap;
5758
import java.util.concurrent.ConcurrentMap;
58-
import java.util.concurrent.ThreadPoolExecutor;
5959
import java.util.function.Consumer;
6060
import java.util.function.Function;
6161
import java.util.stream.Collectors;
@@ -259,18 +259,28 @@ public List<SimpleFileEntry> readSimpleEntries() {
259259
public List<PartitionEntry> readPartitionEntries() {
260260
List<ManifestFileMeta> manifests = readManifests().getRight();
261261
Map<BinaryRow, PartitionEntry> partitions = new ConcurrentHashMap<>();
262-
// Can be executed in disorder
263-
ThreadPoolExecutor executor = getExecutorService(scanManifestParallelism);
264262
Consumer<ManifestFileMeta> processor =
265263
m ->
266264
PartitionEntry.merge(
267265
PartitionEntry.merge(readManifestFileMeta(m)), partitions);
268-
randomlyOnlyExecute(executor, processor, manifests);
266+
randomlyOnlyExecute(getExecutorService(scanManifestParallelism), processor, manifests);
269267
return partitions.values().stream()
270268
.filter(p -> p.fileCount() > 0)
271269
.collect(Collectors.toList());
272270
}
273271

272+
@Override
273+
public List<BucketEntry> readBucketEntries() {
274+
List<ManifestFileMeta> manifests = readManifests().getRight();
275+
Map<Pair<BinaryRow, Integer>, BucketEntry> buckets = new ConcurrentHashMap<>();
276+
Consumer<ManifestFileMeta> processor =
277+
m -> BucketEntry.merge(BucketEntry.merge(readManifestFileMeta(m)), buckets);
278+
randomlyOnlyExecute(getExecutorService(scanManifestParallelism), processor, manifests);
279+
return buckets.values().stream()
280+
.filter(p -> p.fileCount() > 0)
281+
.collect(Collectors.toList());
282+
}
283+
274284
private Pair<Snapshot, List<ManifestEntry>> doPlan() {
275285
long started = System.nanoTime();
276286
Pair<Snapshot, List<ManifestFileMeta>> snapshotListPair = readManifests();

paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.paimon.Snapshot;
2222
import org.apache.paimon.data.BinaryRow;
2323
import org.apache.paimon.io.DataFileMeta;
24+
import org.apache.paimon.manifest.BucketEntry;
2425
import org.apache.paimon.manifest.FileKind;
2526
import org.apache.paimon.manifest.ManifestCacheFilter;
2627
import org.apache.paimon.manifest.ManifestEntry;
@@ -98,6 +99,8 @@ default Long totalRecordCount(Snapshot snapshot) {
9899

99100
List<PartitionEntry> readPartitionEntries();
100101

102+
List<BucketEntry> readBucketEntries();
103+
101104
default List<BinaryRow> listPartitions() {
102105
return readPartitionEntries().stream()
103106
.map(PartitionEntry::partition)

paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.paimon.Snapshot;
2222
import org.apache.paimon.consumer.ConsumerManager;
2323
import org.apache.paimon.data.BinaryRow;
24+
import org.apache.paimon.manifest.BucketEntry;
2425
import org.apache.paimon.manifest.ManifestEntry;
2526
import org.apache.paimon.manifest.PartitionEntry;
2627
import org.apache.paimon.metrics.MetricRegistry;
@@ -88,6 +89,8 @@ public interface SnapshotReader {
8889

8990
List<PartitionEntry> partitionEntries();
9091

92+
List<BucketEntry> bucketEntries();
93+
9194
/** Result plan of this scan. */
9295
interface Plan extends TableScan.Plan {
9396

paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.paimon.index.IndexFileHandler;
2828
import org.apache.paimon.index.IndexFileMeta;
2929
import org.apache.paimon.io.DataFileMeta;
30+
import org.apache.paimon.manifest.BucketEntry;
3031
import org.apache.paimon.manifest.FileKind;
3132
import org.apache.paimon.manifest.ManifestEntry;
3233
import org.apache.paimon.manifest.PartitionEntry;
@@ -335,6 +336,11 @@ public List<PartitionEntry> partitionEntries() {
335336
return scan.readPartitionEntries();
336337
}
337338

339+
@Override
340+
public List<BucketEntry> bucketEntries() {
341+
return scan.readBucketEntries();
342+
}
343+
338344
@Override
339345
public Plan readChanges() {
340346
withMode(ScanMode.DELTA);

paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.paimon.disk.IOManager;
2828
import org.apache.paimon.fs.FileIO;
2929
import org.apache.paimon.fs.Path;
30+
import org.apache.paimon.manifest.BucketEntry;
3031
import org.apache.paimon.manifest.IndexManifestEntry;
3132
import org.apache.paimon.manifest.ManifestEntry;
3233
import org.apache.paimon.manifest.ManifestFileMeta;
@@ -359,6 +360,11 @@ public List<BinaryRow> partitions() {
359360
public List<PartitionEntry> partitionEntries() {
360361
return snapshotReader.partitionEntries();
361362
}
363+
364+
@Override
365+
public List<BucketEntry> bucketEntries() {
366+
return snapshotReader.bucketEntries();
367+
}
362368
}
363369

364370
private class AuditLogBatchScan implements DataTableScan {

0 commit comments

Comments
 (0)