Skip to content

Commit c47f133

Browse files
committed
[feat-857][converter]optimize RowSizeCalculator
1 parent c225de3 commit c47f133

File tree

55 files changed

+308
-102
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

55 files changed

+308
-102
lines changed

chunjun-connectors/chunjun-connector-binlog/src/main/java/com/dtstack/chunjun/connector/binlog/inputformat/BinlogInputFormatBuilder.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,12 @@ public void setBinlogConf(BinlogConf binlogConf) {
6060
}
6161

6262
public void setRowConverter(AbstractCDCRowConverter rowConverter) {
63+
setRowConverter(rowConverter, false);
64+
}
65+
66+
public void setRowConverter(AbstractCDCRowConverter rowConverter, boolean useAbstractColumn) {
6367
this.format.setRowConverter(rowConverter);
68+
format.setUseAbstractColumn(useAbstractColumn);
6469
}
6570

6671
@Override

chunjun-connectors/chunjun-connector-binlog/src/main/java/com/dtstack/chunjun/connector/binlog/source/BinlogSourceFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ public DataStream<RowData> createSource() {
7070
: TimestampFormat.ISO_8601;
7171
rowConverter = new BinlogRowConverter(rowType, format);
7272
}
73-
builder.setRowConverter(rowConverter);
73+
builder.setRowConverter(rowConverter, useAbstractBaseColumn);
7474
return createInput(builder.finish());
7575
}
7676

chunjun-connectors/chunjun-connector-cassandra/src/main/java/com/dtstack/chunjun/connector/cassandra/sink/CassandraSinkFactory.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,8 @@ public DataStreamSink<RowData> createSink(DataStream<RowData> dataSet) {
6666
List<FieldConf> fieldConfList = sinkConf.getColumn();
6767

6868
final RowType rowType = TableUtil.createRowType(fieldConfList, getRawTypeConverter());
69-
builder.setRowConverter(new CassandraColumnConverter(rowType, fieldConfList));
69+
builder.setRowConverter(
70+
new CassandraColumnConverter(rowType, fieldConfList), useAbstractBaseColumn);
7071

7172
return createOutput(dataSet, builder.finish());
7273
}

chunjun-connectors/chunjun-connector-cassandra/src/main/java/com/dtstack/chunjun/connector/cassandra/source/CassandraSourceFactory.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,8 @@ public DataStream<RowData> createSource() {
7171
fieldConfList.forEach(fieldConf -> columnNameList.add(fieldConf.getName()));
7272

7373
final RowType rowType = TableUtil.createRowType(fieldConfList, getRawTypeConverter());
74-
builder.setRowConverter(new CassandraRowConverter(rowType, columnNameList));
74+
builder.setRowConverter(
75+
new CassandraRowConverter(rowType, columnNameList), useAbstractBaseColumn);
7576

7677
return createInput(builder.finish());
7778
}

chunjun-connectors/chunjun-connector-elasticsearch7/src/main/java/com/dtstack/chunjun/connector/elasticsearch7/sink/Elasticsearch7SinkFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ public DataStreamSink<RowData> createSink(DataStream<RowData> dataSet) {
9191
} else {
9292
rowConverter = new ElasticsearchRowConverter(rowType);
9393
}
94-
builder.setRowConverter(rowConverter);
94+
builder.setRowConverter(rowConverter, useAbstractBaseColumn);
9595
return createOutput(dataSet, builder.finish());
9696
}
9797

chunjun-connectors/chunjun-connector-elasticsearch7/src/main/java/com/dtstack/chunjun/connector/elasticsearch7/source/Elasticsearch7SourceFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ public DataStream<RowData> createSource() {
7676
} else {
7777
rowConverter = new ElasticsearchRowConverter(rowType);
7878
}
79-
builder.setRowConverter(rowConverter);
79+
builder.setRowConverter(rowConverter, useAbstractBaseColumn);
8080
return createInput(builder.finish());
8181
}
8282

chunjun-connectors/chunjun-connector-emqx/src/main/java/com/dtstack/chunjun/connector/emqx/sink/EmqxSinkFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ public DataStreamSink<RowData> createSink(DataStream<RowData> dataSet) {
6060
}
6161
EmqxOutputFormatBuilder builder = new EmqxOutputFormatBuilder();
6262
builder.setEmqxConf(emqxConf);
63-
builder.setRowConverter(new EmqxColumnConverter(emqxConf));
63+
builder.setRowConverter(new EmqxColumnConverter(emqxConf), useAbstractBaseColumn);
6464
return createOutput(dataSet, builder.finish());
6565
}
6666
}

chunjun-connectors/chunjun-connector-emqx/src/main/java/com/dtstack/chunjun/connector/emqx/source/EmqxSourceFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ public DataStream<RowData> createSource() {
6060
}
6161
EmqxInputFormatBuilder builder = new EmqxInputFormatBuilder();
6262
builder.setEmqxConf(emqxConf);
63-
builder.setRowConverter(new EmqxColumnConverter(emqxConf));
63+
builder.setRowConverter(new EmqxColumnConverter(emqxConf), useAbstractBaseColumn);
6464
return createInput(builder.finish());
6565
}
6666
}

chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/converter/FtpColumnConverter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ protected ISerializationConverter<List<String>> wrapIntoNullableExternalConverte
112112
ISerializationConverter serializationConverter, FieldConf fieldConf) {
113113
return (rowData, index, list) -> {
114114
if (rowData == null || rowData.isNullAt(index)) {
115-
list.set(index, null);
115+
list.add(index, null);
116116
} else {
117117
serializationConverter.serialize(rowData, index, list);
118118
}

chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/sink/FtpSinkFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ public DataStreamSink<RowData> createSink(DataStream<RowData> dataSet) {
8383
ftpConfig.setColumn(fieldConfList);
8484
final RowType rowType =
8585
TableUtil.createRowType(ftpConfig.getColumn(), getRawTypeConverter());
86-
builder.setRowConverter(new FtpColumnConverter(rowType, ftpConfig));
86+
builder.setRowConverter(new FtpColumnConverter(rowType, ftpConfig), useAbstractBaseColumn);
8787

8888
return createOutput(dataSet, builder.finish());
8989
}

0 commit comments

Comments
 (0)