Skip to content

Commit 1a15c87

Browse files
dujieyanghuaiGit
authored andcommitted
[feat-880][hdfs]support orc writer when fullcolname size greater than column size
1 parent 1237996 commit 1a15c87

File tree

1 file changed

+32
-5
lines changed

1 file changed

+32
-5
lines changed

chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/sink/HdfsOrcOutputFormat.java

Lines changed: 32 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,8 @@ public class HdfsOrcOutputFormat extends BaseHdfsOutputFormat {
7575
private FileOutputFormat outputFormat;
7676
private JobConf jobConf;
7777

78+
protected int[] colIndices;
79+
7880
/** 初始化对象大小计算器 */
7981
protected void initRowSizeCalculator() {
8082
rowSizeCalculator = RowSizeCalculator.getRowSizeCalculator();
@@ -108,17 +110,16 @@ protected void openSource() {
108110
}
109111
FileOutputFormat.setOutputCompressorClass(jobConf, codecClass);
110112

111-
int size = hdfsConf.getColumn().size();
113+
int size = hdfsConf.getFullColumnType().size();
112114
decimalColInfo = Maps.newHashMapWithExpectedSize(size);
113115
List<ObjectInspector> structFieldObjectInspectors = new ArrayList<>();
114116
for (int i = 0; i < size; i++) {
115-
FieldConf fieldConf = hdfsConf.getColumn().get(i);
116-
String columnType = fieldConf.getType();
117+
String columnType = hdfsConf.getFullColumnType().get(i);
117118

118119
if (ColumnTypeUtil.isDecimalType(columnType)) {
119120
ColumnTypeUtil.DecimalInfo decimalInfo =
120121
ColumnTypeUtil.getDecimalInfo(columnType, ORC_DEFAULT_DECIMAL_INFO);
121-
decimalColInfo.put(fieldConf.getName(), decimalInfo);
122+
decimalColInfo.put(hdfsConf.getFullColumnName().get(i), decimalInfo);
122123
}
123124
ColumnType type = ColumnType.getType(columnType);
124125
structFieldObjectInspectors.add(HdfsUtil.columnTypeToObjectInspetor(type));
@@ -135,6 +136,22 @@ protected void openSource() {
135136
this.inspector =
136137
ObjectInspectorFactory.getStandardStructObjectInspector(
137138
fullColumnNameList, structFieldObjectInspectors);
139+
140+
colIndices = new int[hdfsConf.getFullColumnName().size()];
141+
for (int i = 0; i < hdfsConf.getFullColumnName().size(); ++i) {
142+
int j = 0;
143+
for (; j < hdfsConf.getColumn().size(); ++j) {
144+
if (hdfsConf.getFullColumnName()
145+
.get(i)
146+
.equalsIgnoreCase(hdfsConf.getColumn().get(j).getName())) {
147+
colIndices[i] = j;
148+
break;
149+
}
150+
}
151+
if (j == hdfsConf.getColumn().size()) {
152+
colIndices[i] = -1;
153+
}
154+
}
138155
}
139156

140157
@Override
@@ -204,8 +221,18 @@ public void writeSingleRecordToFile(RowData rowData) throws WriteRecordException
204221
}
205222

206223
try {
224+
List<Object> recordList = new ArrayList<>();
225+
for (int i = 0; i < hdfsConf.getFullColumnName().size(); ++i) {
226+
int colIndex = colIndices[i];
227+
if (colIndex == -1) {
228+
recordList.add(null);
229+
} else {
230+
recordList.add(data[colIndex]);
231+
}
232+
}
233+
207234
this.recordWriter.write(
208-
NullWritable.get(), this.orcSerde.serialize(data, this.inspector));
235+
NullWritable.get(), this.orcSerde.serialize(recordList, this.inspector));
209236
rowsOfCurrentBlock++;
210237
lastRow = rowData;
211238
} catch (IOException e) {

0 commit comments

Comments
 (0)