Skip to content

Commit ee9d7d1

Browse files
authored
Fix concurrent modification of non-thread-safe data structures caused by parallel dispatching
1 parent 2113b5a commit ee9d7d1

File tree

2 files changed

+12
-7
lines changed

2 files changed

+12
-7
lines changed

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,9 @@
3434

3535
import java.time.ZoneId;
3636
import java.util.HashSet;
37-
import java.util.LinkedList;
38-
import java.util.List;
3937
import java.util.Optional;
4038
import java.util.Set;
39+
import java.util.concurrent.ConcurrentHashMap;
4140

4241
/**
4342
* This class is used to record the context of a query including QueryId, query statement, session
@@ -66,7 +65,8 @@ public class MPPQueryContext {
6665
// When some DataNode cannot be connected, its endPoint will be put
6766
// in this list. And the following retry will avoid planning fragment
6867
// onto this node.
69-
private final List<TEndPoint> endPointBlackList;
68+
// When dispatch FI fails, this structure may be modified concurrently
69+
private final Set<TEndPoint> endPointBlackList;
7070

7171
private final TypeProvider typeProvider = new TypeProvider();
7272

@@ -93,7 +93,7 @@ public class MPPQueryContext {
9393

9494
public MPPQueryContext(QueryId queryId) {
9595
this.queryId = queryId;
96-
this.endPointBlackList = new LinkedList<>();
96+
this.endPointBlackList = ConcurrentHashMap.newKeySet();
9797
this.memoryReservationManager =
9898
new NotThreadSafeMemoryReservationManager(queryId, this.getClass().getName());
9999
}
@@ -194,7 +194,7 @@ public void addFailedEndPoint(TEndPoint endPoint) {
194194
this.endPointBlackList.add(endPoint);
195195
}
196196

197-
public List<TEndPoint> getEndPointBlackList() {
197+
public Set<TEndPoint> getEndPointBlackList() {
198198
return endPointBlackList;
199199
}
200200

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -151,12 +151,17 @@ private Future<FragInstanceDispatchResult> topologicalParallelDispatchRead(
151151
instances.get(next.getPlanFragment().getIndexInFragmentInstanceList());
152152
futures.add(asyncDispatchOneInstance(next, fragmentInstance, queue));
153153
}
154+
FragInstanceDispatchResult failedResult = null;
154155
for (Future<FragInstanceDispatchResult> future : futures) {
156+
// Make sure all executing tasks are finished to avoid concurrency issues
155157
FragInstanceDispatchResult result = future.get();
156-
if (!result.isSuccessful()) {
157-
return immediateFuture(result);
158+
if (!result.isSuccessful() && failedResult == null) {
159+
failedResult = result;
158160
}
159161
}
162+
if (failedResult != null) {
163+
return immediateFuture(failedResult);
164+
}
160165
} catch (InterruptedException e) {
161166
Thread.currentThread().interrupt();
162167
LOGGER.warn("Interrupted when dispatching read async", e);

0 commit comments

Comments
 (0)