Skip to content

Commit 4f4b360

Browse files
authored
[IOTDB-2780] Config node ratis consensus protocol implementation (#5347)
1 parent 6349cc5 commit 4f4b360

File tree

22 files changed

+455
-109
lines changed

22 files changed

+455
-109
lines changed

confignode/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,11 @@
5555
<artifactId>iotdb-consensus</artifactId>
5656
<version>${project.version}</version>
5757
</dependency>
58+
<dependency>
59+
<groupId>io.dropwizard.metrics</groupId>
60+
<artifactId>metrics-core</artifactId>
61+
<version>3.2.5</version>
62+
</dependency>
5863
<dependency>
5964
<groupId>junit</groupId>
6065
<artifactId>junit</artifactId>

confignode/src/assembly/resources/conf/iotdb-confignode.properties

Lines changed: 35 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -33,11 +33,7 @@ config_node_rpc_port=22277
3333
# Datatype: int
3434
# config_node_internal_port=22278
3535

36-
# used for building the ConfigNode consensus group
37-
# all config node address and internal port
38-
# every node should have the same config_node_address_lists
39-
# Datatype: String
40-
# config_node_address_lists=host0:22278,host1:22278,host2:22278
36+
# this feature is under development, set this as false before it is done.
4137
# Datatype: boolean
4238
# rpc_thrift_compression_enable=false
4339

@@ -57,6 +53,23 @@ config_node_rpc_port=22277
5753
# Datatype: int
5854
# thrift_init_buffer_size=1024
5955

56+
####################
57+
### consensus protocol configuration
58+
####################
59+
60+
# ConfigNodeGroup consensus protocol type
61+
# These consensus protocol are currently supported:
62+
# 1. standalone(No protocol, only supports stand-alone machine)
63+
# 2. ratis(Raft protocol)
64+
# Datatype: String
65+
# consensus_type=standalone
66+
67+
# Used for building the ConfigNode consensus group
68+
# all config node address and internal port, use comma to distinguish
69+
# every node should have the same config_node_address_lists
70+
# Datatype: String
71+
# config_node_group_address_list=0.0.0.0:22278,0.0.0.0:22280,0.0.0.0:22282
72+
6073
####################
6174
### DeviceGroup Configuration
6275
####################
@@ -81,9 +94,9 @@ config_node_rpc_port=22277
8194
####################
8295

8396
# system dir
84-
# If this property is unset, system will save the data in the default relative path directory under the IoTDB folder(i.e., %IOTDB_HOME%/data/system).
97+
# If this property is unset, system will save the data in the default relative path directory under the confignode folder(i.e., %CONFIGNODE_HOME%/data/system).
8598
# If it is absolute, system will save the data in exact location it points to.
86-
# If it is relative, system will save the data in the relative path directory it indicates under the IoTDB folder.
99+
# If it is relative, system will save the data in the relative path directory it indicates under the confignode folder.
87100
# For windows platform
88101
# If its prefix is a drive specifier followed by "\\", or if its prefix is "\\\\", then the path is absolute. Otherwise, it is relative.
89102
# system_dir=data\\system
@@ -93,9 +106,9 @@ config_node_rpc_port=22277
93106

94107

95108
# data dirs
96-
# If this property is unset, system will save the data in the default relative path directory under the IoTDB folder(i.e., %IOTDB_HOME%/data/data).
109+
# If this property is unset, system will save the data in the default relative path directory under the confignode folder(i.e., %CONFIGNODE_HOME%/data/data).
97110
# If it is absolute, system will save the data in exact location it points to.
98-
# If it is relative, system will save the data in the relative path directory it indicates under the IoTDB folder.
111+
# If it is relative, system will save the data in the relative path directory it indicates under the confignode folder.
99112
# Note: If data_dir is assigned an empty string(i.e.,zero-size), it will be handled as a relative path.
100113
# For windows platform
101114
# If its prefix is a drive specifier followed by "\\", or if its prefix is "\\\\", then the path is absolute. Otherwise, it is relative.
@@ -104,6 +117,19 @@ config_node_rpc_port=22277
104117
# If its prefix is "/", then the path is absolute. Otherwise, it is relative.
105118
# data_dirs=data/data
106119

120+
121+
# consensus dir
122+
# If this property is unset, system will save the data in the default relative path directory under the confignode folder(i.e., %CONFIGNODE_HOME%/data/consensus).
123+
# If it is absolute, system will save the data in exact location it points to.
124+
# If it is relative, system will save the data in the relative path directory it indicates under the confignode folder.
125+
# Note: If data_dir is assigned an empty string(i.e.,zero-size), it will be handled as a relative path.
126+
# For windows platform
127+
# If its prefix is a drive specifier followed by "\\", or if its prefix is "\\\\", then the path is absolute. Otherwise, it is relative.
128+
# consensus_dir=data\\consensus
129+
# For Linux platform
130+
# If its prefix is "/", then the path is absolute. Otherwise, it is relative.
131+
# consensus_dir=data/consensus
132+
107133
####################
108134
### Region Configuration
109135
####################

confignode/src/assembly/resources/conf/logback.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,6 @@
135135
<appender-ref ref="FILEALL"/>
136136
<appender-ref ref="stdout"/>
137137
</root>
138-
<logger level="info" name="org.apache.iotdb.confignode.service"/>
139-
<logger level="info" name="org.apache.iotdb.confignode.conf"/>
138+
<logger level="info" name="org.apache.iotdb.confignode"/>
139+
<!-- <logger level="info" name="org.apache.ratis"/> -->
140140
</configuration>

confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConf.java

Lines changed: 32 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
package org.apache.iotdb.confignode.conf;
2020

2121
import org.apache.iotdb.commons.conf.IoTDBConstant;
22+
import org.apache.iotdb.confignode.consensus.ConsensusType;
23+
import org.apache.iotdb.consensus.common.Endpoint;
2224
import org.apache.iotdb.rpc.RpcUtils;
2325

2426
import java.io.File;
@@ -34,8 +36,11 @@ public class ConfigNodeConf {
3436
/** used for communication between data node and data node */
3537
private int internalPort = 22278;
3638

37-
/** every node should have the same config_node_address_lists */
38-
private String addressLists;
39+
/** ConfigNodeGroup consensus protocol */
40+
private ConsensusType consensusType = ConsensusType.STANDALONE;
41+
42+
/** Used for building the ConfigNode consensus group */
43+
private Endpoint[] configNodeGroupAddressList = null;
3944

4045
/** Number of DeviceGroups per StorageGroup */
4146
private int deviceGroupCount = 10000;
@@ -70,6 +75,10 @@ public class ConfigNodeConf {
7075
ConfigNodeConstant.DATA_DIR + File.separator + ConfigNodeConstant.DATA_DIR
7176
};
7277

78+
/** Consensus directory, storage consensus protocol logs */
79+
private String consensusDir =
80+
ConfigNodeConstant.DATA_DIR + File.separator + ConfigNodeConstant.CONSENSUS_FOLDER;
81+
7382
private int regionReplicaCount = 3;
7483
private int schemaRegionCount = 1;
7584
private int dataRegionCount = 1;
@@ -87,6 +96,7 @@ private void formulateFolders() {
8796
for (int i = 0; i < dataDirs.length; i++) {
8897
dataDirs[i] = addHomeDir(dataDirs[i]);
8998
}
99+
consensusDir = addHomeDir(consensusDir);
90100
}
91101

92102
private String addHomeDir(String dir) {
@@ -181,12 +191,28 @@ public void setInternalPort(int internalPort) {
181191
this.internalPort = internalPort;
182192
}
183193

184-
public String getAddressLists() {
185-
return addressLists;
194+
public String getConsensusDir() {
195+
return consensusDir;
196+
}
197+
198+
public void setConsensusDir(String consensusDir) {
199+
this.consensusDir = consensusDir;
200+
}
201+
202+
public ConsensusType getConsensusType() {
203+
return consensusType;
204+
}
205+
206+
public void setConsensusType(ConsensusType consensusType) {
207+
this.consensusType = consensusType;
208+
}
209+
210+
public Endpoint[] getConfigNodeGroupAddressList() {
211+
return configNodeGroupAddressList;
186212
}
187213

188-
public void setAddressLists(String addressLists) {
189-
this.addressLists = addressLists;
214+
public void setConfigNodeGroupAddressList(Endpoint[] configNodeGroupAddressList) {
215+
this.configNodeGroupAddressList = configNodeGroupAddressList;
190216
}
191217

192218
public int getThriftServerAwaitTimeForStopService() {

confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConstant.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ public class ConfigNodeConstant {
4141

4242
public static final String DATA_DIR = "data";
4343
public static final String CONF_DIR = "conf";
44+
public static final String CONSENSUS_FOLDER = "consensus";
4445

4546
public static final int MIN_SUPPORTED_JDK_VERSION = 8;
4647

confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java

Lines changed: 30 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@
1818
*/
1919
package org.apache.iotdb.confignode.conf;
2020

21+
import org.apache.iotdb.confignode.consensus.ConsensusType;
22+
import org.apache.iotdb.consensus.common.Endpoint;
23+
2124
import org.slf4j.Logger;
2225
import org.slf4j.LoggerFactory;
2326

@@ -49,7 +52,7 @@ public ConfigNodeConf getConf() {
4952
public URL getPropsUrl() {
5053
// Check if a config-directory was specified first.
5154
String urlString = System.getProperty(ConfigNodeConstant.CONFIGNODE_CONF, null);
52-
// If it wasn't, check if a home directory was provided (This usually contains a config)
55+
// If it wasn't, check if a home directory was provided
5356
if (urlString == null) {
5457
urlString = System.getProperty(ConfigNodeConstant.CONFIGNODE_HOME, null);
5558
if (urlString != null) {
@@ -60,15 +63,9 @@ public URL getPropsUrl() {
6063
+ File.separatorChar
6164
+ ConfigNodeConstant.CONF_NAME;
6265
} else {
63-
// If this too wasn't provided, try to find a default config in the root of the classpath.
64-
URL uri = ConfigNodeConf.class.getResource("/" + ConfigNodeConstant.CONF_NAME);
65-
if (uri != null) {
66-
return uri;
67-
}
68-
LOGGER.warn(
69-
"Cannot find IOTDB_HOME or IOTDB_CONF environment variable when loading "
70-
+ "config file {}, use default configuration",
71-
ConfigNodeConstant.CONF_NAME);
66+
// When start ConfigNode with the script, the environment variables CONFIGNODE_CONF
67+
// and CONFIGNODE_HOME will be set. But we didn't set these two in developer mode.
68+
// Thus, just return null and use default Configuration in developer mode.
7269
return null;
7370
}
7471
}
@@ -125,8 +122,9 @@ private void loadProps() {
125122
properties.getProperty(
126123
"config_node_internal_port", String.valueOf(conf.getInternalPort()))));
127124

128-
conf.setAddressLists(
129-
properties.getProperty("config_node_address_lists", conf.getAddressLists()));
125+
conf.setConsensusType(
126+
ConsensusType.getConsensusType(
127+
properties.getProperty("consensus_type", String.valueOf(conf.getConsensusType()))));
130128

131129
conf.setRpcAdvancedCompressionEnable(
132130
Boolean.parseBoolean(
@@ -160,6 +158,8 @@ private void loadProps() {
160158

161159
conf.setDataDirs(properties.getProperty("data_dirs", conf.getDataDirs()[0]).split(","));
162160

161+
conf.setConsensusDir(properties.getProperty("consensus_dir", conf.getConsensusDir()));
162+
163163
conf.setRegionReplicaCount(
164164
Integer.parseInt(
165165
properties.getProperty(
@@ -175,6 +175,24 @@ private void loadProps() {
175175
properties.getProperty(
176176
"data_region_count", String.valueOf(conf.getDataRegionCount()))));
177177

178+
String addresses = properties.getProperty("config_node_group_address_list", null);
179+
if (addresses != null) {
180+
String[] addressList = addresses.split(",");
181+
Endpoint[] endpointList = new Endpoint[addressList.length];
182+
for (int i = 0; i < addressList.length; i++) {
183+
String[] ipPort = addressList[i].split(":");
184+
if (ipPort.length != 2) {
185+
throw new IOException(
186+
String.format(
187+
"Parsing parameter config_node_group_address_list error. "
188+
+ "The %d-th address must format to ip:port, but currently is %s",
189+
i, addressList[i]));
190+
}
191+
endpointList[i] = new Endpoint(ipPort[0], Integer.parseInt(ipPort[1]));
192+
}
193+
conf.setConfigNodeGroupAddressList(endpointList);
194+
}
195+
178196
} catch (IOException e) {
179197
LOGGER.warn("Couldn't load ConfigNode conf file, use default config", e);
180198
} finally {
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.iotdb.confignode.consensus;
20+
21+
import java.util.Arrays;
22+
23+
public enum ConsensusType {
24+
STANDALONE("standalone"),
25+
RATIS("ratis");
26+
27+
private final String typeName;
28+
29+
ConsensusType(String typeName) {
30+
this.typeName = typeName;
31+
}
32+
33+
public String getTypeName() {
34+
return typeName;
35+
}
36+
37+
@Override
38+
public String toString() {
39+
return typeName;
40+
}
41+
42+
public static ConsensusType getConsensusType(String typeName) {
43+
for (ConsensusType type : ConsensusType.values()) {
44+
if (type.getTypeName().equals(typeName)) {
45+
return type;
46+
}
47+
}
48+
49+
throw new IllegalArgumentException(
50+
String.format(
51+
"Unknown consensus type, found: %s expected: %s",
52+
typeName, Arrays.toString(ConsensusType.values())));
53+
}
54+
}

confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,11 @@ public class PartitionRegionStateMachine implements IStateMachine {
3838

3939
private static final Logger LOGGER = LoggerFactory.getLogger(PartitionRegionStateMachine.class);
4040

41-
private final PlanExecutor executor = new PlanExecutor();
41+
private final PlanExecutor executor;
42+
43+
public PartitionRegionStateMachine() {
44+
this.executor = new PlanExecutor();
45+
}
4246

4347
@Override
4448
public TSStatus write(IConsensusRequest request) {
@@ -74,7 +78,14 @@ protected TSStatus write(PhysicalPlan plan) {
7478
@Override
7579
public DataSet read(IConsensusRequest request) {
7680
PhysicalPlan plan;
77-
if (request instanceof PhysicalPlan) {
81+
if (request instanceof ByteBufferConsensusRequest) {
82+
try {
83+
plan = PhysicalPlan.Factory.create(((ByteBufferConsensusRequest) request).getContent());
84+
} catch (IOException e) {
85+
LOGGER.error("Deserialization error for write plan : {}", request);
86+
return null;
87+
}
88+
} else if (request instanceof PhysicalPlan) {
7889
plan = (PhysicalPlan) request;
7990
} else {
8091
LOGGER.error("Unexpected read plan : {}", request);

0 commit comments

Comments
 (0)