Skip to content

Commit d9a3505

Browse files
author
gituser
committed
Merge branch 'hotfix_1.8_3.10.x_30425' into 1.8_release_3.10.x
2 parents f95f0d9 + 3432930 commit d9a3505

File tree

1 file changed

+22
-4
lines changed

1 file changed

+22
-4
lines changed

flinkx-postgresql/flinkx-postgresql-writer/src/main/java/com/dtstack/flinkx/postgresql/format/PostgresqlOutputFormat.java

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -100,12 +100,22 @@ protected void writeSingleRecordInternal(Row row) throws WriteRecordException {
100100
int index = 0;
101101
try {
102102
StringBuilder sb = new StringBuilder();
103+
int lastIndex = row.getArity() - 1;
103104
for (; index < row.getArity(); index++) {
104105
Object rowData = getField(row, index);
105-
sb.append(rowData)
106-
.append(DEFAULT_FIELD_DELIM);
106+
if(rowData==null){
107+
sb.append(DEFAULT_NULL_DELIM);
108+
}else{
109+
String data = String.valueOf(rowData);
110+
if(data.contains("\\")){
111+
data= data.replaceAll("\\\\","\\\\\\\\");
112+
}
113+
sb.append(data);
114+
}
115+
if(index != lastIndex){
116+
sb.append(DEFAULT_FIELD_DELIM);
117+
}
107118
}
108-
109119
String rowVal = sb.toString();
110120
ByteArrayInputStream bi = new ByteArrayInputStream(rowVal.getBytes(StandardCharsets.UTF_8));
111121
copyManager.copyIn(copySql, bi);
@@ -147,7 +157,15 @@ protected void writeMultipleRecordsInternal() throws Exception {
147157
int lastIndex = row.getArity() - 1;
148158
for (int index =0; index < row.getArity(); index++) {
149159
Object rowData = getField(row, index);
150-
sb.append(rowData==null ? DEFAULT_NULL_DELIM : rowData);
160+
if(rowData==null){
161+
sb.append(DEFAULT_NULL_DELIM);
162+
}else{
163+
String data = String.valueOf(rowData);
164+
if(data.contains("\\")){
165+
data= data.replaceAll("\\\\","\\\\\\\\");
166+
}
167+
sb.append(data);
168+
}
151169
if(index != lastIndex){
152170
sb.append(DEFAULT_FIELD_DELIM);
153171
}

0 commit comments

Comments
 (0)