Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/main/java/com/actiontech/dble/DbleServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,9 @@ public void startup() throws Exception {
this.config.testConnection();
LOGGER.info("==========================================Test connection finish==================================");

this.config.createDelayDetectTable();
LOGGER.info("==========================================Create delay detect table finish==================================");

// sync global status
this.config.getAndSyncKeyVariables();
LOGGER.info("=====================================Get And Sync KeyVariables finish=============================");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.io.IOException;
import java.lang.reflect.Type;
import java.util.*;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class PhysicalDbGroup {
Expand All @@ -42,11 +43,15 @@ public class PhysicalDbGroup {
public static final int RW_SPLIT_ALL = 2;
// weight
public static final int WEIGHT = 0;
private final List<PhysicalDbInstance> writeInstanceList;

enum USAGE {
NONE, RW, SHARDING;
}

private final String groupName;
private final DbGroupConfig dbGroupConfig;
private volatile PhysicalDbInstance writeDbInstance;
private final List<PhysicalDbInstance> writeInstanceList;
private Map<String, PhysicalDbInstance> allSourceMap = new HashMap<>();

private final int rwSplitMode;
Expand All @@ -55,8 +60,10 @@ public class PhysicalDbGroup {
private final LocalReadLoadBalancer localReadLoadBalancer = new LocalReadLoadBalancer();
private final ReentrantReadWriteLock adjustLock = new ReentrantReadWriteLock();

private boolean shardingUseless = true;
private boolean rwSplitUseless = true;
//delayDetection
private AtomicLong logicTimestamp = new AtomicLong();

private USAGE usedFor = USAGE.NONE;

public PhysicalDbGroup(String name, DbGroupConfig config, PhysicalDbInstance writeDbInstances, PhysicalDbInstance[] readDbInstances, int rwSplitMode) {
this.groupName = name;
Expand All @@ -66,8 +73,8 @@ public PhysicalDbGroup(String name, DbGroupConfig config, PhysicalDbInstance wri
writeDbInstances.setDbGroup(this);
this.writeDbInstance = writeDbInstances;
this.writeInstanceList = Collections.singletonList(writeDbInstance);
allSourceMap.put(writeDbInstances.getName(), writeDbInstances);

allSourceMap.put(writeDbInstances.getName(), writeDbInstances);
for (PhysicalDbInstance readDbInstance : readDbInstances) {
readDbInstance.setDbGroup(this);
allSourceMap.put(readDbInstance.getName(), readDbInstance);
Expand All @@ -89,6 +96,54 @@ public PhysicalDbGroup(PhysicalDbGroup org) {
writeInstanceList = Collections.singletonList(writeDbInstance);
}

public void init(String reason) {
for (Map.Entry<String, PhysicalDbInstance> entry : allSourceMap.entrySet()) {
entry.getValue().init(reason);
}
}

// only fresh backend connection pool
public void init(List<String> sourceNames, String reason) {
for (String sourceName : sourceNames) {
if (allSourceMap.containsKey(sourceName)) {
allSourceMap.get(sourceName).init(reason, false);
}
}
}

public void stop(String reason) {
stop(reason, false);
}

public void stop(String reason, boolean closeFront) {
for (PhysicalDbInstance dbInstance : allSourceMap.values()) {
dbInstance.stop(reason, closeFront);
}
}

// only fresh backend connection pool
public void stop(List<String> sourceNames, String reason, boolean closeFront) {
for (String sourceName : sourceNames) {
if (allSourceMap.containsKey(sourceName)) {
allSourceMap.get(sourceName).stop(reason, closeFront, false);
}
}

if (closeFront) {
Iterator<PooledConnection> iterator = IOProcessor.BACKENDS_OLD.iterator();
while (iterator.hasNext()) {
PooledConnection con = iterator.next();
if (con instanceof BackendConnection) {
BackendConnection backendCon = (BackendConnection) con;
if (backendCon.getPoolDestroyedTime() != 0 && sourceNames.contains(backendCon.getInstance().getConfig().getInstanceName())) {
backendCon.closeWithFront("old active backend conn will be forced closed by closing front conn");
iterator.remove();
}
}
}
}
}

public String getGroupName() {
return groupName;
}
Expand Down Expand Up @@ -125,88 +180,46 @@ PhysicalDbInstance findDbInstance(BackendConnection exitsCon) {
}

boolean isSlave(PhysicalDbInstance ds) {
return !(writeDbInstance == ds);
return writeDbInstance != ds;
}

public int getRwSplitMode() {
return rwSplitMode;
}

public boolean isUseless() {
return shardingUseless && rwSplitUseless;
return usedFor == USAGE.NONE;
}

public boolean usedForSharding() {
return usedFor == USAGE.SHARDING;
}

public boolean isShardingUseless() {
return shardingUseless;
public boolean usedForRW() {
return usedFor == USAGE.RW;
}

public boolean isRwSplitUseless() {
return rwSplitUseless;
public void setUsedForSharding() {
usedFor = USAGE.SHARDING;
}

public void setShardingUseless(boolean shardingUseless) {
this.shardingUseless = shardingUseless;
public void setUsedForRW() {
usedFor = USAGE.RW;
}

public void setRwSplitUseless(boolean rwSplitUseless) {
this.rwSplitUseless = rwSplitUseless;
public USAGE getUsedFor() {
return usedFor;
}

private boolean checkSlaveSynStatus() {
return (dbGroupConfig.getDelayThreshold() != -1) &&
(dbGroupConfig.isShowSlaveSql());
return ((dbGroupConfig.getDelayThreshold() != -1) &&
(dbGroupConfig.isShowSlaveSql())) || dbGroupConfig.isDelayDetection();
}

public PhysicalDbInstance getWriteDbInstance() {
return writeDbInstance;
}

public void init(String reason) {
for (Map.Entry<String, PhysicalDbInstance> entry : allSourceMap.entrySet()) {
entry.getValue().init(reason);
}
}

public void init(List<String> sourceNames, String reason) {
for (String sourceName : sourceNames) {
if (allSourceMap.containsKey(sourceName)) {
allSourceMap.get(sourceName).init(reason, false);
}
}
}

public void stop(String reason) {
stop(reason, false);
}

public void stop(String reason, boolean closeFront) {
for (PhysicalDbInstance dbInstance : allSourceMap.values()) {
dbInstance.stop(reason, closeFront);
}
}

public void stop(List<String> sourceNames, String reason, boolean closeFront) {
for (String sourceName : sourceNames) {
if (allSourceMap.containsKey(sourceName)) {
allSourceMap.get(sourceName).stop(reason, closeFront, false);
}
}

if (closeFront) {
Iterator<PooledConnection> iterator = IOProcessor.BACKENDS_OLD.iterator();
while (iterator.hasNext()) {
PooledConnection con = iterator.next();
if (con instanceof BackendConnection) {
BackendConnection backendCon = (BackendConnection) con;
if (backendCon.getPoolDestroyedTime() != 0 && sourceNames.contains(backendCon.getInstance().getConfig().getInstanceName())) {
backendCon.closeWithFront("old active backend conn will be forced closed by closing front conn");
iterator.remove();
}
}
}
}
}

public Collection<PhysicalDbInstance> getDbInstances(boolean isAll) {
if (!isAll && rwSplitMode == RW_SPLIT_OFF) {
return writeInstanceList;
Expand All @@ -230,18 +243,6 @@ public PhysicalDbInstance[] getReadDbInstances() {
return readSources;
}

/**
* rwsplit user
*
* @param master
* @param writeStatistical
* @return
* @throws IOException
*/
public PhysicalDbInstance rwSelect(Boolean master, Boolean writeStatistical) throws IOException {
return rwSelect(master, writeStatistical, false);
}

/**
* rwsplit user
*
Expand Down Expand Up @@ -546,6 +547,14 @@ public boolean checkInstanceExist(String instanceName) {
return true;
}

public AtomicLong getLogicTimestamp() {
return logicTimestamp;
}

public void setLogicTimestamp(AtomicLong logicTimestamp) {
this.logicTimestamp = logicTimestamp;
}

private void reportHeartbeatError(PhysicalDbInstance ins) throws IOException {
final DbInstanceConfig config = ins.getConfig();
String heartbeatError = "the dbInstance[" + config.getUrl() + "] can't reach. Please check the dbInstance status";
Expand All @@ -566,6 +575,6 @@ public boolean equalsBaseInfo(PhysicalDbGroup pool) {
pool.getDbGroupConfig().getRwSplitMode() == this.dbGroupConfig.getRwSplitMode() &&
pool.getDbGroupConfig().getDelayThreshold() == this.dbGroupConfig.getDelayThreshold() &&
pool.getDbGroupConfig().isDisableHA() == this.dbGroupConfig.isDisableHA() &&
pool.getGroupName().equals(this.groupName) && pool.isShardingUseless() == this.isShardingUseless() && pool.isRwSplitUseless() == this.isRwSplitUseless();
pool.getGroupName().equals(this.groupName) && pool.getUsedFor() == this.getUsedFor();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -364,18 +364,8 @@ private void startHeartbeat() {
return;
}

heartbeat.start();
if (initHeartbeat.compareAndSet(false, true)) {

heartbeat.setScheduledFuture(Scheduler.getInstance().getScheduledExecutor().scheduleAtFixedRate(() -> {
if (DbleServer.getInstance().getConfig().isFullyConfigured()) {
if (TimeUtil.currentTimeMillis() < heartbeatRecoveryTime) {
return;
}

heartbeat.heartbeat();
}
}, 0L, config.getPoolConfig().getHeartbeatPeriodMillis(), TimeUnit.MILLISECONDS));
heartbeat.start(heartbeatRecoveryTime);
} else {
LOGGER.warn("init dbInstance[{}] heartbeat, but it has been initialized, skip initialization.", heartbeat.getSource().getName());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package com.actiontech.dble.backend.delyDetection;

public enum DelayDetectionStatus {
INIT(), OK(), TIMEOUT(), ERROR(), STOP();

@Override
public String toString() {
return super.toString().toLowerCase();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public class HeartbeatSQLJob implements ResponseHandler {

public static final Logger LOGGER = LoggerFactory.getLogger(HeartbeatSQLJob.class);

private final String sql;
private volatile String sql;
private final SQLJobHandler jobHandler;
/*
* (null, 0) -> initial
Expand All @@ -37,11 +37,19 @@ public class HeartbeatSQLJob implements ResponseHandler {

public HeartbeatSQLJob(MySQLHeartbeat heartbeat, SQLJobHandler jobHandler) {
super();
this.sql = heartbeat.getHeartbeatSQL();
this.jobHandler = jobHandler;
this.heartbeat = heartbeat;
}

public long getConnectionId() {
final BackendConnection con = this.connectionRef.getReference();
long connId = 0;
if (con != null) {
connId = con.getId();
}
return connId;
}

public void terminate() {
if (connectionRef.compareAndSet(null, null, 0, 2)) {
LOGGER.info("[heartbeat]terminate timeout heartbeat job.");
Expand Down Expand Up @@ -72,6 +80,7 @@ public void connectionAcquired(final BackendConnection conn) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("[heartbeat]do heartbeat,conn is " + conn);
}
this.sql = heartbeat.getHeartbeatSQL();
conn.getBackendService().query(sql);
} catch (Exception e) { // (UnsupportedEncodingException e) {
LOGGER.warn("[heartbeat]send heartbeat error", e);
Expand All @@ -89,6 +98,7 @@ public void execute() {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("[heartbeat]do heartbeat,conn is {}", conn);
}
this.sql = heartbeat.getHeartbeatSQL();
conn.getBackendService().query(sql);
} catch (Exception e) { // (UnsupportedEncodingException e) {
LOGGER.warn("[heartbeat]send heartbeat error", e);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright (C) 2016-2022 ActionTech.
* based on code by MyCATCopyrightHolder Copyright (c) 2013, OpenCloudDB/MyCAT.
* License: http://www.gnu.org/licenses/gpl.html GPL version 2 or higher.
*/
package com.actiontech.dble.backend.heartbeat;

import com.actiontech.dble.backend.datasource.PhysicalDbInstance;
import com.actiontech.dble.sqlengine.OneRawSQLQueryResultHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;

/**
* @author mycat
*/
public class MySQLDefaultDetector extends MySQLDetector {
public static final Logger LOGGER = LoggerFactory.getLogger(MySQLDefaultDetector.class);

public MySQLDefaultDetector(MySQLHeartbeat heartbeat) {
super(heartbeat);
String[] fetchCols = {};
this.sqlJob = new HeartbeatSQLJob(heartbeat, new OneRawSQLQueryResultHandler(fetchCols, this));
}

@Override
protected void setStatus(PhysicalDbInstance source, Map<String, String> resultResult) {
// heartbeat.setResult(MySQLHeartbeat.OK_STATUS);
}
}
Loading
Loading