Skip to content

Commit 7661114

Browse files
author
gituser
committed
Merge branch '1.8_release_3.10.x' into 1.8_release_4.0.x
2 parents 6854ed5 + 9106f10 commit 7661114

File tree

3 files changed

+45
-31
lines changed

3 files changed

+45
-31
lines changed

flinkx-postgresql/flinkx-postgresql-writer/src/main/java/com/dtstack/flinkx/postgresql/format/PostgresqlOutputFormat.java

Lines changed: 27 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -106,17 +106,23 @@ protected void writeSingleRecordInternal(Row row) throws WriteRecordException {
106106
if(rowData==null){
107107
sb.append(DEFAULT_NULL_DELIM);
108108
}else{
109-
String data = String.valueOf(rowData);
110-
if(data.contains("\\")){
111-
data= data.replaceAll("\\\\","\\\\\\\\");
112-
}
113-
sb.append(data);
109+
sb.append(rowData);
114110
}
115111
if(index != lastIndex){
116112
sb.append(DEFAULT_FIELD_DELIM);
117113
}
118114
}
119115
String rowVal = sb.toString();
116+
if(rowVal.contains("\\")){
117+
rowVal= rowVal.replaceAll("\\\\","\\\\\\\\");
118+
}
119+
if(rowVal.contains("\r")){
120+
rowVal= rowVal.replaceAll("\r","\\\\r");
121+
}
122+
123+
if(rowVal.contains("\n")){
124+
rowVal= rowVal.replaceAll("\n","\\\\n");
125+
}
120126
ByteArrayInputStream bi = new ByteArrayInputStream(rowVal.getBytes(StandardCharsets.UTF_8));
121127
copyManager.copyIn(copySql, bi);
122128
} catch (Exception e) {
@@ -155,23 +161,31 @@ protected void writeMultipleRecordsInternal() throws Exception {
155161
StringBuilder sb = new StringBuilder(128);
156162
for (Row row : rows) {
157163
int lastIndex = row.getArity() - 1;
164+
StringBuilder tempBuilder = new StringBuilder(128);
158165
for (int index =0; index < row.getArity(); index++) {
159166
Object rowData = getField(row, index);
160167
if(rowData==null){
161-
sb.append(DEFAULT_NULL_DELIM);
168+
tempBuilder.append(DEFAULT_NULL_DELIM);
162169
}else{
163-
String data = String.valueOf(rowData);
164-
if(data.contains("\\")){
165-
data= data.replaceAll("\\\\","\\\\\\\\");
166-
}
167-
sb.append(data);
170+
tempBuilder.append(rowData);
168171
}
169172
if(index != lastIndex){
170-
sb.append(DEFAULT_FIELD_DELIM);
173+
tempBuilder.append(DEFAULT_FIELD_DELIM);
171174
}
172175
}
176+
// \r \n \ 等特殊字符串需要转义
177+
String tempData = tempBuilder.toString();
178+
if(tempData.contains("\\")){
179+
tempData= tempData.replaceAll("\\\\","\\\\\\\\");
180+
}
181+
if(tempData.contains("\r")){
182+
tempData= tempData.replaceAll("\r","\\\\r");
183+
}
173184

174-
sb.append(LINE_DELIMITER);
185+
if(tempData.contains("\n")){
186+
tempData= tempData.replaceAll("\n","\\\\n");
187+
}
188+
sb.append(tempData).append(LINE_DELIMITER);
175189
}
176190

177191
String rowVal = sb.toString();

flinkx-sqlservercdc/flinkx-sqlservercdc-core/src/main/java/com/dtstack/flinkx/sqlservercdc/SqlServerCdcUtil.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ public class SqlServerCdcUtil {
6464

6565
public static void changeDatabase(Connection conn, String databaseName) throws SQLException {
6666
try (Statement statement = conn.createStatement()) {
67-
statement.execute(" use " + databaseName);
67+
statement.execute(" use " + "\""+databaseName+"\"");
6868
}
6969
}
7070

flinkx-sqlservercdc/flinkx-sqlservercdc-reader/src/main/java/com/dtstack/flinkx/sqlservercdc/listener/SqlServerCdcListener.java

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -146,37 +146,37 @@ private void readData(ChangeTablePointer[] changeTables) throws Exception{
146146
continue;
147147
}
148148

149+
Object[] dataPrev = null;
149150
TableId tableId = changeTable.getSourceTableId();
150151
if (operation == SqlserverCdcEnum.UPDATE_BEFORE.code) {
152+
dataPrev = tableWithSmallestLsn.getData();
151153
if (!tableWithSmallestLsn.next() || tableWithSmallestLsn.getOperation() != SqlserverCdcEnum.UPDATE_AFTER.code) {
152154
throw new IllegalStateException("The update before event at " + tableWithSmallestLsn.getChangePosition() + " for table " + tableId + " was not followed by after event");
153155
}
154156
}
155157

156158
Object[] data = tableWithSmallestLsn.getData();
157-
Object[] dataNext;
158-
if(operation == SqlserverCdcEnum.UPDATE_BEFORE.code){
159-
dataNext = tableWithSmallestLsn.getData();
160-
}else if(operation == SqlserverCdcEnum.DELETE.code){
161-
dataNext = data;
162-
data = new Object[dataNext.length];
163-
}else{
164-
dataNext = new Object[data.length];
159+
160+
if(operation == SqlserverCdcEnum.DELETE.code){
161+
dataPrev = data;
162+
data = new Object[dataPrev.length];
163+
}else if (operation !=SqlserverCdcEnum.UPDATE_BEFORE.code){
164+
dataPrev = new Object[data.length];
165165
}
166166

167-
Map<String, Object> map = buildResult(changeTable, tableId, data, dataNext, operation, tableWithSmallestLsn);
167+
Map<String, Object> map = buildResult(changeTable, tableId, data, dataPrev, operation, tableWithSmallestLsn);
168168
format.processEvent(map);
169169
format.setLogPosition(tableWithSmallestLsn.getChangePosition());
170170
tableWithSmallestLsn.next();
171171
}
172172
}
173173

174174
private Map<String, Object> buildResult(ChangeTable changeTable,
175-
TableId tableId,
176-
Object[] data,
177-
Object[] dataNext,
178-
int operation,
179-
ChangeTablePointer tableWithSmallestLsn) throws Exception{
175+
TableId tableId,
176+
Object[] data,
177+
Object[] dataPrev,
178+
int operation,
179+
ChangeTablePointer tableWithSmallestLsn) throws Exception{
180180
Map<String, Object> map = new LinkedHashMap<>();
181181
map.put("type", SqlserverCdcEnum.getEnum(operation).name.split("_")[0]);
182182
map.put("schema", tableId.getSchemaName());
@@ -186,7 +186,7 @@ private Map<String, Object> buildResult(ChangeTable changeTable,
186186
if(pavingData){
187187
int i = 0;
188188
for (String column : changeTable.getColumnList()) {
189-
map.put("before_" + column, SqlServerCdcUtil.clobToString(dataNext[i]));
189+
map.put("before_" + column, SqlServerCdcUtil.clobToString(dataPrev[i]));
190190
map.put("after_" + column, SqlServerCdcUtil.clobToString(data[i]));
191191
i++;
192192
}
@@ -195,8 +195,8 @@ private Map<String, Object> buildResult(ChangeTable changeTable,
195195
Map<String, Object> after = new LinkedHashMap<>();
196196
int i = 0;
197197
for (String column : changeTable.getColumnList()) {
198-
before.put(column, SqlServerCdcUtil.clobToString(data[i]));
199-
after.put(column, SqlServerCdcUtil.clobToString(dataNext[i]));
198+
before.put(column, SqlServerCdcUtil.clobToString(dataPrev[i]));
199+
after.put(column, SqlServerCdcUtil.clobToString(data[i]));
200200
i++;
201201
}
202202
map.put("before", before);

0 commit comments

Comments
 (0)