diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java index 439b0423651151..c6fa7804d34314 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java @@ -212,7 +212,7 @@ protected void createRollupReplica() throws AlterCancelException { totalReplicaNum += tablet.getReplicas().size(); } } - MarkedCountDownLatch countDownLatch = new MarkedCountDownLatch(totalReplicaNum); + MarkedCountDownLatch countDownLatch = new MarkedCountDownLatch(); OlapTable tbl; try { tbl = (OlapTable) db.getTableOrMetaException(tableId, Table.TableType.OLAP); diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java index 4e042d822f054a..fd2d2f58882db2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java @@ -2773,7 +2773,7 @@ public void updatePartitionProperties(Database db, String tableName, String part } int totalTaskNum = beIdToTabletIdWithHash.keySet().size(); - MarkedCountDownLatch>> countDownLatch = new MarkedCountDownLatch<>(totalTaskNum); + MarkedCountDownLatch>> countDownLatch = new MarkedCountDownLatch<>(); AgentBatchTask batchTask = new AgentBatchTask(); for (Map.Entry>> kv : beIdToTabletIdWithHash.entrySet()) { countDownLatch.addMark(kv.getKey(), kv.getValue()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java index a235a84f100ef0..89e17bd7d9a547 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java @@ -261,7 +261,7 @@ protected void createShadowIndexReplica() throws AlterCancelException { totalReplicaNum += tablet.getReplicas().size(); } } - MarkedCountDownLatch countDownLatch = new MarkedCountDownLatch<>(totalReplicaNum); + MarkedCountDownLatch countDownLatch = new MarkedCountDownLatch<>(); OlapTable tbl; try { diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java index a7c19ddafecd3a..e851d2a976ae06 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java @@ -1041,7 +1041,7 @@ protected void doCreateReplicas() { .stream() .mapToInt(AgentBatchTask::getTaskNum) .sum(); - createReplicaTasksLatch = new MarkedCountDownLatch<>(numBatchTasks); + createReplicaTasksLatch = new MarkedCountDownLatch<>(); if (numBatchTasks > 0) { LOG.info("begin to send create replica tasks to BE for restore. total {} tasks. {}", numBatchTasks, this); @@ -1069,7 +1069,7 @@ protected void waitingAllReplicasCreated() { try { if (!createReplicaTasksLatch.await(0, TimeUnit.SECONDS)) { LOG.info("waiting {} create replica tasks for restore to finish. {}", - createReplicaTasksLatch.getCount(), this); + createReplicaTasksLatch.getMarkCount(), this); long createReplicasTimeOut = DbUtil.getCreateReplicasTimeoutMs(createReplicaTasksLatch.getMarkCount()); long tryCreateTime = System.currentTimeMillis() - createReplicasTimeStamp; if (tryCreateTime > createReplicasTimeOut) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java index e56d9b7a6617f4..34019a86863565 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java @@ -75,7 +75,7 @@ protected void runAfterCatalogReady() { // no need to get tablet stat if backend is not alive List aliveBackends = backends.values().stream().filter(Backend::isAlive) .collect(Collectors.toList()); - updateTabletStatsLatch = new MarkedCountDownLatch<>(aliveBackends.size()); + updateTabletStatsLatch = new MarkedCountDownLatch<>(); aliveBackends.forEach(backend -> { updateTabletStatsLatch.addMark(backend.getId(), backend); executor.submit(() -> { @@ -322,7 +322,7 @@ public void waitForTabletStatUpdate() { try { if (!updateTabletStatsLatch.await(600, TimeUnit.SECONDS)) { LOG.info("timeout waiting {} update tablet stats tasks finish after {} seconds.", - updateTabletStatsLatch.getCount(), 600); + updateTabletStatsLatch.getMarkCount(), 600); ok = false; } } catch (InterruptedException e) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java index f06278329be422..c8efe2a3f90073 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java @@ -1302,9 +1302,7 @@ private void sendCalcDeleteBitmaptask(long dbId, long transactionId, stopWatch.start(); boolean res = false; try { - int totalTaskNum = backendToPartitionInfos.size(); - MarkedCountDownLatch countDownLatch = new MarkedCountDownLatch( - totalTaskNum); + MarkedCountDownLatch countDownLatch = new MarkedCountDownLatch(); AgentBatchTask batchTask = new AgentBatchTask(); long signature = getTxnLastSignature(dbId, transactionId); diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/MarkedCountDownLatch.java b/fe/fe-core/src/main/java/org/apache/doris/common/MarkedCountDownLatch.java index 0eecbc43b1d340..af30bd2ff53c56 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/MarkedCountDownLatch.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/MarkedCountDownLatch.java @@ -24,73 +24,133 @@ import java.util.List; import java.util.Map.Entry; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; -public class MarkedCountDownLatch extends CountDownLatch { +public class MarkedCountDownLatch { + + private final Object lock = new Object(); private Multimap marks; private Multimap failedMarks; private Status st = Status.OK; - private int markCount = 0; + private int markCount; + private CountDownLatch downLatch; + - public MarkedCountDownLatch(int count) { - super(count); - this.markCount = count; + public MarkedCountDownLatch() { marks = HashMultimap.create(); failedMarks = HashMultimap.create(); + markCount = 0; + downLatch = null; } public int getMarkCount() { - return markCount; + synchronized (lock) { + return markCount; + } } - public synchronized void addMark(K key, V value) { - marks.put(key, value); + public void addMark(K key, V value) { + synchronized (lock) { + if (downLatch != null) { + throw new IllegalStateException("downLatch must initialize after mark."); + } + marks.put(key, value); + markCount++; + } } - public synchronized boolean markedCountDown(K key, V value) { - if (marks.remove(key, value)) { - super.countDown(); - return true; + public boolean await(long timeout, TimeUnit unit) throws InterruptedException { + synchronized (lock) { + if (downLatch == null) { + this.downLatch = new CountDownLatch(markCount); + } } - return false; + return downLatch.await(timeout, unit); } - public synchronized boolean markedCountDownWithStatus(K key, V value, Status status) { - // update status first before countDown. - // so that the waiting thread will get the correct status. - if (st.ok()) { - st = status; + public void await() throws InterruptedException { + synchronized (lock) { + if (downLatch == null) { + this.downLatch = new CountDownLatch(markCount); + } } + downLatch.await(); + } - // Since marks are used to determine whether a task is completed, we should not remove - // a mark if the task has failed rather than finished. To maintain the idempotency of - // this method, we store failed marks in a separate map. - // - // Search `getLeftMarks` for details. - if (!failedMarks.containsEntry(key, value)) { - failedMarks.put(key, value); - super.countDown(); - return true; + public boolean markedCountDown(K key, V value) { + synchronized (lock) { + if (downLatch == null) { + throw new IllegalStateException("downLatch is not initialize checkout usage is valid."); + } + if (marks.remove(key, value)) { + markCount--; + downLatch.countDown(); + return true; + } + return false; } - return false; } - public synchronized List> getLeftMarks() { - return Lists.newArrayList(marks.entries()); + public boolean markedCountDownWithStatus(K key, V value, Status status) { + // update status first before countDown. + // so that the waiting thread will get the correct status. + synchronized (lock) { + if (downLatch == null) { + throw new IllegalStateException("downLatch is not initialize checkout usage is valid."); + } + + if (st.ok()) { + st = status; + } + + // Since marks are used to determine whether a task is completed, we should not remove + // a mark if the task has failed rather than finished. To maintain the idempotency of + // this method, we store failed marks in a separate map. + // + // Search `getLeftMarks` for details. + if (!failedMarks.containsEntry(key, value)) { + failedMarks.put(key, value); + marks.remove(key, value); + markCount--; + downLatch.countDown(); + return true; + } + return false; + } } - public synchronized Status getStatus() { - return st; + public void countDownToZero(Status status) { + synchronized (lock) { + if (downLatch == null) { + throw new IllegalStateException("downLatch is not initialize checkout usage is valid."); + } + // update status first before countDown. + // so that the waiting thread will get the correct status. + if (st.ok()) { + st = status; + } + while (downLatch.getCount() > 0) { + markCount--; + downLatch.countDown(); + } + + //clear up the marks list + marks.clear(); + } } - public synchronized void countDownToZero(Status status) { - // update status first before countDown. - // so that the waiting thread will get the correct status. - if (st.ok()) { - st = status; + public List> getLeftMarks() { + synchronized (lock) { + return Lists.newArrayList(marks.entries()); } - while (getCount() > 0) { - super.countDown(); + } + + public Status getStatus() { + synchronized (lock) { + return st; } } + + } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java index 6302dbc9d18b4e..3c26605fd48ddb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java @@ -2093,7 +2093,7 @@ protected Partition createPartitionWithIndices(long dbId, OlapTable tbl, long pa KeysType keysType = indexMeta.getKeysType(); List indexes = indexId == tbl.getBaseIndexId() ? tbl.getCopiedIndexes() : null; int totalTaskNum = index.getTablets().size() * totalReplicaNum; - MarkedCountDownLatch countDownLatch = new MarkedCountDownLatch(totalTaskNum); + MarkedCountDownLatch countDownLatch = new MarkedCountDownLatch(); AgentBatchTask batchTask = new AgentBatchTask(); List rowStoreColumns = tbl.getTableProperty().getCopiedRowStoreColumns(); for (Tablet tablet : index.getTablets()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/NodeAction.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/NodeAction.java index 4067864dc30219..5fc39029f2f49a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/NodeAction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/NodeAction.java @@ -24,6 +24,7 @@ import org.apache.doris.common.ConfigBase; import org.apache.doris.common.MarkedCountDownLatch; import org.apache.doris.common.Pair; +import org.apache.doris.common.Status; import org.apache.doris.common.ThreadPoolManager; import org.apache.doris.common.proc.ProcResult; import org.apache.doris.common.proc.ProcService; @@ -365,7 +366,7 @@ private List> handleConfigurationInfo(List> h // The configuration information returned by each node is a List> type, // configInfoTotal is used to store the configuration information of all nodes. List>> configInfoTotal = Lists.newArrayList(); - MarkedCountDownLatch configRequestDoneSignal = new MarkedCountDownLatch<>(hostPorts.size()); + MarkedCountDownLatch configRequestDoneSignal = new MarkedCountDownLatch<>(); for (int i = 0; i < hostPorts.size(); ++i) { configInfoTotal.add(Lists.newArrayList()); @@ -437,6 +438,7 @@ public HttpConfigInfoTask(String url, Pair hostPort, String aut @Override public void run() { String configInfo; + String markKey = NetUtils.getHostPortInAccessibleFormat(hostPort.first, hostPort.second); try { configInfo = HttpUtils.doGet(url, ImmutableMap.builder().put(AUTHORIZATION, authorization).build()); @@ -447,11 +449,10 @@ public void run() { addConfig(conf); } } - configRequestDoneSignal.markedCountDown(NetUtils - .getHostPortInAccessibleFormat(hostPort.first, hostPort.second), -1); + configRequestDoneSignal.markedCountDown(markKey, -1); } catch (Exception e) { LOG.warn("get config from {}:{} failed.", hostPort.first, hostPort.second, e); - configRequestDoneSignal.countDown(); + configRequestDoneSignal.markedCountDownWithStatus(markKey, -1, Status.CANCELLED); } } @@ -815,9 +816,7 @@ private List> handleBeSetConfig(List nodeConfig List> failedTotal) { initHttpExecutor(); - int configNum = nodeConfigList.stream().mapToInt(e -> e.getConfigs(true).size() + e.getConfigs(false).size()) - .sum(); - MarkedCountDownLatch beSetConfigCountDownSignal = new MarkedCountDownLatch<>(configNum); + MarkedCountDownLatch beSetConfigCountDownSignal = new MarkedCountDownLatch<>(); for (NodeConfigs nodeConfigs : nodeConfigList) { submitBeSetConfigTask(nodeConfigs, true, authorization, beSetConfigCountDownSignal, failedTotal); submitBeSetConfigTask(nodeConfigs, false, authorization, beSetConfigCountDownSignal, failedTotal); @@ -905,6 +904,7 @@ public HttpSetConfigTask(String url, Pair hostPort, String auth @Override public void run() { + String markKey = concatNodeConfig(hostPort.first, hostPort.second, configName, configValue); try { String response = HttpUtils.doPost(url, ImmutableMap.builder().put(AUTHORIZATION, authorization).build(), null); @@ -915,13 +915,12 @@ public void run() { .getHostPortInAccessibleFormat(hostPort.first, hostPort.second), jsonObject.get("msg").getAsString(), failed); } - beSetConfigDoneSignal.markedCountDown( - concatNodeConfig(hostPort.first, hostPort.second, configName, configValue), -1); + beSetConfigDoneSignal.markedCountDown(markKey, -1); } catch (Exception e) { LOG.warn("set be:{} config:{} failed.", NetUtils .getHostPortInAccessibleFormat(hostPort.first, hostPort.second), configName + "=" + configValue, e); - beSetConfigDoneSignal.countDown(); + beSetConfigDoneSignal.markedCountDownWithStatus(markKey, -1, Status.CANCELLED); } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/DeleteJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/DeleteJob.java index 2f619a4cc53a89..78d06697509b16 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/DeleteJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/DeleteJob.java @@ -352,6 +352,11 @@ public void dispatch() throws Exception { for (Replica replica : tablet.getReplicas()) { long replicaId = replica.getId(); long backendId = replica.getBackendId(); + //if backend has been dropped or not alive it will skip dispatch agent task + if (!Env.getCurrentSystemInfo().checkBackendAlive(backendId)) { + continue; + } + countDownLatch.addMark(backendId, tabletId); // create push task for each replica @@ -412,31 +417,6 @@ public void await() throws Exception { errMsg = "unfinished replicas [BackendId=TabletId]: " + Joiner.on(", ").join(subList); } LOG.warn(errMsg); - checkAndUpdateQuorum(); - switch (state) { - case UN_QUORUM: - LOG.warn("delete job timeout: transactionId {}, timeout {}, {}", - transactionId, timeoutMs, errMsg); - throw new UserException(String.format("delete job timeout, timeout(ms):%s, msg:%s", timeoutMs, errMsg)); - case QUORUM_FINISHED: - case FINISHED: - long nowQuorumTimeMs = System.currentTimeMillis(); - long endQuorumTimeoutMs = nowQuorumTimeMs + timeoutMs / 2; - // if job's state is quorum_finished then wait for a period of time and commit it. - while (state == DeleteState.QUORUM_FINISHED - && endQuorumTimeoutMs > nowQuorumTimeMs) { - checkAndUpdateQuorum(); - Thread.sleep(1000); - nowQuorumTimeMs = System.currentTimeMillis(); - if (LOG.isDebugEnabled()) { - LOG.debug("wait for quorum finished delete job: {}, txn id: {}", - id, transactionId); - } - } - break; - default: - throw new IllegalStateException("wrong delete job state: " + state.name()); - } } protected List generateTabletCommitInfos() { @@ -579,12 +559,11 @@ public DeleteJob buildWithNereids(BuildParams params) { DeleteJob deleteJob = ConnectContext.get() != null && ConnectContext.get().isTxnModel() ? new TxnDeleteJob(jobId, -1, label, partitionReplicaNum, deleteInfo) : new DeleteJob(jobId, -1, label, partitionReplicaNum, deleteInfo); - long replicaNum = partitions.stream().mapToLong(Partition::getAllReplicaCount).sum(); deleteJob.setPartitions(partitions); deleteJob.setDeleteConditions(params.getDeleteConditions()); deleteJob.setTargetDb(params.getDb()); deleteJob.setTargetTbl(params.getTable()); - deleteJob.setCountDownLatch(new MarkedCountDownLatch<>((int) replicaNum)); + deleteJob.setCountDownLatch(new MarkedCountDownLatch<>()); ConnectContext connectContext = ConnectContext.get(); if (connectContext != null) { deleteJob.setTimeoutS(connectContext.getExecTimeoutS()); @@ -616,12 +595,11 @@ public DeleteJob buildWith(BuildParams params) throws Exception { DeleteJob deleteJob = ConnectContext.get() != null && ConnectContext.get().isTxnModel() ? new TxnDeleteJob(jobId, -1, label, partitionReplicaNum, deleteInfo) : new DeleteJob(jobId, -1, label, partitionReplicaNum, deleteInfo); - long replicaNum = partitions.stream().mapToLong(Partition::getAllReplicaCount).sum(); deleteJob.setPartitions(partitions); deleteJob.setDeleteConditions(params.getDeleteConditions()); deleteJob.setTargetDb(params.getDb()); deleteJob.setTargetTbl(params.getTable()); - deleteJob.setCountDownLatch(new MarkedCountDownLatch<>((int) replicaNum)); + deleteJob.setCountDownLatch(new MarkedCountDownLatch<>()); ConnectContext connectContext = ConnectContext.get(); if (connectContext != null) { deleteJob.setTimeoutS(connectContext.getExecTimeoutS()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AdminCopyTabletCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AdminCopyTabletCommand.java index 45f1d138e9eff5..c3f467a2822c80 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AdminCopyTabletCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AdminCopyTabletCommand.java @@ -193,7 +193,7 @@ private ShowResultSet handleCopyTablet() throws AnalysisException { tabletMeta.getTableId(), tabletMeta.getPartitionId(), tabletMeta.getIndexId(), tabletId, version, 0, getExpirationMinutes() * 60 * 1000, false); task.setIsCopyTabletTask(true); - MarkedCountDownLatch countDownLatch = new MarkedCountDownLatch(1); + MarkedCountDownLatch countDownLatch = new MarkedCountDownLatch(); countDownLatch.addMark(backendId, tabletId); task.setCountDownLatch(countDownLatch); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index f6c929b6526544..95ca79eaa32670 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -891,7 +891,7 @@ protected void sendPipelineCtx() throws Exception { } // end for fragments // Init the mark done in order to track the finished state of the query - fragmentsDoneLatch = new MarkedCountDownLatch<>(backendFragments.size()); + fragmentsDoneLatch = new MarkedCountDownLatch<>(); for (Pair pair : backendFragments) { fragmentsDoneLatch.addMark(pair.first.asInt(), pair.second); } @@ -1365,7 +1365,7 @@ protected void computeFragmentExecParams() throws Exception { } // Init instancesDoneLatch, it will be used to track if the instances has finished for insert stmt - instancesDoneLatch = new MarkedCountDownLatch<>(instanceIds.size()); + instancesDoneLatch = new MarkedCountDownLatch<>(); for (TUniqueId instanceId : instanceIds) { instancesDoneLatch.addMark(instanceId, -1L /* value is meaningless */); } @@ -2556,9 +2556,9 @@ private boolean checkBackendState() { public boolean isDone() { if (fragmentsDoneLatch != null) { - return fragmentsDoneLatch.getCount() == 0; + return fragmentsDoneLatch.getMarkCount() == 0; } else { - return instancesDoneLatch.getCount() == 0; + return instancesDoneLatch.getMarkCount() == 0; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/LoadProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/LoadProcessor.java index 39442516dcc871..4eddc449bf17d4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/LoadProcessor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/LoadProcessor.java @@ -85,7 +85,7 @@ public LoadProcessor(CoordinatorContext coordinatorContext, long jobId) { @Override protected void afterSetPipelineExecutionTask(PipelineExecutionTask pipelineExecutionTask) { Map backendFragmentTasks = this.backendFragmentTasks.get(); - MarkedCountDownLatch latch = new MarkedCountDownLatch<>(backendFragmentTasks.size()); + MarkedCountDownLatch latch = new MarkedCountDownLatch<>(); for (BackendFragmentId backendFragmentId : backendFragmentTasks.keySet()) { latch.addMark(backendFragmentId.fragmentId, backendFragmentId.backendId); } @@ -107,7 +107,7 @@ protected void afterSetPipelineExecutionTask(PipelineExecutionTask pipelineExecu this.topFragmentTasks = topFragmentTasks; // only wait top fragments - MarkedCountDownLatch topFragmentLatch = new MarkedCountDownLatch<>(topFragmentTasks.size()); + MarkedCountDownLatch topFragmentLatch = new MarkedCountDownLatch<>(); for (SingleFragmentPipelineTask topFragmentTask : topFragmentTasks) { topFragmentLatch.addMark(topFragmentTask.getFragmentId(), topFragmentTask.getBackend().getId()); } @@ -125,7 +125,7 @@ public void cancel(Status cancelReason) { } public boolean isDone() { - return latch.map(l -> l.getCount() == 0).orElse(false); + return latch.map(l -> l.getMarkCount() == 0).orElse(false); } public boolean join(int timeoutS) { @@ -243,7 +243,7 @@ protected void doProcessReportExecStatus(TReportExecStatusParams params, SingleF if (topFragmentId == params.getFragmentId()) { MarkedCountDownLatch topFragmentLatch = this.topFragmentLatch.get(); topFragmentLatch.markedCountDown(params.getFragmentId(), params.getBackendId()); - if (topFragmentLatch.getCount() == 0) { + if (topFragmentLatch.getMarkCount() == 0) { tryFinishSchedule(); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/CalcDeleteBitmapTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/CalcDeleteBitmapTask.java index 3d2feec11deb3f..6e2fffff967b5b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/CalcDeleteBitmapTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/CalcDeleteBitmapTask.java @@ -54,7 +54,7 @@ public void countDownLatch(long backendId, long transactionId) { if (latch.markedCountDown(backendId, transactionId)) { if (LOG.isDebugEnabled()) { LOG.debug("CalcDeleteBitmapTask current latch count: {}, backend: {}, transactionId:{}", - latch.getCount(), backendId, transactionId); + latch.getMarkCount(), backendId, transactionId); } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/CreateReplicaTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/CreateReplicaTask.java index c049b997aea74d..f0fffe2558eed3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/CreateReplicaTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/CreateReplicaTask.java @@ -233,7 +233,7 @@ public void countDownLatch(long backendId, long tabletId) { if (latch.markedCountDown(backendId, tabletId)) { if (LOG.isDebugEnabled()) { LOG.debug("CreateReplicaTask current latch count: {}, backend: {}, tablet:{}", - latch.getCount(), backendId, tabletId); + latch.getMarkCount(), backendId, tabletId); } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/PushTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/PushTask.java index cb06f4d27f6df0..8c0b34a34698a7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/PushTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/PushTask.java @@ -205,8 +205,8 @@ public void countDownLatch(long backendId, long tabletId) { if (this.latch != null) { if (latch.markedCountDown(backendId, tabletId)) { if (LOG.isDebugEnabled()) { - LOG.debug("pushTask current latch count: {}. backend: {}, tablet:{}", latch.getCount(), backendId, - tabletId); + LOG.debug("pushTask current latch count: {}. backend: {}, tablet:{}", latch.getMarkCount(), + backendId, tabletId); } } } @@ -219,7 +219,7 @@ public void countDownLatchWithStatus(long backendId, long tabletId, Status st) { if (latch.markedCountDownWithStatus(backendId, tabletId, st)) { if (LOG.isDebugEnabled()) { LOG.debug("pushTask current latch count with status: {}. backend: {}, tablet:{}, st::{}", - latch.getCount(), backendId, tabletId, st); + latch.getMarkCount(), backendId, tabletId, st); } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/UpdateTabletMetaInfoTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/UpdateTabletMetaInfoTask.java index 7d4c6a3d022cda..8e377a1b56c08f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/UpdateTabletMetaInfoTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/UpdateTabletMetaInfoTask.java @@ -103,7 +103,7 @@ public void countDownLatch(long backendId, Set> tablets) { if (latch.markedCountDown(backendId, tablets)) { if (LOG.isDebugEnabled()) { LOG.debug("UpdateTabletMetaInfoTask current latch count: {}, backend: {}, tablets:{}", - latch.getCount(), backendId, tablets); + latch.getMarkCount(), backendId, tablets); } } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/common/MarkedCountDownLatchTest.java b/fe/fe-core/src/test/java/org/apache/doris/common/MarkedCountDownLatchTest.java new file mode 100644 index 00000000000000..d6308da8889a33 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/common/MarkedCountDownLatchTest.java @@ -0,0 +1,196 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.concurrent.TimeUnit; + + +public class MarkedCountDownLatchTest { + + + @Test + @SuppressWarnings("unchecked") + public void testNormal() throws Exception { + + final MarkedCountDownLatch markedCountDownLatch = new MarkedCountDownLatch(); + markedCountDownLatch.addMark("k1", "v1"); + markedCountDownLatch.addMark("k2", "v2"); + markedCountDownLatch.addMark("k3", "v3"); + Assert.assertEquals(3, markedCountDownLatch.getMarkCount()); + + Thread t = new Thread(() -> { + int i = 1; + while (i < 4) { + try { + markedCountDownLatch.markedCountDown("k" + i, "v" + i); + } catch (Exception e) { + if (e instanceof IllegalStateException) { + continue; + } + throw e; + } + Assert.assertEquals(3 - i, markedCountDownLatch.getMarkCount()); + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + i++; + } + }); + t.start(); + + long startTime = System.currentTimeMillis(); + markedCountDownLatch.await(10, TimeUnit.SECONDS); + long endTime = System.currentTimeMillis(); + Assert.assertEquals(0, markedCountDownLatch.getMarkCount()); + Assert.assertTrue(endTime - startTime < 10 * 1000L); + } + + @Test + @SuppressWarnings("unchecked") + public void testCountWithStatus() throws Exception { + final MarkedCountDownLatch markedCountDownLatch = new MarkedCountDownLatch(); + markedCountDownLatch.addMark("k1", "v1"); + markedCountDownLatch.addMark("k2", "v2"); + markedCountDownLatch.addMark("k3", "v3"); + Assert.assertEquals(3, markedCountDownLatch.getMarkCount()); + + Thread t = new Thread(() -> { + int i = 1; + while (i < 4) { + try { + markedCountDownLatch.markedCountDownWithStatus("k" + i, "v" + i, Status.FINISHED); + } catch (Exception e) { + if (e instanceof IllegalStateException) { + continue; + } + throw e; + } + Assert.assertEquals(3 - i, markedCountDownLatch.getMarkCount()); + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + i++; + } + }); + t.start(); + + long startTime = System.currentTimeMillis(); + markedCountDownLatch.await(10, TimeUnit.SECONDS); + long endTime = System.currentTimeMillis(); + Assert.assertEquals(0, markedCountDownLatch.getMarkCount()); + Assert.assertTrue(endTime - startTime < 10 * 1000L); + Assert.assertEquals(markedCountDownLatch.getStatus(), Status.FINISHED); + } + + @Test + @SuppressWarnings("unchecked") + public void testTimeout() throws Exception { + + final MarkedCountDownLatch markedCountDownLatch = new MarkedCountDownLatch(); + markedCountDownLatch.addMark("k1", "v1"); + markedCountDownLatch.addMark("k2", "v2"); + markedCountDownLatch.addMark("k3", "v3"); + Assert.assertEquals(3, markedCountDownLatch.getMarkCount()); + + Thread t = new Thread(() -> { + int i = 1; + while (i < 3) { + try { + markedCountDownLatch.markedCountDown("k" + i, "v" + i); + } catch (Exception e) { + if (e instanceof IllegalStateException) { + continue; + } + throw e; + } + Assert.assertEquals( 3 - i, markedCountDownLatch.getMarkCount()); + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + i++; + } + }); + t.start(); + + long startTime = System.currentTimeMillis(); + markedCountDownLatch.await(10, TimeUnit.SECONDS); + long endTime = System.currentTimeMillis(); + Assert.assertEquals(1, markedCountDownLatch.getMarkCount()); + Assert.assertTrue(endTime - startTime >= 10 * 1000L); + } + + @Test(expected = IllegalStateException.class) + @SuppressWarnings("unchecked") + public void testNotWaitException() { + + final MarkedCountDownLatch markedCountDownLatch = new MarkedCountDownLatch(); + markedCountDownLatch.addMark("k1", "v1"); + markedCountDownLatch.addMark("k2", "v2"); + markedCountDownLatch.addMark("k3", "v3"); + Assert.assertEquals(3, markedCountDownLatch.getMarkCount()); + + //call markedCountDown before wait method will throw IllegalStateException + markedCountDownLatch.markedCountDown("k1", "v1"); + } + + + @Test + @SuppressWarnings("unchecked") + public void testCountDownToZero() throws Exception { + final MarkedCountDownLatch markedCountDownLatch = new MarkedCountDownLatch(); + markedCountDownLatch.addMark("k1", "v1"); + markedCountDownLatch.addMark("k2", "v2"); + markedCountDownLatch.addMark("k3", "v3"); + Assert.assertEquals(3, markedCountDownLatch.getMarkCount()); + + Thread t = new Thread(() -> { + while (true) { + try { + markedCountDownLatch.countDownToZero(Status.CANCELLED); + } catch (Exception e) { + if (e instanceof IllegalStateException) { + continue; + } + throw e; + } + Assert.assertEquals(0, markedCountDownLatch.getMarkCount()); + break; + } + + }); + t.start(); + + long startTime = System.currentTimeMillis(); + markedCountDownLatch.await(10, TimeUnit.SECONDS); + long endTime = System.currentTimeMillis(); + Assert.assertEquals(0, markedCountDownLatch.getMarkCount()); + Assert.assertTrue(endTime - startTime < 10 * 1000L); + Assert.assertEquals(markedCountDownLatch.getStatus(), Status.CANCELLED); + } + + +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java b/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java index 799e665039bd31..215d691b430757 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java @@ -81,7 +81,7 @@ public class AgentTaskTest { private long storageDictPageSize = 262144L; private List columns; - private MarkedCountDownLatch latch = new MarkedCountDownLatch(3); + private MarkedCountDownLatch latch = new MarkedCountDownLatch(); private Range range1; private Range range2; @@ -93,8 +93,9 @@ public class AgentTaskTest { private AgentTask storageMediaMigrationTask; @Before - public void setUp() throws AnalysisException { + public void setUp() throws AnalysisException, InterruptedException { MetricRepo.init(); + agentBatchTask = new AgentBatchTask(); columns = new LinkedList(); @@ -112,6 +113,7 @@ public void setUp() throws AnalysisException { // create tasks Map objectPool = new HashMap(); // create + createReplicaTask = new CreateReplicaTask(backendId1, dbId, tableId, partitionId, indexId1, tabletId1, replicaId1, shortKeyNum, schemaHash1, version, KeysType.AGG_KEYS, storageType, TStorageMedium.SSD, columns, null, 0, latch, null, false, TTabletType.TABLET_TYPE_DISK, null,