Skip to content

Commit a9df02e

Browse files
authored
[core] Support chain tbl on batch mode (#6394)
1 parent 184b95a commit a9df02e

File tree

14 files changed

+1524
-10
lines changed

14 files changed

+1524
-10
lines changed
Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
---
2+
title: "Chain Table"
3+
weight: 6
4+
type: docs
5+
aliases:
6+
- /primary-key-table/chain-table.html
7+
---
8+
<!--
9+
Licensed to the Apache Software Foundation (ASF) under one
10+
or more contributor license agreements. See the NOTICE file
11+
distributed with this work for additional information
12+
regarding copyright ownership. The ASF licenses this file
13+
to you under the Apache License, Version 2.0 (the
14+
"License"); you may not use this file except in compliance
15+
with the License. You may obtain a copy of the License at
16+
17+
http://www.apache.org/licenses/LICENSE-2.0
18+
19+
Unless required by applicable law or agreed to in writing,
20+
software distributed under the License is distributed on an
21+
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
22+
KIND, either express or implied. See the License for the
23+
specific language governing permissions and limitations
24+
under the License.
25+
-->
26+
27+
# Chain Table
28+
29+
Chain table is a new capability for primary key tables that transforms how you process incremental data.
30+
Imagine a scenario where you periodically store a full snapshot of data (for example, once a day), even
31+
though only a small portion changes between snapshots. ODS binlog dump is a typical example of this pattern.
32+
33+
Taking a daily binlog dump job as an example. A batch job merges yesterday’s full dataset with today’s
34+
incremental changes to produce a new full dataset. This approach has two clear drawbacks:
35+
* Full computation: Merge operation includes all data, and it will involve shuffle, which results in poor performance.
36+
* Full storage: Store a full set of data every day, and the changed data usually accounts for a very small proportion.
37+
38+
Paimon addresses this problem by directly consuming only the changed data and performing merge-on-read.
39+
In this way, full computation and storage are turned into incremental mode:
40+
* Incremental computation: The offline ETL daily job only needs to consume the changed data of the current day and do not require merging all data.
41+
* Incremental Storage: Only store the changed data each day, and asynchronously compact it periodically (e.g., weekly) to build a global chain table within the lifecycle.
42+
{{< img src="/img/chain-table.png">}}
43+
44+
Based on the regular table, chain table introduces snapshot and delta branches to represent full and incremental
45+
data respectively. When writing, you specify the branch to write full or incremental data. When reading, paimon
46+
automatically chooses the appropriate strategy based on the read mode, such as full, incremental, or hybrid.
47+
48+
To enable chain table, you must config `chain-table.enabled` to true in the table options when creating the
49+
table, and the snapshot and delta branch need to be created as well. Consider an example via Spark SQL:
50+
51+
```sql
52+
CREATE TABLE default.t (
53+
`t1` string ,
54+
`t2` string ,
55+
`t3` string
56+
) PARTITIONED BY (`date` string)
57+
TBLPROPERTIES (
58+
'chain-table.enabled' = 'true',
59+
-- props about primary key table
60+
'primary-key' = 'date,t1',
61+
'sequence.field' = 't2',
62+
'bucket-key' = 't1',
63+
'bucket' = '2',
64+
-- props about partition
65+
'partition.timestamp-pattern' = '$date',
66+
'partition.timestamp-formatter' = 'yyyyMMdd'
67+
);
68+
69+
CALL sys.create_branch('default.t', 'snapshot');
70+
71+
CALL sys.create_branch('default.t', 'delta');
72+
73+
ALTER TABLE default.t SET tblproperties
74+
('scan.fallback-snapshot-branch' = 'snapshot',
75+
'scan.fallback-delta-branch' = 'delta');
76+
77+
ALTER TABLE `default`.`t$branch_snapshot` SET tblproperties
78+
('scan.fallback-snapshot-branch' = 'snapshot',
79+
'scan.fallback-delta-branch' = 'delta');
80+
81+
ALTER TABLE `default`.`t$branch_delta` SET tblproperties
82+
('scan.fallback-snapshot-branch' = 'snapshot',
83+
'scan.fallback-delta-branch' = 'delta');
84+
```
85+
86+
Notice that:
87+
- Chain table is only supported for primary key table, which means you should define `bucket` and `bucket-key` for the table.
88+
- Chain table should ensure that the schema of each branch is consistent.
89+
- Only spark support now, flink will be supported later.
90+
- Chain compact is not supported for now, and it will be supported later.
91+
92+
After creating a chain table, you can read and write data in the following ways.
93+
94+
- Full Write: Write data to t$branch_snapshot.
95+
```sql
96+
insert overwrite `default`.`t$branch_snapshot` partition (date = '20250810')
97+
values ('1', '1', '1');
98+
```
99+
100+
- Incremental Write: Write data to t$branch_delta.
101+
```sql
102+
insert overwrite `default`.`t$branch_delta` partition (date = '20250811')
103+
values ('2', '1', '1');
104+
```
105+
106+
- Full Query: If the snapshot branch has full partition, read it directly; otherwise, read on chain merge mode.
107+
```sql
108+
select t1, t2, t3 from default.t where date = '20250811'
109+
```
110+
you will get the following result:
111+
```text
112+
+---+----+-----+
113+
| t1| t2| t3|
114+
+---+----+-----+
115+
| 1 | 1| 1 |
116+
| 2 | 1| 1 |
117+
+---+----+-----+
118+
```
119+
120+
- Incremental Query: Read the incremental partition from t$branch_delta
121+
```sql
122+
select t1, t2, t3 from `default`.`t$branch_delta` where date = '20250811'
123+
```
124+
you will get the following result:
125+
```text
126+
+---+----+-----+
127+
| t1| t2| t3|
128+
+---+----+-----+
129+
| 2 | 1| 1 |
130+
+---+----+-----+
131+
```
132+
133+
- Hybrid Query: Read both full and incremental data simultaneously.
134+
```sql
135+
select t1, t2, t3 from default.t where date = '20250811'
136+
union all
137+
select t1, t2, t3 from `default`.`t$branch_delta` where date = '20250811'
138+
```
139+
you will get the following result:
140+
```text
141+
+---+----+-----+
142+
| t1| t2| t3|
143+
+---+----+-----+
144+
| 1 | 1| 1 |
145+
| 2 | 1| 1 |
146+
| 2 | 1| 1 |
147+
+---+----+-----+
148+
```

docs/layouts/shortcodes/generated/core_configuration.html

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,12 @@
9898
<td>MemorySize</td>
9999
<td>Memory page size for caching.</td>
100100
</tr>
101+
<tr>
102+
<td><h5>chain-table.enabled</h5></td>
103+
<td style="word-wrap: break-word;">false</td>
104+
<td>Boolean</td>
105+
<td>Whether enabled chain table.</td>
106+
</tr>
101107
<tr>
102108
<td><h5>changelog-file.compression</h5></td>
103109
<td style="word-wrap: break-word;">(none)</td>
@@ -1061,6 +1067,18 @@
10611067
<td>String</td>
10621068
<td>When a batch job queries from a table, if a partition does not exist in the current branch, the reader will try to get this partition from this fallback branch.</td>
10631069
</tr>
1070+
<tr>
1071+
<td><h5>scan.fallback-delta-branch</h5></td>
1072+
<td style="word-wrap: break-word;">(none)</td>
1073+
<td>String</td>
1074+
<td>When a batch job queries from a chain table, if a partition does not exist in either main or snapshot branch, the reader will try to get this partition from chain snapshot and delta branch together.</td>
1075+
</tr>
1076+
<tr>
1077+
<td><h5>scan.fallback-snapshot-branch</h5></td>
1078+
<td style="word-wrap: break-word;">(none)</td>
1079+
<td>String</td>
1080+
<td>When a batch job queries from a chain table, if a partition does not exist in the main branch, the reader will try to get this partition from chain snapshot branch.</td>
1081+
</tr>
10641082
<tr>
10651083
<td><h5>scan.file-creation-time-millis</h5></td>
10661084
<td style="word-wrap: break-word;">(none)</td>

docs/static/img/chain-table.png

201 KB
Loading

paimon-api/src/main/java/org/apache/paimon/CoreOptions.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,28 @@ public InlineElement getDescription() {
226226
public static final ConfigOption<String> BRANCH =
227227
key("branch").stringType().defaultValue("main").withDescription("Specify branch name.");
228228

229+
public static final ConfigOption<Boolean> CHAIN_TABLE_ENABLED =
230+
key("chain-table.enabled")
231+
.booleanType()
232+
.defaultValue(false)
233+
.withDescription("Whether enabled chain table.");
234+
235+
public static final ConfigOption<String> SCAN_FALLBACK_SNAPSHOT_BRANCH =
236+
key("scan.fallback-snapshot-branch")
237+
.stringType()
238+
.noDefaultValue()
239+
.withDescription(
240+
"When a batch job queries from a chain table, if a partition does not exist in the main branch, "
241+
+ "the reader will try to get this partition from chain snapshot branch.");
242+
243+
public static final ConfigOption<String> SCAN_FALLBACK_DELTA_BRANCH =
244+
key("scan.fallback-delta-branch")
245+
.stringType()
246+
.noDefaultValue()
247+
.withDescription(
248+
"When a batch job queries from a chain table, if a partition does not exist in either main or snapshot branch, "
249+
+ "the reader will try to get this partition from chain snapshot and delta branch together.");
250+
229251
public static final String FILE_FORMAT_ORC = "orc";
230252
public static final String FILE_FORMAT_AVRO = "avro";
231253
public static final String FILE_FORMAT_PARQUET = "parquet";
@@ -3254,6 +3276,18 @@ public int lookupMergeRecordsThreshold() {
32543276
return options.get(LOOKUP_MERGE_RECORDS_THRESHOLD);
32553277
}
32563278

3279+
public boolean isChainTable() {
3280+
return options.get(CHAIN_TABLE_ENABLED);
3281+
}
3282+
3283+
public String scanFallbackSnapshotBranch() {
3284+
return options.get(SCAN_FALLBACK_SNAPSHOT_BRANCH);
3285+
}
3286+
3287+
public String scanFallbackDeltaBranch() {
3288+
return options.get(SCAN_FALLBACK_DELTA_BRANCH);
3289+
}
3290+
32573291
public boolean formatTableImplementationIsPaimon() {
32583292
return options.get(FORMAT_TABLE_IMPLEMENTATION) == FormatTableImplementation.PAIMON;
32593293
}

paimon-core/src/main/java/org/apache/paimon/io/ChainKeyValueFileReaderFactory.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,8 +70,19 @@ public ChainKeyValueFileReaderFactory(
7070
this.chainReadContext = chainReadContext;
7171
CoreOptions options = new CoreOptions(schema.options());
7272
this.currentBranch = options.branch();
73+
String snapshotBranch = options.scanFallbackSnapshotBranch();
74+
String deltaBranch = options.scanFallbackDeltaBranch();
75+
SchemaManager snapshotSchemaManager =
76+
snapshotBranch.equalsIgnoreCase(currentBranch)
77+
? schemaManager
78+
: schemaManager.copyWithBranch(snapshotBranch);
79+
SchemaManager deltaSchemaManager =
80+
deltaBranch.equalsIgnoreCase(currentBranch)
81+
? schemaManager
82+
: schemaManager.copyWithBranch(deltaBranch);
7383
this.branchSchemaManagers = new HashMap<>();
74-
this.branchSchemaManagers.put(currentBranch, schemaManager);
84+
this.branchSchemaManagers.put(snapshotBranch, snapshotSchemaManager);
85+
this.branchSchemaManagers.put(deltaBranch, deltaSchemaManager);
7586
}
7687

7788
@Override

paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.paimon.CoreOptions;
2222
import org.apache.paimon.CoreOptions.ChangelogProducer;
2323
import org.apache.paimon.CoreOptions.MergeEngine;
24+
import org.apache.paimon.TableType;
2425
import org.apache.paimon.factories.FactoryUtil;
2526
import org.apache.paimon.format.FileFormat;
2627
import org.apache.paimon.mergetree.compact.aggregate.FieldAggregator;
@@ -39,6 +40,7 @@
3940
import org.apache.paimon.types.MultisetType;
4041
import org.apache.paimon.types.RowType;
4142
import org.apache.paimon.types.TimestampType;
43+
import org.apache.paimon.utils.Preconditions;
4244
import org.apache.paimon.utils.StringUtils;
4345

4446
import java.util.ArrayList;
@@ -241,6 +243,8 @@ public static void validateTableSchema(TableSchema schema) {
241243
validateRowTracking(schema, options);
242244

243245
validateIncrementalClustering(schema, options);
246+
247+
validateChainTable(schema, options);
244248
}
245249

246250
public static void validateFallbackBranch(SchemaManager schemaManager, TableSchema schema) {
@@ -679,4 +683,34 @@ private static void validateIncrementalClustering(TableSchema schema, CoreOption
679683
PRIMARY_KEY.key());
680684
}
681685
}
686+
687+
public static void validateChainTable(TableSchema schema, CoreOptions options) {
688+
if (options.isChainTable()) {
689+
boolean isPrimaryTbl = schema.primaryKeys() != null && !schema.primaryKeys().isEmpty();
690+
boolean isPartitionTbl =
691+
schema.partitionKeys() != null && !schema.partitionKeys().isEmpty();
692+
ChangelogProducer changelogProducer = options.changelogProducer();
693+
Preconditions.checkArgument(
694+
options.type() == TableType.TABLE, "Chain table must be table type.");
695+
Preconditions.checkArgument(isPrimaryTbl, "Primary key is required for chain table.");
696+
Preconditions.checkArgument(isPartitionTbl, "Chain table must be partition table.");
697+
Preconditions.checkArgument(
698+
options.bucket() > 0, "Bucket number must be greater than 0 for chain table.");
699+
Preconditions.checkArgument(
700+
options.sequenceField() != null, "Sequence field is required for chain table.");
701+
Preconditions.checkArgument(
702+
options.mergeEngine() == MergeEngine.DEDUPLICATE,
703+
"Merge engine must be deduplicate for chain table.");
704+
Preconditions.checkArgument(
705+
changelogProducer == ChangelogProducer.NONE
706+
|| changelogProducer == ChangelogProducer.INPUT,
707+
"Changelog producer must be none or input for chain table.");
708+
Preconditions.checkArgument(
709+
options.partitionTimestampPattern() != null,
710+
"Partition timestamp pattern is required for chain table.");
711+
Preconditions.checkArgument(
712+
options.partitionTimestampFormatter() != null,
713+
"Partition timestamp formatter is required for chain table.");
714+
}
715+
}
682716
}

0 commit comments

Comments
 (0)