Skip to content

Commit 3a6583c

Browse files
xirang@dtstack.comtiezhu
authored andcommitted
[feat-4789][influxdb] nep fix
1 parent f0298c6 commit 3a6583c

File tree

5 files changed

+27
-25
lines changed

5 files changed

+27
-25
lines changed

flinkx-connectors/flinkx-connector-influxdb/src/main/java/com/dtstack/flinkx/connector/influxdb/conf/InfluxdbSinkConfig.java

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import com.dtstack.flinkx.sink.WriteMode;
44

55
import java.util.List;
6+
import java.util.Locale;
67

78
/** @Author xirang @Company Dtstack @Date: 2022/3/14 6:00 PM */
89
public class InfluxdbSinkConfig extends InfluxdbConfig {
@@ -16,7 +17,7 @@ public class InfluxdbSinkConfig extends InfluxdbConfig {
1617
/** tags of the measurement */
1718
private List<String> tags;
1819

19-
private int batchSize = 10000;
20+
private int bufferLimit = 10000;
2021

2122
/** flush duration (ms) */
2223
private int flushDuration = 1000;
@@ -75,8 +76,20 @@ public WriteMode getWriteMode() {
7576
return writeMode;
7677
}
7778

78-
public void setWriteMode(WriteMode writeMode) {
79-
this.writeMode = writeMode;
79+
public void setWriteMode(String writeMode) {
80+
switch (writeMode.toLowerCase(Locale.ENGLISH)) {
81+
case "insert":
82+
this.writeMode = WriteMode.INSERT;
83+
break;
84+
case "update":
85+
this.writeMode = WriteMode.UPDATE;
86+
break;
87+
case "upsert":
88+
this.writeMode = WriteMode.UPSERT;
89+
break;
90+
default:
91+
this.writeMode = WriteMode.APPEND;
92+
}
8093
}
8194

8295
public List<String> getTags() {
@@ -87,14 +100,12 @@ public void setTags(List<String> tags) {
87100
this.tags = tags;
88101
}
89102

90-
@Override
91-
public int getBatchSize() {
92-
return batchSize;
103+
public int getBufferLimit() {
104+
return bufferLimit;
93105
}
94106

95-
@Override
96-
public void setBatchSize(int batchSize) {
97-
this.batchSize = batchSize;
107+
public void setBufferLimit(int bufferLimit) {
108+
this.bufferLimit = bufferLimit;
98109
}
99110

100111
public String getPrecision() {

flinkx-connectors/flinkx-connector-influxdb/src/main/java/com/dtstack/flinkx/connector/influxdb/converter/InfluxdbColumnConverter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ public InfluxdbColumnConverter(
114114
protected ISerializationConverter<Point.Builder> wrapIntoNullableExternalConverter(
115115
ISerializationConverter<Point.Builder> converter, LogicalType type) {
116116
return (val, index, builder) -> {
117-
if (val == null) {
117+
if (val == null || val.isNullAt(index)) {
118118
return;
119119
} else {
120120
converter.serialize(val, index, builder);

flinkx-connectors/flinkx-connector-influxdb/src/main/java/com/dtstack/flinkx/connector/influxdb/sink/InfluxdbOutputFormat.java

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,9 @@
1616
import okhttp3.OkHttpClient;
1717
import org.influxdb.BatchOptions;
1818
import org.influxdb.InfluxDB;
19-
import org.influxdb.InfluxDBFactory;
2019
import org.influxdb.dto.BatchPoints;
2120
import org.influxdb.dto.Point;
21+
import org.influxdb.impl.InfluxDBImpl;
2222
import org.slf4j.Logger;
2323
import org.slf4j.LoggerFactory;
2424

@@ -88,13 +88,7 @@ protected void openInternal(int taskNumber, int numTasks) throws IOException {
8888
columnNameList, columnTypeList, InfluxdbRawTypeConverter::apply);
8989
setRowConverter(
9090
new InfluxdbColumnConverter(
91-
rowType,
92-
sinkConfig,
93-
measurement,
94-
columnNameList,
95-
tags,
96-
timestamp,
97-
precision));
91+
rowType, sinkConfig, columnNameList, tags, timestamp, precision));
9892
}
9993

10094
@Override
@@ -111,7 +105,7 @@ private void establishConnnection() {
111105
.connectTimeout(15000, TimeUnit.MILLISECONDS)
112106
.readTimeout(sinkConfig.getWriteTimeout(), TimeUnit.SECONDS);
113107
influxDB =
114-
InfluxDBFactory.connect(
108+
new InfluxDBImpl(
115109
sinkConfig.getUrl().get(0),
116110
StringUtils.isNullOrWhitespaceOnly(sinkConfig.getUsername())
117111
? null
@@ -125,7 +119,7 @@ private void establishConnnection() {
125119
if (enableBatch) {
126120
BatchOptions options = BatchOptions.DEFAULTS;
127121
options.precision(precision);
128-
options.bufferLimit(sinkConfig.getBatchSize());
122+
options.bufferLimit(sinkConfig.getBufferLimit());
129123
options.flushDuration(sinkConfig.getFlushDuration());
130124
influxDB.enableBatch(options);
131125
}

flinkx-connectors/flinkx-connector-influxdb/src/main/java/com/dtstack/flinkx/connector/influxdb/sink/InfluxdbOutputFormatBuilder.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
import com.dtstack.flinkx.sink.format.BaseRichOutputFormatBuilder;
55

66
import org.apache.flink.util.CollectionUtil;
7-
import org.apache.flink.util.StringUtils;
87

98
/** @Author xirang @Company Dtstack @Date: 2022/3/14 2:57 PM */
109
public class InfluxdbOutputFormatBuilder extends BaseRichOutputFormatBuilder {
@@ -25,10 +24,6 @@ protected void checkFormat() {
2524
InfluxdbSinkConfig sinkConfig = format.getSinkConfig();
2625
StringBuilder sb = new StringBuilder(256);
2726
if (CollectionUtil.isNullOrEmpty(sinkConfig.getUrl())) sb.append("No url supplied;\n");
28-
if (StringUtils.isNullOrWhitespaceOnly(sinkConfig.getUsername()))
29-
sb.append("No username supplied;\n");
30-
if (StringUtils.isNullOrWhitespaceOnly(sinkConfig.getPassword()))
31-
sb.append("No password supplied;\n");
3227
if (!"insert".equalsIgnoreCase(sinkConfig.getWriteMode().getMode()))
3328
sb.append("Only support insert write mode;\n");
3429
if (sb.length() > 0) {

flinkx-connectors/flinkx-connector-influxdb/src/main/java/com/dtstack/flinkx/connector/influxdb/sink/InfluxdbSinkFactory.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ public InfluxdbSinkFactory(SyncConf syncConf) {
2727
Gson gson = new GsonBuilder().create();
2828
GsonUtil.setTypeAdapter(gson);
2929
this.influxdbConfig = gson.fromJson(gson.toJson(parameter), InfluxdbSinkConfig.class);
30+
Object writeMode = parameter.get("writeMode");
31+
influxdbConfig.setWriteMode(writeMode == null ? null : writeMode.toString());
3032
influxdbConfig.setColumn(syncConf.getWriter().getFieldList());
3133
super.initFlinkxCommonConf(influxdbConfig);
3234
}

0 commit comments

Comments
 (0)