Skip to content

Commit 1c21978

Browse files
dujieFlechazoW
authored andcommitted
[hotfix-53323][logminer]Data is lost due to log group switchover during online log loading
1 parent 68fa900 commit 1c21978

File tree

2 files changed

+37
-7
lines changed

2 files changed

+37
-7
lines changed

flinkx-connectors/flinkx-connector-oraclelogminer/src/main/java/com/dtstack/flinkx/connector/oraclelogminer/listener/LogMinerConnection.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.apache.commons.collections.CollectionUtils;
3838
import org.apache.commons.lang.time.DateFormatUtils;
3939
import org.apache.commons.lang3.StringUtils;
40+
import org.apache.commons.lang3.tuple.Pair;
4041
import org.slf4j.Logger;
4142
import org.slf4j.LoggerFactory;
4243

@@ -510,7 +511,7 @@ private void closeResources(ResultSet rs, Statement stmt, Connection conn) {
510511
}
511512

512513
/** 根据leftScn 以及加载的日志大小限制 获取可加载的scn范围 以及此范围对应的日志文件 */
513-
protected BigInteger getEndScn(BigInteger startScn, List<LogFile> logFiles)
514+
protected Pair<BigInteger, Boolean> getEndScn(BigInteger startScn, List<LogFile> logFiles)
514515
throws SQLException {
515516

516517
List<LogFile> logFileLists = new ArrayList<>();
@@ -557,6 +558,7 @@ protected BigInteger getEndScn(BigInteger startScn, List<LogFile> logFiles)
557558
.collect(Collectors.toList())));
558559

559560
BigInteger endScn = startScn;
561+
Boolean loadRedoLog = false;
560562

561563
long fileSize = 0L;
562564
Collection<List<LogFile>> values = map.values();
@@ -598,10 +600,16 @@ protected BigInteger getEndScn(BigInteger startScn, List<LogFile> logFiles)
598600
// 解决logminer偶尔丢失数据问题,读取online日志的时候,需要将rightScn置为当前SCN
599601
endScn = getCurrentScn();
600602
logFiles = logFileLists;
603+
// 如果加载了online日志 则loadRedoLog为true
604+
loadRedoLog = true;
601605
}
602606

603-
LOG.info("getEndScn success,startScn:{},endScn:{}", startScn, endScn);
604-
return endScn;
607+
LOG.info(
608+
"getEndScn success,startScn:{},endScn:{}, loadRedoLog:{}",
609+
startScn,
610+
endScn,
611+
loadRedoLog);
612+
return Pair.of(endScn, loadRedoLog);
605613
}
606614

607615
/** 获取logminer加载的日志文件 */

flinkx-connectors/flinkx-connector-oraclelogminer/src/main/java/com/dtstack/flinkx/connector/oraclelogminer/listener/LogMinerHelper.java

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import com.google.common.util.concurrent.ThreadFactoryBuilder;
2626
import org.apache.commons.codec.DecoderException;
2727
import org.apache.commons.collections.CollectionUtils;
28+
import org.apache.commons.lang3.tuple.Pair;
2829
import org.slf4j.Logger;
2930
import org.slf4j.LoggerFactory;
3031

@@ -57,6 +58,11 @@ public class LogMinerHelper {
5758
private final BigInteger step = new BigInteger("3000");
5859
private BigInteger startScn;
5960
private BigInteger endScn;
61+
// 是否加载了online实时日志
62+
private Boolean loadRedo = false;
63+
64+
// 最后一条数据的位点
65+
private BigInteger currentSinkPosition;
6066
/** 当前正在读取的connection索引 * */
6167
private int currentIndex;
6268
/** 当前正在读取的connection * */
@@ -151,15 +157,26 @@ private void preLoad() throws SQLException {
151157
// 按照加载日志文件大小限制,根据endScn作为起点找到对应的一组加载范围
152158
BigInteger currentStartScn = Objects.nonNull(this.endScn) ? this.endScn : startScn;
153159

154-
BigInteger endScn =
160+
// 如果加载了redo日志,则起点不能是上一次记载的日志的结束位点,而是上次消费的最后一条数据的位点
161+
if (loadRedo) {
162+
// 需要加1 因为logminer查找数据是左闭右开,如果不加1 会导致最后一条数据重新消费
163+
currentStartScn = currentSinkPosition.add(BigInteger.ONE);
164+
}
165+
166+
Pair<BigInteger, Boolean> endScn =
155167
logMinerConnection.getEndScn(currentStartScn, new ArrayList<>(32));
156-
logMinerConnection.startOrUpdateLogMiner(currentStartScn, endScn);
168+
logMinerConnection.startOrUpdateLogMiner(currentStartScn, endScn.getLeft());
157169
// 读取v$logmnr_contents 数据由线程池加载
158170
loadData(logMinerConnection, logMinerSelectSql);
159-
this.endScn = endScn;
171+
this.endScn = endScn.getLeft();
172+
this.loadRedo = endScn.getRight();
160173
if (Objects.isNull(currentConnection)) {
161174
updateCurrentConnection(logMinerConnection);
162175
}
176+
// 如果已经加载了redoLog就不需要多线程加载了
177+
if (endScn.getRight()) {
178+
break;
179+
}
163180
} else {
164181
break;
165182
}
@@ -327,10 +344,15 @@ public LogMinerConnection chooseConnection() {
327344
}
328345

329346
public QueueData getQueueData() {
330-
return activeConnectionList.get(currentIndex).next();
347+
QueueData next = activeConnectionList.get(currentIndex).next();
348+
if (BigInteger.ZERO.compareTo(next.getScn()) != 0) {
349+
this.currentSinkPosition = next.getScn();
350+
}
351+
return next;
331352
}
332353

333354
public void setStartScn(BigInteger startScn) {
334355
this.startScn = startScn;
356+
this.currentSinkPosition = this.startScn;
335357
}
336358
}

0 commit comments

Comments
 (0)