|
30 | 30 | import com.dtstack.flinkx.restore.FormatState; |
31 | 31 | import com.dtstack.flinkx.util.ExceptionUtil; |
32 | 32 | import com.dtstack.flinkx.util.GsonUtil; |
| 33 | +import com.google.common.collect.Maps; |
33 | 34 | import com.google.gson.JsonSyntaxException; |
| 35 | +import org.apache.commons.collections.CollectionUtils; |
34 | 36 | import org.apache.commons.collections.MapUtils; |
35 | 37 | import org.apache.commons.math3.util.Pair; |
36 | 38 | import org.apache.flink.types.Row; |
37 | 39 | import org.apache.hadoop.conf.Configuration; |
38 | 40 |
|
39 | 41 | import java.io.IOException; |
40 | 42 | import java.io.Serializable; |
41 | | -import java.util.HashMap; |
42 | | -import java.util.Iterator; |
43 | | -import java.util.List; |
44 | | -import java.util.Map; |
| 43 | +import java.util.*; |
45 | 44 |
|
46 | 45 | /** |
47 | 46 | * @author toutian |
@@ -194,10 +193,25 @@ public void writeRecord(Row row) throws IOException { |
194 | 193 | event = (Map) tempObj; |
195 | 194 | } else if (tempObj instanceof String) { |
196 | 195 | try { |
197 | | - event = GsonUtil.GSON.fromJson((String) tempObj, GsonUtil.gsonMapTypeToken); |
| 196 | + List<String> columns = tableInfos.entrySet().iterator().next().getValue().getColumns(); |
| 197 | + //如果写入的hive表字段包含column message,并且tempObj不符合简单json定义【以{开头 以}结尾】,同时不以message开头 则直接将tempObj作为value |
| 198 | + if(CollectionUtils.isNotEmpty(columns) && columns.size()==1 && columns.get(0).equals("message")){ |
| 199 | + String data = (String) tempObj; |
| 200 | + if(!data.startsWith("{") || !data.endsWith("}") || !data.substring(1,data.length()-1).trim().startsWith("\"message\":")){ |
| 201 | + event = Collections.singletonMap("message",data); |
| 202 | + } |
| 203 | + }else{ |
| 204 | + event = GsonUtil.GSON.fromJson((String) tempObj, GsonUtil.gsonMapTypeToken); |
| 205 | + } |
198 | 206 | }catch (JsonSyntaxException e){ |
| 207 | + |
| 208 | + List<String> columns = tableInfos.entrySet().iterator().next().getValue().getColumns(); |
| 209 | + //如果hive没有message column,那么认为是脏数据 打印日志 |
| 210 | + if(CollectionUtils.isNotEmpty(columns) && !columns.contains("message")){ |
| 211 | + LOG.warn("bad json string:【{}】", tempObj); |
| 212 | + } |
199 | 213 | // is not a json string |
200 | | - LOG.warn("bad json string:【{}】", tempObj); |
| 214 | + event = Collections.singletonMap("message",tempObj); |
201 | 215 | } |
202 | 216 | } |
203 | 217 | } |
|
0 commit comments