Skip to content

Commit c2cd7d1

Browse files
author
bosiew.tian
committed
[core] fix1
1 parent 1e104c6 commit c2cd7d1

File tree

10 files changed

+147
-65
lines changed

10 files changed

+147
-65
lines changed

docs/content/concepts/system-tables.md

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,51 @@ SELECT * FROM my_table$audit_log;
136136
*/
137137
```
138138

139+
#### Reading with Sequence Number
140+
141+
For primary key tables, you can enable the `table-read.sequence-number.enabled` option to include the `_SEQUENCE_NUMBER` field in the output.
142+
143+
{{< tabs "audit-log-sequence-number" >}}
144+
145+
{{< tab "Enable via CREATE TABLE" >}}
146+
```sql
147+
CREATE TABLE my_table (
148+
...
149+
) WITH (
150+
'table-read.sequence-number.enabled' = 'true'
151+
);
152+
```
153+
{{< /tab >}}
154+
155+
{{< tab "Enable via ALTER TABLE" >}}
156+
```sql
157+
ALTER TABLE my_table SET ('table-read.sequence-number.enabled' = 'true');
158+
```
159+
{{< /tab >}}
160+
161+
{{< /tabs >}}
162+
163+
```sql
164+
SELECT * FROM my_table$audit_log;
165+
166+
/*
167+
+------------------+--------------------+-----------------+-----------------+
168+
| rowkind | _SEQUENCE_NUMBER | column_0 | column_1 |
169+
+------------------+--------------------+-----------------+-----------------+
170+
| +I | 0 | ... | ... |
171+
+------------------+--------------------+-----------------+-----------------+
172+
| -U | 0 | ... | ... |
173+
+------------------+--------------------+-----------------+-----------------+
174+
| +U | 1 | ... | ... |
175+
+------------------+--------------------+-----------------+-----------------+
176+
3 rows in set
177+
*/
178+
```
179+
180+
{{< hint info >}}
181+
The `table-read.sequence-number.enabled` option cannot be set via SQL hints.
182+
{{< /hint >}}
183+
139184
### Binlog Table
140185

141186
You can query the binlog through binlog table. In the binlog system table, the update before and update after will be packed in one row.
@@ -158,6 +203,24 @@ SELECT * FROM T$binlog;
158203
*/
159204
```
160205

206+
Similar to the audit_log table, you can also enable `table-read.sequence-number.enabled` to include `_SEQUENCE_NUMBER` in the binlog table output:
207+
208+
```sql
209+
SELECT * FROM T$binlog;
210+
211+
/*
212+
+------------------+--------------------+----------------------+-----------------------+
213+
| rowkind | _SEQUENCE_NUMBER | column_0 | column_1 |
214+
+------------------+--------------------+----------------------+-----------------------+
215+
| +I | 0 | [col_0] | [col_1] |
216+
+------------------+--------------------+----------------------+-----------------------+
217+
| +U | 1 | [col_0_ub, col_0_ua] | [col_1_ub, col_1_ua] |
218+
+------------------+--------------------+----------------------+-----------------------+
219+
| -D | 2 | [col_0] | [col_1] |
220+
+------------------+--------------------+----------------------+-----------------------+
221+
*/
222+
```
223+
161224
### Read-optimized Table
162225

163226
If you require extreme reading performance and can accept reading slightly old data,

docs/layouts/shortcodes/generated/core_configuration.html

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -147,10 +147,10 @@
147147
<td>Fields that are ignored for comparison while generating -U, +U changelog for the same record. This configuration is only valid for the changelog-producer.row-deduplicate is true.</td>
148148
</tr>
149149
<tr>
150-
<td><h5>changelog-read.sequence-number.enabled</h5></td>
150+
<td><h5>table-read.sequence-number.enabled</h5></td>
151151
<td style="word-wrap: break-word;">false</td>
152152
<td>Boolean</td>
153-
<td>Whether to include _SEQUENCE_NUMBER field in audit_log and binlog system tables. This is only valid for primary key tables.</td>
153+
<td>Whether to include the _SEQUENCE_NUMBER field when reading the audit_log or binlog system tables. This is only valid for primary key tables.</td>
154154
</tr>
155155
<tr>
156156
<td><h5>changelog.num-retained.max</h5></td>

paimon-api/src/main/java/org/apache/paimon/CoreOptions.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -827,13 +827,13 @@ public InlineElement getDescription() {
827827
.withDescription(
828828
"Fields that are ignored for comparison while generating -U, +U changelog for the same record. This configuration is only valid for the changelog-producer.row-deduplicate is true.");
829829

830-
public static final ConfigOption<Boolean> CHANGELOG_READ_SEQUENCE_NUMBER_ENABLED =
831-
key("changelog-read.sequence-number.enabled")
830+
public static final ConfigOption<Boolean> TABLE_READ_SEQUENCE_NUMBER_ENABLED =
831+
key("table-read.sequence-number.enabled")
832832
.booleanType()
833833
.defaultValue(false)
834834
.withDescription(
835-
"Whether to include _SEQUENCE_NUMBER field in audit_log and binlog system tables. "
836-
+ "This is only valid for primary key tables.");
835+
"Whether to include the _SEQUENCE_NUMBER field when reading the audit_log or binlog "
836+
+ "system tables. This is only valid for primary key tables.");
837837

838838
public static final ConfigOption<String> SEQUENCE_FIELD =
839839
key("sequence.field")
@@ -2729,8 +2729,8 @@ public List<String> changelogRowDeduplicateIgnoreFields() {
27292729
.orElse(Collections.emptyList());
27302730
}
27312731

2732-
public boolean changelogReadSequenceNumberEnabled() {
2733-
return options.get(CHANGELOG_READ_SEQUENCE_NUMBER_ENABLED);
2732+
public boolean tableReadSequenceNumberEnabled() {
2733+
return options.get(TABLE_READ_SEQUENCE_NUMBER_ENABLED);
27342734
}
27352735

27362736
public boolean scanPlanSortPartition() {

paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -723,12 +723,12 @@ public static void validateChainTable(TableSchema schema, CoreOptions options) {
723723

724724
private static void validateChangelogReadSequenceNumber(
725725
TableSchema schema, CoreOptions options) {
726-
if (options.changelogReadSequenceNumberEnabled()) {
726+
if (options.tableReadSequenceNumberEnabled()) {
727727
checkArgument(
728728
!schema.primaryKeys().isEmpty(),
729729
"Cannot enable '%s' for non-primary-key table. "
730730
+ "Sequence number is only available for primary key tables.",
731-
CoreOptions.CHANGELOG_READ_SEQUENCE_NUMBER_ENABLED.key());
731+
CoreOptions.TABLE_READ_SEQUENCE_NUMBER_ENABLED.key());
732732
}
733733
}
734734
}

paimon-core/src/main/java/org/apache/paimon/table/source/ValueContentRowDataRecordIterator.java

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,11 @@
1818

1919
package org.apache.paimon.table.source;
2020

21-
import org.apache.paimon.CoreOptions;
2221
import org.apache.paimon.KeyValue;
2322
import org.apache.paimon.data.GenericRow;
2423
import org.apache.paimon.data.InternalRow;
2524
import org.apache.paimon.data.JoinedRow;
2625
import org.apache.paimon.reader.RecordReader;
27-
import org.apache.paimon.table.system.AuditLogTable;
2826

2927
import java.io.IOException;
3028
import java.util.HashMap;
@@ -33,8 +31,9 @@
3331
/** A {@link RecordReader.RecordIterator} mapping a {@link KeyValue} to its value. */
3432
public class ValueContentRowDataRecordIterator extends ResetRowKindRecordIterator {
3533

36-
private final boolean includeSequenceNumber;
37-
private final boolean auditLogEnabled;
34+
public static final String KEY_VALUE_SEQUENCE_NUMBER_ENABLED =
35+
"key-value.sequence_number.enabled";
36+
private final boolean keyValueSequenceNumberEnabled;
3837

3938
public ValueContentRowDataRecordIterator(RecordReader.RecordIterator<KeyValue> kvIterator) {
4039
this(kvIterator, new HashMap<>(1));
@@ -43,11 +42,9 @@ public ValueContentRowDataRecordIterator(RecordReader.RecordIterator<KeyValue> k
4342
public ValueContentRowDataRecordIterator(
4443
RecordReader.RecordIterator<KeyValue> kvIterator, Map<String, String> schemaOptions) {
4544
super(kvIterator);
46-
this.includeSequenceNumber =
47-
CoreOptions.fromMap(schemaOptions).changelogReadSequenceNumberEnabled();
48-
this.auditLogEnabled =
45+
this.keyValueSequenceNumberEnabled =
4946
Boolean.parseBoolean(
50-
schemaOptions.getOrDefault(AuditLogTable.AUDIT_LOG_ENABLED, "false"));
47+
schemaOptions.getOrDefault(KEY_VALUE_SEQUENCE_NUMBER_ENABLED, "false"));
5148
}
5249

5350
@Override
@@ -60,7 +57,7 @@ public InternalRow next() throws IOException {
6057
InternalRow rowData = kv.value();
6158
rowData.setRowKind(kv.valueKind());
6259

63-
if (includeSequenceNumber && auditLogEnabled) {
60+
if (keyValueSequenceNumberEnabled) {
6461
JoinedRow joinedRow = new JoinedRow();
6562
GenericRow systemFieldsRow = new GenericRow(1);
6663
systemFieldsRow.setField(0, kv.sequenceNumber());

paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java

Lines changed: 27 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -81,40 +81,44 @@
8181
import java.util.Optional;
8282
import java.util.stream.Collectors;
8383

84+
import static org.apache.paimon.CoreOptions.TABLE_READ_SEQUENCE_NUMBER_ENABLED;
8485
import static org.apache.paimon.catalog.Identifier.SYSTEM_TABLE_SPLITTER;
86+
import static org.apache.paimon.table.source.ValueContentRowDataRecordIterator.KEY_VALUE_SEQUENCE_NUMBER_ENABLED;
8587

8688
/** A {@link Table} for reading audit log of table. */
8789
public class AuditLogTable implements DataTable, ReadonlyTable {
8890

8991
public static final String AUDIT_LOG = "audit_log";
90-
public static final String AUDIT_LOG_ENABLED = "audit.log.enabled";
9192

9293
protected final FileStoreTable wrapped;
9394

94-
/** Number of special fields (rowkind, and optionally _SEQUENCE_NUMBER). */
95-
protected final int specialFieldCount;
95+
protected final List<DataField> specialFields;
9696

9797
public AuditLogTable(FileStoreTable wrapped) {
9898
this.wrapped = wrapped;
99-
this.wrapped.schema().options().put(AUDIT_LOG_ENABLED, "true");
100-
this.specialFieldCount =
101-
coreOptions().changelogReadSequenceNumberEnabled()
102-
&& !wrapped.primaryKeys().isEmpty()
103-
? 2
104-
: 1;
99+
this.specialFields = new ArrayList<>();
100+
specialFields.add(SpecialFields.ROW_KIND);
101+
102+
boolean includeSequenceNumber =
103+
CoreOptions.fromMap(wrapped.options()).tableReadSequenceNumberEnabled();
104+
105+
if (includeSequenceNumber) {
106+
this.wrapped.options().put(KEY_VALUE_SEQUENCE_NUMBER_ENABLED, "true");
107+
specialFields.add(SpecialFields.SEQUENCE_NUMBER);
108+
}
105109
}
106110

107111
/** Creates a PredicateReplaceVisitor that adjusts field indices by systemFieldCount. */
108112
private PredicateReplaceVisitor createPredicateConverter() {
109113
return p -> {
110-
if (p.index() < specialFieldCount) {
114+
if (p.index() < specialFields.size()) {
111115
return Optional.empty();
112116
}
113117
return Optional.of(
114118
new LeafPredicate(
115119
p.function(),
116120
p.type(),
117-
p.index() - specialFieldCount,
121+
p.index() - specialFields.size(),
118122
p.fieldName(),
119123
p.literals()));
120124
};
@@ -152,11 +156,7 @@ public String name() {
152156

153157
@Override
154158
public RowType rowType() {
155-
List<DataField> fields = new ArrayList<>();
156-
fields.add(SpecialFields.ROW_KIND);
157-
if (specialFieldCount > 1) {
158-
fields.add(SpecialFields.SEQUENCE_NUMBER);
159-
}
159+
List<DataField> fields = new ArrayList<>(specialFields);
160160
fields.addAll(wrapped.rowType().getFields());
161161
return new RowType(fields);
162162
}
@@ -243,6 +243,11 @@ public InnerTableRead newRead() {
243243

244244
@Override
245245
public Table copy(Map<String, String> dynamicOptions) {
246+
if (Boolean.parseBoolean(
247+
dynamicOptions.getOrDefault(TABLE_READ_SEQUENCE_NUMBER_ENABLED.key(), "false"))) {
248+
throw new UnsupportedOperationException(
249+
"table-read.sequence-number.enabled is not supported by hint.");
250+
}
246251
return new AuditLogTable(wrapped.copy(dynamicOptions));
247252
}
248253

@@ -648,8 +653,8 @@ class AuditLogRead implements InnerTableRead {
648653

649654
// Special index for rowkind field
650655
protected static final int ROW_KIND_INDEX = -1;
651-
// Special index for _SEQUENCE_NUMBER field
652-
protected static final int SEQUENCE_NUMBER_INDEX = -2;
656+
// _SEQUENCE_NUMBER is at index 0 by setting: KEY_VALUE_SEQUENCE_NUMBER_ENABLED
657+
protected static final int SEQUENCE_NUMBER_INDEX = 0;
653658

654659
protected final InnerTableRead dataRead;
655660

@@ -663,13 +668,13 @@ protected AuditLogRead(InnerTableRead dataRead) {
663668
/** Default projection, add system fields (rowkind, and optionally _SEQUENCE_NUMBER). */
664669
private int[] defaultProjection() {
665670
int dataFieldCount = wrapped.rowType().getFieldCount();
666-
int[] projection = new int[dataFieldCount + specialFieldCount];
671+
int[] projection = new int[dataFieldCount + specialFields.size()];
667672
projection[0] = ROW_KIND_INDEX;
668-
if (specialFieldCount > 1) {
673+
if (specialFields.contains(SpecialFields.SEQUENCE_NUMBER)) {
669674
projection[1] = SEQUENCE_NUMBER_INDEX;
670675
}
671676
for (int i = 0; i < dataFieldCount; i++) {
672-
projection[specialFieldCount + i] = i + specialFieldCount - 1;
677+
projection[specialFields.size() + i] = i + specialFields.size() - 1;
673678
}
674679
return projection;
675680
}
@@ -687,7 +692,7 @@ private int[] buildProjection(RowType readType) {
687692
} else if (fieldName.equals(SpecialFields.SEQUENCE_NUMBER.name())) {
688693
projection[i] = SEQUENCE_NUMBER_INDEX;
689694
} else {
690-
projection[i] = dataFieldIndex + specialFieldCount - 1;
695+
projection[i] = dataFieldIndex + specialFields.size() - 1;
691696
dataFieldIndex++;
692697
}
693698
}
@@ -764,11 +769,6 @@ public boolean isNullAt(int pos) {
764769

765770
@Override
766771
public long getLong(int pos) {
767-
int index = indexMapping[pos];
768-
if (index == AuditLogRead.SEQUENCE_NUMBER_INDEX) {
769-
// _SEQUENCE_NUMBER is at index 0 in bottom output
770-
return row.getLong(0);
771-
}
772772
return super.getLong(pos);
773773
}
774774

paimon-core/src/main/java/org/apache/paimon/table/system/BinlogTable.java

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import java.util.Map;
4242
import java.util.stream.IntStream;
4343

44+
import static org.apache.paimon.CoreOptions.TABLE_READ_SEQUENCE_NUMBER_ENABLED;
4445
import static org.apache.paimon.catalog.Identifier.SYSTEM_TABLE_SPLITTER;
4546

4647
/**
@@ -67,11 +68,7 @@ public String name() {
6768

6869
@Override
6970
public RowType rowType() {
70-
List<DataField> fields = new ArrayList<>();
71-
fields.add(SpecialFields.ROW_KIND);
72-
if (specialFieldCount > 1) {
73-
fields.add(SpecialFields.SEQUENCE_NUMBER);
74-
}
71+
List<DataField> fields = new ArrayList<>(specialFields);
7572
for (DataField field : wrapped.rowType().getFields()) {
7673
// convert to nullable
7774
fields.add(field.newType(new ArrayType(field.type().nullable())));
@@ -86,6 +83,11 @@ public InnerTableRead newRead() {
8683

8784
@Override
8885
public Table copy(Map<String, String> dynamicOptions) {
86+
if (Boolean.parseBoolean(
87+
dynamicOptions.getOrDefault(TABLE_READ_SEQUENCE_NUMBER_ENABLED.key(), "false"))) {
88+
throw new UnsupportedOperationException(
89+
"table-read.sequence-number.enabled is not supported by hint.");
90+
}
8991
return new BinlogTable(wrapped.copy(dynamicOptions));
9092
}
9193

@@ -120,7 +122,7 @@ public RecordReader<InternalRow> createReader(Split split) throws IOException {
120122
// When sequence number is enabled, the underlying data layout is:
121123
// [_SEQUENCE_NUMBER, pk, pt, col1, ...]
122124
// We need to offset the field index to skip the sequence number field.
123-
int offset = specialFieldCount - 1;
125+
int offset = specialFields.size() - 1;
124126
InternalRow.FieldGetter[] fieldGetters =
125127
IntStream.range(0, wrappedReadType.getFieldCount())
126128
.mapToObj(
@@ -151,7 +153,7 @@ private InternalRow convertToArray(
151153
@Nullable InternalRow row2,
152154
InternalRow.FieldGetter[] fieldGetters) {
153155
// seqOffset is 1 if sequence number is enabled, 0 otherwise
154-
int seqOffset = specialFieldCount - 1;
156+
int seqOffset = specialFields.size() - 1;
155157
GenericRow row = new GenericRow(fieldGetters.length + seqOffset);
156158

157159
// Copy sequence number if enabled (it's at index 0 in input row)

paimon-core/src/test/java/org/apache/paimon/table/system/AuditLogTableTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ public void testReadSequenceNumberWithAlterTable() throws Exception {
8181
catalog.alterTable(
8282
identifier(tableName),
8383
SchemaChange.setOption(
84-
CoreOptions.CHANGELOG_READ_SEQUENCE_NUMBER_ENABLED.key(), "true"),
84+
CoreOptions.TABLE_READ_SEQUENCE_NUMBER_ENABLED.key(), "true"),
8585
false);
8686

8787
// Re-fetch the audit_log table to get updated schema
@@ -113,7 +113,7 @@ private AuditLogTable createAuditLogTable(String tableName, boolean enableSequen
113113
.option(CoreOptions.CHANGELOG_PRODUCER.key(), "input")
114114
.option("bucket", "1");
115115
if (enableSequenceNumber) {
116-
schemaBuilder.option(CoreOptions.CHANGELOG_READ_SEQUENCE_NUMBER_ENABLED.key(), "true");
116+
schemaBuilder.option(CoreOptions.TABLE_READ_SEQUENCE_NUMBER_ENABLED.key(), "true");
117117
}
118118

119119
TableSchema tableSchema =

paimon-core/src/test/java/org/apache/paimon/table/system/BinlogTableTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ public void testReadSequenceNumberWithAlterTable() throws Exception {
8383
catalog.alterTable(
8484
identifier(tableName),
8585
SchemaChange.setOption(
86-
CoreOptions.CHANGELOG_READ_SEQUENCE_NUMBER_ENABLED.key(), "true"),
86+
CoreOptions.TABLE_READ_SEQUENCE_NUMBER_ENABLED.key(), "true"),
8787
false);
8888

8989
// Re-fetch the binlog table to get updated schema
@@ -115,7 +115,7 @@ private BinlogTable createBinlogTable(String tableName, boolean enableSequenceNu
115115
.option(CoreOptions.CHANGELOG_PRODUCER.key(), "input")
116116
.option("bucket", "1");
117117
if (enableSequenceNumber) {
118-
schemaBuilder.option(CoreOptions.CHANGELOG_READ_SEQUENCE_NUMBER_ENABLED.key(), "true");
118+
schemaBuilder.option(CoreOptions.TABLE_READ_SEQUENCE_NUMBER_ENABLED.key(), "true");
119119
}
120120

121121
TableSchema tableSchema =

0 commit comments

Comments
 (0)