Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/main/java/com/actiontech/dble/DbleServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,9 @@ public void startup() throws Exception {
this.config.testConnection();
LOGGER.info("==========================================Test connection finish==================================");

this.config.createDelayDetectTable();
LOGGER.info("==========================================Create delay detect table finish==================================");

// sync global status
this.config.getAndSyncKeyVariables();
LOGGER.info("=====================================Get And Sync KeyVariables finish=============================");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.io.IOException;
import java.lang.reflect.Type;
import java.util.*;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class PhysicalDbGroup {
Expand All @@ -42,11 +43,15 @@ public class PhysicalDbGroup {
public static final int RW_SPLIT_ALL = 2;
// weight
public static final int WEIGHT = 0;
private final List<PhysicalDbInstance> writeInstanceList;

enum USAGE {
NONE, RW, SHARDING;
}

private final String groupName;
private final DbGroupConfig dbGroupConfig;
private volatile PhysicalDbInstance writeDbInstance;
private final List<PhysicalDbInstance> writeInstanceList;
private Map<String, PhysicalDbInstance> allSourceMap = new HashMap<>();

private final int rwSplitMode;
Expand All @@ -55,8 +60,10 @@ public class PhysicalDbGroup {
private final LocalReadLoadBalancer localReadLoadBalancer = new LocalReadLoadBalancer();
private final ReentrantReadWriteLock adjustLock = new ReentrantReadWriteLock();

private boolean shardingUseless = true;
private boolean rwSplitUseless = true;
//delayDetection
private AtomicLong logicTimestamp = new AtomicLong();

private USAGE usedFor = USAGE.NONE;

public PhysicalDbGroup(String name, DbGroupConfig config, PhysicalDbInstance writeDbInstances, PhysicalDbInstance[] readDbInstances, int rwSplitMode) {
this.groupName = name;
Expand All @@ -66,8 +73,8 @@ public PhysicalDbGroup(String name, DbGroupConfig config, PhysicalDbInstance wri
writeDbInstances.setDbGroup(this);
this.writeDbInstance = writeDbInstances;
this.writeInstanceList = Collections.singletonList(writeDbInstance);
allSourceMap.put(writeDbInstances.getName(), writeDbInstances);

allSourceMap.put(writeDbInstances.getName(), writeDbInstances);
for (PhysicalDbInstance readDbInstance : readDbInstances) {
readDbInstance.setDbGroup(this);
allSourceMap.put(readDbInstance.getName(), readDbInstance);
Expand All @@ -89,6 +96,54 @@ public PhysicalDbGroup(PhysicalDbGroup org) {
writeInstanceList = Collections.singletonList(writeDbInstance);
}

public void init(String reason) {
for (Map.Entry<String, PhysicalDbInstance> entry : allSourceMap.entrySet()) {
entry.getValue().init(reason);
}
}

// only fresh backend connection pool
public void init(List<String> sourceNames, String reason) {
for (String sourceName : sourceNames) {
if (allSourceMap.containsKey(sourceName)) {
allSourceMap.get(sourceName).init(reason, false);
}
}
}

public void stop(String reason) {
stop(reason, false);
}

public void stop(String reason, boolean closeFront) {
for (PhysicalDbInstance dbInstance : allSourceMap.values()) {
dbInstance.stop(reason, closeFront);
}
}

// only fresh backend connection pool
public void stop(List<String> sourceNames, String reason, boolean closeFront) {
for (String sourceName : sourceNames) {
if (allSourceMap.containsKey(sourceName)) {
allSourceMap.get(sourceName).stop(reason, closeFront, false);
}
}

if (closeFront) {
Iterator<PooledConnection> iterator = IOProcessor.BACKENDS_OLD.iterator();
while (iterator.hasNext()) {
PooledConnection con = iterator.next();
if (con instanceof BackendConnection) {
BackendConnection backendCon = (BackendConnection) con;
if (backendCon.getPoolDestroyedTime() != 0 && sourceNames.contains(backendCon.getInstance().getConfig().getInstanceName())) {
backendCon.closeWithFront("old active backend conn will be forced closed by closing front conn");
iterator.remove();
}
}
}
}
}

public String getGroupName() {
return groupName;
}
Expand Down Expand Up @@ -125,88 +180,46 @@ PhysicalDbInstance findDbInstance(BackendConnection exitsCon) {
}

boolean isSlave(PhysicalDbInstance ds) {
return !(writeDbInstance == ds);
return writeDbInstance != ds;
}

public int getRwSplitMode() {
return rwSplitMode;
}

public boolean isUseless() {
return shardingUseless && rwSplitUseless;
return usedFor == USAGE.NONE;
}

public boolean usedForSharding() {
return usedFor == USAGE.SHARDING;
}

public boolean isShardingUseless() {
return shardingUseless;
public boolean usedForRW() {
return usedFor == USAGE.RW;
}

public boolean isRwSplitUseless() {
return rwSplitUseless;
public void setUsedForSharding() {
usedFor = USAGE.SHARDING;
}

public void setShardingUseless(boolean shardingUseless) {
this.shardingUseless = shardingUseless;
public void setUsedForRW() {
usedFor = USAGE.RW;
}

public void setRwSplitUseless(boolean rwSplitUseless) {
this.rwSplitUseless = rwSplitUseless;
public USAGE getUsedFor() {
return usedFor;
}

private boolean checkSlaveSynStatus() {
return (dbGroupConfig.getDelayThreshold() != -1) &&
(dbGroupConfig.isShowSlaveSql());
return ((dbGroupConfig.getDelayThreshold() != -1) && dbGroupConfig.isShowSlaveSql()) ||
dbGroupConfig.isDelayDetection();
}

public PhysicalDbInstance getWriteDbInstance() {
return writeDbInstance;
}

public void init(String reason) {
for (Map.Entry<String, PhysicalDbInstance> entry : allSourceMap.entrySet()) {
entry.getValue().init(reason);
}
}

public void init(List<String> sourceNames, String reason) {
for (String sourceName : sourceNames) {
if (allSourceMap.containsKey(sourceName)) {
allSourceMap.get(sourceName).init(reason, false);
}
}
}

public void stop(String reason) {
stop(reason, false);
}

public void stop(String reason, boolean closeFront) {
for (PhysicalDbInstance dbInstance : allSourceMap.values()) {
dbInstance.stop(reason, closeFront);
}
}

public void stop(List<String> sourceNames, String reason, boolean closeFront) {
for (String sourceName : sourceNames) {
if (allSourceMap.containsKey(sourceName)) {
allSourceMap.get(sourceName).stop(reason, closeFront, false);
}
}

if (closeFront) {
Iterator<PooledConnection> iterator = IOProcessor.BACKENDS_OLD.iterator();
while (iterator.hasNext()) {
PooledConnection con = iterator.next();
if (con instanceof BackendConnection) {
BackendConnection backendCon = (BackendConnection) con;
if (backendCon.getPoolDestroyedTime() != 0 && sourceNames.contains(backendCon.getInstance().getConfig().getInstanceName())) {
backendCon.closeWithFront("old active backend conn will be forced closed by closing front conn");
iterator.remove();
}
}
}
}
}

public Collection<PhysicalDbInstance> getDbInstances(boolean isAll) {
if (!isAll && rwSplitMode == RW_SPLIT_OFF) {
return writeInstanceList;
Expand All @@ -230,18 +243,6 @@ public PhysicalDbInstance[] getReadDbInstances() {
return readSources;
}

/**
* rwsplit user
*
* @param master
* @param writeStatistical
* @return
* @throws IOException
*/
public PhysicalDbInstance rwSelect(Boolean master, Boolean writeStatistical) throws IOException {
return rwSelect(master, writeStatistical, false);
}

/**
* rwsplit user
*
Expand Down Expand Up @@ -546,6 +547,14 @@ public boolean checkInstanceExist(String instanceName) {
return true;
}

public AtomicLong getLogicTimestamp() {
return logicTimestamp;
}

public void setLogicTimestamp(AtomicLong logicTimestamp) {
this.logicTimestamp = logicTimestamp;
}

private void reportHeartbeatError(PhysicalDbInstance ins) throws IOException {
final DbInstanceConfig config = ins.getConfig();
String heartbeatError = "the dbInstance[" + config.getUrl() + "] can't reach. Please check the dbInstance status";
Expand All @@ -565,7 +574,9 @@ public boolean equalsBaseInfo(PhysicalDbGroup pool) {
pool.getDbGroupConfig().getErrorRetryCount() == this.dbGroupConfig.getErrorRetryCount() &&
pool.getDbGroupConfig().getRwSplitMode() == this.dbGroupConfig.getRwSplitMode() &&
pool.getDbGroupConfig().getDelayThreshold() == this.dbGroupConfig.getDelayThreshold() &&
pool.getDbGroupConfig().getDelayPeriodMillis() == this.dbGroupConfig.getDelayPeriodMillis() &&
pool.getDbGroupConfig().getDelayDatabase().equals(this.dbGroupConfig.getDelayDatabase()) &&
pool.getDbGroupConfig().isDisableHA() == this.dbGroupConfig.isDisableHA() &&
pool.getGroupName().equals(this.groupName) && pool.isShardingUseless() == this.isShardingUseless() && pool.isRwSplitUseless() == this.isRwSplitUseless();
pool.getGroupName().equals(this.groupName) && pool.getUsedFor() == this.getUsedFor();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,12 @@
import com.actiontech.dble.net.factory.MySQLConnectionFactory;
import com.actiontech.dble.net.service.AbstractService;
import com.actiontech.dble.services.mysqlsharding.MySQLResponseService;
import com.actiontech.dble.singleton.Scheduler;
import com.actiontech.dble.singleton.TraceManager;
import com.actiontech.dble.util.StringUtil;
import com.actiontech.dble.util.TimeUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.LongAdder;

Expand Down Expand Up @@ -54,7 +51,6 @@ public abstract class PhysicalDbInstance implements ReadTimeStatusInstance {
private final LongAdder writeCount = new LongAdder();

private final AtomicBoolean isInitial = new AtomicBoolean(false);
private AtomicBoolean initHeartbeat = new AtomicBoolean(false);

// connection pool
private ConnectionPool connectionPool;
Expand Down Expand Up @@ -96,24 +92,30 @@ public void init(String reason, boolean isInitHeartbeat) {
return;
}

if (dbGroup.usedForSharding()) {
checkPoolSize();
}

LOGGER.info("init dbInstance[{}]", this.dbGroup.getGroupName() + "." + name);
start(reason, isInitHeartbeat);
}

private void checkPoolSize() {
int size = config.getMinCon();
String[] physicalSchemas = dbGroup.getSchemas();
int initSize = physicalSchemas.length;
if (size < initSize) {
LOGGER.warn("For db instance[{}], minIdle is less than (the count of shardingNodes), so dble will create at least 1 conn for every schema, " +
LOGGER.warn("For db instance[{}], minIdle is less than (the count of shardingNodes/apNodes), so dble will create at least 1 conn for every schema, " +
"minCon size before:{}, now:{}", this.dbGroup.getGroupName() + "." + name, size, initSize);
config.setMinCon(initSize);
}

initSize = Math.max(initSize, config.getMinCon());
size = config.getMaxCon();
if (size < initSize) {
LOGGER.warn("For db instance[{}], maxTotal[{}] is less than the minCon or the count of shardingNodes,change the maxCon into {}", this.dbGroup.getGroupName() + "." + name, size, initSize);
LOGGER.warn("For db instance[{}], maxTotal[{}] is less than the minCon or the count of shardingNodes/apNodes,change the maxCon into {}", this.dbGroup.getGroupName() + "." + name, size, initSize);
config.setMaxCon(initSize);
}

LOGGER.info("init dbInstance[{}]", this.dbGroup.getGroupName() + "." + name);
start(reason, isInitHeartbeat);
}

public void createConnectionSkipPool(String schema, ResponseHandler handler) {
Expand Down Expand Up @@ -363,36 +365,33 @@ private void startHeartbeat() {
LOGGER.info("the instance[{}] is disabled or fake node, skip to start heartbeat.", this.dbGroup.getGroupName() + "." + name);
return;
}
heartbeat.start(heartbeatRecoveryTime);
}

heartbeat.start();
if (initHeartbeat.compareAndSet(false, true)) {

heartbeat.setScheduledFuture(Scheduler.getInstance().getScheduledExecutor().scheduleAtFixedRate(() -> {
if (DbleServer.getInstance().getConfig().isFullyConfigured()) {
if (TimeUtil.currentTimeMillis() < heartbeatRecoveryTime) {
return;
}

heartbeat.heartbeat();
}
}, 0L, config.getPoolConfig().getHeartbeatPeriodMillis(), TimeUnit.MILLISECONDS));
} else {
LOGGER.warn("init dbInstance[{}] heartbeat, but it has been initialized, skip initialization.", heartbeat.getSource().getName());
}
private void stopHeartbeat(String reason) {
heartbeat.stop(reason);
}

public void start(String reason) {
start(reason, true);
}

public void start(String reason, boolean isStartHeartbeat) {
startPool(reason);
if (isStartHeartbeat) {
startHeartbeat();
}
}

private void startPool(String reason) {
if (disabled.get() || fakeNode) {
LOGGER.info("init dbInstance[{}] because {}, but it is disabled or a fakeNode, skip initialization.", this.dbGroup.getGroupName() + "." + name, reason);
return;
}
if ((dbGroupConfig.getRwSplitMode() != RW_SPLIT_OFF || dbGroup.getWriteDbInstance() == this) && !dbGroup.isUseless()) {
LOGGER.info("start connection pool of physical db instance[{}], due to {}", this.dbGroup.getGroupName() + "." + name, reason);
this.connectionPool.startEvictor();
}
if (isStartHeartbeat) {
startHeartbeat();
}
}

public void stop(String reason, boolean closeFront) {
Expand All @@ -401,17 +400,18 @@ public void stop(String reason, boolean closeFront) {

public void stop(String reason, boolean closeFront, boolean isStopHeartbeat) {
if (isStopHeartbeat) {
final boolean stop = heartbeat.isStop();
heartbeat.stop(reason);
if (!stop) {
initHeartbeat.set(false);
}
stopHeartbeat(reason);
}
stopPool(reason, closeFront);

isInitial.set(false);
}

private void stopPool(String reason, boolean closeFront) {
if (dbGroupConfig.getRwSplitMode() != RW_SPLIT_OFF || dbGroup.getWriteDbInstance() == this) {
LOGGER.info("stop connection pool of physical db instance[{}], due to {}", this.dbGroup.getGroupName() + "." + name, reason);
connectionPool.stop(reason, closeFront);
}
isInitial.set(false);
}

public void closeAllConnection(String reason) {
Expand Down
Loading
Loading