Skip to content

Commit 670deb9

Browse files
authored
[Fix](routine load) Fix routine load partial update (#59209)
### What problem does this PR solve? 1. fix`"partial_colunms"="true"` doesn't take effect in routine load 2. add `"partial_update_new_key_behavior"="APPEND"/"ERROR"` property in routine load doc: apache/doris-website#3211
1 parent faa32ab commit 670deb9

File tree

8 files changed

+510
-5
lines changed

8 files changed

+510
-5
lines changed

fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
import org.apache.doris.qe.ConnectContext;
5252
import org.apache.doris.rpc.RpcException;
5353
import org.apache.doris.thrift.TFileCompressType;
54+
import org.apache.doris.thrift.TPartialUpdateNewRowPolicy;
5455
import org.apache.doris.transaction.TransactionState;
5556
import org.apache.doris.transaction.TransactionStatus;
5657

@@ -790,6 +791,14 @@ private void modifyPropertiesInternal(Map<String, String> jobProperties,
790791
if (jobProperties.containsKey(CreateRoutineLoadInfo.PARTIAL_COLUMNS)) {
791792
this.isPartialUpdate = BooleanUtils.toBoolean(jobProperties.get(CreateRoutineLoadInfo.PARTIAL_COLUMNS));
792793
}
794+
if (jobProperties.containsKey(CreateRoutineLoadInfo.PARTIAL_UPDATE_NEW_KEY_POLICY)) {
795+
String policy = jobProperties.get(CreateRoutineLoadInfo.PARTIAL_UPDATE_NEW_KEY_POLICY);
796+
if ("ERROR".equalsIgnoreCase(policy)) {
797+
this.partialUpdateNewKeyPolicy = TPartialUpdateNewRowPolicy.ERROR;
798+
} else {
799+
this.partialUpdateNewKeyPolicy = TPartialUpdateNewRowPolicy.APPEND;
800+
}
801+
}
793802
}
794803
LOG.info("modify the properties of kafka routine load job: {}, jobProperties: {}, datasource properties: {}",
795804
this.id, jobProperties, dataSourceProperties);
@@ -962,6 +971,6 @@ public NereidsRoutineLoadTaskInfo toNereidsRoutineLoadTaskInfo() throws UserExce
962971
return new NereidsRoutineLoadTaskInfo(execMemLimit, new HashMap<>(jobProperties), maxBatchIntervalS,
963972
partitionNamesInfo, mergeType, deleteCondition, sequenceCol, maxFilterRatio, importColumnDescs,
964973
precedingFilter, whereExpr, columnSeparator, lineDelimiter, enclose, escape, sendBatchParallelism,
965-
loadToSingleTablet, isPartialUpdate, memtableOnSinkNode);
974+
loadToSingleTablet, isPartialUpdate, partialUpdateNewKeyPolicy, memtableOnSinkNode);
966975
}
967976
}

fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@
6868
import org.apache.doris.task.LoadTaskInfo;
6969
import org.apache.doris.thrift.TFileFormatType;
7070
import org.apache.doris.thrift.TFileType;
71+
import org.apache.doris.thrift.TPartialUpdateNewRowPolicy;
7172
import org.apache.doris.thrift.TPipelineFragmentParams;
7273
import org.apache.doris.thrift.TUniqueId;
7374
import org.apache.doris.transaction.AbstractTxnStateChangeCallback;
@@ -223,6 +224,7 @@ public boolean isFinalState() {
223224
protected long maxBatchSizeBytes = DEFAULT_MAX_BATCH_SIZE;
224225

225226
protected boolean isPartialUpdate = false;
227+
protected TPartialUpdateNewRowPolicy partialUpdateNewKeyPolicy = TPartialUpdateNewRowPolicy.APPEND;
226228

227229
protected String sequenceCol;
228230

@@ -388,6 +390,9 @@ protected void setOptional(CreateRoutineLoadInfo info) throws UserException {
388390
jobProperties.put(info.PARTIAL_COLUMNS, info.isPartialUpdate() ? "true" : "false");
389391
if (info.isPartialUpdate()) {
390392
this.isPartialUpdate = true;
393+
this.partialUpdateNewKeyPolicy = info.getPartialUpdateNewKeyPolicy();
394+
jobProperties.put(info.PARTIAL_UPDATE_NEW_KEY_POLICY,
395+
this.partialUpdateNewKeyPolicy == TPartialUpdateNewRowPolicy.ERROR ? "ERROR" : "APPEND");
391396
}
392397
jobProperties.put(info.MAX_FILTER_RATIO_PROPERTY, String.valueOf(maxFilterRatio));
393398

@@ -1869,6 +1874,10 @@ public String jobPropertiesToJsonString() {
18691874

18701875
// job properties defined in CreateRoutineLoadStmt
18711876
jobProperties.put(CreateRoutineLoadInfo.PARTIAL_COLUMNS, String.valueOf(isPartialUpdate));
1877+
if (isPartialUpdate) {
1878+
jobProperties.put(CreateRoutineLoadInfo.PARTIAL_UPDATE_NEW_KEY_POLICY,
1879+
partialUpdateNewKeyPolicy == TPartialUpdateNewRowPolicy.ERROR ? "ERROR" : "APPEND");
1880+
}
18721881
jobProperties.put(CreateRoutineLoadInfo.MAX_ERROR_NUMBER_PROPERTY, String.valueOf(maxErrorNum));
18731882
jobProperties.put(CreateRoutineLoadInfo.MAX_BATCH_INTERVAL_SEC_PROPERTY, String.valueOf(maxBatchIntervalS));
18741883
jobProperties.put(CreateRoutineLoadInfo.MAX_BATCH_ROWS_PROPERTY, String.valueOf(maxBatchRows));
@@ -1921,6 +1930,12 @@ public void gsonPostProcess() throws IOException {
19211930
jobProperties.forEach((k, v) -> {
19221931
if (k.equals(CreateRoutineLoadInfo.PARTIAL_COLUMNS)) {
19231932
isPartialUpdate = Boolean.parseBoolean(v);
1933+
} else if (k.equals(CreateRoutineLoadInfo.PARTIAL_UPDATE_NEW_KEY_POLICY)) {
1934+
if ("ERROR".equalsIgnoreCase(v)) {
1935+
partialUpdateNewKeyPolicy = TPartialUpdateNewRowPolicy.ERROR;
1936+
} else {
1937+
partialUpdateNewKeyPolicy = TPartialUpdateNewRowPolicy.APPEND;
1938+
}
19241939
}
19251940
});
19261941
try {

fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsRoutineLoadTaskInfo.java

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@
2828
import org.apache.doris.thrift.TFileCompressType;
2929
import org.apache.doris.thrift.TFileFormatType;
3030
import org.apache.doris.thrift.TFileType;
31+
import org.apache.doris.thrift.TPartialUpdateNewRowPolicy;
32+
import org.apache.doris.thrift.TUniqueKeyUpdateMode;
3133

3234
import com.google.common.base.Strings;
3335

@@ -65,7 +67,8 @@ public class NereidsRoutineLoadTaskInfo implements NereidsLoadTaskInfo {
6567
protected boolean emptyFieldAsNull;
6668
protected int sendBatchParallelism;
6769
protected boolean loadToSingleTablet;
68-
protected boolean isPartialUpdate;
70+
protected TUniqueKeyUpdateMode uniquekeyUpdateMode = TUniqueKeyUpdateMode.UPSERT;
71+
protected TPartialUpdateNewRowPolicy partialUpdateNewKeyPolicy = TPartialUpdateNewRowPolicy.APPEND;
6972
protected boolean memtableOnSinkNode;
7073
protected int timeoutSec;
7174

@@ -77,7 +80,8 @@ public NereidsRoutineLoadTaskInfo(long execMemLimit, Map<String, String> jobProp
7780
String sequenceCol, double maxFilterRatio, NereidsImportColumnDescs columnDescs,
7881
Expression precedingFilter, Expression whereExpr, Separator columnSeparator,
7982
Separator lineDelimiter, byte enclose, byte escape, int sendBatchParallelism,
80-
boolean loadToSingleTablet, boolean isPartialUpdate, boolean memtableOnSinkNode) {
83+
boolean loadToSingleTablet, boolean isPartialUpdate, TPartialUpdateNewRowPolicy partialUpdateNewKeyPolicy,
84+
boolean memtableOnSinkNode) {
8185
this.execMemLimit = execMemLimit;
8286
this.jobProperties = jobProperties;
8387
this.maxBatchIntervalS = maxBatchIntervalS;
@@ -95,7 +99,10 @@ public NereidsRoutineLoadTaskInfo(long execMemLimit, Map<String, String> jobProp
9599
this.escape = escape;
96100
this.sendBatchParallelism = sendBatchParallelism;
97101
this.loadToSingleTablet = loadToSingleTablet;
98-
this.isPartialUpdate = isPartialUpdate;
102+
if (isPartialUpdate) {
103+
this.uniquekeyUpdateMode = TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS;
104+
}
105+
this.partialUpdateNewKeyPolicy = partialUpdateNewKeyPolicy;
99106
this.memtableOnSinkNode = memtableOnSinkNode;
100107
this.timeoutSec = calTimeoutSec();
101108
}
@@ -311,7 +318,22 @@ public List<String> getHiddenColumns() {
311318

312319
@Override
313320
public boolean isFixedPartialUpdate() {
314-
return isPartialUpdate;
321+
return uniquekeyUpdateMode == TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS;
322+
}
323+
324+
@Override
325+
public TUniqueKeyUpdateMode getUniqueKeyUpdateMode() {
326+
return uniquekeyUpdateMode;
327+
}
328+
329+
@Override
330+
public boolean isFlexiblePartialUpdate() {
331+
return uniquekeyUpdateMode == TUniqueKeyUpdateMode.UPDATE_FLEXIBLE_COLUMNS;
332+
}
333+
334+
@Override
335+
public TPartialUpdateNewRowPolicy getPartialUpdateNewRowPolicy() {
336+
return partialUpdateNewKeyPolicy;
315337
}
316338

317339
@Override

fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateRoutineLoadInfo.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
import org.apache.doris.nereids.util.PlanUtils;
5757
import org.apache.doris.qe.ConnectContext;
5858
import org.apache.doris.resource.workloadgroup.WorkloadGroup;
59+
import org.apache.doris.thrift.TPartialUpdateNewRowPolicy;
5960

6061
import com.google.common.base.Strings;
6162
import com.google.common.collect.ImmutableSet;
@@ -88,6 +89,7 @@ public class CreateRoutineLoadInfo {
8889
public static final String EXEC_MEM_LIMIT_PROPERTY = "exec_mem_limit";
8990

9091
public static final String PARTIAL_COLUMNS = "partial_columns";
92+
public static final String PARTIAL_UPDATE_NEW_KEY_POLICY = "partial_update_new_key_behavior";
9193
public static final String WORKLOAD_GROUP = "workload_group";
9294
public static final String ENDPOINT_REGEX = "[-A-Za-z0-9+&@#/%?=~_|!:,.;]+[-A-Za-z0-9+&@#/%=~_|]";
9395
public static final String SEND_BATCH_PARALLELISM = "send_batch_parallelism";
@@ -122,6 +124,7 @@ public class CreateRoutineLoadInfo {
122124
.add(SEND_BATCH_PARALLELISM)
123125
.add(LOAD_TO_SINGLE_TABLET)
124126
.add(PARTIAL_COLUMNS)
127+
.add(PARTIAL_UPDATE_NEW_KEY_POLICY)
125128
.add(WORKLOAD_GROUP)
126129
.add(FileFormatProperties.PROP_FORMAT)
127130
.add(JsonFileFormatProperties.PROP_JSON_PATHS)
@@ -166,6 +169,7 @@ public class CreateRoutineLoadInfo {
166169
* support partial columns load(Only Unique Key Columns)
167170
*/
168171
private boolean isPartialUpdate = false;
172+
private TPartialUpdateNewRowPolicy partialUpdateNewKeyPolicy = TPartialUpdateNewRowPolicy.APPEND;
169173

170174
private String comment = "";
171175

@@ -195,6 +199,15 @@ public CreateRoutineLoadInfo(LabelNameInfo labelNameInfo, String tableName,
195199
.createDataSource(typeName, dataSourceProperties, this.isMultiTable);
196200
this.mergeType = mergeType;
197201
this.isPartialUpdate = this.jobProperties.getOrDefault(PARTIAL_COLUMNS, "false").equalsIgnoreCase("true");
202+
if (this.isPartialUpdate && this.jobProperties.containsKey(PARTIAL_UPDATE_NEW_KEY_POLICY)) {
203+
String policyStr = this.jobProperties.get(PARTIAL_UPDATE_NEW_KEY_POLICY).toUpperCase();
204+
if ("APPEND".equals(policyStr)) {
205+
this.partialUpdateNewKeyPolicy = TPartialUpdateNewRowPolicy.APPEND;
206+
} else if ("ERROR".equals(policyStr)) {
207+
this.partialUpdateNewKeyPolicy = TPartialUpdateNewRowPolicy.ERROR;
208+
}
209+
// validation will be done in checkJobProperties()
210+
}
198211
if (comment != null) {
199212
this.comment = comment;
200213
}
@@ -276,6 +289,10 @@ public boolean isPartialUpdate() {
276289
return isPartialUpdate;
277290
}
278291

292+
public TPartialUpdateNewRowPolicy getPartialUpdateNewKeyPolicy() {
293+
return partialUpdateNewKeyPolicy;
294+
}
295+
279296
public String getComment() {
280297
return comment;
281298
}
@@ -515,6 +532,19 @@ public void checkJobProperties() throws UserException {
515532
}
516533
timezone = TimeUtils.checkTimeZoneValidAndStandardize(jobProperties.getOrDefault(TIMEZONE, timezone));
517534

535+
// check partial_update_new_key_behavior
536+
if (jobProperties.containsKey(PARTIAL_UPDATE_NEW_KEY_POLICY)) {
537+
if (!isPartialUpdate) {
538+
throw new AnalysisException(
539+
PARTIAL_UPDATE_NEW_KEY_POLICY + " can only be set when partial_columns is true");
540+
}
541+
String policy = jobProperties.get(PARTIAL_UPDATE_NEW_KEY_POLICY).toUpperCase();
542+
if (!"APPEND".equals(policy) && !"ERROR".equals(policy)) {
543+
throw new AnalysisException(
544+
PARTIAL_UPDATE_NEW_KEY_POLICY + " should be one of {'APPEND', 'ERROR'}, but found " + policy);
545+
}
546+
}
547+
518548
String format = jobProperties.getOrDefault(FileFormatProperties.PROP_FORMAT, "csv");
519549
fileFormatProperties = FileFormatProperties.createFileFormatProperties(format);
520550
fileFormatProperties.analyzeFileFormatProperties(jobProperties, false);
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
-- This file is automatically generated. You should know what you did if you want to edit this
2+
-- !select_initial --
3+
1 alice 100 20
4+
2 bob 90 21
5+
3 charlie 80 22
6+
7+
-- !select_after_partial_update --
8+
1 alice 150 20
9+
2 bob 95 21
10+
3 charlie 80 22
11+
100 \N 100 \N
12+
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
-- This file is automatically generated. You should know what you did if you want to edit this
2+
-- !select_initial --
3+
1 1 1 1
4+
2 2 2 2
5+
3 3 3 3
6+
7+
-- !select_after_append --
8+
1 10 1 1
9+
2 20 2 2
10+
3 3 3 3
11+
4 40 \N \N
12+
5 50 \N \N
13+
14+
-- !select_after_error_mode --
15+
1 1 100 1
16+
2 2 200 2
17+
3 3 3 3
18+
4 4 40 4
19+
5 5 50 5
20+
21+
-- !select_after_error_rejected --
22+
1 1 100 1
23+
2 2 200 2
24+
3 3 3 3
25+
4 4 40 4
26+
5 5 50 5
27+
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
import org.apache.doris.regression.util.RoutineLoadTestUtils
19+
import org.apache.kafka.clients.producer.KafkaProducer
20+
import org.apache.kafka.clients.producer.ProducerRecord
21+
22+
suite("test_routine_load_partial_update", "nonConcurrent") {
23+
def kafkaCsvTopic = "test_routine_load_partial_update"
24+
25+
if (RoutineLoadTestUtils.isKafkaTestEnabled(context)) {
26+
def runSql = { String q -> sql q }
27+
def kafka_broker = RoutineLoadTestUtils.getKafkaBroker(context)
28+
def producer = RoutineLoadTestUtils.createKafkaProducer(kafka_broker)
29+
30+
def tableName = "test_routine_load_partial_update"
31+
def job = "test_partial_update_job"
32+
33+
sql """ DROP TABLE IF EXISTS ${tableName} force;"""
34+
sql """
35+
CREATE TABLE IF NOT EXISTS ${tableName} (
36+
`id` int NULL,
37+
`name` varchar(65533) NULL,
38+
`score` int NULL,
39+
`age` int NULL
40+
) ENGINE=OLAP
41+
UNIQUE KEY(`id`)
42+
COMMENT 'test partial update'
43+
DISTRIBUTED BY HASH(`id`) BUCKETS 3
44+
PROPERTIES (
45+
"replication_allocation" = "tag.location.default: 1",
46+
"enable_unique_key_merge_on_write" = "true"
47+
);
48+
"""
49+
50+
// insert initial data
51+
sql """
52+
INSERT INTO ${tableName} VALUES
53+
(1, 'alice', 100, 20),
54+
(2, 'bob', 90, 21),
55+
(3, 'charlie', 80, 22)
56+
"""
57+
58+
qt_select_initial "SELECT * FROM ${tableName} ORDER BY id"
59+
60+
try {
61+
// create routine load with partial_columns=true
62+
// only update id and score columns
63+
sql """
64+
CREATE ROUTINE LOAD ${job} ON ${tableName}
65+
COLUMNS TERMINATED BY ",",
66+
COLUMNS (id, score)
67+
PROPERTIES
68+
(
69+
"max_batch_interval" = "10",
70+
"partial_columns" = "true"
71+
)
72+
FROM KAFKA
73+
(
74+
"kafka_broker_list" = "${kafka_broker}",
75+
"kafka_topic" = "${kafkaCsvTopic}",
76+
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
77+
);
78+
"""
79+
80+
// send partial update data to kafka
81+
// update score for id=1 from 100 to 150
82+
// update score for id=2 from 90 to 95
83+
def data = [
84+
"1,150",
85+
"2,95",
86+
"100,100"
87+
]
88+
89+
data.each { line ->
90+
logger.info("Sending to Kafka: ${line}")
91+
def record = new ProducerRecord<>(kafkaCsvTopic, null, line)
92+
producer.send(record).get()
93+
}
94+
producer.flush()
95+
96+
// wait for routine load task to finish
97+
RoutineLoadTestUtils.waitForTaskFinish(runSql, job, tableName, 3)
98+
99+
// verify partial update: score should be updated, name and age should remain unchanged
100+
qt_select_after_partial_update "SELECT * FROM ${tableName} ORDER BY id"
101+
} catch (Exception e) {
102+
logger.error("Error during test: " + e.getMessage())
103+
throw e
104+
} finally {
105+
sql "STOP ROUTINE LOAD FOR ${job}"
106+
}
107+
}
108+
}

0 commit comments

Comments
 (0)