Skip to content

Commit 243a6d2

Browse files
xirang@dtstack.comtiezhu
authored andcommitted
[feat-4789][influxdb]
Fix data precision issues and influxdb java cli swallow exceptions issues
1 parent 3a6583c commit 243a6d2

File tree

2 files changed

+23
-7
lines changed

2 files changed

+23
-7
lines changed

flinkx-connectors/flinkx-connector-influxdb/src/main/java/com/dtstack/flinkx/connector/influxdb/converter/InfluxdbColumnConverter.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -210,11 +210,11 @@ protected ISerializationConverter<Point.Builder> createExternalConverter(Logical
210210
};
211211
case FLOAT:
212212
return (val, index, builder) -> {
213-
builder.addField(fieldNameList.get(index), val.getFloat(index));
213+
builder.addField(fieldNameList.get(index), val.getDouble(index));
214214
};
215215
case INTEGER:
216216
return (val, index, builder) -> {
217-
builder.addField(fieldNameList.get(index), val.getInt(index));
217+
builder.addField(fieldNameList.get(index), val.getLong(index));
218218
};
219219
case BOOLEAN:
220220
return (val, index, builder) -> {
@@ -236,7 +236,8 @@ private boolean specicalField(
236236
builder.time(value.getLong(index), precision);
237237
return true;
238238
} else if (CollectionUtils.isNotEmpty(tags) && tags.contains(fieldName)) {
239-
builder.tag(fieldName, ((ColumnRowData) value).getField(index).asString());
239+
if (!value.isNullAt(index))
240+
builder.tag(fieldName, ((ColumnRowData) value).getField(index).asString());
240241
return true;
241242
}
242243
}

flinkx-connectors/flinkx-connector-influxdb/src/main/java/com/dtstack/flinkx/connector/influxdb/sink/InfluxdbOutputFormat.java

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import com.dtstack.flinkx.connector.influxdb.enums.TimePrecisionEnums;
88
import com.dtstack.flinkx.sink.format.BaseRichOutputFormat;
99
import com.dtstack.flinkx.throwable.WriteRecordException;
10+
import com.dtstack.flinkx.util.ExceptionUtil;
1011
import com.dtstack.flinkx.util.TableUtil;
1112

1213
import org.apache.flink.table.data.RowData;
@@ -24,6 +25,7 @@
2425

2526
import java.io.IOException;
2627
import java.util.ArrayList;
28+
import java.util.Iterator;
2729
import java.util.List;
2830
import java.util.concurrent.TimeUnit;
2931
import java.util.stream.Collectors;
@@ -70,7 +72,7 @@ protected void writeMultipleRecordsInternal() throws Exception {
7072
pointList.add(builder.build());
7173
}
7274
batchPoints.points(pointList);
73-
influxDB.writeWithRetry(batchPoints.build());
75+
influxDB.write(batchPoints.build());
7476
}
7577

7678
@Override
@@ -118,9 +120,22 @@ private void establishConnnection() {
118120
if (!StringUtils.isNullOrWhitespaceOnly(rp)) influxDB.setRetentionPolicy(rp);
119121
if (enableBatch) {
120122
BatchOptions options = BatchOptions.DEFAULTS;
121-
options.precision(precision);
122-
options.bufferLimit(sinkConfig.getBufferLimit());
123-
options.flushDuration(sinkConfig.getFlushDuration());
123+
options =
124+
options.exceptionHandler(
125+
(iterable, e) -> {
126+
Iterator<Point> iterator = iterable.iterator();
127+
while (iterator.hasNext()) {
128+
dirtyManager.collect(iterator.next(), e, null);
129+
}
130+
if (LOG.isTraceEnabled()) {
131+
LOG.trace(
132+
"write data error, e = {}",
133+
ExceptionUtil.getErrorMessage(e));
134+
}
135+
})
136+
.precision(precision)
137+
.bufferLimit(sinkConfig.getBufferLimit())
138+
.flushDuration(sinkConfig.getFlushDuration());
124139
influxDB.enableBatch(options);
125140
}
126141
}

0 commit comments

Comments
 (0)