Skip to content

Commit e8c76a6

Browse files
committed
[TASK-2293] use column id instead of position in Partial Update
1 parent ec43289 commit e8c76a6

File tree

14 files changed

+268
-29
lines changed

14 files changed

+268
-29
lines changed

fluss-client/src/main/java/org/apache/fluss/client/table/writer/UpsertWriterImpl.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.fluss.client.write.WriteRecord;
2222
import org.apache.fluss.client.write.WriterClient;
2323
import org.apache.fluss.metadata.KvFormat;
24+
import org.apache.fluss.metadata.Schema;
2425
import org.apache.fluss.metadata.TableInfo;
2526
import org.apache.fluss.metadata.TablePath;
2627
import org.apache.fluss.row.BinaryRow;
@@ -79,7 +80,9 @@ class UpsertWriterImpl extends AbstractTableWriter implements UpsertWriter {
7980
tableInfo.getSchema().getAutoIncrementColumnNames(),
8081
partialUpdateColumns);
8182

82-
this.targetColumns = partialUpdateColumns;
83+
// Convert positions to field IDs for schema evolution safety.
84+
// The server will convert back to positions using the latest schema.
85+
this.targetColumns = toFieldIds(partialUpdateColumns, tableInfo.getSchema());
8386
// encode primary key using physical primary key
8487
this.primaryKeyEncoder =
8588
KeyEncoder.ofPrimaryKeyEncoder(
@@ -217,6 +220,18 @@ public CompletableFuture<DeleteResult> delete(InternalRow row) {
217220
return sendWithResult(record, DeleteResult::new);
218221
}
219222

223+
private static @Nullable int[] toFieldIds(@Nullable int[] positions, Schema schema) {
224+
if (positions == null) {
225+
return null;
226+
}
227+
List<Integer> columnIds = schema.getColumnIds();
228+
int[] fieldIds = new int[positions.length];
229+
for (int i = 0; i < positions.length; i++) {
230+
fieldIds[i] = columnIds.get(positions[i]);
231+
}
232+
return fieldIds;
233+
}
234+
220235
private BinaryRow encodeRow(InternalRow row) {
221236
if (kvFormat == KvFormat.INDEXED && row instanceof IndexedRow) {
222237
return (IndexedRow) row;

fluss-client/src/main/java/org/apache/fluss/client/utils/ClientRpcMessageUtils.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,7 @@ public static PutKvRequest makePutKvRequest(
167167
}
168168
if (targetColumns != null) {
169169
request.setTargetColumns(targetColumns);
170+
request.setTargetColumnsByIds(true);
170171
}
171172
// Set mergeMode in the request - this is the proper way to pass mergeMode to server
172173
request.setAggMode(mergeMode.getProtoValue());

fluss-rpc/src/main/proto/FlussApi.proto

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -239,6 +239,8 @@ message PutKvRequest {
239239
// Aggregation mode for this request (see AGG_MODE_* constants above)
240240
// 0 = AGGREGATE (default), 1 = OVERWRITE, 2 = LOCAL_AGGREGATE (not yet supported)
241241
optional int32 agg_mode = 6;
242+
// If true, target_columns are interpreted as field IDs; if false or absent, as field positions.
243+
optional bool target_columns_by_ids = 7;
242244
}
243245

244246
message PutKvResponse {

fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@
6767
import org.apache.fluss.server.log.LogTablet;
6868
import org.apache.fluss.server.metrics.group.TabletServerMetricGroup;
6969
import org.apache.fluss.server.utils.FatalErrorHandler;
70+
import org.apache.fluss.server.utils.ServerRpcMessageUtils;
7071
import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.BufferAllocator;
7172
import org.apache.fluss.types.RowType;
7273
import org.apache.fluss.utils.BytesUtils;
@@ -291,7 +292,7 @@ public long getFlushedLogOffset() {
291292
*/
292293
public LogAppendInfo putAsLeader(KvRecordBatch kvRecords, @Nullable int[] targetColumns)
293294
throws Exception {
294-
return putAsLeader(kvRecords, targetColumns, MergeMode.DEFAULT);
295+
return putAsLeader(kvRecords, targetColumns, false, MergeMode.DEFAULT);
295296
}
296297

297298
/**
@@ -316,7 +317,10 @@ public LogAppendInfo putAsLeader(KvRecordBatch kvRecords, @Nullable int[] target
316317
* @param mergeMode the merge mode (DEFAULT or OVERWRITE)
317318
*/
318319
public LogAppendInfo putAsLeader(
319-
KvRecordBatch kvRecords, @Nullable int[] targetColumns, MergeMode mergeMode)
320+
KvRecordBatch kvRecords,
321+
@Nullable int[] targetColumns,
322+
boolean targetColumnsByIds,
323+
MergeMode mergeMode)
320324
throws Exception {
321325
return inWriteLock(
322326
kvLock,
@@ -328,23 +332,32 @@ public LogAppendInfo putAsLeader(
328332
short latestSchemaId = (short) schemaInfo.getSchemaId();
329333
validateSchemaId(kvRecords.schemaId(), latestSchemaId);
330334

335+
// Convert field IDs to column positions if the client sent IDs.
336+
// Must use the latest schema for conversion (not the original
337+
// tableInfo schema) because columns may have been added/dropped.
338+
int[] resolvedTargetColumns = targetColumns;
339+
if (targetColumnsByIds && targetColumns != null) {
340+
resolvedTargetColumns =
341+
ServerRpcMessageUtils.convertFieldIdsToPositions(
342+
targetColumns, latestSchema);
343+
}
344+
331345
AutoIncrementUpdater currentAutoIncrementUpdater =
332346
autoIncrementManager.getUpdaterForSchema(kvFormat, latestSchemaId);
333347

334348
// Validate targetColumns doesn't contain auto-increment column
335-
currentAutoIncrementUpdater.validateTargetColumns(targetColumns);
349+
currentAutoIncrementUpdater.validateTargetColumns(resolvedTargetColumns);
336350

337351
// Determine the row merger based on mergeMode:
338352
// - DEFAULT: Use the configured merge engine (rowMerger)
339353
// - OVERWRITE: Bypass merge engine, use pre-created overwriteRowMerger
340354
// to directly replace values (for undo recovery scenarios)
341-
// We only support ADD COLUMN, so targetColumns is fine to be used directly.
342355
RowMerger currentMerger =
343356
(mergeMode == MergeMode.OVERWRITE)
344357
? overwriteRowMerger.configureTargetColumns(
345-
targetColumns, latestSchemaId, latestSchema)
358+
resolvedTargetColumns, latestSchemaId, latestSchema)
346359
: rowMerger.configureTargetColumns(
347-
targetColumns, latestSchemaId, latestSchema);
360+
resolvedTargetColumns, latestSchemaId, latestSchema);
348361

349362
RowType latestRowType = latestSchema.getRowType();
350363
WalBuilder walBuilder = createWalBuilder(latestSchemaId, latestRowType);

fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -949,6 +949,7 @@ public LogAppendInfo appendRecordsToFollower(MemoryLogRecords memoryLogRecords)
949949
public LogAppendInfo putRecordsToLeader(
950950
KvRecordBatch kvRecords,
951951
@Nullable int[] targetColumns,
952+
boolean targetColumnsByIds,
952953
MergeMode mergeMode,
953954
int requiredAcks)
954955
throws Exception {
@@ -968,7 +969,9 @@ public LogAppendInfo putRecordsToLeader(
968969
kv, "KvTablet for the replica to put kv records shouldn't be null.");
969970
LogAppendInfo logAppendInfo;
970971
try {
971-
logAppendInfo = kv.putAsLeader(kvRecords, targetColumns, mergeMode);
972+
logAppendInfo =
973+
kv.putAsLeader(
974+
kvRecords, targetColumns, targetColumnsByIds, mergeMode);
972975
} catch (IOException e) {
973976
LOG.error("Error while putting records to {}", tableBucket, e);
974977
fatalErrorHandler.onFatalError(e);

fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -565,6 +565,7 @@ public void putRecordsToKv(
565565
int requiredAcks,
566566
Map<TableBucket, KvRecordBatch> entriesPerBucket,
567567
@Nullable int[] targetColumns,
568+
boolean targetColumnsByIds,
568569
MergeMode mergeMode,
569570
short apiVersion,
570571
Consumer<List<PutKvResultForBucket>> responseCallback) {
@@ -574,7 +575,13 @@ public void putRecordsToKv(
574575

575576
long startTime = System.currentTimeMillis();
576577
Map<TableBucket, PutKvResultForBucket> kvPutResult =
577-
putToLocalKv(entriesPerBucket, targetColumns, mergeMode, requiredAcks, apiVersion);
578+
putToLocalKv(
579+
entriesPerBucket,
580+
targetColumns,
581+
targetColumnsByIds,
582+
mergeMode,
583+
requiredAcks,
584+
apiVersion);
578585
LOG.debug(
579586
"Put records to local kv storage and wait generate cdc log in {} ms",
580587
System.currentTimeMillis() - startTime);
@@ -730,6 +737,7 @@ public void lookups(
730737
requiredAcks,
731738
produceEntryData,
732739
schema.getPrimaryKeyIndexes(),
740+
false,
733741
MergeMode.DEFAULT,
734742
apiVersion,
735743
(result) ->
@@ -1201,6 +1209,7 @@ private Map<TableBucket, ProduceLogResultForBucket> appendToLocalLog(
12011209
private Map<TableBucket, PutKvResultForBucket> putToLocalKv(
12021210
Map<TableBucket, KvRecordBatch> entriesPerBucket,
12031211
@Nullable int[] targetColumns,
1212+
boolean targetColumnsByIds,
12041213
MergeMode mergeMode,
12051214
int requiredAcks,
12061215
short apiVersion) {
@@ -1216,7 +1225,11 @@ private Map<TableBucket, PutKvResultForBucket> putToLocalKv(
12161225
tableMetrics.totalPutKvRequests().inc();
12171226
LogAppendInfo appendInfo =
12181227
replica.putRecordsToLeader(
1219-
entry.getValue(), targetColumns, mergeMode, requiredAcks);
1228+
entry.getValue(),
1229+
targetColumns,
1230+
targetColumnsByIds,
1231+
mergeMode,
1232+
requiredAcks);
12201233
LOG.trace(
12211234
"Written to local kv for {}, and the cdc log beginning at offset {} and ending at offset {}",
12221235
tb,

fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletService.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@
108108
import static org.apache.fluss.server.utils.ServerRpcMessageUtils.getStopReplicaData;
109109
import static org.apache.fluss.server.utils.ServerRpcMessageUtils.getTargetColumns;
110110
import static org.apache.fluss.server.utils.ServerRpcMessageUtils.getUpdateMetadataRequestData;
111+
import static org.apache.fluss.server.utils.ServerRpcMessageUtils.isTargetColumnsByIds;
111112
import static org.apache.fluss.server.utils.ServerRpcMessageUtils.makeFetchLogResponse;
112113
import static org.apache.fluss.server.utils.ServerRpcMessageUtils.makeInitWriterResponse;
113114
import static org.apache.fluss.server.utils.ServerRpcMessageUtils.makeLimitScanResponse;
@@ -236,6 +237,7 @@ public CompletableFuture<PutKvResponse> putKv(PutKvRequest request) {
236237
request.getAcks(),
237238
putKvData,
238239
getTargetColumns(request),
240+
isTargetColumnsByIds(request),
239241
mergeMode,
240242
currentSession().getApiVersion(),
241243
bucketResponse -> response.complete(makePutKvResponse(bucketResponse)));

fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.apache.fluss.metadata.PartitionSpec;
3434
import org.apache.fluss.metadata.PhysicalTablePath;
3535
import org.apache.fluss.metadata.ResolvedPartitionSpec;
36+
import org.apache.fluss.metadata.Schema;
3637
import org.apache.fluss.metadata.TableBucket;
3738
import org.apache.fluss.metadata.TableChange;
3839
import org.apache.fluss.metadata.TableDescriptor;
@@ -1033,6 +1034,29 @@ public static Map<TableBucket, List<byte[]>> toPrefixLookupData(
10331034
return targetColumns.length == 0 ? null : targetColumns;
10341035
}
10351036

1037+
public static boolean isTargetColumnsByIds(PutKvRequest putKvRequest) {
1038+
return putKvRequest.hasTargetColumnsByIds() && putKvRequest.isTargetColumnsByIds();
1039+
}
1040+
1041+
/** Converts field IDs to column positions using the schema's column ID mapping. */
1042+
public static int[] convertFieldIdsToPositions(int[] fieldIds, Schema schema) {
1043+
List<Integer> columnIds = schema.getColumnIds();
1044+
Map<Integer, Integer> idToPosition = new HashMap<>();
1045+
for (int i = 0; i < columnIds.size(); i++) {
1046+
idToPosition.put(columnIds.get(i), i);
1047+
}
1048+
int[] positions = new int[fieldIds.length];
1049+
for (int i = 0; i < fieldIds.length; i++) {
1050+
Integer pos = idToPosition.get(fieldIds[i]);
1051+
if (pos == null) {
1052+
throw new IllegalArgumentException(
1053+
"Target column field ID " + fieldIds[i] + " not found in schema");
1054+
}
1055+
positions[i] = pos;
1056+
}
1057+
return positions;
1058+
}
1059+
10361060
public static PutKvResponse makePutKvResponse(Collection<PutKvResultForBucket> kvPutResult) {
10371061
PutKvResponse putKvResponse = new PutKvResponse();
10381062
List<PbPutKvRespForBucket> putKvRespForBucketList = new ArrayList<>();

0 commit comments

Comments
 (0)