Skip to content

Commit 13b1b4d

Browse files
shitouFlechazoW
authored andcommitted
[hotfix-870][doris] add flush retry and sleep timeout.
1 parent acf0e55 commit 13b1b4d

File tree

7 files changed

+73
-18
lines changed

7 files changed

+73
-18
lines changed

chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/options/DorisConf.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,13 +52,24 @@ public class DorisConf extends JdbcConf {
5252

5353
/** * default value is 3 */
5454
private Integer maxRetries = 3;
55+
/** retry load sleep timeout* */
56+
private long waitRetryMills = 18000;
57+
5558
/** 是否配置了NameMapping, true, RowData中将携带名称匹配后的数据库和表名, sink端配置的database和table失效* */
5659
private boolean nameMapped;
5760

5861
private LoadConf loadConf;
5962

6063
private Properties loadProperties;
6164

65+
public long getWaitRetryMills() {
66+
return waitRetryMills;
67+
}
68+
69+
public void setWaitRetryMills(long waitRetryMills) {
70+
this.waitRetryMills = waitRetryMills;
71+
}
72+
6273
public String getDatabase() {
6374
return database;
6475
}

chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/options/DorisConfBuilder.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,16 @@ public DorisConfBuilder setFlushIntervalMills(long flushIntervalMills) {
8888
return this;
8989
}
9090

91+
public DorisConfBuilder setMaxRetries(int maxRetries) {
92+
this.dorisConf.setMaxRetries(maxRetries);
93+
return this;
94+
}
95+
96+
public DorisConfBuilder setWaitRetryMills(long waitRetryMills) {
97+
this.dorisConf.setWaitRetryMills(waitRetryMills);
98+
return this;
99+
}
100+
91101
public DorisConf build() {
92102
StringJoiner errorMessage = new StringJoiner("\n");
93103

chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/options/DorisKeys.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ public final class DorisKeys {
3636

3737
public static final String FLUSH_INTERNAL_MS_KEY = "flushIntervalMills";
3838

39+
public static final String WAITRETRIES_MS_KEY = "waitRetryMills";
40+
3941
public static final String MAX_RETRIES_KEY = "maxRetries";
4042

4143
public static final String DATABASE_KEY = "database";

chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/rest/DorisLoadClient.java

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
package com.dtstack.chunjun.connector.doris.rest;
2222

2323
import com.dtstack.chunjun.conf.FieldConf;
24+
import com.dtstack.chunjun.connector.doris.DorisUtil;
2425
import com.dtstack.chunjun.connector.doris.options.DorisConf;
2526
import com.dtstack.chunjun.converter.AbstractRowConverter;
2627
import com.dtstack.chunjun.element.ColumnRowData;
@@ -36,7 +37,6 @@
3637

3738
import javax.annotation.Nonnull;
3839

39-
import java.io.IOException;
4040
import java.io.Serializable;
4141
import java.util.Arrays;
4242
import java.util.HashMap;
@@ -63,8 +63,6 @@ public class DorisLoadClient implements Serializable {
6363
private final Set<String> metaHeader =
6464
Stream.of("schema", "table", "type", "opTime", "ts", "scn")
6565
.collect(Collectors.toCollection(HashSet::new));
66-
67-
private static final String LOAD_URL_PATTERN = "http://%s/api/%s/%s/_stream_load?";
6866
private static final String KEY_SCHEMA = "schema";
6967
private static final String KEY_TABLE = "table";
7068
private static final String KEY_POINT = ".";
@@ -73,16 +71,13 @@ public class DorisLoadClient implements Serializable {
7371

7472
private final DorisStreamLoad dorisStreamLoad;
7573
private final boolean nameMapped;
76-
private String hostPort;
7774
private final DorisConf conf;
7875

79-
public DorisLoadClient(DorisStreamLoad dorisStreamLoad, DorisConf conf, String hostPort) {
76+
public DorisLoadClient(DorisStreamLoad dorisStreamLoad, DorisConf conf) {
8077
this.dorisStreamLoad = dorisStreamLoad;
81-
this.hostPort = hostPort;
8278
this.conf = conf;
8379
this.nameMapped = conf.isNameMapped();
8480
}
85-
8681
/**
8782
* Each time a RowData is processed, a Carrier is obtained and then returned.
8883
*
@@ -218,13 +213,15 @@ private void processWithGenericRowData(
218213
* @param carrier data carrier
219214
* @throws WriteRecordException
220215
*/
221-
public void flush(Carrier carrier) throws WriteRecordException {
216+
public void flush(final Carrier carrier) throws WriteRecordException {
222217
try {
223-
dorisStreamLoad.load(
218+
DorisUtil.doRetry(
219+
dorisStreamLoad::load,
220+
dorisStreamLoad::replaceBackend,
224221
carrier,
225-
String.format(
226-
LOAD_URL_PATTERN, hostPort, carrier.getDatabase(), carrier.getTable()));
227-
} catch (IOException e) {
222+
conf.getMaxRetries(),
223+
conf.getWaitRetryMills());
224+
} catch (Exception e) {
228225
String errorMessage = "write record failed.";
229226
throw new WriteRecordException(errorMessage, e, -1, carrier.toString());
230227
}

chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/rest/DorisStreamLoad.java

Lines changed: 35 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -63,10 +63,14 @@ public class DorisStreamLoad implements Serializable {
6363
private static final ObjectMapper OM = new ObjectMapper();
6464
private static final List<String> DORIS_SUCCESS_STATUS =
6565
new ArrayList<>(Arrays.asList("Success", "Publish Timeout"));
66-
private final String authEncoding;
67-
private final Properties streamLoadProp;
66+
private static final String LOAD_URL_PATTERN = "http://%s/api/%s/%s/_stream_load?";
67+
private String authEncoding;
68+
private Properties streamLoadProp;
69+
private String hostPort;
70+
private DorisConf options;
6871

6972
public DorisStreamLoad(DorisConf options) {
73+
this.options = options;
7074
this.authEncoding =
7175
Base64.getEncoder()
7276
.encodeToString(
@@ -75,6 +79,14 @@ public DorisStreamLoad(DorisConf options) {
7579
this.streamLoadProp = options.getLoadProperties();
7680
}
7781

82+
public void setHostPort(String hostPort) {
83+
this.hostPort = hostPort;
84+
}
85+
86+
public void setOptions(DorisConf options) {
87+
this.options = options;
88+
}
89+
7890
/**
7991
* Generate Http Put request.
8092
*
@@ -138,15 +150,33 @@ public String toString() {
138150
}
139151
}
140152

153+
public void replaceBackend() throws IOException {
154+
String backend = getBackend();
155+
this.setHostPort(backend);
156+
LOG.info("replace backend node to {}", backend);
157+
}
158+
159+
private String getBackend() throws IOException {
160+
try {
161+
// get be url from fe
162+
return FeRestService.randomBackend(options);
163+
} catch (IOException e) {
164+
LOG.error("get backends info fail");
165+
throw new IOException(e);
166+
}
167+
}
168+
141169
/**
142170
* Doris load data via stream.
143171
*
144172
* @param carrier data carrier.
145-
* @param loadUrlStr doris load url.
146173
* @throws IOException io exception.
147174
*/
148-
public void load(Carrier carrier, String loadUrlStr) throws IOException {
175+
public void load(Carrier carrier) throws IOException {
149176
List<String> columnNames = carrier.getColumns();
177+
String loadUrlStr =
178+
String.format(
179+
LOAD_URL_PATTERN, hostPort, carrier.getDatabase(), carrier.getTable());
150180
String json = OM.writeValueAsString(carrier.getInsertContent());
151181
String mergeConditions = carrier.getDeleteContent();
152182
LoadResponse loadResponse = loadBatch(columnNames, json, mergeConditions, loadUrlStr);
@@ -196,7 +226,7 @@ private String generateLabel() {
196226
String formatDate = sdf.format(new Date());
197227
label =
198228
String.format(
199-
"flinkx_connector_%s_%s",
229+
"chunjun_connector_%s_%s",
200230
formatDate, UUID.randomUUID().toString().replaceAll("-", ""));
201231
}
202232
return label;

chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/sink/DorisHttpOutputFormat.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,8 @@ private String getBackend() throws IOException {
7070
@Override
7171
public void open(int taskNumber, int numTasks) throws IOException {
7272
DorisStreamLoad dorisStreamLoad = new DorisStreamLoad(options);
73-
client = new DorisLoadClient(dorisStreamLoad, options, getBackend());
73+
dorisStreamLoad.replaceBackend();
74+
client = new DorisLoadClient(dorisStreamLoad, options);
7475
super.open(taskNumber, numTasks);
7576
}
7677

chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/sink/DorisSinkFactory.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
import static com.dtstack.chunjun.connector.doris.options.DorisKeys.FLUSH_INTERNAL_MS_KEY;
5454
import static com.dtstack.chunjun.connector.doris.options.DorisKeys.LOAD_OPTIONS_KEY;
5555
import static com.dtstack.chunjun.connector.doris.options.DorisKeys.LOAD_PROPERTIES_KEY;
56+
import static com.dtstack.chunjun.connector.doris.options.DorisKeys.MAX_RETRIES_KEY;
5657
import static com.dtstack.chunjun.connector.doris.options.DorisKeys.PASSWORD_KEY;
5758
import static com.dtstack.chunjun.connector.doris.options.DorisKeys.REQUEST_BATCH_SIZE_KEY;
5859
import static com.dtstack.chunjun.connector.doris.options.DorisKeys.REQUEST_CONNECT_TIMEOUT_MS_KEY;
@@ -62,6 +63,7 @@
6263
import static com.dtstack.chunjun.connector.doris.options.DorisKeys.REQUEST_TABLET_SIZE_KEY;
6364
import static com.dtstack.chunjun.connector.doris.options.DorisKeys.TABLE_KEY;
6465
import static com.dtstack.chunjun.connector.doris.options.DorisKeys.USER_NAME_KEY;
66+
import static com.dtstack.chunjun.connector.doris.options.DorisKeys.WAITRETRIES_MS_KEY;
6567
import static com.dtstack.chunjun.connector.doris.options.DorisKeys.WRITE_MODE_KEY;
6668

6769
/**
@@ -145,6 +147,8 @@ public DorisSinkFactory(SyncConf syncConf) {
145147
.setUsername(parameter.getStringVal(USER_NAME_KEY))
146148
.setBatchSize(parameter.getIntVal(BATCH_SIZE_KEY, 1000))
147149
.setFlushIntervalMills(parameter.getLongVal(FLUSH_INTERNAL_MS_KEY, 10000L))
150+
.setMaxRetries(parameter.getIntVal(MAX_RETRIES_KEY, 1))
151+
.setWaitRetryMills(parameter.getLongVal(WAITRETRIES_MS_KEY, 18000L))
148152
.build();
149153
options.setColumn(syncConf.getWriter().getFieldList());
150154
super.initCommonConf(options);

0 commit comments

Comments
 (0)