Skip to content

Commit ed453a8

Browse files
author
bosiew.tian
committed
[core] Support reading sequence_number in AuditLogTable and BinlogTable
1 parent 816a76f commit ed453a8

File tree

21 files changed

+1226
-219
lines changed

21 files changed

+1226
-219
lines changed

paimon-core/src/main/java/org/apache/paimon/operation/MergeFileSplitRead.java

Lines changed: 85 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
import org.apache.paimon.table.source.ChainSplit;
5151
import org.apache.paimon.table.source.DataSplit;
5252
import org.apache.paimon.table.source.DeletionFile;
53+
import org.apache.paimon.table.source.KeyValueSystemFieldsRecordReader;
5354
import org.apache.paimon.table.source.Split;
5455
import org.apache.paimon.types.DataField;
5556
import org.apache.paimon.types.RowType;
@@ -62,6 +63,7 @@
6263
import java.io.IOException;
6364
import java.util.ArrayList;
6465
import java.util.Arrays;
66+
import java.util.Collections;
6567
import java.util.Comparator;
6668
import java.util.List;
6769
import java.util.Set;
@@ -97,6 +99,11 @@ public class MergeFileSplitRead implements SplitRead<KeyValue> {
9799
@Nullable private int[][] outerProjection;
98100
@Nullable private VariantAccessInfo[] variantAccess;
99101

102+
private List<KeyValueSystemFieldsRecordReader.SystemFieldExtractor> systemFieldExtractors =
103+
Collections.emptyList();
104+
105+
@Nullable private int[] projection = null;
106+
100107
private boolean forceKeepDelete = false;
101108

102109
public MergeFileSplitRead(
@@ -137,18 +144,31 @@ public MergeFileSplitRead withReadKeyType(RowType readKeyType) {
137144
return this;
138145
}
139146

147+
public List<KeyValueSystemFieldsRecordReader.SystemFieldExtractor> getSystemFieldExtractors() {
148+
return systemFieldExtractors;
149+
}
150+
151+
@Nullable
152+
public int[] getProjection() {
153+
return projection;
154+
}
155+
140156
@Override
141157
public MergeFileSplitRead withReadType(RowType readType) {
158+
this.systemFieldExtractors = collectSystemFieldExtractors(readType);
159+
this.projection = createProjection(readType);
160+
142161
// todo: replace projectedFields with readType
143162
RowType tableRowType = tableSchema.logicalRowType();
163+
List<String> fieldNames = tableSchema.fieldNames();
144164
int[][] projectedFields =
145165
Arrays.stream(tableRowType.getFieldIndices(readType.getFieldNames()))
166+
.filter(i -> i >= 0) // Filter out system fields (index = -1)
146167
.mapToObj(i -> new int[] {i})
147168
.toArray(int[][]::new);
148169
int[][] newProjectedFields = projectedFields;
149170
if (sequenceFields.size() > 0) {
150171
// make sure projection contains sequence fields
151-
List<String> fieldNames = tableSchema.fieldNames();
152172
List<String> projectedNames = Projection.of(projectedFields).project(fieldNames);
153173
int[] lackFields =
154174
sequenceFields.stream()
@@ -408,4 +428,68 @@ public UserDefinedSeqComparator createUdsComparator() {
408428
return UserDefinedSeqComparator.create(
409429
readerFactoryBuilder.readValueType(), sequenceFields, sequenceOrder);
410430
}
431+
432+
/**
433+
* Collects system field extractors for the requested read type.
434+
*
435+
* @param readType the requested read type (may contain system fields)
436+
* @return list of extractors for system fields present in readType
437+
*/
438+
private List<KeyValueSystemFieldsRecordReader.SystemFieldExtractor>
439+
collectSystemFieldExtractors(RowType readType) {
440+
if (readType == null) {
441+
return Collections.emptyList();
442+
}
443+
444+
List<KeyValueSystemFieldsRecordReader.SystemFieldExtractor> extractors = new ArrayList<>();
445+
for (String fieldName : readType.getFieldNames()) {
446+
KeyValueSystemFieldsRecordReader.SystemFieldExtractor extractor =
447+
KeyValueSystemFieldsRecordReader.getExtractor(fieldName);
448+
if (extractor != null) {
449+
extractors.add(extractor);
450+
}
451+
}
452+
return extractors;
453+
}
454+
455+
/**
456+
* Creates a projection array to reorder fields from natural order to requested order.
457+
*
458+
* <p>Example: readType = [pt, rowkind, col1], systemFieldExtractors = [rowkind] Natural order:
459+
* [rowkind(0), pt(1), col1(2)] (physical fields pt, col1 in readType order) Requested order:
460+
* [pt, rowkind, col1] Projection: [1, 0, 2]
461+
*
462+
* @param readType the requested read type (may contain system fields)
463+
* @return projection array, or null if fields are already in natural order
464+
*/
465+
@Nullable
466+
private int[] createProjection(RowType readType) {
467+
if (readType == null || systemFieldExtractors.isEmpty()) {
468+
return null;
469+
}
470+
471+
List<String> readFieldNames = readType.getFieldNames();
472+
int[] projection = new int[readFieldNames.size()];
473+
// System fields are first in natural order
474+
int systemIdx = 0;
475+
// Physical fields follow system fields in natural order
476+
int physicalIdx = systemFieldExtractors.size();
477+
boolean needsProjection = false;
478+
479+
for (int i = 0; i < readFieldNames.size(); i++) {
480+
String fieldName = readFieldNames.get(i);
481+
// Check if it's a system field
482+
if (KeyValueSystemFieldsRecordReader.getExtractor(fieldName) != null) {
483+
projection[i] = systemIdx++;
484+
} else {
485+
projection[i] = physicalIdx++;
486+
}
487+
488+
if (projection[i] != i) {
489+
needsProjection = true;
490+
}
491+
}
492+
493+
return needsProjection ? projection : null;
494+
}
411495
}
Lines changed: 254 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,254 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.table.source;
20+
21+
import org.apache.paimon.KeyValue;
22+
import org.apache.paimon.data.BinaryString;
23+
import org.apache.paimon.data.GenericRow;
24+
import org.apache.paimon.data.InternalRow;
25+
import org.apache.paimon.data.JoinedRow;
26+
import org.apache.paimon.reader.RecordReader;
27+
import org.apache.paimon.table.SpecialFields;
28+
import org.apache.paimon.utils.ProjectedRow;
29+
30+
import javax.annotation.Nullable;
31+
32+
import java.io.IOException;
33+
import java.util.HashMap;
34+
import java.util.List;
35+
import java.util.Map;
36+
37+
/**
38+
* A decorator for {@link RecordReader} that injects system fields into the output rows for
39+
* KeyValue-based data sources.
40+
*
41+
* <p>This reader wraps a {@code RecordReader<KeyValue>} and produces {@code
42+
* RecordReader<InternalRow>} with additional system fields extracted from the KeyValue metadata.
43+
*
44+
* <p><b>Naming:</b> This class is specifically designed for KeyValue format data (e.g.,
45+
* MergeFileSplitRead). For InternalRow readers (e.g., RawFileSplitRead), system fields are handled
46+
* differently in {@link org.apache.paimon.io.DataFileRecordReader} using file metadata.
47+
*
48+
* <p><b>Field Ordering:</b> The output schema supports arbitrary field ordering. Internally, fields
49+
* are assembled as [system fields... + physical fields...], then reordered using {@link
50+
* ProjectedRow} to match the requested field order.
51+
*
52+
* <p><b>Performance:</b> Implementation uses {@link JoinedRow} for zero-copy concatenation of
53+
* system fields and physical fields, then {@link ProjectedRow} for zero-copy field reordering.
54+
*/
55+
public class KeyValueSystemFieldsRecordReader implements RecordReader<InternalRow> {
56+
57+
private final RecordReader<KeyValue> wrapped;
58+
private final List<SystemFieldExtractor> systemFieldExtractors;
59+
@Nullable private final int[] projection;
60+
61+
/**
62+
* Creates a KeyValueSystemFieldsRecordReader.
63+
*
64+
* @param wrapped the underlying KeyValue reader
65+
* @param systemFieldExtractors extractors for system fields, in order
66+
* @param projection optional projection to reorder fields. If null, fields are in natural order
67+
* [system fields... + physical fields...]. If provided, projects from the natural order to
68+
* the desired order.
69+
*/
70+
public KeyValueSystemFieldsRecordReader(
71+
RecordReader<KeyValue> wrapped,
72+
List<SystemFieldExtractor> systemFieldExtractors,
73+
@Nullable int[] projection) {
74+
this.wrapped = wrapped;
75+
this.systemFieldExtractors = systemFieldExtractors;
76+
this.projection = projection;
77+
}
78+
79+
@Nullable
80+
@Override
81+
public RecordIterator<InternalRow> readBatch() throws IOException {
82+
RecordIterator<KeyValue> batch = wrapped.readBatch();
83+
if (batch == null) {
84+
return null;
85+
}
86+
return new SystemFieldsRecordIterator(batch);
87+
}
88+
89+
@Override
90+
public void close() throws IOException {
91+
wrapped.close();
92+
}
93+
94+
private class SystemFieldsRecordIterator implements RecordIterator<InternalRow> {
95+
96+
private final RecordIterator<KeyValue> kvIterator;
97+
private final JoinedRow joinedRow;
98+
private final GenericRow systemFieldsRow;
99+
@Nullable private final ProjectedRow projectedRow;
100+
101+
private SystemFieldsRecordIterator(RecordIterator<KeyValue> kvIterator) {
102+
this.kvIterator = kvIterator;
103+
this.joinedRow = new JoinedRow();
104+
this.systemFieldsRow = new GenericRow(systemFieldExtractors.size());
105+
// If projection is provided, use ProjectedRow to reorder fields
106+
this.projectedRow = projection != null ? ProjectedRow.from(projection) : null;
107+
}
108+
109+
@Nullable
110+
@Override
111+
public InternalRow next() throws IOException {
112+
KeyValue kv = kvIterator.next();
113+
if (kv == null) {
114+
return null;
115+
}
116+
117+
InternalRow value = kv.value();
118+
119+
// Extract system fields into the reusable row
120+
for (int i = 0; i < systemFieldExtractors.size(); i++) {
121+
SystemFieldExtractor extractor = systemFieldExtractors.get(i);
122+
Object systemValue = extractor.extract(kv);
123+
systemFieldsRow.setField(i, systemValue);
124+
}
125+
126+
// Join system fields first, then physical fields
127+
// Natural order: [system fields...] + [physical fields...]
128+
joinedRow.replace(systemFieldsRow, value);
129+
joinedRow.setRowKind(kv.valueKind());
130+
131+
// If projection is provided, reorder to match requested order
132+
if (projectedRow != null) {
133+
return projectedRow.replaceRow(joinedRow);
134+
}
135+
return joinedRow;
136+
}
137+
138+
@Override
139+
public void releaseBatch() {
140+
kvIterator.releaseBatch();
141+
}
142+
}
143+
144+
/**
145+
* Wraps a KeyValue reader with system field injection if needed.
146+
*
147+
* @param reader the KeyValue reader
148+
* @param systemFieldExtractors extractors for system fields (empty if no system fields needed)
149+
* @param projection optional projection to reorder fields from natural order [system fields...
150+
* + physical fields...] to desired order
151+
* @return a reader producing InternalRow with system fields, or a simple unwrapped reader if no
152+
* system fields
153+
*/
154+
public static RecordReader<InternalRow> wrap(
155+
RecordReader<KeyValue> reader,
156+
List<SystemFieldExtractor> systemFieldExtractors,
157+
@Nullable int[] projection) {
158+
if (systemFieldExtractors.isEmpty()) {
159+
// No system fields, use the default unwrap logic
160+
return KeyValueTableRead.unwrap(reader);
161+
}
162+
return new KeyValueSystemFieldsRecordReader(reader, systemFieldExtractors, projection);
163+
}
164+
165+
// ========== Internal Extractor Interface ==========
166+
167+
/**
168+
* Internal interface for extracting system fields from {@link KeyValue} objects.
169+
*
170+
* <p>System fields are metadata fields like {@code _SEQUENCE_NUMBER}, {@code _LEVEL}, {@code
171+
* rowkind} that are derived from the KeyValue container itself rather than the stored data.
172+
*
173+
* <p><b>Note:</b> This interface is specifically for KeyValue-based extraction. For InternalRow
174+
* readers (e.g., RawFileSplitRead), system fields are handled differently in {@link
175+
* org.apache.paimon.io.DataFileRecordReader} using file metadata.
176+
*
177+
* <p>All field definitions are sourced from {@link SpecialFields} to maintain consistency
178+
* across the codebase.
179+
*
180+
* <p>Each extractor is stateless and thread-safe.
181+
*/
182+
@FunctionalInterface
183+
public interface SystemFieldExtractor {
184+
185+
/**
186+
* Extracts the system field value from a KeyValue object.
187+
*
188+
* @param kv the KeyValue to extract from
189+
* @return the extracted value, or null if not applicable
190+
*/
191+
@Nullable
192+
Object extract(KeyValue kv);
193+
194+
// ========== Built-in Extractors ==========
195+
196+
/**
197+
* Extractor for {@code _SEQUENCE_NUMBER} system field.
198+
*
199+
* <p>Extracts the sequence number from KeyValue metadata.
200+
*/
201+
SystemFieldExtractor SEQUENCE_NUMBER = kv -> kv.sequenceNumber();
202+
203+
/**
204+
* Extractor for {@code rowkind} system field (used in AuditLogTable).
205+
*
206+
* <p>Extracts the row kind from KeyValue's valueKind.
207+
*/
208+
SystemFieldExtractor ROW_KIND = kv -> BinaryString.fromString(kv.valueKind().shortString());
209+
210+
/**
211+
* Extractor for {@code _LEVEL} system field (LSM tree level).
212+
*
213+
* <p>Note: Currently not extractable from KeyValue at read time. This is a placeholder for
214+
* future implementation where level information would need to be tracked through the read
215+
* path.
216+
*/
217+
SystemFieldExtractor LEVEL = kv -> null; // TODO: Level information needs to be propagated
218+
219+
/**
220+
* Extractor for {@code _ROW_ID} system field.
221+
*
222+
* <p>Note: ROW_ID is typically handled by DataFileRecordReader for InternalRow-based
223+
* readers. This extractor is provided for completeness but may not be used in KeyValue
224+
* scenarios.
225+
*/
226+
SystemFieldExtractor ROW_ID =
227+
kv -> null; // ROW_ID is computed from file metadata, not available in KeyValue
228+
}
229+
230+
// ========== Registry ==========
231+
232+
/** Registry for system field extractors. */
233+
private static final Map<String, SystemFieldExtractor> EXTRACTOR_REGISTRY = new HashMap<>();
234+
235+
static {
236+
// Register all extractors that can be used with KeyValue
237+
EXTRACTOR_REGISTRY.put(
238+
SpecialFields.SEQUENCE_NUMBER.name(), SystemFieldExtractor.SEQUENCE_NUMBER);
239+
EXTRACTOR_REGISTRY.put(SpecialFields.ROW_KIND.name(), SystemFieldExtractor.ROW_KIND);
240+
EXTRACTOR_REGISTRY.put(SpecialFields.LEVEL.name(), SystemFieldExtractor.LEVEL);
241+
EXTRACTOR_REGISTRY.put(SpecialFields.ROW_ID.name(), SystemFieldExtractor.ROW_ID);
242+
}
243+
244+
/**
245+
* Gets an extractor by field name.
246+
*
247+
* @param fieldName the system field name
248+
* @return the extractor, or null if not a registered system field
249+
*/
250+
@Nullable
251+
public static SystemFieldExtractor getExtractor(String fieldName) {
252+
return EXTRACTOR_REGISTRY.get(fieldName);
253+
}
254+
}

0 commit comments

Comments
 (0)