Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ protected void createRollupReplica() throws AlterCancelException {
totalReplicaNum += tablet.getReplicas().size();
}
}
MarkedCountDownLatch<Long, Long> countDownLatch = new MarkedCountDownLatch<Long, Long>(totalReplicaNum);
MarkedCountDownLatch<Long, Long> countDownLatch = new MarkedCountDownLatch<Long, Long>();
OlapTable tbl;
try {
tbl = (OlapTable) db.getTableOrMetaException(tableId, Table.TableType.OLAP);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2773,7 +2773,7 @@ public void updatePartitionProperties(Database db, String tableName, String part
}

int totalTaskNum = beIdToTabletIdWithHash.keySet().size();
MarkedCountDownLatch<Long, Set<Pair<Long, Integer>>> countDownLatch = new MarkedCountDownLatch<>(totalTaskNum);
MarkedCountDownLatch<Long, Set<Pair<Long, Integer>>> countDownLatch = new MarkedCountDownLatch<>();
AgentBatchTask batchTask = new AgentBatchTask();
for (Map.Entry<Long, Set<Pair<Long, Integer>>> kv : beIdToTabletIdWithHash.entrySet()) {
countDownLatch.addMark(kv.getKey(), kv.getValue());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ protected void createShadowIndexReplica() throws AlterCancelException {
totalReplicaNum += tablet.getReplicas().size();
}
}
MarkedCountDownLatch<Long, Long> countDownLatch = new MarkedCountDownLatch<>(totalReplicaNum);
MarkedCountDownLatch<Long, Long> countDownLatch = new MarkedCountDownLatch<>();

OlapTable tbl;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1041,7 +1041,7 @@ protected void doCreateReplicas() {
.stream()
.mapToInt(AgentBatchTask::getTaskNum)
.sum();
createReplicaTasksLatch = new MarkedCountDownLatch<>(numBatchTasks);
createReplicaTasksLatch = new MarkedCountDownLatch<>();
if (numBatchTasks > 0) {
LOG.info("begin to send create replica tasks to BE for restore. total {} tasks. {}",
numBatchTasks, this);
Expand Down Expand Up @@ -1069,7 +1069,7 @@ protected void waitingAllReplicasCreated() {
try {
if (!createReplicaTasksLatch.await(0, TimeUnit.SECONDS)) {
LOG.info("waiting {} create replica tasks for restore to finish. {}",
createReplicaTasksLatch.getCount(), this);
createReplicaTasksLatch.getMarkCount(), this);
long createReplicasTimeOut = DbUtil.getCreateReplicasTimeoutMs(createReplicaTasksLatch.getMarkCount());
long tryCreateTime = System.currentTimeMillis() - createReplicasTimeStamp;
if (tryCreateTime > createReplicasTimeOut) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ protected void runAfterCatalogReady() {
// no need to get tablet stat if backend is not alive
List<Backend> aliveBackends = backends.values().stream().filter(Backend::isAlive)
.collect(Collectors.toList());
updateTabletStatsLatch = new MarkedCountDownLatch<>(aliveBackends.size());
updateTabletStatsLatch = new MarkedCountDownLatch<>();
aliveBackends.forEach(backend -> {
updateTabletStatsLatch.addMark(backend.getId(), backend);
executor.submit(() -> {
Expand Down Expand Up @@ -322,7 +322,7 @@ public void waitForTabletStatUpdate() {
try {
if (!updateTabletStatsLatch.await(600, TimeUnit.SECONDS)) {
LOG.info("timeout waiting {} update tablet stats tasks finish after {} seconds.",
updateTabletStatsLatch.getCount(), 600);
updateTabletStatsLatch.getMarkCount(), 600);
ok = false;
}
} catch (InterruptedException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1302,9 +1302,7 @@ private void sendCalcDeleteBitmaptask(long dbId, long transactionId,
stopWatch.start();
boolean res = false;
try {
int totalTaskNum = backendToPartitionInfos.size();
MarkedCountDownLatch<Long, Long> countDownLatch = new MarkedCountDownLatch<Long, Long>(
totalTaskNum);
MarkedCountDownLatch<Long, Long> countDownLatch = new MarkedCountDownLatch<Long, Long>();
AgentBatchTask batchTask = new AgentBatchTask();

long signature = getTxnLastSignature(dbId, transactionId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,73 +24,133 @@
import java.util.List;
import java.util.Map.Entry;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

public class MarkedCountDownLatch<K, V> extends CountDownLatch {
public class MarkedCountDownLatch<K, V> {

private final Object lock = new Object();

private Multimap<K, V> marks;
private Multimap<K, V> failedMarks;
private Status st = Status.OK;
private int markCount = 0;
private int markCount;
private CountDownLatch downLatch;


public MarkedCountDownLatch(int count) {
super(count);
this.markCount = count;
public MarkedCountDownLatch() {
marks = HashMultimap.create();
failedMarks = HashMultimap.create();
markCount = 0;
downLatch = null;
}

public int getMarkCount() {
return markCount;
synchronized (lock) {
return markCount;
}
}

public synchronized void addMark(K key, V value) {
marks.put(key, value);
public void addMark(K key, V value) {
synchronized (lock) {
if (downLatch != null) {
throw new IllegalStateException("downLatch must initialize after mark.");
}
marks.put(key, value);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里如果downLatch != null,需要报错,我们不能够支持await 之后,还变更mark的数量

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK,这里可以增加一个判断的,之所以让这个行为存在主要是为了兼容之前的写入,我们可以让使用者在事先就知道目标的count是多少,然后调用有count的构造函数在构建MarkedCountDownLatch时就初始化downLatch,如果可以不兼容这种情况那么是需要在这里限制这种行为的。

markCount++;
}
}

public synchronized boolean markedCountDown(K key, V value) {
if (marks.remove(key, value)) {
super.countDown();
return true;
public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
synchronized (lock) {
if (downLatch == null) {
this.downLatch = new CountDownLatch(markCount);
}
}
return false;
return downLatch.await(timeout, unit);
}

public synchronized boolean markedCountDownWithStatus(K key, V value, Status status) {
// update status first before countDown.
// so that the waiting thread will get the correct status.
if (st.ok()) {
st = status;
public void await() throws InterruptedException {
synchronized (lock) {
if (downLatch == null) {
this.downLatch = new CountDownLatch(markCount);
}
}
downLatch.await();
}

// Since marks are used to determine whether a task is completed, we should not remove
// a mark if the task has failed rather than finished. To maintain the idempotency of
// this method, we store failed marks in a separate map.
//
// Search `getLeftMarks` for details.
if (!failedMarks.containsEntry(key, value)) {
failedMarks.put(key, value);
super.countDown();
return true;
public boolean markedCountDown(K key, V value) {
synchronized (lock) {
if (downLatch == null) {
throw new IllegalStateException("downLatch is not initialize checkout usage is valid.");
}
if (marks.remove(key, value)) {
markCount--;
downLatch.countDown();
return true;
}
return false;
}
return false;
}

public synchronized List<Entry<K, V>> getLeftMarks() {
return Lists.newArrayList(marks.entries());
public boolean markedCountDownWithStatus(K key, V value, Status status) {
// update status first before countDown.
// so that the waiting thread will get the correct status.
synchronized (lock) {
if (downLatch == null) {
throw new IllegalStateException("downLatch is not initialize checkout usage is valid.");
}

if (st.ok()) {
st = status;
}

// Since marks are used to determine whether a task is completed, we should not remove
// a mark if the task has failed rather than finished. To maintain the idempotency of
// this method, we store failed marks in a separate map.
//
// Search `getLeftMarks` for details.
if (!failedMarks.containsEntry(key, value)) {
failedMarks.put(key, value);
marks.remove(key, value);
markCount--;
downLatch.countDown();
return true;
}
return false;
}
}

public synchronized Status getStatus() {
return st;
public void countDownToZero(Status status) {
synchronized (lock) {
if (downLatch == null) {
throw new IllegalStateException("downLatch is not initialize checkout usage is valid.");
}
// update status first before countDown.
// so that the waiting thread will get the correct status.
if (st.ok()) {
st = status;
}
while (downLatch.getCount() > 0) {
markCount--;
downLatch.countDown();
}

//clear up the marks list
marks.clear();
}
}

public synchronized void countDownToZero(Status status) {
// update status first before countDown.
// so that the waiting thread will get the correct status.
if (st.ok()) {
st = status;
public List<Entry<K, V>> getLeftMarks() {
synchronized (lock) {
return Lists.newArrayList(marks.entries());
}
while (getCount() > 0) {
super.countDown();
}

public Status getStatus() {
synchronized (lock) {
return st;
}
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -2093,7 +2093,7 @@ protected Partition createPartitionWithIndices(long dbId, OlapTable tbl, long pa
KeysType keysType = indexMeta.getKeysType();
List<Index> indexes = indexId == tbl.getBaseIndexId() ? tbl.getCopiedIndexes() : null;
int totalTaskNum = index.getTablets().size() * totalReplicaNum;
MarkedCountDownLatch<Long, Long> countDownLatch = new MarkedCountDownLatch<Long, Long>(totalTaskNum);
MarkedCountDownLatch<Long, Long> countDownLatch = new MarkedCountDownLatch<Long, Long>();
AgentBatchTask batchTask = new AgentBatchTask();
List<String> rowStoreColumns = tbl.getTableProperty().getCopiedRowStoreColumns();
for (Tablet tablet : index.getTablets()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.doris.common.ConfigBase;
import org.apache.doris.common.MarkedCountDownLatch;
import org.apache.doris.common.Pair;
import org.apache.doris.common.Status;
import org.apache.doris.common.ThreadPoolManager;
import org.apache.doris.common.proc.ProcResult;
import org.apache.doris.common.proc.ProcService;
Expand Down Expand Up @@ -365,7 +366,7 @@ private List<List<String>> handleConfigurationInfo(List<Pair<String, Integer>> h
// The configuration information returned by each node is a List<List<String>> type,
// configInfoTotal is used to store the configuration information of all nodes.
List<List<List<String>>> configInfoTotal = Lists.newArrayList();
MarkedCountDownLatch<String, Integer> configRequestDoneSignal = new MarkedCountDownLatch<>(hostPorts.size());
MarkedCountDownLatch<String, Integer> configRequestDoneSignal = new MarkedCountDownLatch<>();
for (int i = 0; i < hostPorts.size(); ++i) {
configInfoTotal.add(Lists.newArrayList());

Expand Down Expand Up @@ -437,6 +438,7 @@ public HttpConfigInfoTask(String url, Pair<String, Integer> hostPort, String aut
@Override
public void run() {
String configInfo;
String markKey = NetUtils.getHostPortInAccessibleFormat(hostPort.first, hostPort.second);
try {
configInfo = HttpUtils.doGet(url,
ImmutableMap.<String, String>builder().put(AUTHORIZATION, authorization).build());
Expand All @@ -447,11 +449,10 @@ public void run() {
addConfig(conf);
}
}
configRequestDoneSignal.markedCountDown(NetUtils
.getHostPortInAccessibleFormat(hostPort.first, hostPort.second), -1);
configRequestDoneSignal.markedCountDown(markKey, -1);
} catch (Exception e) {
LOG.warn("get config from {}:{} failed.", hostPort.first, hostPort.second, e);
configRequestDoneSignal.countDown();
configRequestDoneSignal.markedCountDownWithStatus(markKey, -1, Status.CANCELLED);
}
}

Expand Down Expand Up @@ -815,9 +816,7 @@ private List<Map<String, String>> handleBeSetConfig(List<NodeConfigs> nodeConfig
List<Map<String, String>> failedTotal) {
initHttpExecutor();

int configNum = nodeConfigList.stream().mapToInt(e -> e.getConfigs(true).size() + e.getConfigs(false).size())
.sum();
MarkedCountDownLatch<String, Integer> beSetConfigCountDownSignal = new MarkedCountDownLatch<>(configNum);
MarkedCountDownLatch<String, Integer> beSetConfigCountDownSignal = new MarkedCountDownLatch<>();
for (NodeConfigs nodeConfigs : nodeConfigList) {
submitBeSetConfigTask(nodeConfigs, true, authorization, beSetConfigCountDownSignal, failedTotal);
submitBeSetConfigTask(nodeConfigs, false, authorization, beSetConfigCountDownSignal, failedTotal);
Expand Down Expand Up @@ -905,6 +904,7 @@ public HttpSetConfigTask(String url, Pair<String, Integer> hostPort, String auth

@Override
public void run() {
String markKey = concatNodeConfig(hostPort.first, hostPort.second, configName, configValue);
try {
String response = HttpUtils.doPost(url,
ImmutableMap.<String, String>builder().put(AUTHORIZATION, authorization).build(), null);
Expand All @@ -915,13 +915,12 @@ public void run() {
.getHostPortInAccessibleFormat(hostPort.first, hostPort.second),
jsonObject.get("msg").getAsString(), failed);
}
beSetConfigDoneSignal.markedCountDown(
concatNodeConfig(hostPort.first, hostPort.second, configName, configValue), -1);
beSetConfigDoneSignal.markedCountDown(markKey, -1);
} catch (Exception e) {
LOG.warn("set be:{} config:{} failed.", NetUtils
.getHostPortInAccessibleFormat(hostPort.first, hostPort.second),
configName + "=" + configValue, e);
beSetConfigDoneSignal.countDown();
beSetConfigDoneSignal.markedCountDownWithStatus(markKey, -1, Status.CANCELLED);
}
}
}
Expand Down
Loading
Loading