diff --git a/docs/content/concepts/system-tables.md b/docs/content/concepts/system-tables.md
index fd412f3fa50c..d7f76abfac19 100644
--- a/docs/content/concepts/system-tables.md
+++ b/docs/content/concepts/system-tables.md
@@ -136,6 +136,51 @@ SELECT * FROM my_table$audit_log;
*/
```
+#### Reading with Sequence Number
+
+For primary key tables, you can enable the `table-read.sequence-number.enabled` option to include the `_SEQUENCE_NUMBER` field in the output.
+
+{{< tabs "audit-log-sequence-number" >}}
+
+{{< tab "Enable via CREATE TABLE" >}}
+```sql
+CREATE TABLE my_table (
+ ...
+) WITH (
+ 'table-read.sequence-number.enabled' = 'true'
+);
+```
+{{< /tab >}}
+
+{{< tab "Enable via ALTER TABLE" >}}
+```sql
+ALTER TABLE my_table SET ('table-read.sequence-number.enabled' = 'true');
+```
+{{< /tab >}}
+
+{{< /tabs >}}
+
+```sql
+SELECT * FROM my_table$audit_log;
+
+/*
++------------------+--------------------+-----------------+-----------------+
+| rowkind | _SEQUENCE_NUMBER | column_0 | column_1 |
++------------------+--------------------+-----------------+-----------------+
+| +I | 0 | ... | ... |
++------------------+--------------------+-----------------+-----------------+
+| -U | 0 | ... | ... |
++------------------+--------------------+-----------------+-----------------+
+| +U | 1 | ... | ... |
++------------------+--------------------+-----------------+-----------------+
+3 rows in set
+*/
+```
+
+{{< hint info >}}
+The `table-read.sequence-number.enabled` option cannot be set via SQL hints.
+{{< /hint >}}
+
### Binlog Table
You can query the binlog through binlog table. In the binlog system table, the update before and update after will be packed in one row.
@@ -158,6 +203,24 @@ SELECT * FROM T$binlog;
*/
```
+Similar to the audit_log table, you can also enable `table-read.sequence-number.enabled` to include `_SEQUENCE_NUMBER` in the binlog table output:
+
+```sql
+SELECT * FROM T$binlog;
+
+/*
++------------------+--------------------+----------------------+-----------------------+
+| rowkind | _SEQUENCE_NUMBER | column_0 | column_1 |
++------------------+--------------------+----------------------+-----------------------+
+| +I | 0 | [col_0] | [col_1] |
++------------------+--------------------+----------------------+-----------------------+
+| +U | 1 | [col_0_ub, col_0_ua] | [col_1_ub, col_1_ua] |
++------------------+--------------------+----------------------+-----------------------+
+| -D | 2 | [col_0] | [col_1] |
++------------------+--------------------+----------------------+-----------------------+
+*/
+```
+
### Read-optimized Table
If you require extreme reading performance and can accept reading slightly old data,
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html
index f4e0a6d1cd43..a2aacaf550f6 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -146,6 +146,12 @@
String |
Fields that are ignored for comparison while generating -U, +U changelog for the same record. This configuration is only valid for the changelog-producer.row-deduplicate is true. |
+
+ table-read.sequence-number.enabled |
+ false |
+ Boolean |
+ Whether to include the _SEQUENCE_NUMBER field when reading the audit_log or binlog system tables. This is only valid for primary key tables. |
+
changelog.num-retained.max |
(none) |
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index ade2ac9c3741..53e9a298f711 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -827,6 +827,14 @@ public InlineElement getDescription() {
.withDescription(
"Fields that are ignored for comparison while generating -U, +U changelog for the same record. This configuration is only valid for the changelog-producer.row-deduplicate is true.");
+ public static final ConfigOption TABLE_READ_SEQUENCE_NUMBER_ENABLED =
+ key("table-read.sequence-number.enabled")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "Whether to include the _SEQUENCE_NUMBER field when reading the audit_log or binlog "
+ + "system tables. This is only valid for primary key tables.");
+
public static final ConfigOption SEQUENCE_FIELD =
key("sequence.field")
.stringType()
@@ -2721,6 +2729,10 @@ public List changelogRowDeduplicateIgnoreFields() {
.orElse(Collections.emptyList());
}
+ public boolean tableReadSequenceNumberEnabled() {
+ return options.get(TABLE_READ_SEQUENCE_NUMBER_ENABLED);
+ }
+
public boolean scanPlanSortPartition() {
return options.get(SCAN_PLAN_SORT_PARTITION);
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
index 1378ecc58651..5d1e46c1dad0 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
@@ -245,6 +245,8 @@ public static void validateTableSchema(TableSchema schema) {
validateIncrementalClustering(schema, options);
validateChainTable(schema, options);
+
+ validateChangelogReadSequenceNumber(schema, options);
}
public static void validateFallbackBranch(SchemaManager schemaManager, TableSchema schema) {
@@ -718,4 +720,15 @@ public static void validateChainTable(TableSchema schema, CoreOptions options) {
"Partition timestamp formatter is required for chain table.");
}
}
+
+ private static void validateChangelogReadSequenceNumber(
+ TableSchema schema, CoreOptions options) {
+ if (options.tableReadSequenceNumberEnabled()) {
+ checkArgument(
+ !schema.primaryKeys().isEmpty(),
+ "Cannot enable '%s' for non-primary-key table. "
+ + "Sequence number is only available for primary key tables.",
+ CoreOptions.TABLE_READ_SEQUENCE_NUMBER_ENABLED.key());
+ }
+ }
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java b/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java
index fde63285a42d..da694671433f 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java
@@ -43,6 +43,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import java.util.Map;
import java.util.function.Supplier;
/**
@@ -161,14 +162,17 @@ public RecordReader reader(Split split) throws IOException {
throw new RuntimeException("Should not happen.");
}
- public static RecordReader unwrap(RecordReader reader) {
+ public static RecordReader unwrap(
+ RecordReader reader, Map schemaOptions) {
return new RecordReader() {
@Nullable
@Override
public RecordIterator readBatch() throws IOException {
RecordIterator batch = reader.readBatch();
- return batch == null ? null : new ValueContentRowDataRecordIterator(batch);
+ return batch == null
+ ? null
+ : new ValueContentRowDataRecordIterator(batch, schemaOptions);
}
@Override
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/ValueContentRowDataRecordIterator.java b/paimon-core/src/main/java/org/apache/paimon/table/source/ValueContentRowDataRecordIterator.java
index adcf504a5d38..bd9d97046a93 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/ValueContentRowDataRecordIterator.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/ValueContentRowDataRecordIterator.java
@@ -19,16 +19,32 @@
package org.apache.paimon.table.source;
import org.apache.paimon.KeyValue;
+import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.JoinedRow;
import org.apache.paimon.reader.RecordReader;
import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
/** A {@link RecordReader.RecordIterator} mapping a {@link KeyValue} to its value. */
public class ValueContentRowDataRecordIterator extends ResetRowKindRecordIterator {
+ public static final String KEY_VALUE_SEQUENCE_NUMBER_ENABLED =
+ "key-value.sequence_number.enabled";
+ private final boolean keyValueSequenceNumberEnabled;
+
public ValueContentRowDataRecordIterator(RecordReader.RecordIterator kvIterator) {
+ this(kvIterator, new HashMap<>(1));
+ }
+
+ public ValueContentRowDataRecordIterator(
+ RecordReader.RecordIterator kvIterator, Map schemaOptions) {
super(kvIterator);
+ this.keyValueSequenceNumberEnabled =
+ Boolean.parseBoolean(
+ schemaOptions.getOrDefault(KEY_VALUE_SEQUENCE_NUMBER_ENABLED, "false"));
}
@Override
@@ -40,6 +56,15 @@ public InternalRow next() throws IOException {
InternalRow rowData = kv.value();
rowData.setRowKind(kv.valueKind());
+
+ if (keyValueSequenceNumberEnabled) {
+ JoinedRow joinedRow = new JoinedRow();
+ GenericRow systemFieldsRow = new GenericRow(1);
+ systemFieldsRow.setField(0, kv.sequenceNumber());
+ joinedRow.replace(systemFieldsRow, rowData);
+ joinedRow.setRowKind(kv.valueKind());
+ return joinedRow;
+ }
return rowData;
}
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalChangelogReadProvider.java b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalChangelogReadProvider.java
index ee3d84b9544f..abe2f1ced20d 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalChangelogReadProvider.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalChangelogReadProvider.java
@@ -75,7 +75,7 @@ private SplitRead create(Supplier supplier) {
dataSplit.dataFiles(),
dataSplit.deletionFiles().orElse(null),
false));
- return unwrap(reader);
+ return unwrap(reader, read.tableSchema().options());
};
return SplitRead.convert(read, convertedFactory);
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffSplitRead.java b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffSplitRead.java
index ec4408fb9737..7655bd3b35f6 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffSplitRead.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffSplitRead.java
@@ -117,7 +117,7 @@ public RecordReader createReader(Split s) throws IOException {
ProjectedRow.from(readType, mergeRead.tableSchema().logicalRowType());
reader = reader.transform(kv -> kv.replaceValue(projectedRow.replaceRow(kv.value())));
}
- return KeyValueTableRead.unwrap(reader);
+ return KeyValueTableRead.unwrap(reader, mergeRead.tableSchema().options());
}
private static RecordReader readDiff(
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/MergeFileSplitReadProvider.java b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/MergeFileSplitReadProvider.java
index d69209ef4e66..da9692a8d4a0 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/MergeFileSplitReadProvider.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/MergeFileSplitReadProvider.java
@@ -49,7 +49,8 @@ public MergeFileSplitReadProvider(
private SplitRead create(Supplier supplier) {
final MergeFileSplitRead read = supplier.get().withReadKeyType(RowType.of());
- return SplitRead.convert(read, split -> unwrap(read.createReader(split)));
+ return SplitRead.convert(
+ read, split -> unwrap(read.createReader(split), read.tableSchema().options()));
}
@Override
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
index d590e1eb5536..7d0d17ebbddf 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
@@ -81,31 +81,47 @@
import java.util.Optional;
import java.util.stream.Collectors;
+import static org.apache.paimon.CoreOptions.TABLE_READ_SEQUENCE_NUMBER_ENABLED;
import static org.apache.paimon.catalog.Identifier.SYSTEM_TABLE_SPLITTER;
+import static org.apache.paimon.table.source.ValueContentRowDataRecordIterator.KEY_VALUE_SEQUENCE_NUMBER_ENABLED;
/** A {@link Table} for reading audit log of table. */
public class AuditLogTable implements DataTable, ReadonlyTable {
public static final String AUDIT_LOG = "audit_log";
- public static final PredicateReplaceVisitor PREDICATE_CONVERTER =
- p -> {
- if (p.index() == 0) {
- return Optional.empty();
- }
- return Optional.of(
- new LeafPredicate(
- p.function(),
- p.type(),
- p.index() - 1,
- p.fieldName(),
- p.literals()));
- };
+ protected final FileStoreTable wrapped;
- private final FileStoreTable wrapped;
+ protected final List specialFields;
public AuditLogTable(FileStoreTable wrapped) {
this.wrapped = wrapped;
+ this.specialFields = new ArrayList<>();
+ specialFields.add(SpecialFields.ROW_KIND);
+
+ boolean includeSequenceNumber =
+ CoreOptions.fromMap(wrapped.options()).tableReadSequenceNumberEnabled();
+
+ if (includeSequenceNumber) {
+ this.wrapped.options().put(KEY_VALUE_SEQUENCE_NUMBER_ENABLED, "true");
+ specialFields.add(SpecialFields.SEQUENCE_NUMBER);
+ }
+ }
+
+ /** Creates a PredicateReplaceVisitor that adjusts field indices by systemFieldCount. */
+ private PredicateReplaceVisitor createPredicateConverter() {
+ return p -> {
+ if (p.index() < specialFields.size()) {
+ return Optional.empty();
+ }
+ return Optional.of(
+ new LeafPredicate(
+ p.function(),
+ p.type(),
+ p.index() - specialFields.size(),
+ p.fieldName(),
+ p.literals()));
+ };
}
@Override
@@ -140,8 +156,7 @@ public String name() {
@Override
public RowType rowType() {
- List fields = new ArrayList<>();
- fields.add(SpecialFields.ROW_KIND);
+ List fields = new ArrayList<>(specialFields);
fields.addAll(wrapped.rowType().getFields());
return new RowType(fields);
}
@@ -228,6 +243,11 @@ public InnerTableRead newRead() {
@Override
public Table copy(Map dynamicOptions) {
+ if (Boolean.parseBoolean(
+ dynamicOptions.getOrDefault(TABLE_READ_SEQUENCE_NUMBER_ENABLED.key(), "false"))) {
+ throw new UnsupportedOperationException(
+ "table-read.sequence-number.enabled is not supported by hint.");
+ }
return new AuditLogTable(wrapped.copy(dynamicOptions));
}
@@ -238,9 +258,10 @@ public FileIO fileIO() {
/** Push down predicate to dataScan and dataRead. */
private Optional convert(Predicate predicate) {
+ PredicateReplaceVisitor converter = createPredicateConverter();
List result =
PredicateBuilder.splitAnd(predicate).stream()
- .map(p -> p.visit(PREDICATE_CONVERTER))
+ .map(p -> p.visit(converter))
.filter(Optional::isPresent)
.map(Optional::get)
.collect(Collectors.toList());
@@ -630,6 +651,11 @@ public DataTableScan withShard(int indexOfThisSubtask, int numberOfParallelSubta
class AuditLogRead implements InnerTableRead {
+ // Special index for rowkind field
+ protected static final int ROW_KIND_INDEX = -1;
+ // _SEQUENCE_NUMBER is at index 0 by setting: KEY_VALUE_SEQUENCE_NUMBER_ENABLED
+ protected static final int SEQUENCE_NUMBER_INDEX = 0;
+
protected final InnerTableRead dataRead;
protected int[] readProjection;
@@ -639,52 +665,61 @@ protected AuditLogRead(InnerTableRead dataRead) {
this.readProjection = defaultProjection();
}
- /** Default projection, just add row kind to the first. */
+ /** Default projection, add system fields (rowkind, and optionally _SEQUENCE_NUMBER). */
private int[] defaultProjection() {
int dataFieldCount = wrapped.rowType().getFieldCount();
- int[] projection = new int[dataFieldCount + 1];
- projection[0] = -1;
+ int[] projection = new int[dataFieldCount + specialFields.size()];
+ projection[0] = ROW_KIND_INDEX;
+ if (specialFields.contains(SpecialFields.SEQUENCE_NUMBER)) {
+ projection[1] = SEQUENCE_NUMBER_INDEX;
+ }
for (int i = 0; i < dataFieldCount; i++) {
- projection[i + 1] = i;
+ projection[specialFields.size() + i] = i + specialFields.size() - 1;
}
return projection;
}
- @Override
- public InnerTableRead withFilter(Predicate predicate) {
- convert(predicate).ifPresent(dataRead::withFilter);
- return this;
- }
-
- @Override
- public InnerTableRead withReadType(RowType readType) {
- // data projection to push down to dataRead
- List dataReadFields = new ArrayList<>();
-
- // read projection to handle record returned by dataRead
+ /** Build projection array from readType. */
+ private int[] buildProjection(RowType readType) {
List fields = readType.getFields();
- int[] readProjection = new int[fields.size()];
+ int[] projection = new int[fields.size()];
+ int dataFieldIndex = 0;
- boolean rowKindAppeared = false;
for (int i = 0; i < fields.size(); i++) {
String fieldName = fields.get(i).name();
if (fieldName.equals(SpecialFields.ROW_KIND.name())) {
- rowKindAppeared = true;
- readProjection[i] = -1;
+ projection[i] = ROW_KIND_INDEX;
+ } else if (fieldName.equals(SpecialFields.SEQUENCE_NUMBER.name())) {
+ projection[i] = SEQUENCE_NUMBER_INDEX;
} else {
- dataReadFields.add(fields.get(i));
- // There is no row kind field. Keep it as it is
- // Row kind field has occurred, and the following fields are offset by 1
- // position
- readProjection[i] = rowKindAppeared ? i - 1 : i;
+ projection[i] = dataFieldIndex + specialFields.size() - 1;
+ dataFieldIndex++;
}
}
+ return projection;
+ }
+
+ @Override
+ public InnerTableRead withFilter(Predicate predicate) {
+ convert(predicate).ifPresent(dataRead::withFilter);
+ return this;
+ }
- this.readProjection = readProjection;
- dataRead.withReadType(new RowType(readType.isNullable(), dataReadFields));
+ @Override
+ public InnerTableRead withReadType(RowType readType) {
+ this.readProjection = buildProjection(readType);
+ List dataFields = extractDataFields(readType);
+ dataRead.withReadType(new RowType(readType.isNullable(), dataFields));
return this;
}
+ /** Extract data fields (non-system fields) from readType. */
+ private List extractDataFields(RowType readType) {
+ return readType.getFields().stream()
+ .filter(f -> !SpecialFields.isSystemField(f.name()))
+ .collect(Collectors.toList());
+ }
+
@Override
public TableRead withIOManager(IOManager ioManager) {
this.dataRead.withIOManager(ioManager);
@@ -701,7 +736,10 @@ private InternalRow convertRow(InternalRow data) {
}
}
- /** A {@link ProjectedRow} which returns row kind when mapping index is negative. */
+ /**
+ * A {@link ProjectedRow} which returns row kind and sequence number when mapping index is
+ * negative.
+ */
static class AuditLogRow extends ProjectedRow {
AuditLogRow(int[] indexMapping, InternalRow row) {
@@ -723,15 +761,21 @@ public void setRowKind(RowKind kind) {
@Override
public boolean isNullAt(int pos) {
if (indexMapping[pos] < 0) {
- // row kind is always not null
+ // row kind and sequence num are always not null
return false;
}
return super.isNullAt(pos);
}
+ @Override
+ public long getLong(int pos) {
+ return super.getLong(pos);
+ }
+
@Override
public BinaryString getString(int pos) {
- if (indexMapping[pos] < 0) {
+ int index = indexMapping[pos];
+ if (index == AuditLogRead.ROW_KIND_INDEX) {
return BinaryString.fromString(row.getRowKind().shortString());
}
return super.getString(pos);
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/BinlogTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/BinlogTable.java
index cc8d1621a3fa..5c1268f96304 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/system/BinlogTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/system/BinlogTable.java
@@ -41,6 +41,7 @@
import java.util.Map;
import java.util.stream.IntStream;
+import static org.apache.paimon.CoreOptions.TABLE_READ_SEQUENCE_NUMBER_ENABLED;
import static org.apache.paimon.catalog.Identifier.SYSTEM_TABLE_SPLITTER;
/**
@@ -56,11 +57,8 @@ public class BinlogTable extends AuditLogTable {
public static final String BINLOG = "binlog";
- private final FileStoreTable wrapped;
-
public BinlogTable(FileStoreTable wrapped) {
super(wrapped);
- this.wrapped = wrapped;
}
@Override
@@ -70,8 +68,7 @@ public String name() {
@Override
public RowType rowType() {
- List fields = new ArrayList<>();
- fields.add(SpecialFields.ROW_KIND);
+ List fields = new ArrayList<>(specialFields);
for (DataField field : wrapped.rowType().getFields()) {
// convert to nullable
fields.add(field.newType(new ArrayType(field.type().nullable())));
@@ -86,6 +83,11 @@ public InnerTableRead newRead() {
@Override
public Table copy(Map dynamicOptions) {
+ if (Boolean.parseBoolean(
+ dynamicOptions.getOrDefault(TABLE_READ_SEQUENCE_NUMBER_ENABLED.key(), "false"))) {
+ throw new UnsupportedOperationException(
+ "table-read.sequence-number.enabled is not supported by hint.");
+ }
return new BinlogTable(wrapped.copy(dynamicOptions));
}
@@ -102,7 +104,7 @@ public InnerTableRead withReadType(RowType readType) {
List fields = new ArrayList<>();
List wrappedReadFields = new ArrayList<>();
for (DataField field : readType.getFields()) {
- if (field.name().equals(SpecialFields.ROW_KIND.name())) {
+ if (SpecialFields.isSystemField(field.name())) {
fields.add(field);
} else {
DataField origin = field.newType(((ArrayType) field.type()).getElementType());
@@ -117,12 +119,16 @@ public InnerTableRead withReadType(RowType readType) {
@Override
public RecordReader createReader(Split split) throws IOException {
DataSplit dataSplit = (DataSplit) split;
+ // When sequence number is enabled, the underlying data layout is:
+ // [_SEQUENCE_NUMBER, pk, pt, col1, ...]
+ // We need to offset the field index to skip the sequence number field.
+ int offset = specialFields.size() - 1;
InternalRow.FieldGetter[] fieldGetters =
IntStream.range(0, wrappedReadType.getFieldCount())
.mapToObj(
i ->
InternalRow.createFieldGetter(
- wrappedReadType.getTypeAt(i), i))
+ wrappedReadType.getTypeAt(i), i + offset))
.toArray(InternalRow.FieldGetter[]::new);
if (dataSplit.isStreaming()) {
@@ -146,15 +152,23 @@ private InternalRow convertToArray(
InternalRow row1,
@Nullable InternalRow row2,
InternalRow.FieldGetter[] fieldGetters) {
- GenericRow row = new GenericRow(row1.getFieldCount());
- for (int i = 0; i < row1.getFieldCount(); i++) {
+ // seqOffset is 1 if sequence number is enabled, 0 otherwise
+ int seqOffset = specialFields.size() - 1;
+ GenericRow row = new GenericRow(fieldGetters.length + seqOffset);
+
+ // Copy sequence number if enabled (it's at index 0 in input row)
+ if (seqOffset > 0) {
+ row.setField(0, row1.getLong(0));
+ }
+
+ for (int i = 0; i < fieldGetters.length; i++) {
Object o1 = fieldGetters[i].getFieldOrNull(row1);
Object o2;
if (row2 != null) {
o2 = fieldGetters[i].getFieldOrNull(row2);
- row.setField(i, new GenericArray(new Object[] {o1, o2}));
+ row.setField(i + seqOffset, new GenericArray(new Object[] {o1, o2}));
} else {
- row.setField(i, new GenericArray(new Object[] {o1}));
+ row.setField(i + seqOffset, new GenericArray(new Object[] {o1}));
}
}
// If no row2 provided, then follow the row1 kind.
diff --git a/paimon-core/src/test/java/org/apache/paimon/table/system/AuditLogTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/system/AuditLogTableTest.java
index 41facd1d837f..cbba947bcb9b 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/system/AuditLogTableTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/system/AuditLogTableTest.java
@@ -27,6 +27,7 @@
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.SchemaUtils;
import org.apache.paimon.schema.TableSchema;
@@ -36,7 +37,6 @@
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowKind;
-import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.util.ArrayList;
@@ -48,15 +48,62 @@
/** Unit tests for {@link AuditLogTable}. */
public class AuditLogTableTest extends TableTestBase {
- private static final String tableName = "MyTable";
- private AuditLogTable auditLogTable;
+ @Test
+ public void testReadAuditLogFromLatest() throws Exception {
+ AuditLogTable auditLogTable = createAuditLogTable("audit_table", false);
+ assertThat(auditLogTable.rowType().getFieldNames())
+ .containsExactly("rowkind", "pk", "pt", "col1");
+ List expectRow = getExpectedResult();
+ List result = read(auditLogTable);
+ assertThat(result).containsExactlyInAnyOrderElementsOf(expectRow);
+ }
+
+ @Test
+ public void testReadSequenceNumberWithTableOption() throws Exception {
+ AuditLogTable auditLogTable = createAuditLogTable("audit_table_with_seq", true);
+ assertThat(auditLogTable.rowType().getFieldNames())
+ .containsExactly("rowkind", "_SEQUENCE_NUMBER", "pk", "pt", "col1");
+
+ List result = read(auditLogTable);
+ List expectRow = getExpectedResultWithSequenceNumber();
+ assertThat(result).containsExactlyInAnyOrderElementsOf(expectRow);
+ }
+
+ @Test
+ public void testReadSequenceNumberWithAlterTable() throws Exception {
+ String tableName = "audit_table_alter_seq";
+ // Create table without sequence-number option
+ AuditLogTable auditLogTable = createAuditLogTable(tableName, false);
+ assertThat(auditLogTable.rowType().getFieldNames())
+ .containsExactly("rowkind", "pk", "pt", "col1");
+
+ // Add sequence-number option via alterTable
+ catalog.alterTable(
+ identifier(tableName),
+ SchemaChange.setOption(
+ CoreOptions.TABLE_READ_SEQUENCE_NUMBER_ENABLED.key(), "true"),
+ false);
- @BeforeEach
- public void before() throws Exception {
+ // Re-fetch the audit_log table to get updated schema
+ Identifier auditLogTableId =
+ identifier(tableName + SYSTEM_TABLE_SPLITTER + AuditLogTable.AUDIT_LOG);
+ AuditLogTable updatedAuditLogTable = (AuditLogTable) catalog.getTable(auditLogTableId);
+
+ // Verify schema now includes _SEQUENCE_NUMBER
+ assertThat(updatedAuditLogTable.rowType().getFieldNames())
+ .containsExactly("rowkind", "_SEQUENCE_NUMBER", "pk", "pt", "col1");
+
+ List result = read(updatedAuditLogTable);
+ List expectRow = getExpectedResultWithSequenceNumber();
+ assertThat(result).containsExactlyInAnyOrderElementsOf(expectRow);
+ }
+
+ private AuditLogTable createAuditLogTable(String tableName, boolean enableSequenceNumber)
+ throws Exception {
Path tablePath = new Path(String.format("%s/%s.db/%s", warehouse, database, tableName));
FileIO fileIO = LocalFileIO.create();
- Schema schema =
+ Schema.Builder schemaBuilder =
Schema.newBuilder()
.column("pk", DataTypes.INT())
.column("pt", DataTypes.INT())
@@ -64,44 +111,54 @@ public void before() throws Exception {
.partitionKeys("pt")
.primaryKey("pk", "pt")
.option(CoreOptions.CHANGELOG_PRODUCER.key(), "input")
- .option("bucket", "1")
- .build();
+ .option("bucket", "1");
+ if (enableSequenceNumber) {
+ schemaBuilder.option(CoreOptions.TABLE_READ_SEQUENCE_NUMBER_ENABLED.key(), "true");
+ }
TableSchema tableSchema =
- SchemaUtils.forceCommit(new SchemaManager(fileIO, tablePath), schema);
+ SchemaUtils.forceCommit(
+ new SchemaManager(fileIO, tablePath), schemaBuilder.build());
FileStoreTable table =
FileStoreTableFactory.create(LocalFileIO.create(), tablePath, tableSchema);
- Identifier filesTableId =
+
+ writeTestData(table);
+
+ Identifier auditLogTableId =
identifier(tableName + SYSTEM_TABLE_SPLITTER + AuditLogTable.AUDIT_LOG);
- auditLogTable = (AuditLogTable) catalog.getTable(filesTableId);
+ return (AuditLogTable) catalog.getTable(auditLogTableId);
+ }
+ private void writeTestData(FileStoreTable table) throws Exception {
write(table, GenericRow.ofKind(RowKind.INSERT, 1, 1, 1));
write(table, GenericRow.ofKind(RowKind.DELETE, 1, 1, 1));
write(table, GenericRow.ofKind(RowKind.INSERT, 1, 2, 5));
write(table, GenericRow.ofKind(RowKind.UPDATE_BEFORE, 1, 2, 5));
- write(table, GenericRow.ofKind(RowKind.UPDATE_AFTER, 1, 4, 6));
+ write(table, GenericRow.ofKind(RowKind.UPDATE_AFTER, 1, 2, 6));
write(table, GenericRow.ofKind(RowKind.INSERT, 2, 3, 1));
}
- @Test
- public void testReadAuditLogFromLatest() throws Exception {
- List expectRow = getExpectedResult();
- List result = read(auditLogTable);
- assertThat(result).containsExactlyInAnyOrderElementsOf(expectRow);
- }
-
private List getExpectedResult() {
List expectedRow = new ArrayList<>();
expectedRow.add(
GenericRow.of(BinaryString.fromString(RowKind.DELETE.shortString()), 1, 1, 1));
expectedRow.add(
GenericRow.of(
- BinaryString.fromString(RowKind.UPDATE_BEFORE.shortString()), 1, 2, 5));
+ BinaryString.fromString(RowKind.UPDATE_AFTER.shortString()), 1, 2, 6));
+ expectedRow.add(
+ GenericRow.of(BinaryString.fromString(RowKind.INSERT.shortString()), 2, 3, 1));
+ return expectedRow;
+ }
+
+ private List getExpectedResultWithSequenceNumber() {
+ List expectedRow = new ArrayList<>();
+ expectedRow.add(
+ GenericRow.of(BinaryString.fromString(RowKind.DELETE.shortString()), 1L, 1, 1, 1));
expectedRow.add(
GenericRow.of(
- BinaryString.fromString(RowKind.UPDATE_AFTER.shortString()), 1, 4, 6));
+ BinaryString.fromString(RowKind.UPDATE_AFTER.shortString()), 2L, 1, 2, 6));
expectedRow.add(
- GenericRow.of(BinaryString.fromString(RowKind.INSERT.shortString()), 2, 3, 1));
+ GenericRow.of(BinaryString.fromString(RowKind.INSERT.shortString()), 0L, 2, 3, 1));
return expectedRow;
}
}
diff --git a/paimon-core/src/test/java/org/apache/paimon/table/system/BinlogTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/system/BinlogTableTest.java
new file mode 100644
index 000000000000..6f187fbffd98
--- /dev/null
+++ b/paimon-core/src/test/java/org/apache/paimon/table/system/BinlogTableTest.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.system;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericArray;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaChange;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.SchemaUtils;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.FileStoreTableFactory;
+import org.apache.paimon.table.TableTestBase;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowKind;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.paimon.catalog.Identifier.SYSTEM_TABLE_SPLITTER;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Unit tests for {@link BinlogTable}. */
+public class BinlogTableTest extends TableTestBase {
+
+ @Test
+ public void testReadBinlogFromLatest() throws Exception {
+ BinlogTable binlogTable = createBinlogTable("binlog_table", false);
+ assertThat(binlogTable.rowType().getFieldNames())
+ .containsExactly("rowkind", "pk", "pt", "col1");
+
+ List result = read(binlogTable);
+ List expectRow = getExpectedResult();
+ assertThat(result).containsExactlyInAnyOrderElementsOf(expectRow);
+ }
+
+ @Test
+ public void testReadSequenceNumberWithTableOption() throws Exception {
+ BinlogTable binlogTable = createBinlogTable("binlog_table_with_seq", true);
+ assertThat(binlogTable.rowType().getFieldNames())
+ .containsExactly("rowkind", "_SEQUENCE_NUMBER", "pk", "pt", "col1");
+
+ List result = read(binlogTable);
+ List expectRow = getExpectedResultWithSequenceNumber();
+ assertThat(result).containsExactlyInAnyOrderElementsOf(expectRow);
+ }
+
+ @Test
+ public void testReadSequenceNumberWithAlterTable() throws Exception {
+ String tableName = "binlog_table_alter_seq";
+ // Create table without sequence-number option
+ BinlogTable binlogTable = createBinlogTable(tableName, false);
+ assertThat(binlogTable.rowType().getFieldNames())
+ .containsExactly("rowkind", "pk", "pt", "col1");
+
+ // Add sequence-number option via alterTable
+ catalog.alterTable(
+ identifier(tableName),
+ SchemaChange.setOption(
+ CoreOptions.TABLE_READ_SEQUENCE_NUMBER_ENABLED.key(), "true"),
+ false);
+
+ // Re-fetch the binlog table to get updated schema
+ Identifier binlogTableId =
+ identifier(tableName + SYSTEM_TABLE_SPLITTER + BinlogTable.BINLOG);
+ BinlogTable updatedBinlogTable = (BinlogTable) catalog.getTable(binlogTableId);
+
+ // Verify schema now includes _SEQUENCE_NUMBER
+ assertThat(updatedBinlogTable.rowType().getFieldNames())
+ .containsExactly("rowkind", "_SEQUENCE_NUMBER", "pk", "pt", "col1");
+
+ List result = read(updatedBinlogTable);
+ List expectRow = getExpectedResultWithSequenceNumber();
+ assertThat(result).containsExactlyInAnyOrderElementsOf(expectRow);
+ }
+
+ private BinlogTable createBinlogTable(String tableName, boolean enableSequenceNumber)
+ throws Exception {
+ Path tablePath = new Path(String.format("%s/%s.db/%s", warehouse, database, tableName));
+ FileIO fileIO = LocalFileIO.create();
+
+ Schema.Builder schemaBuilder =
+ Schema.newBuilder()
+ .column("pk", DataTypes.INT())
+ .column("pt", DataTypes.INT())
+ .column("col1", DataTypes.INT())
+ .partitionKeys("pt")
+ .primaryKey("pk", "pt")
+ .option(CoreOptions.CHANGELOG_PRODUCER.key(), "input")
+ .option("bucket", "1");
+ if (enableSequenceNumber) {
+ schemaBuilder.option(CoreOptions.TABLE_READ_SEQUENCE_NUMBER_ENABLED.key(), "true");
+ }
+
+ TableSchema tableSchema =
+ SchemaUtils.forceCommit(
+ new SchemaManager(fileIO, tablePath), schemaBuilder.build());
+ FileStoreTable table =
+ FileStoreTableFactory.create(LocalFileIO.create(), tablePath, tableSchema);
+
+ writeTestData(table);
+
+ Identifier binlogTableId =
+ identifier(tableName + SYSTEM_TABLE_SPLITTER + BinlogTable.BINLOG);
+ return (BinlogTable) catalog.getTable(binlogTableId);
+ }
+
+ private void writeTestData(FileStoreTable table) throws Exception {
+ write(table, GenericRow.ofKind(RowKind.INSERT, 1, 1, 1));
+ write(table, GenericRow.ofKind(RowKind.DELETE, 1, 1, 1));
+ write(table, GenericRow.ofKind(RowKind.INSERT, 1, 2, 5));
+ write(table, GenericRow.ofKind(RowKind.UPDATE_BEFORE, 1, 2, 5));
+ write(table, GenericRow.ofKind(RowKind.UPDATE_AFTER, 1, 2, 6));
+ write(table, GenericRow.ofKind(RowKind.INSERT, 2, 3, 1));
+ }
+
+ private List getExpectedResult() {
+ List expectedRow = new ArrayList<>();
+ expectedRow.add(
+ GenericRow.of(
+ BinaryString.fromString(RowKind.DELETE.shortString()),
+ new GenericArray(new Object[] {1}),
+ new GenericArray(new Object[] {1}),
+ new GenericArray(new Object[] {1})));
+ expectedRow.add(
+ GenericRow.of(
+ BinaryString.fromString(RowKind.UPDATE_AFTER.shortString()),
+ new GenericArray(new Object[] {1}),
+ new GenericArray(new Object[] {2}),
+ new GenericArray(new Object[] {6})));
+ expectedRow.add(
+ GenericRow.of(
+ BinaryString.fromString(RowKind.INSERT.shortString()),
+ new GenericArray(new Object[] {2}),
+ new GenericArray(new Object[] {3}),
+ new GenericArray(new Object[] {1})));
+ return expectedRow;
+ }
+
+ private List getExpectedResultWithSequenceNumber() {
+ List expectedRow = new ArrayList<>();
+ expectedRow.add(
+ GenericRow.of(
+ BinaryString.fromString(RowKind.DELETE.shortString()),
+ 1L,
+ new GenericArray(new Object[] {1}),
+ new GenericArray(new Object[] {1}),
+ new GenericArray(new Object[] {1})));
+ expectedRow.add(
+ GenericRow.of(
+ BinaryString.fromString(RowKind.UPDATE_AFTER.shortString()),
+ 2L,
+ new GenericArray(new Object[] {1}),
+ new GenericArray(new Object[] {2}),
+ new GenericArray(new Object[] {6})));
+ expectedRow.add(
+ GenericRow.of(
+ BinaryString.fromString(RowKind.INSERT.shortString()),
+ 0L,
+ new GenericArray(new Object[] {2}),
+ new GenericArray(new Object[] {3}),
+ new GenericArray(new Object[] {1})));
+ return expectedRow;
+ }
+}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupCompactDiffRead.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupCompactDiffRead.java
index f955418b6d21..c2a17412fe78 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupCompactDiffRead.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupCompactDiffRead.java
@@ -29,14 +29,13 @@
import org.apache.paimon.table.source.AbstractDataTableRead;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.InnerTableRead;
+import org.apache.paimon.table.source.KeyValueTableRead;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.TableRead;
import org.apache.paimon.types.RowType;
import java.io.IOException;
-import static org.apache.paimon.table.source.KeyValueTableRead.unwrap;
-
/** An {@link InnerTableRead} that reads the data changed before and after compaction. */
public class LookupCompactDiffRead extends AbstractDataTableRead {
private final SplitRead fullPhaseMergeRead;
@@ -46,7 +45,11 @@ public LookupCompactDiffRead(MergeFileSplitRead mergeRead, TableSchema schema) {
super(schema);
this.incrementalDiffRead = new IncrementalCompactDiffSplitRead(mergeRead);
this.fullPhaseMergeRead =
- SplitRead.convert(mergeRead, split -> unwrap(mergeRead.createReader(split)));
+ SplitRead.convert(
+ mergeRead,
+ split ->
+ KeyValueTableRead.unwrap(
+ mergeRead.createReader(split), schema.options()));
}
@Override
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
index 22271c8a7ebe..48fcbbda710e 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
@@ -56,6 +56,7 @@
import static org.apache.paimon.testutils.assertj.PaimonAssertions.anyCauseMatches;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.junit.jupiter.api.Assertions.assertThrows;
/** ITCase for batch file store. */
public class BatchFileStoreITCase extends CatalogITCaseBase {
@@ -1053,6 +1054,127 @@ public void testBinlogTableWithProjection() {
.containsExactly(Row.of("+I", new String[] {"A"}));
}
+ @Test
+ public void testAuditLogTableWithSequenceNumberEnabled() {
+ // Creating an append-only table (no primary key) with
+ // table-read.sequence-number.enabled option should throw an exception
+ assertThrows(
+ RuntimeException.class,
+ () ->
+ sql(
+ "CREATE TABLE test_table_err (a int, b int, c AS a + b) "
+ + "WITH ('table-read.sequence-number.enabled'='true')"));
+
+ // Selecting an auditlog table with
+ // table-read.sequence-number.enabled option should throw an exception
+ sql("CREATE TABLE test_table_err2 (a int PRIMARY KEY NOT ENFORCED, b int, c AS a + b);");
+ assertThrows(
+ RuntimeException.class,
+ () ->
+ sql(
+ "SELECT * FROM `test_table_err2$audit_log`"
+ + " /*+ OPTIONS('table-read.sequence-number.enabled' = 'true') */"));
+
+ // Create primary key table with table-read.sequence-number.enabled option
+ sql(
+ "CREATE TABLE test_table_seq (a int PRIMARY KEY NOT ENFORCED, b int, c AS a + b) "
+ + "WITH ('table-read.sequence-number.enabled'='true');");
+ sql("INSERT INTO test_table_seq VALUES (1, 2)");
+ sql("INSERT INTO test_table_seq VALUES (3, 4)");
+
+ // Test SELECT * from original table
+ assertThat(sql("SELECT * FROM `test_table_seq`"))
+ .containsExactlyInAnyOrder(Row.of(1, 2, 3), Row.of(3, 4, 7));
+
+ // Test SELECT * includes _SEQUENCE_NUMBER
+ assertThat(sql("SELECT * FROM `test_table_seq$audit_log`"))
+ .containsExactlyInAnyOrder(Row.of("+I", 0L, 1, 2, 3), Row.of("+I", 1L, 3, 4, 7));
+
+ // Test out-of-order select with _SEQUENCE_NUMBER
+ assertThat(sql("SELECT b, c, _SEQUENCE_NUMBER FROM `test_table_seq$audit_log`"))
+ .containsExactlyInAnyOrder(Row.of(2, 3, 0L), Row.of(4, 7, 1L));
+
+ // Test selecting only _SEQUENCE_NUMBER, rowkind
+ assertThat(sql("SELECT _SEQUENCE_NUMBER, rowkind FROM `test_table_seq$audit_log`"))
+ .containsExactlyInAnyOrder(Row.of(0L, "+I"), Row.of(1L, "+I"));
+ }
+
+ @Test
+ public void testAuditLogTableWithSequenceNumberAlterTable() {
+ // Create primary key table without sequence-number option
+ sql("CREATE TABLE test_table_dyn (a int PRIMARY KEY NOT ENFORCED, b int, c AS a + b);");
+ sql("INSERT INTO test_table_dyn VALUES (1, 2)");
+ sql("INSERT INTO test_table_dyn VALUES (3, 4)");
+
+ // Add table-read.sequence-number.enabled option via ALTER TABLE
+ sql("ALTER TABLE test_table_dyn SET ('table-read.sequence-number.enabled'='true')");
+
+ // Test SELECT * includes _SEQUENCE_NUMBER (same as
+ // testAuditLogTableWithSequenceNumberEnabled)
+ assertThat(sql("SELECT * FROM `test_table_dyn$audit_log`"))
+ .containsExactlyInAnyOrder(Row.of("+I", 0L, 1, 2, 3), Row.of("+I", 1L, 3, 4, 7));
+
+ // Test out-of-order select with _SEQUENCE_NUMBER
+ assertThat(sql("SELECT b, c, _SEQUENCE_NUMBER FROM `test_table_dyn$audit_log`"))
+ .containsExactlyInAnyOrder(Row.of(2, 3, 0L), Row.of(4, 7, 1L));
+
+ // Test selecting only _SEQUENCE_NUMBER, rowkind
+ assertThat(sql("SELECT _SEQUENCE_NUMBER, rowkind FROM `test_table_dyn$audit_log`"))
+ .containsExactlyInAnyOrder(Row.of(0L, "+I"), Row.of(1L, "+I"));
+ }
+
+ @Test
+ public void testBinlogTableWithSequenceNumberEnabled() {
+ // Create primary key table with table-read.sequence-number.enabled option
+ sql(
+ "CREATE TABLE test_table_seq (a int PRIMARY KEY NOT ENFORCED, b int) "
+ + "WITH ('table-read.sequence-number.enabled'='true');");
+ sql("INSERT INTO test_table_seq VALUES (1, 2)");
+ sql("INSERT INTO test_table_seq VALUES (3, 4)");
+
+ // Test SELECT * includes _SEQUENCE_NUMBER
+ assertThat(sql("SELECT * FROM `test_table_seq$binlog`"))
+ .containsExactlyInAnyOrder(
+ Row.of("+I", 0L, new Integer[] {1}, new Integer[] {2}),
+ Row.of("+I", 1L, new Integer[] {3}, new Integer[] {4}));
+
+ // Test out-of-order select with _SEQUENCE_NUMBER
+ assertThat(sql("SELECT b, _SEQUENCE_NUMBER FROM `test_table_seq$binlog`"))
+ .containsExactlyInAnyOrder(
+ Row.of(new Integer[] {2}, 0L), Row.of(new Integer[] {4}, 1L));
+
+ // Test selecting only _SEQUENCE_NUMBER
+ assertThat(sql("SELECT _SEQUENCE_NUMBER, rowkind FROM `test_table_seq$binlog`"))
+ .containsExactlyInAnyOrder(Row.of(0L, "+I"), Row.of(1L, "+I"));
+ }
+
+ @Test
+ public void testBinlogTableWithSequenceNumberAlterTable() {
+ // Create primary key table without sequence-number option
+ sql("CREATE TABLE test_table_dyn (a int PRIMARY KEY NOT ENFORCED, b int);");
+ sql("INSERT INTO test_table_dyn VALUES (1, 2)");
+ sql("INSERT INTO test_table_dyn VALUES (3, 4)");
+
+ // Add table-read.sequence-number.enabled option via ALTER TABLE
+ sql("ALTER TABLE test_table_dyn SET ('table-read.sequence-number.enabled'='true')");
+
+ // Test SELECT * includes _SEQUENCE_NUMBER (same as
+ // testBinlogTableWithSequenceNumberEnabled)
+ assertThat(sql("SELECT * FROM `test_table_dyn$binlog`"))
+ .containsExactlyInAnyOrder(
+ Row.of("+I", 0L, new Integer[] {1}, new Integer[] {2}),
+ Row.of("+I", 1L, new Integer[] {3}, new Integer[] {4}));
+
+ // Test out-of-order select with _SEQUENCE_NUMBER
+ assertThat(sql("SELECT b, _SEQUENCE_NUMBER FROM `test_table_dyn$binlog`"))
+ .containsExactlyInAnyOrder(
+ Row.of(new Integer[] {2}, 0L), Row.of(new Integer[] {4}, 1L));
+
+ // Test selecting only _SEQUENCE_NUMBER
+ assertThat(sql("SELECT _SEQUENCE_NUMBER, rowkind FROM `test_table_dyn$binlog`"))
+ .containsExactlyInAnyOrder(Row.of(0L, "+I"), Row.of(1L, "+I"));
+ }
+
@Test
public void testBatchReadSourceWithSnapshot() {
batchSql("INSERT INTO T VALUES (1, 11, 111), (2, 22, 222), (3, 33, 333), (4, 44, 444)");