Skip to content

Commit 610b587

Browse files
committed
【logminer优化,增加睡眠时间,减少无效查询】【30802】
1 parent bc654f1 commit 610b587

File tree

2 files changed

+21
-3
lines changed

2 files changed

+21
-3
lines changed

flinkx-oraclelogminer/flinkx-oraclelogminer-reader/src/main/java/com/dtstack/flinkx/oraclelogminer/format/LogMinerConnection.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ public class LogMinerConnection {
103103

104104
private long lastQueryTime;
105105

106-
private static final long QUERY_LOG_INTERVAL = 1000;
106+
private static final long QUERY_LOG_INTERVAL = 10000;
107107

108108
private boolean logMinerStarted = false;
109109

@@ -151,6 +151,19 @@ public void disConnect() throws SQLException{
151151
public void startOrUpdateLogMiner(Long startScn) {
152152
String startSql = null;
153153
try {
154+
// 防止没有数据更新的时候频繁查询数据库,限定查询的最小时间间隔 QUERY_LOG_INTERVAL
155+
if (lastQueryTime > 0) {
156+
long time = System.currentTimeMillis() - lastQueryTime;
157+
if (time < QUERY_LOG_INTERVAL) {
158+
try {
159+
Thread.sleep(QUERY_LOG_INTERVAL-time);
160+
} catch (InterruptedException e) {
161+
LOG.warn("", e);
162+
}
163+
}
164+
}
165+
lastQueryTime = System.currentTimeMillis();
166+
154167
if (logMinerConfig.getSupportAutoAddLog()) {
155168
startSql = oracleVersion == 10 ? SqlUtil.SQL_START_LOG_MINER_AUTO_ADD_LOG_10 : SqlUtil.SQL_START_LOG_MINER_AUTO_ADD_LOG;
156169
} else {

flinkx-oraclelogminer/flinkx-oraclelogminer-reader/src/main/java/com/dtstack/flinkx/oraclelogminer/format/LogMinerListener.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,13 @@
2828

2929
import java.util.Collections;
3030
import java.util.Map;
31-
import java.util.concurrent.*;
31+
import java.util.concurrent.BlockingQueue;
32+
import java.util.concurrent.ExecutorService;
33+
import java.util.concurrent.LinkedBlockingQueue;
34+
import java.util.concurrent.SynchronousQueue;
35+
import java.util.concurrent.ThreadFactory;
36+
import java.util.concurrent.ThreadPoolExecutor;
37+
import java.util.concurrent.TimeUnit;
3238

3339
/**
3440
* @author jiangbo
@@ -99,7 +105,6 @@ public void run() {
99105
logMinerConnection.closeStmt();
100106
logMinerConnection.startOrUpdateLogMiner(positionManager.getPosition());
101107
logMinerConnection.queryData(positionManager.getPosition());
102-
103108
LOG.debug("Update log and continue read:{}", positionManager.getPosition());
104109
}
105110
} catch (Exception e) {

0 commit comments

Comments
 (0)