-
Notifications
You must be signed in to change notification settings - Fork 1.2k
[core] Add option to support reading sequence_number in AuditLogTable and BinlogTable #6933
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
8bc109b to
1e104c6
Compare
yunfengzhou-hub
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR. Left some comments as below.
| <td><h5>changelog-read.sequence-number.enabled</h5></td> | ||
| <td style="word-wrap: break-word;">false</td> | ||
| <td>Boolean</td> | ||
| <td>Whether to include _SEQUENCE_NUMBER field in audit_log and binlog system tables. This is only valid for primary key tables.</td> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The output result of an audit log table can be controlled by incremental-between-scan-mode, where changelog is only one of the options. So we might better not include the word "changelog" in the config option name.
|
|
||
| public AuditLogTable(FileStoreTable wrapped) { | ||
| this.wrapped = wrapped; | ||
| this.wrapped.schema().options().put(AUDIT_LOG_ENABLED, "true"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about organize the two new configurations in the following way?
- User sets config A.
- In
AuditLogTable, the code detects whether config A is set. If so, set config B to true. - In
ValueContentRowDataRecordIterator, the code only checks config B, instead of configA && configB.
The benefit of such structure is that it can decouple system table concepts from ValueContentRowDataRecordIterator. config B need not be related to audit log table.
|
|
||
| private final FileStoreTable wrapped; | ||
| /** Number of special fields (rowkind, and optionally _SEQUENCE_NUMBER). */ | ||
| protected final int specialFieldCount; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about introducing List<SpecialField> specialFields? If so, we will be able to change
fields.add(SpecialFields.ROW_KIND);
if (specialFieldCount > 1) {
fields.add(SpecialFields.SEQUENCE_NUMBER);
}into
fields.add(specialFields);| // _SEQUENCE_NUMBER is at index 0 in bottom output | ||
| return row.getLong(0); | ||
| } | ||
| return super.getLong(pos); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If _SEQUENCE_NUMBER is at index 0, then the other fields should be acquired through pos - 1.
| public long getLong(int pos) { | ||
| int index = indexMapping[pos]; | ||
| if (index == AuditLogRead.SEQUENCE_NUMBER_INDEX) { | ||
| // _SEQUENCE_NUMBER is at index 0 in bottom output |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason behind this comment is located in another class(ValueContentRowDataRecordIterator), and these two classes are mainly associated with AUDIT_LOG_ENABLED, which shows a generic concept not related to sequence number.
Thus it might increase the other developer's burden to understand why "_SEQUENCE_NUMBER is at index 0". We might need to think about how to increase code readability here.
| write(table, GenericRow.ofKind(RowKind.INSERT, 1, 2, 5)); | ||
| write(table, GenericRow.ofKind(RowKind.UPDATE_BEFORE, 1, 2, 5)); | ||
| write(table, GenericRow.ofKind(RowKind.UPDATE_AFTER, 1, 4, 6)); | ||
| write(table, GenericRow.ofKind(RowKind.UPDATE_AFTER, 1, 2, 6)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change seems unnecessary.
| expectedRow.add( | ||
| GenericRow.of( | ||
| BinaryString.fromString(RowKind.UPDATE_BEFORE.shortString()), 1, 2, 5)); | ||
| BinaryString.fromString(RowKind.UPDATE_AFTER.shortString()), 1, 2, 6)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the original code, the expected result contains 4 rows. Here only three rows are left, missing an UPDATE_BEFORE row.
| // Create primary key table with changelog-read.sequence-number.enabled option | ||
| sql( | ||
| "CREATE TABLE test_table_seq (a int PRIMARY KEY NOT ENFORCED, b int, c AS a + b) " | ||
| + "WITH ('changelog-read.sequence-number.enabled'='true');"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Despite that users are allowed to set changelog-read.sequence-number.enabled when creating or altering table, a more recommended way is to configure it dynamically in the SELECT query through Flink SQL Hints. It should be more suitable for such configurations that are only useful in specific read queries. The usage of SQL hints is like follows
SELECT * FROM `test_table_seq$audit_log`/*+ OPTIONS('changelog-read.sequence-number.enabled' = 'true') */;Let's add test cases for this use case as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's also add some test cases to verify what will happen if changelog-read.sequence-number.enabled is configured on append-only tables.
Purpose
This PR adds support for reading
_SEQUENCE_NUMBERfield inaudit_logandbinlogsystem tables for primary key tables.A new configuration option
changelog-read.sequence-number.enabled(default:false) is introduced to control whether to include the_SEQUENCE_NUMBERfield in the output schema of audit_log and binlog system tables. When enabled, users can access the internal sequence number that Paimon uses for ordering records within primary key tables.Key changes:
CoreOptions.CHANGELOG_READ_SEQUENCE_NUMBER_ENABLEDconfigurationAuditLogTableandBinlogTableto optionally include_SEQUENCE_NUMBERfield in their row typesValueContentRowDataRecordIteratorto output sequence number when enabledTests
AuditLogTableTest- basic auditlog table reading and sequence number with table optionBinlogTableTest- basic binlog table reading and sequence number with table optionBatchFileStoreITCase- Flink integration test for audit_log and binlog reading sequence numberAPI and Format
changelog-read.sequence-number.enabled(Boolean, default:false)audit_logandbinlogsystem tables will have an additional_SEQUENCE_NUMBERcolumn afterrowkindDocumentation
core_configuration.html