Skip to content

Commit c396fd8

Browse files
authored
Weak Read Consistency Level Refactor & Print InternalAddress and port in explain analyze
1 parent 42b8ef5 commit c396fd8

File tree

4 files changed

+46
-14
lines changed

4 files changed

+46
-14
lines changed

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.iotdb.commons.client.property.ClientPoolProperty.DefaultProperty;
2525
import org.apache.iotdb.commons.conf.CommonDescriptor;
2626
import org.apache.iotdb.commons.conf.IoTDBConstant;
27+
import org.apache.iotdb.commons.enums.ReadConsistencyLevel;
2728
import org.apache.iotdb.commons.utils.FileUtils;
2829
import org.apache.iotdb.consensus.ConsensusFactory;
2930
import org.apache.iotdb.db.audit.AuditLogOperation;
@@ -975,7 +976,7 @@ public class IoTDBConfig {
975976
private long detailContainerMinDegradeMemoryInBytes = 1024 * 1024L;
976977
private int schemaThreadCount = 5;
977978

978-
private String readConsistencyLevel = "strong";
979+
private ReadConsistencyLevel readConsistencyLevel = ReadConsistencyLevel.STRONG;
979980

980981
/** Maximum execution time of a DriverTask */
981982
private int driverTaskExecutionTimeSliceInMs = 200;
@@ -3304,12 +3305,16 @@ public void setSchemaThreadCount(int schemaThreadCount) {
33043305
this.schemaThreadCount = schemaThreadCount;
33053306
}
33063307

3307-
public String getReadConsistencyLevel() {
3308+
public ReadConsistencyLevel getReadConsistencyLevel() {
33083309
return readConsistencyLevel;
33093310
}
33103311

33113312
public void setReadConsistencyLevel(String readConsistencyLevel) {
3312-
this.readConsistencyLevel = readConsistencyLevel;
3313+
if ("weak".equalsIgnoreCase(readConsistencyLevel)) {
3314+
this.readConsistencyLevel = ReadConsistencyLevel.WEAK;
3315+
} else {
3316+
this.readConsistencyLevel = ReadConsistencyLevel.STRONG;
3317+
}
33133318
}
33143319

33153320
public int getDriverTaskExecutionTimeSliceInMs() {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
package org.apache.iotdb.db.queryengine.execution.fragment;
2121

2222
import org.apache.iotdb.commons.utils.FileUtils;
23+
import org.apache.iotdb.db.conf.IoTDBConfig;
2324
import org.apache.iotdb.db.conf.IoTDBDescriptor;
2425
import org.apache.iotdb.db.queryengine.common.FragmentInstanceId;
2526
import org.apache.iotdb.db.queryengine.exception.CpuNotEnoughException;
@@ -53,6 +54,7 @@
5354
public class FragmentInstanceExecution {
5455

5556
private static final Logger LOGGER = LoggerFactory.getLogger(FragmentInstanceExecution.class);
57+
private static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig();
5658
private final FragmentInstanceId instanceId;
5759
private final FragmentInstanceContext context;
5860

@@ -152,7 +154,7 @@ private boolean fillFragmentInstanceStatistics(
152154
return false;
153155
}
154156
statistics.setDataRegion(((DataRegion) context.getDataRegion()).getDataRegionId());
155-
statistics.setIp(IoTDBDescriptor.getInstance().getConfig().getAddressAndPort().ip);
157+
statistics.setIp(CONFIG.getInternalAddress() + ":" + CONFIG.getInternalPort());
156158
statistics.setStartTimeInMS(context.getStartTime());
157159
statistics.setEndTimeInMS(
158160
context.isEndTimeUpdate() ? context.getEndTime() : System.currentTimeMillis());

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/AbstractFragmentParallelPlanner.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,11 @@
2222
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
2323
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
2424
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
25+
import org.apache.iotdb.commons.enums.ReadConsistencyLevel;
2526
import org.apache.iotdb.commons.exception.IoTDBRuntimeException;
2627
import org.apache.iotdb.commons.partition.QueryExecutor;
2728
import org.apache.iotdb.commons.partition.StorageExecutor;
29+
import org.apache.iotdb.db.conf.IoTDBConfig;
2830
import org.apache.iotdb.db.conf.IoTDBDescriptor;
2931
import org.apache.iotdb.db.queryengine.common.DataNodeEndPoints;
3032
import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
@@ -33,7 +35,6 @@
3335
import org.apache.iotdb.db.queryengine.plan.planner.exceptions.ReplicaSetUnreachableException;
3436
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
3537
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
36-
import org.apache.iotdb.db.queryengine.plan.relational.planner.distribute.TableModelQueryFragmentPlanner;
3738
import org.apache.iotdb.rpc.TSStatusCode;
3839

3940
import org.apache.commons.collections4.CollectionUtils;
@@ -45,24 +46,27 @@
4546
import java.util.LinkedList;
4647
import java.util.List;
4748
import java.util.Map;
48-
import java.util.function.Function;
4949
import java.util.function.Supplier;
50+
import java.util.function.UnaryOperator;
5051

5152
public abstract class AbstractFragmentParallelPlanner implements IFragmentParallelPlaner {
5253
private static final Logger LOGGER =
53-
LoggerFactory.getLogger(TableModelQueryFragmentPlanner.class);
54+
LoggerFactory.getLogger(AbstractFragmentParallelPlanner.class);
55+
private static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig();
56+
private final ReadConsistencyLevel readConsistencyLevel;
5457

55-
protected MPPQueryContext queryContext;
58+
protected final MPPQueryContext queryContext;
5659

5760
protected AbstractFragmentParallelPlanner(MPPQueryContext queryContext) {
5861
this.queryContext = queryContext;
62+
this.readConsistencyLevel = CONFIG.getReadConsistencyLevel();
5963
}
6064

6165
protected void selectExecutorAndHost(
6266
PlanFragment fragment,
6367
FragmentInstance fragmentInstance,
6468
Supplier<TRegionReplicaSet> replicaSetProvider,
65-
Function<TRegionReplicaSet, TRegionReplicaSet> validator,
69+
UnaryOperator<TRegionReplicaSet> validator,
6670
Map<TDataNodeLocation, List<FragmentInstance>> dataNodeFIMap) {
6771
// Get the target region for origin PlanFragment, then its instance will be distributed one
6872
// of them.
@@ -112,11 +116,7 @@ protected TDataNodeLocation selectTargetDataNode(TRegionReplicaSet regionReplica
112116
throw new IllegalArgumentException(
113117
String.format("regionReplicaSet is invalid: %s", regionReplicaSet));
114118
}
115-
String readConsistencyLevel =
116-
IoTDBDescriptor.getInstance().getConfig().getReadConsistencyLevel();
117-
// TODO: (Chen Rongzhao) need to make the values of ReadConsistencyLevel as static variable or
118-
// enums
119-
boolean selectRandomDataNode = "weak".equals(readConsistencyLevel);
119+
boolean selectRandomDataNode = ReadConsistencyLevel.WEAK == this.readConsistencyLevel;
120120

121121
// When planning fragment onto specific DataNode, the DataNode whose endPoint is in
122122
// black list won't be considered because it may have connection issue now.
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iotdb.commons.enums;
21+
22+
public enum ReadConsistencyLevel {
23+
STRONG,
24+
WEAK
25+
}

0 commit comments

Comments
 (0)