Skip to content

Commit d06c910

Browse files
author
bosiew.tian
committed
[core] fix1
1 parent 1e104c6 commit d06c910

File tree

9 files changed

+76
-59
lines changed

9 files changed

+76
-59
lines changed

docs/layouts/shortcodes/generated/core_configuration.html

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@
147147
<td>Fields that are ignored for comparison while generating -U, +U changelog for the same record. This configuration is only valid for the changelog-producer.row-deduplicate is true.</td>
148148
</tr>
149149
<tr>
150-
<td><h5>changelog-read.sequence-number.enabled</h5></td>
150+
<td><h5>auditlog-read.sequence-number.enabled</h5></td>
151151
<td style="word-wrap: break-word;">false</td>
152152
<td>Boolean</td>
153153
<td>Whether to include _SEQUENCE_NUMBER field in audit_log and binlog system tables. This is only valid for primary key tables.</td>

paimon-api/src/main/java/org/apache/paimon/CoreOptions.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -827,8 +827,8 @@ public InlineElement getDescription() {
827827
.withDescription(
828828
"Fields that are ignored for comparison while generating -U, +U changelog for the same record. This configuration is only valid for the changelog-producer.row-deduplicate is true.");
829829

830-
public static final ConfigOption<Boolean> CHANGELOG_READ_SEQUENCE_NUMBER_ENABLED =
831-
key("changelog-read.sequence-number.enabled")
830+
public static final ConfigOption<Boolean> AUDITLOG_READ_SEQUENCE_NUMBER_ENABLED =
831+
key("auditlog-read.sequence-number.enabled")
832832
.booleanType()
833833
.defaultValue(false)
834834
.withDescription(
@@ -2730,7 +2730,7 @@ public List<String> changelogRowDeduplicateIgnoreFields() {
27302730
}
27312731

27322732
public boolean changelogReadSequenceNumberEnabled() {
2733-
return options.get(CHANGELOG_READ_SEQUENCE_NUMBER_ENABLED);
2733+
return options.get(AUDITLOG_READ_SEQUENCE_NUMBER_ENABLED);
27342734
}
27352735

27362736
public boolean scanPlanSortPartition() {

paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -728,7 +728,7 @@ private static void validateChangelogReadSequenceNumber(
728728
!schema.primaryKeys().isEmpty(),
729729
"Cannot enable '%s' for non-primary-key table. "
730730
+ "Sequence number is only available for primary key tables.",
731-
CoreOptions.CHANGELOG_READ_SEQUENCE_NUMBER_ENABLED.key());
731+
CoreOptions.AUDITLOG_READ_SEQUENCE_NUMBER_ENABLED.key());
732732
}
733733
}
734734
}

paimon-core/src/main/java/org/apache/paimon/table/source/ValueContentRowDataRecordIterator.java

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818

1919
package org.apache.paimon.table.source;
2020

21-
import org.apache.paimon.CoreOptions;
2221
import org.apache.paimon.KeyValue;
2322
import org.apache.paimon.data.GenericRow;
2423
import org.apache.paimon.data.InternalRow;
@@ -33,8 +32,7 @@
3332
/** A {@link RecordReader.RecordIterator} mapping a {@link KeyValue} to its value. */
3433
public class ValueContentRowDataRecordIterator extends ResetRowKindRecordIterator {
3534

36-
private final boolean includeSequenceNumber;
37-
private final boolean auditLogEnabled;
35+
private final boolean addSequenceNumberFirst;
3836

3937
public ValueContentRowDataRecordIterator(RecordReader.RecordIterator<KeyValue> kvIterator) {
4038
this(kvIterator, new HashMap<>(1));
@@ -43,11 +41,10 @@ public ValueContentRowDataRecordIterator(RecordReader.RecordIterator<KeyValue> k
4341
public ValueContentRowDataRecordIterator(
4442
RecordReader.RecordIterator<KeyValue> kvIterator, Map<String, String> schemaOptions) {
4543
super(kvIterator);
46-
this.includeSequenceNumber =
47-
CoreOptions.fromMap(schemaOptions).changelogReadSequenceNumberEnabled();
48-
this.auditLogEnabled =
44+
this.addSequenceNumberFirst =
4945
Boolean.parseBoolean(
50-
schemaOptions.getOrDefault(AuditLogTable.AUDIT_LOG_ENABLED, "false"));
46+
schemaOptions.getOrDefault(
47+
AuditLogTable.AUDIT_ADD_SEQUENCE_NUMBER_FIRST, "false"));
5148
}
5249

5350
@Override
@@ -60,7 +57,7 @@ public InternalRow next() throws IOException {
6057
InternalRow rowData = kv.value();
6158
rowData.setRowKind(kv.valueKind());
6259

63-
if (includeSequenceNumber && auditLogEnabled) {
60+
if (addSequenceNumberFirst) {
6461
JoinedRow joinedRow = new JoinedRow();
6562
GenericRow systemFieldsRow = new GenericRow(1);
6663
systemFieldsRow.setField(0, kv.sequenceNumber());

paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java

Lines changed: 28 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -81,40 +81,44 @@
8181
import java.util.Optional;
8282
import java.util.stream.Collectors;
8383

84+
import static org.apache.paimon.CoreOptions.AUDITLOG_READ_SEQUENCE_NUMBER_ENABLED;
8485
import static org.apache.paimon.catalog.Identifier.SYSTEM_TABLE_SPLITTER;
8586

8687
/** A {@link Table} for reading audit log of table. */
8788
public class AuditLogTable implements DataTable, ReadonlyTable {
8889

8990
public static final String AUDIT_LOG = "audit_log";
90-
public static final String AUDIT_LOG_ENABLED = "audit.log.enabled";
91+
public static final String AUDIT_ADD_SEQUENCE_NUMBER_FIRST = "audit.add.sequence_number.first";
9192

9293
protected final FileStoreTable wrapped;
9394

94-
/** Number of special fields (rowkind, and optionally _SEQUENCE_NUMBER). */
95-
protected final int specialFieldCount;
95+
protected final List<DataField> specialFields;
9696

9797
public AuditLogTable(FileStoreTable wrapped) {
9898
this.wrapped = wrapped;
99-
this.wrapped.schema().options().put(AUDIT_LOG_ENABLED, "true");
100-
this.specialFieldCount =
101-
coreOptions().changelogReadSequenceNumberEnabled()
102-
&& !wrapped.primaryKeys().isEmpty()
103-
? 2
104-
: 1;
99+
this.specialFields = new ArrayList<>();
100+
specialFields.add(SpecialFields.ROW_KIND);
101+
102+
boolean includeSequenceNumber =
103+
CoreOptions.fromMap(wrapped.options()).changelogReadSequenceNumberEnabled();
104+
105+
if (includeSequenceNumber) {
106+
this.wrapped.options().put(AUDIT_ADD_SEQUENCE_NUMBER_FIRST, "true");
107+
specialFields.add(SpecialFields.SEQUENCE_NUMBER);
108+
}
105109
}
106110

107111
/** Creates a PredicateReplaceVisitor that adjusts field indices by systemFieldCount. */
108112
private PredicateReplaceVisitor createPredicateConverter() {
109113
return p -> {
110-
if (p.index() < specialFieldCount) {
114+
if (p.index() < specialFields.size()) {
111115
return Optional.empty();
112116
}
113117
return Optional.of(
114118
new LeafPredicate(
115119
p.function(),
116120
p.type(),
117-
p.index() - specialFieldCount,
121+
p.index() - specialFields.size(),
118122
p.fieldName(),
119123
p.literals()));
120124
};
@@ -152,11 +156,7 @@ public String name() {
152156

153157
@Override
154158
public RowType rowType() {
155-
List<DataField> fields = new ArrayList<>();
156-
fields.add(SpecialFields.ROW_KIND);
157-
if (specialFieldCount > 1) {
158-
fields.add(SpecialFields.SEQUENCE_NUMBER);
159-
}
159+
List<DataField> fields = new ArrayList<>(specialFields);
160160
fields.addAll(wrapped.rowType().getFields());
161161
return new RowType(fields);
162162
}
@@ -243,6 +243,12 @@ public InnerTableRead newRead() {
243243

244244
@Override
245245
public Table copy(Map<String, String> dynamicOptions) {
246+
if (Boolean.parseBoolean(
247+
dynamicOptions.getOrDefault(
248+
AUDITLOG_READ_SEQUENCE_NUMBER_ENABLED.key(), "false"))) {
249+
throw new UnsupportedOperationException(
250+
"auditlog-read.sequence-number.enabled is not supported by hint.");
251+
}
246252
return new AuditLogTable(wrapped.copy(dynamicOptions));
247253
}
248254

@@ -648,8 +654,8 @@ class AuditLogRead implements InnerTableRead {
648654

649655
// Special index for rowkind field
650656
protected static final int ROW_KIND_INDEX = -1;
651-
// Special index for _SEQUENCE_NUMBER field
652-
protected static final int SEQUENCE_NUMBER_INDEX = -2;
657+
// _SEQUENCE_NUMBER is at index 0 by setting: OPERATION_ADD_SEQUENCE_NUMBER_FIRST
658+
protected static final int SEQUENCE_NUMBER_INDEX = 0;
653659

654660
protected final InnerTableRead dataRead;
655661

@@ -663,13 +669,13 @@ protected AuditLogRead(InnerTableRead dataRead) {
663669
/** Default projection, add system fields (rowkind, and optionally _SEQUENCE_NUMBER). */
664670
private int[] defaultProjection() {
665671
int dataFieldCount = wrapped.rowType().getFieldCount();
666-
int[] projection = new int[dataFieldCount + specialFieldCount];
672+
int[] projection = new int[dataFieldCount + specialFields.size()];
667673
projection[0] = ROW_KIND_INDEX;
668-
if (specialFieldCount > 1) {
674+
if (specialFields.contains(SpecialFields.SEQUENCE_NUMBER)) {
669675
projection[1] = SEQUENCE_NUMBER_INDEX;
670676
}
671677
for (int i = 0; i < dataFieldCount; i++) {
672-
projection[specialFieldCount + i] = i + specialFieldCount - 1;
678+
projection[specialFields.size() + i] = i + specialFields.size() - 1;
673679
}
674680
return projection;
675681
}
@@ -687,7 +693,7 @@ private int[] buildProjection(RowType readType) {
687693
} else if (fieldName.equals(SpecialFields.SEQUENCE_NUMBER.name())) {
688694
projection[i] = SEQUENCE_NUMBER_INDEX;
689695
} else {
690-
projection[i] = dataFieldIndex + specialFieldCount - 1;
696+
projection[i] = dataFieldIndex + specialFields.size() - 1;
691697
dataFieldIndex++;
692698
}
693699
}
@@ -764,11 +770,6 @@ public boolean isNullAt(int pos) {
764770

765771
@Override
766772
public long getLong(int pos) {
767-
int index = indexMapping[pos];
768-
if (index == AuditLogRead.SEQUENCE_NUMBER_INDEX) {
769-
// _SEQUENCE_NUMBER is at index 0 in bottom output
770-
return row.getLong(0);
771-
}
772773
return super.getLong(pos);
773774
}
774775

paimon-core/src/main/java/org/apache/paimon/table/system/BinlogTable.java

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import java.util.Map;
4242
import java.util.stream.IntStream;
4343

44+
import static org.apache.paimon.CoreOptions.AUDITLOG_READ_SEQUENCE_NUMBER_ENABLED;
4445
import static org.apache.paimon.catalog.Identifier.SYSTEM_TABLE_SPLITTER;
4546

4647
/**
@@ -67,11 +68,7 @@ public String name() {
6768

6869
@Override
6970
public RowType rowType() {
70-
List<DataField> fields = new ArrayList<>();
71-
fields.add(SpecialFields.ROW_KIND);
72-
if (specialFieldCount > 1) {
73-
fields.add(SpecialFields.SEQUENCE_NUMBER);
74-
}
71+
List<DataField> fields = new ArrayList<>(specialFields);
7572
for (DataField field : wrapped.rowType().getFields()) {
7673
// convert to nullable
7774
fields.add(field.newType(new ArrayType(field.type().nullable())));
@@ -86,6 +83,12 @@ public InnerTableRead newRead() {
8683

8784
@Override
8885
public Table copy(Map<String, String> dynamicOptions) {
86+
if (Boolean.parseBoolean(
87+
dynamicOptions.getOrDefault(
88+
AUDITLOG_READ_SEQUENCE_NUMBER_ENABLED.key(), "false"))) {
89+
throw new UnsupportedOperationException(
90+
"auditlog-read.sequence-number.enabled is not supported by hint.");
91+
}
8992
return new BinlogTable(wrapped.copy(dynamicOptions));
9093
}
9194

@@ -120,7 +123,7 @@ public RecordReader<InternalRow> createReader(Split split) throws IOException {
120123
// When sequence number is enabled, the underlying data layout is:
121124
// [_SEQUENCE_NUMBER, pk, pt, col1, ...]
122125
// We need to offset the field index to skip the sequence number field.
123-
int offset = specialFieldCount - 1;
126+
int offset = specialFields.size() - 1;
124127
InternalRow.FieldGetter[] fieldGetters =
125128
IntStream.range(0, wrappedReadType.getFieldCount())
126129
.mapToObj(
@@ -151,7 +154,7 @@ private InternalRow convertToArray(
151154
@Nullable InternalRow row2,
152155
InternalRow.FieldGetter[] fieldGetters) {
153156
// seqOffset is 1 if sequence number is enabled, 0 otherwise
154-
int seqOffset = specialFieldCount - 1;
157+
int seqOffset = specialFields.size() - 1;
155158
GenericRow row = new GenericRow(fieldGetters.length + seqOffset);
156159

157160
// Copy sequence number if enabled (it's at index 0 in input row)

paimon-core/src/test/java/org/apache/paimon/table/system/AuditLogTableTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ public void testReadSequenceNumberWithAlterTable() throws Exception {
8181
catalog.alterTable(
8282
identifier(tableName),
8383
SchemaChange.setOption(
84-
CoreOptions.CHANGELOG_READ_SEQUENCE_NUMBER_ENABLED.key(), "true"),
84+
CoreOptions.AUDITLOG_READ_SEQUENCE_NUMBER_ENABLED.key(), "true"),
8585
false);
8686

8787
// Re-fetch the audit_log table to get updated schema
@@ -113,7 +113,7 @@ private AuditLogTable createAuditLogTable(String tableName, boolean enableSequen
113113
.option(CoreOptions.CHANGELOG_PRODUCER.key(), "input")
114114
.option("bucket", "1");
115115
if (enableSequenceNumber) {
116-
schemaBuilder.option(CoreOptions.CHANGELOG_READ_SEQUENCE_NUMBER_ENABLED.key(), "true");
116+
schemaBuilder.option(CoreOptions.AUDITLOG_READ_SEQUENCE_NUMBER_ENABLED.key(), "true");
117117
}
118118

119119
TableSchema tableSchema =

paimon-core/src/test/java/org/apache/paimon/table/system/BinlogTableTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ public void testReadSequenceNumberWithAlterTable() throws Exception {
8383
catalog.alterTable(
8484
identifier(tableName),
8585
SchemaChange.setOption(
86-
CoreOptions.CHANGELOG_READ_SEQUENCE_NUMBER_ENABLED.key(), "true"),
86+
CoreOptions.AUDITLOG_READ_SEQUENCE_NUMBER_ENABLED.key(), "true"),
8787
false);
8888

8989
// Re-fetch the binlog table to get updated schema
@@ -115,7 +115,7 @@ private BinlogTable createBinlogTable(String tableName, boolean enableSequenceNu
115115
.option(CoreOptions.CHANGELOG_PRODUCER.key(), "input")
116116
.option("bucket", "1");
117117
if (enableSequenceNumber) {
118-
schemaBuilder.option(CoreOptions.CHANGELOG_READ_SEQUENCE_NUMBER_ENABLED.key(), "true");
118+
schemaBuilder.option(CoreOptions.AUDITLOG_READ_SEQUENCE_NUMBER_ENABLED.key(), "true");
119119
}
120120

121121
TableSchema tableSchema =

paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
import static org.apache.paimon.testutils.assertj.PaimonAssertions.anyCauseMatches;
5757
import static org.assertj.core.api.Assertions.assertThat;
5858
import static org.assertj.core.api.Assertions.assertThatThrownBy;
59+
import static org.junit.jupiter.api.Assertions.assertThrows;
5960

6061
/** ITCase for batch file store. */
6162
public class BatchFileStoreITCase extends CatalogITCaseBase {
@@ -1055,10 +1056,25 @@ public void testBinlogTableWithProjection() {
10551056

10561057
@Test
10571058
public void testAuditLogTableWithSequenceNumberEnabled() {
1058-
// Create primary key table with changelog-read.sequence-number.enabled option
1059+
// Creating an append-only table (no primary key) with
1060+
// auditlog-read.sequence-number.enabled option should throw an exception
1061+
assertThrows(
1062+
RuntimeException.class,
1063+
() ->
1064+
sql(
1065+
"CREATE TABLE test_table_seq (a int, b int, c AS a + b) "
1066+
+ "WITH ('auditlog-read.sequence-number.enabled'='true')"));
1067+
assertThrows(
1068+
RuntimeException.class,
1069+
() ->
1070+
sql(
1071+
"SELECT * FROM `test_table_dyn$audit_log`"
1072+
+ " /*+ OPTIONS('auditlog-read.sequence-number.enabled' = 'true') */"));
1073+
1074+
// Create primary key table with auditlog-read.sequence-number.enabled option
10591075
sql(
10601076
"CREATE TABLE test_table_seq (a int PRIMARY KEY NOT ENFORCED, b int, c AS a + b) "
1061-
+ "WITH ('changelog-read.sequence-number.enabled'='true');");
1077+
+ "WITH ('auditlog-read.sequence-number.enabled'='true');");
10621078
sql("INSERT INTO test_table_seq VALUES (1, 2)");
10631079
sql("INSERT INTO test_table_seq VALUES (3, 4)");
10641080

@@ -1086,8 +1102,8 @@ public void testAuditLogTableWithSequenceNumberAlterTable() {
10861102
sql("INSERT INTO test_table_dyn VALUES (1, 2)");
10871103
sql("INSERT INTO test_table_dyn VALUES (3, 4)");
10881104

1089-
// Add changelog-read.sequence-number.enabled option via ALTER TABLE
1090-
sql("ALTER TABLE test_table_dyn SET ('changelog-read.sequence-number.enabled'='true')");
1105+
// Add auditlog-read.sequence-number.enabled option via ALTER TABLE
1106+
sql("ALTER TABLE test_table_dyn SET ('auditlog-read.sequence-number.enabled'='true')");
10911107

10921108
// Test SELECT * includes _SEQUENCE_NUMBER (same as
10931109
// testAuditLogTableWithSequenceNumberEnabled)
@@ -1105,10 +1121,10 @@ public void testAuditLogTableWithSequenceNumberAlterTable() {
11051121

11061122
@Test
11071123
public void testBinlogTableWithSequenceNumberEnabled() {
1108-
// Create primary key table with changelog-read.sequence-number.enabled option
1124+
// Create primary key table with auditlog-read.sequence-number.enabled option
11091125
sql(
11101126
"CREATE TABLE test_table_seq (a int PRIMARY KEY NOT ENFORCED, b int) "
1111-
+ "WITH ('changelog-read.sequence-number.enabled'='true');");
1127+
+ "WITH ('auditlog-read.sequence-number.enabled'='true');");
11121128
sql("INSERT INTO test_table_seq VALUES (1, 2)");
11131129
sql("INSERT INTO test_table_seq VALUES (3, 4)");
11141130

@@ -1135,8 +1151,8 @@ public void testBinlogTableWithSequenceNumberAlterTable() {
11351151
sql("INSERT INTO test_table_dyn VALUES (1, 2)");
11361152
sql("INSERT INTO test_table_dyn VALUES (3, 4)");
11371153

1138-
// Add changelog-read.sequence-number.enabled option via ALTER TABLE
1139-
sql("ALTER TABLE test_table_dyn SET ('changelog-read.sequence-number.enabled'='true')");
1154+
// Add auditlog-read.sequence-number.enabled option via ALTER TABLE
1155+
sql("ALTER TABLE test_table_dyn SET ('auditlog-read.sequence-number.enabled'='true')");
11401156

11411157
// Test SELECT * includes _SEQUENCE_NUMBER (same as
11421158
// testBinlogTableWithSequenceNumberEnabled)

0 commit comments

Comments
 (0)