Skip to content

Commit b3147ef

Browse files
committed
[TASK-2293] use column id instead of position in Partial Update
1 parent ec43289 commit b3147ef

File tree

9 files changed

+214
-3
lines changed

9 files changed

+214
-3
lines changed

fluss-client/src/main/java/org/apache/fluss/client/table/writer/UpsertWriterImpl.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.fluss.client.write.WriteRecord;
2222
import org.apache.fluss.client.write.WriterClient;
2323
import org.apache.fluss.metadata.KvFormat;
24+
import org.apache.fluss.metadata.Schema;
2425
import org.apache.fluss.metadata.TableInfo;
2526
import org.apache.fluss.metadata.TablePath;
2627
import org.apache.fluss.row.BinaryRow;
@@ -79,7 +80,9 @@ class UpsertWriterImpl extends AbstractTableWriter implements UpsertWriter {
7980
tableInfo.getSchema().getAutoIncrementColumnNames(),
8081
partialUpdateColumns);
8182

82-
this.targetColumns = partialUpdateColumns;
83+
// Convert positions to field IDs for schema evolution safety.
84+
// The server will convert back to positions using the latest schema.
85+
this.targetColumns = toFieldIds(partialUpdateColumns, tableInfo.getSchema());
8386
// encode primary key using physical primary key
8487
this.primaryKeyEncoder =
8588
KeyEncoder.ofPrimaryKeyEncoder(
@@ -217,6 +220,19 @@ public CompletableFuture<DeleteResult> delete(InternalRow row) {
217220
return sendWithResult(record, DeleteResult::new);
218221
}
219222

223+
private static @Nullable int[] toFieldIds(
224+
@Nullable int[] positions, Schema schema) {
225+
if (positions == null) {
226+
return null;
227+
}
228+
List<Integer> columnIds = schema.getColumnIds();
229+
int[] fieldIds = new int[positions.length];
230+
for (int i = 0; i < positions.length; i++) {
231+
fieldIds[i] = columnIds.get(positions[i]);
232+
}
233+
return fieldIds;
234+
}
235+
220236
private BinaryRow encodeRow(InternalRow row) {
221237
if (kvFormat == KvFormat.INDEXED && row instanceof IndexedRow) {
222238
return (IndexedRow) row;

fluss-client/src/main/java/org/apache/fluss/client/utils/ClientRpcMessageUtils.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,7 @@ public static PutKvRequest makePutKvRequest(
167167
}
168168
if (targetColumns != null) {
169169
request.setTargetColumns(targetColumns);
170+
request.setTargetColumnsByIds(true);
170171
}
171172
// Set mergeMode in the request - this is the proper way to pass mergeMode to server
172173
request.setAggMode(mergeMode.getProtoValue());

fluss-rpc/src/main/proto/FlussApi.proto

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -239,6 +239,8 @@ message PutKvRequest {
239239
// Aggregation mode for this request (see AGG_MODE_* constants above)
240240
// 0 = AGGREGATE (default), 1 = OVERWRITE, 2 = LOCAL_AGGREGATE (not yet supported)
241241
optional int32 agg_mode = 6;
242+
// If true, target_columns are interpreted as field IDs; if false or absent, as field positions.
243+
optional bool target_columns_by_ids = 7;
242244
}
243245

244246
message PutKvResponse {

fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@
104104
import org.apache.fluss.server.replica.fetcher.InitialFetchStatus;
105105
import org.apache.fluss.server.replica.fetcher.ReplicaFetcherManager;
106106
import org.apache.fluss.server.utils.FatalErrorHandler;
107+
import org.apache.fluss.server.utils.ServerRpcMessageUtils;
107108
import org.apache.fluss.server.zk.ZooKeeperClient;
108109
import org.apache.fluss.server.zk.data.lake.LakeTableSnapshot;
109110
import org.apache.fluss.utils.FileUtils;
@@ -565,6 +566,7 @@ public void putRecordsToKv(
565566
int requiredAcks,
566567
Map<TableBucket, KvRecordBatch> entriesPerBucket,
567568
@Nullable int[] targetColumns,
569+
boolean targetColumnsByIds,
568570
MergeMode mergeMode,
569571
short apiVersion,
570572
Consumer<List<PutKvResultForBucket>> responseCallback) {
@@ -574,7 +576,13 @@ public void putRecordsToKv(
574576

575577
long startTime = System.currentTimeMillis();
576578
Map<TableBucket, PutKvResultForBucket> kvPutResult =
577-
putToLocalKv(entriesPerBucket, targetColumns, mergeMode, requiredAcks, apiVersion);
579+
putToLocalKv(
580+
entriesPerBucket,
581+
targetColumns,
582+
targetColumnsByIds,
583+
mergeMode,
584+
requiredAcks,
585+
apiVersion);
578586
LOG.debug(
579587
"Put records to local kv storage and wait generate cdc log in {} ms",
580588
System.currentTimeMillis() - startTime);
@@ -730,6 +738,7 @@ public void lookups(
730738
requiredAcks,
731739
produceEntryData,
732740
schema.getPrimaryKeyIndexes(),
741+
false,
733742
MergeMode.DEFAULT,
734743
apiVersion,
735744
(result) ->
@@ -1201,6 +1210,7 @@ private Map<TableBucket, ProduceLogResultForBucket> appendToLocalLog(
12011210
private Map<TableBucket, PutKvResultForBucket> putToLocalKv(
12021211
Map<TableBucket, KvRecordBatch> entriesPerBucket,
12031212
@Nullable int[] targetColumns,
1213+
boolean targetColumnsByIds,
12041214
MergeMode mergeMode,
12051215
int requiredAcks,
12061216
short apiVersion) {
@@ -1212,11 +1222,18 @@ private Map<TableBucket, PutKvResultForBucket> putToLocalKv(
12121222
LOG.trace("Put records to local kv tablet for table bucket {}", tb);
12131223
Replica replica = getReplicaOrException(tb);
12141224
validateClientVersionForPkTable(apiVersion, replica.getTableInfo());
1225+
// Convert field IDs to column positions if the client sent IDs
1226+
int[] resolvedTargetColumns = targetColumns;
1227+
if (targetColumnsByIds && targetColumns != null) {
1228+
resolvedTargetColumns =
1229+
ServerRpcMessageUtils.convertFieldIdsToPositions(
1230+
targetColumns, replica.getTableInfo().getSchema());
1231+
}
12151232
tableMetrics = replica.tableMetrics();
12161233
tableMetrics.totalPutKvRequests().inc();
12171234
LogAppendInfo appendInfo =
12181235
replica.putRecordsToLeader(
1219-
entry.getValue(), targetColumns, mergeMode, requiredAcks);
1236+
entry.getValue(), resolvedTargetColumns, mergeMode, requiredAcks);
12201237
LOG.trace(
12211238
"Written to local kv for {}, and the cdc log beginning at offset {} and ending at offset {}",
12221239
tb,

fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletService.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@
108108
import static org.apache.fluss.server.utils.ServerRpcMessageUtils.getStopReplicaData;
109109
import static org.apache.fluss.server.utils.ServerRpcMessageUtils.getTargetColumns;
110110
import static org.apache.fluss.server.utils.ServerRpcMessageUtils.getUpdateMetadataRequestData;
111+
import static org.apache.fluss.server.utils.ServerRpcMessageUtils.isTargetColumnsByIds;
111112
import static org.apache.fluss.server.utils.ServerRpcMessageUtils.makeFetchLogResponse;
112113
import static org.apache.fluss.server.utils.ServerRpcMessageUtils.makeInitWriterResponse;
113114
import static org.apache.fluss.server.utils.ServerRpcMessageUtils.makeLimitScanResponse;
@@ -236,6 +237,7 @@ public CompletableFuture<PutKvResponse> putKv(PutKvRequest request) {
236237
request.getAcks(),
237238
putKvData,
238239
getTargetColumns(request),
240+
isTargetColumnsByIds(request),
239241
mergeMode,
240242
currentSession().getApiVersion(),
241243
bucketResponse -> response.complete(makePutKvResponse(bucketResponse)));

fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.apache.fluss.metadata.PartitionSpec;
3434
import org.apache.fluss.metadata.PhysicalTablePath;
3535
import org.apache.fluss.metadata.ResolvedPartitionSpec;
36+
import org.apache.fluss.metadata.Schema;
3637
import org.apache.fluss.metadata.TableBucket;
3738
import org.apache.fluss.metadata.TableChange;
3839
import org.apache.fluss.metadata.TableDescriptor;
@@ -1033,6 +1034,29 @@ public static Map<TableBucket, List<byte[]>> toPrefixLookupData(
10331034
return targetColumns.length == 0 ? null : targetColumns;
10341035
}
10351036

1037+
public static boolean isTargetColumnsByIds(PutKvRequest putKvRequest) {
1038+
return putKvRequest.hasTargetColumnsByIds() && putKvRequest.isTargetColumnsByIds();
1039+
}
1040+
1041+
/** Converts field IDs to column positions using the schema's column ID mapping. */
1042+
public static int[] convertFieldIdsToPositions(int[] fieldIds, Schema schema) {
1043+
List<Integer> columnIds = schema.getColumnIds();
1044+
Map<Integer, Integer> idToPosition = new HashMap<>();
1045+
for (int i = 0; i < columnIds.size(); i++) {
1046+
idToPosition.put(columnIds.get(i), i);
1047+
}
1048+
int[] positions = new int[fieldIds.length];
1049+
for (int i = 0; i < fieldIds.length; i++) {
1050+
Integer pos = idToPosition.get(fieldIds[i]);
1051+
if (pos == null) {
1052+
throw new IllegalArgumentException(
1053+
"Target column field ID " + fieldIds[i] + " not found in schema");
1054+
}
1055+
positions[i] = pos;
1056+
}
1057+
return positions;
1058+
}
1059+
10361060
public static PutKvResponse makePutKvResponse(Collection<PutKvResultForBucket> kvPutResult) {
10371061
PutKvResponse putKvResponse = new PutKvResponse();
10381062
List<PbPutKvRespForBucket> putKvRespForBucketList = new ArrayList<>();

fluss-server/src/test/java/org/apache/fluss/server/tablet/TabletServiceITCase.java

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -916,6 +916,78 @@ void testBecomeLeaderOrFollowerWithOneTabletServerOffline() throws Exception {
916916
assertThat(result.get(0).getError().error()).isEqualTo(Errors.NONE);
917917
}
918918

919+
@Test
920+
void testPutKvWithPartialUpdate() throws Exception {
921+
// Schema: a (PK), b (nullable), c (nullable), d (nullable)
922+
TablePath tablePath = TablePath.of("test_db_1", "test_partial_update_t1");
923+
Schema schema =
924+
Schema.newBuilder()
925+
.column("a", DataTypes.INT())
926+
.column("b", DataTypes.STRING())
927+
.column("c", DataTypes.STRING())
928+
.column("d", DataTypes.BOOLEAN())
929+
.primaryKey("a")
930+
.build();
931+
RowType rowType = schema.getRowType();
932+
RowType pkType = DataTypes.ROW(new DataField("a", DataTypes.INT()));
933+
TableDescriptor descriptor =
934+
TableDescriptor.builder().schema(schema).distributedBy(1, "a").build();
935+
long tableId = createTable(FLUSS_CLUSTER_EXTENSION, tablePath, descriptor);
936+
TableBucket tb = new TableBucket(tableId, 0);
937+
938+
FLUSS_CLUSTER_EXTENSION.waitUntilAllReplicaReady(tb);
939+
940+
int leader = FLUSS_CLUSTER_EXTENSION.waitAndGetLeader(tb);
941+
TabletServerGateway leaderGateWay =
942+
FLUSS_CLUSTER_EXTENSION.newTabletServerClientForNode(leader);
943+
944+
// 1. Insert a full row: {1, "val_b", "val_c", true}
945+
List<Tuple2<Object[], Object[]>> fullRow =
946+
Collections.singletonList(
947+
Tuple2.of(new Object[] {1}, new Object[] {1, "val_b", "val_c", true}));
948+
assertPutKvResponse(
949+
leaderGateWay
950+
.putKv(
951+
newPutKvRequest(
952+
tableId, 0, 1, genKvRecordBatch(pkType, rowType, fullRow)))
953+
.get());
954+
955+
// 2. Partial update: target columns a,b (indices/ids [0,1]), update b to "new_b"
956+
List<Tuple2<Object[], Object[]>> partialRow =
957+
Collections.singletonList(
958+
Tuple2.of(new Object[] {1}, new Object[] {1, "new_b", null, null}));
959+
assertPutKvResponse(
960+
leaderGateWay
961+
.putKv(
962+
newPutKvRequest(
963+
tableId,
964+
0,
965+
1,
966+
genKvRecordBatch(pkType, rowType, partialRow),
967+
new int[] {0, 1},
968+
true))
969+
.get());
970+
971+
// 3. Lookup and verify: b updated, c and d preserved
972+
CompactedKeyEncoder keyEncoder = new CompactedKeyEncoder(rowType, new int[] {0});
973+
byte[] keyBytes = keyEncoder.encodeKey(row(new Object[] {1}));
974+
TestingSchemaGetter schemaGetter = new TestingSchemaGetter(DEFAULT_SCHEMA_ID, schema);
975+
ValueDecoder valueDecoder = new ValueDecoder(schemaGetter, KvFormat.COMPACTED);
976+
977+
PbLookupRespForBucket lookupResp =
978+
leaderGateWay
979+
.lookup(newLookupRequest(tableId, 0, keyBytes))
980+
.get()
981+
.getBucketsRespAt(0);
982+
assertThat(lookupResp.hasErrorCode()).isFalse();
983+
InternalRow resultRow =
984+
valueDecoder.decodeValue(lookupResp.getValuesList().get(0).getValues()).row;
985+
assertThat(resultRow.getInt(0)).isEqualTo(1);
986+
assertThat(resultRow.getString(1).toString()).isEqualTo("new_b");
987+
assertThat(resultRow.getString(2).toString()).isEqualTo("val_c");
988+
assertThat(resultRow.getBoolean(3)).isTrue();
989+
}
990+
919991
private static void assertPutKvResponse(PutKvResponse putKvResponse) {
920992
assertThat(putKvResponse.getBucketsRespsCount()).isEqualTo(1);
921993
PbPutKvRespForBucket putKvRespForBucket = putKvResponse.getBucketsRespsList().get(0);

fluss-server/src/test/java/org/apache/fluss/server/testutils/RpcMessageTestUtils.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -209,8 +209,22 @@ public static ProduceLogRequest newProduceLogRequest(
209209

210210
public static PutKvRequest newPutKvRequest(
211211
long tableId, int bucketId, int acks, KvRecordBatch kvRecordBatch) {
212+
return newPutKvRequest(tableId, bucketId, acks, kvRecordBatch, null, false);
213+
}
214+
215+
public static PutKvRequest newPutKvRequest(
216+
long tableId,
217+
int bucketId,
218+
int acks,
219+
KvRecordBatch kvRecordBatch,
220+
@Nullable int[] targetColumns,
221+
boolean targetColumnsByIds) {
212222
PutKvRequest putKvRequest = new PutKvRequest();
213223
putKvRequest.setTableId(tableId).setAcks(acks).setTimeoutMs(10000);
224+
if (targetColumns != null) {
225+
putKvRequest.setTargetColumns(targetColumns);
226+
putKvRequest.setTargetColumnsByIds(targetColumnsByIds);
227+
}
214228
PbPutKvReqForBucket pbPutKvReqForBucket = new PbPutKvReqForBucket();
215229
pbPutKvReqForBucket.setBucketId(bucketId);
216230
if (kvRecordBatch instanceof DefaultKvRecordBatch) {
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.server.utils;
19+
20+
import org.apache.fluss.metadata.Schema;
21+
import org.apache.fluss.types.DataTypes;
22+
23+
import org.junit.jupiter.api.Test;
24+
25+
import java.util.Arrays;
26+
27+
import static org.assertj.core.api.Assertions.assertThat;
28+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
29+
30+
/** Tests for {@link ServerRpcMessageUtils}. */
31+
class ServerRpcMessageUtilsTest {
32+
33+
@Test
34+
void testConvertFieldIdsToPositions() {
35+
// Schema with ID gaps: a(id=0, pos=0), b(id=2, pos=1), c(id=3, pos=2)
36+
// Simulates a schema after a column (id=1) was dropped.
37+
Schema schema =
38+
Schema.newBuilder()
39+
.fromColumns(
40+
Arrays.asList(
41+
new Schema.Column("a", DataTypes.INT(), null, 0),
42+
new Schema.Column("b", DataTypes.STRING(), null, 2),
43+
new Schema.Column("c", DataTypes.STRING(), null, 3)))
44+
.primaryKey("a")
45+
.build();
46+
47+
// Field IDs [0, 2] should map to positions [0, 1]
48+
assertThat(ServerRpcMessageUtils.convertFieldIdsToPositions(new int[] {0, 2}, schema))
49+
.isEqualTo(new int[] {0, 1});
50+
51+
// Field IDs [0, 3] should map to positions [0, 2]
52+
assertThat(ServerRpcMessageUtils.convertFieldIdsToPositions(new int[] {0, 3}, schema))
53+
.isEqualTo(new int[] {0, 2});
54+
55+
// Non-existent field ID should throw
56+
assertThatThrownBy(
57+
() ->
58+
ServerRpcMessageUtils.convertFieldIdsToPositions(
59+
new int[] {0, 99}, schema))
60+
.isInstanceOf(IllegalArgumentException.class)
61+
.hasMessageContaining("Target column field ID 99 not found in schema");
62+
}
63+
}

0 commit comments

Comments
 (0)