Skip to content

Commit ac6711e

Browse files
committed
[hotfix-864][influxdb] Influxdb convert the wrong value of 'time'.
1 parent 6db4fd2 commit ac6711e

File tree

2 files changed

+52
-20
lines changed

2 files changed

+52
-20
lines changed

chunjun-connectors/chunjun-connector-influxdb/src/main/java/com/dtstack/chunjun/connector/influxdb/converter/InfluxdbColumnConverter.java

Lines changed: 40 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import com.dtstack.chunjun.element.column.BytesColumn;
3636
import com.dtstack.chunjun.element.column.NullColumn;
3737
import com.dtstack.chunjun.element.column.StringColumn;
38+
import com.dtstack.chunjun.element.column.TimestampColumn;
3839

3940
import org.apache.flink.table.data.RowData;
4041
import org.apache.flink.table.types.logical.LogicalType;
@@ -57,6 +58,8 @@
5758
public class InfluxdbColumnConverter
5859
extends AbstractRowConverter<Map<String, Object>, RowData, Point.Builder, LogicalType> {
5960

61+
private static final String TIME_KEY = "time";
62+
6063
private String format = "MSGPACK";
6164
private List<String> fieldNameList;
6265
private List<FieldConf> fieldConfList;
@@ -72,7 +75,8 @@ public InfluxdbColumnConverter(
7275
RowType rowType,
7376
ChunJunCommonConf commonConf,
7477
List<String> fieldNameList,
75-
String format) {
78+
String format,
79+
TimeUnit precision) {
7680
super(rowType, commonConf);
7781
for (int i = 0; i < rowType.getFieldCount(); i++) {
7882
toInternalConverters.add(
@@ -85,6 +89,7 @@ public InfluxdbColumnConverter(
8589
this.format = format;
8690
this.fieldConfList = commonConf.getColumn();
8791
this.fieldNameList = fieldNameList;
92+
this.precision = precision;
8893
}
8994

9095
public InfluxdbColumnConverter(
@@ -124,33 +129,49 @@ protected ISerializationConverter<Point.Builder> wrapIntoNullableExternalConvert
124129

125130
@Override
126131
public RowData toInternal(Map<String, Object> input) throws Exception {
127-
132+
int converterIndex = 0;
128133
if (fieldConfList.size() == 1
129134
&& StringUtils.equals(ConstantValue.STAR_SYMBOL, fieldConfList.get(0).getName())) {
130135
ColumnRowData result = new ColumnRowData(fieldNameList.size());
131-
for (int i = 0; i < fieldNameList.size(); i++) {
132-
Object field = input.get(fieldNameList.get(i));
133-
AbstractBaseColumn baseColumn =
134-
(AbstractBaseColumn) toInternalConverters.get(i).deserialize(field);
136+
for (String fieldName : fieldNameList) {
137+
AbstractBaseColumn baseColumn = setValue(input, fieldName, converterIndex);
135138
result.addField(baseColumn);
139+
converterIndex++;
136140
}
137141
return result;
138-
}
139-
140-
ColumnRowData result = new ColumnRowData(fieldConfList.size());
141-
int converterIndex = 0;
142-
for (FieldConf fieldConf : fieldConfList) {
143-
AbstractBaseColumn baseColumn = null;
144-
if (StringUtils.isBlank(fieldConf.getValue())) {
145-
Object field = input.get(fieldConf.getName());
146-
baseColumn =
147-
(AbstractBaseColumn)
148-
toInternalConverters.get(converterIndex).deserialize(field);
142+
} else {
143+
ColumnRowData result = new ColumnRowData(fieldConfList.size());
144+
for (FieldConf fieldConf : fieldConfList) {
145+
String fieldName = fieldConf.getName();
146+
AbstractBaseColumn baseColumn = setValue(input, fieldName, converterIndex);
147+
result.addField(assembleFieldProps(fieldConf, baseColumn));
149148
converterIndex++;
150149
}
151-
result.addField(assembleFieldProps(fieldConf, baseColumn));
150+
return result;
151+
}
152+
}
153+
154+
/**
155+
* Set the value of input into column.
156+
*
157+
* @param input input value.
158+
* @param fieldName field name of input.
159+
* @param index index of converter.
160+
* @return column
161+
* @throws Exception the exception from converter.
162+
*/
163+
private AbstractBaseColumn setValue(Map<String, Object> input, String fieldName, int index)
164+
throws Exception {
165+
AbstractBaseColumn baseColumn;
166+
if (TIME_KEY.equalsIgnoreCase(fieldName)) {
167+
Long timeLong = (Long) input.get(fieldName);
168+
long timeMs = TimeUnit.MILLISECONDS.convert(timeLong, precision);
169+
baseColumn = new TimestampColumn(timeMs);
170+
} else {
171+
Object field = input.get(fieldName);
172+
baseColumn = (AbstractBaseColumn) toInternalConverters.get(index).deserialize(field);
152173
}
153-
return result;
174+
return baseColumn;
154175
}
155176

156177
@Override

chunjun-connectors/chunjun-connector-influxdb/src/main/java/com/dtstack/chunjun/connector/influxdb/source/InfluxdbInputFormat.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import com.dtstack.chunjun.connector.influxdb.conf.InfluxdbSourceConfig;
2626
import com.dtstack.chunjun.connector.influxdb.converter.InfluxdbColumnConverter;
2727
import com.dtstack.chunjun.connector.influxdb.converter.InfluxdbRawTypeConverter;
28+
import com.dtstack.chunjun.connector.influxdb.enums.TimePrecisionEnums;
2829
import com.dtstack.chunjun.source.format.BaseRichInputFormat;
2930
import com.dtstack.chunjun.throwable.ReadRecordException;
3031
import com.dtstack.chunjun.util.ColumnBuildUtil;
@@ -54,6 +55,7 @@
5455
import java.util.HashMap;
5556
import java.util.List;
5657
import java.util.Map;
58+
import java.util.Objects;
5759
import java.util.Optional;
5860
import java.util.concurrent.BlockingQueue;
5961
import java.util.concurrent.LinkedBlockingQueue;
@@ -74,6 +76,7 @@ public class InfluxdbInputFormat extends BaseRichInputFormat {
7476

7577
private InfluxdbSourceConfig config;
7678
private String queryTemplate;
79+
private TimeUnit precision;
7780
private transient InfluxDB influxDB;
7881
private transient AtomicBoolean hasNext;
7982
private transient BlockingQueue<Map<String, Object>> queue;
@@ -93,6 +96,8 @@ protected void openInternal(InputSplit inputSplit) throws IOException {
9396
LOG.info("subTask[{}] inputSplit = {}.", indexOfSubTask, inputSplit);
9497
this.queue = new LinkedBlockingQueue<>(config.getFetchSize() * 3);
9598
this.hasNext = new AtomicBoolean(true);
99+
this.precision = TimePrecisionEnums.of(config.getEpoch()).getPrecision();
100+
96101
connect();
97102

98103
Pair<List<String>, List<String>> pair = getTableMetadata();
@@ -107,7 +112,8 @@ protected void openInternal(InputSplit inputSplit) throws IOException {
107112

108113
// TODO add InfluxdbRawConverter
109114
setRowConverter(
110-
new InfluxdbColumnConverter(rowType, config, columnNameList, config.getFormat()));
115+
new InfluxdbColumnConverter(
116+
rowType, config, columnNameList, config.getFormat(), precision));
111117

112118
this.queryInfluxQLBuilder = new InfluxdbQuerySqlBuilder(config, columnNameList);
113119
this.queryTemplate = queryInfluxQLBuilder.buildSql();
@@ -133,6 +139,11 @@ protected RowData nextRecordInternal(RowData rowData) throws ReadRecordException
133139
RowData row;
134140
try {
135141
Map<String, Object> data = queue.poll(5, TimeUnit.SECONDS);
142+
143+
if (Objects.isNull(data)) {
144+
return null;
145+
}
146+
136147
row = rowConverter.toInternal(data);
137148
} catch (Exception e) {
138149
throw new ReadRecordException("can not read next record.", e, -1, rowData);

0 commit comments

Comments
 (0)