Skip to content

Commit 77cb7fc

Browse files
author
gituser
committed
Merge branch 'hotfix_1.8_3.10.x_29953' into 1.8_release_3.10.x
2 parents 1d81f8a + ed4b6ab commit 77cb7fc

File tree

2 files changed

+18
-18
lines changed

2 files changed

+18
-18
lines changed

flinkx-sqlservercdc/flinkx-sqlservercdc-core/src/main/java/com/dtstack/flinkx/sqlservercdc/SqlServerCdcUtil.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ public class SqlServerCdcUtil {
6464

6565
public static void changeDatabase(Connection conn, String databaseName) throws SQLException {
6666
try (Statement statement = conn.createStatement()) {
67-
statement.execute(" use " + databaseName);
67+
statement.execute(" use " + "\""+databaseName+"\"");
6868
}
6969
}
7070

flinkx-sqlservercdc/flinkx-sqlservercdc-reader/src/main/java/com/dtstack/flinkx/sqlservercdc/listener/SqlServerCdcListener.java

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -146,37 +146,37 @@ private void readData(ChangeTablePointer[] changeTables) throws Exception{
146146
continue;
147147
}
148148

149+
Object[] dataPrev = null;
149150
TableId tableId = changeTable.getSourceTableId();
150151
if (operation == SqlserverCdcEnum.UPDATE_BEFORE.code) {
152+
dataPrev = tableWithSmallestLsn.getData();
151153
if (!tableWithSmallestLsn.next() || tableWithSmallestLsn.getOperation() != SqlserverCdcEnum.UPDATE_AFTER.code) {
152154
throw new IllegalStateException("The update before event at " + tableWithSmallestLsn.getChangePosition() + " for table " + tableId + " was not followed by after event");
153155
}
154156
}
155157

156158
Object[] data = tableWithSmallestLsn.getData();
157-
Object[] dataNext;
158-
if(operation == SqlserverCdcEnum.UPDATE_BEFORE.code){
159-
dataNext = tableWithSmallestLsn.getData();
160-
}else if(operation == SqlserverCdcEnum.DELETE.code){
161-
dataNext = data;
162-
data = new Object[dataNext.length];
163-
}else{
164-
dataNext = new Object[data.length];
159+
160+
if(operation == SqlserverCdcEnum.DELETE.code){
161+
dataPrev = data;
162+
data = new Object[dataPrev.length];
163+
}else if (operation !=SqlserverCdcEnum.UPDATE_BEFORE.code){
164+
dataPrev = new Object[data.length];
165165
}
166166

167-
Map<String, Object> map = buildResult(changeTable, tableId, data, dataNext, operation, tableWithSmallestLsn);
167+
Map<String, Object> map = buildResult(changeTable, tableId, data, dataPrev, operation, tableWithSmallestLsn);
168168
format.processEvent(map);
169169
format.setLogPosition(tableWithSmallestLsn.getChangePosition());
170170
tableWithSmallestLsn.next();
171171
}
172172
}
173173

174174
private Map<String, Object> buildResult(ChangeTable changeTable,
175-
TableId tableId,
176-
Object[] data,
177-
Object[] dataNext,
178-
int operation,
179-
ChangeTablePointer tableWithSmallestLsn) throws Exception{
175+
TableId tableId,
176+
Object[] data,
177+
Object[] dataPrev,
178+
int operation,
179+
ChangeTablePointer tableWithSmallestLsn) throws Exception{
180180
Map<String, Object> map = new LinkedHashMap<>();
181181
map.put("type", SqlserverCdcEnum.getEnum(operation).name.split("_")[0]);
182182
map.put("schema", tableId.getSchemaName());
@@ -186,7 +186,7 @@ private Map<String, Object> buildResult(ChangeTable changeTable,
186186
if(pavingData){
187187
int i = 0;
188188
for (String column : changeTable.getColumnList()) {
189-
map.put("before_" + column, SqlServerCdcUtil.clobToString(dataNext[i]));
189+
map.put("before_" + column, SqlServerCdcUtil.clobToString(dataPrev[i]));
190190
map.put("after_" + column, SqlServerCdcUtil.clobToString(data[i]));
191191
i++;
192192
}
@@ -195,8 +195,8 @@ private Map<String, Object> buildResult(ChangeTable changeTable,
195195
Map<String, Object> after = new LinkedHashMap<>();
196196
int i = 0;
197197
for (String column : changeTable.getColumnList()) {
198-
before.put(column, SqlServerCdcUtil.clobToString(data[i]));
199-
after.put(column, SqlServerCdcUtil.clobToString(dataNext[i]));
198+
before.put(column, SqlServerCdcUtil.clobToString(dataPrev[i]));
199+
after.put(column, SqlServerCdcUtil.clobToString(data[i]));
200200
i++;
201201
}
202202
map.put("before", before);

0 commit comments

Comments
 (0)