Skip to content

Commit b9e6efa

Browse files
committed
CDPD-76037. HDDS-11667. Validating DatanodeID on any request to the datanode (apache#7418)
Cherry pick: 9945de6 Conflict files: hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestECKeyOutputStream.java Change-Id: Ic00cd0704504dc4b4b56635ee00639b78ea074fc
1 parent f09dbac commit b9e6efa

File tree

8 files changed

+275
-26
lines changed

8 files changed

+275
-26
lines changed

hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -553,6 +553,20 @@ public boolean equals(Object obj) {
553553
uuid.equals(((DatanodeDetails) obj).uuid);
554554
}
555555

556+
557+
/**
558+
* Checks hostname, ipAddress and port of the 2 nodes are the same.
559+
* @param datanodeDetails dnDetails object to compare with.
560+
* @return true if the values match otherwise false.
561+
*/
562+
public boolean compareNodeValues(DatanodeDetails datanodeDetails) {
563+
if (this == datanodeDetails || super.equals(datanodeDetails)) {
564+
return true;
565+
}
566+
return Objects.equals(ipAddress, datanodeDetails.ipAddress)
567+
&& Objects.equals(hostName, datanodeDetails.hostName) && Objects.equals(ports, datanodeDetails.ports);
568+
}
569+
556570
@Override
557571
public int hashCode() {
558572
return uuid.hashCode();

hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -331,7 +331,11 @@ public List<DatanodeDetails> getNodesInOrder() {
331331
}
332332

333333
void reportDatanode(DatanodeDetails dn) throws IOException {
334-
if (nodeStatus.get(dn) == null) {
334+
//This is a workaround for the case a datanode restarted with reinitializing it's dnId but it still reports the
335+
// same set of pipelines it was part of. The pipeline report should be accepted for this anomalous condition.
336+
// We rely on StaleNodeHandler in closing this pipeline eventually.
337+
if (dn == null || (nodeStatus.get(dn) == null
338+
&& nodeStatus.keySet().stream().noneMatch(node -> node.compareNodeValues(dn)))) {
335339
throw new IOException(
336340
String.format("Datanode=%s not part of pipeline=%s", dn, id));
337341
}

hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.io.InputStream;
2323
import java.io.OutputStream;
2424

25+
import com.google.common.annotations.VisibleForTesting;
2526
import org.apache.hadoop.hdds.conf.ConfigurationSource;
2627
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
2728
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
@@ -93,7 +94,8 @@ public abstract StateMachine.DataChannel getStreamDataChannel(
9394
*
9495
* @return datanode Id
9596
*/
96-
protected String getDatanodeId() {
97+
@VisibleForTesting
98+
public String getDatanodeId() {
9799
return datanodeId;
98100
}
99101

hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@
5959
import org.apache.hadoop.hdds.utils.ResourceCache;
6060
import org.apache.hadoop.ozone.OzoneConfigKeys;
6161
import org.apache.hadoop.ozone.common.utils.BufferUtils;
62+
import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
6263
import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
6364
import org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
6465
import org.apache.hadoop.ozone.container.keyvalue.impl.KeyValueStreamDataChannel;
@@ -73,6 +74,7 @@
7374
import org.apache.ratis.proto.RaftProtos.StateMachineLogEntryProto;
7475
import org.apache.ratis.protocol.Message;
7576
import org.apache.ratis.protocol.RaftClientRequest;
77+
import org.apache.ratis.protocol.RaftGroup;
7678
import org.apache.ratis.protocol.RaftGroupId;
7779
import org.apache.ratis.protocol.RaftGroupMemberId;
7880
import org.apache.ratis.protocol.RaftPeerId;
@@ -179,6 +181,7 @@ synchronized void removeIfEmpty(long containerId) {
179181

180182
private final Semaphore applyTransactionSemaphore;
181183
private final boolean waitOnBothFollowers;
184+
private final AtomicBoolean peersValidated;
182185
/**
183186
* CSM metrics.
184187
*/
@@ -225,6 +228,7 @@ public ContainerStateMachine(RaftGroupId gid, ContainerDispatcher dispatcher,
225228
DFS_CONTAINER_RATIS_STATEMACHINE_MAX_PENDING_APPLY_TXNS_DEFAULT);
226229
applyTransactionSemaphore = new Semaphore(maxPendingApplyTransactions);
227230
stateMachineHealthy = new AtomicBoolean(true);
231+
this.peersValidated = new AtomicBoolean(false);
228232

229233
this.executor = Executors.newFixedThreadPool(numContainerOpExecutors,
230234
new ThreadFactoryBuilder()
@@ -236,6 +240,19 @@ public ContainerStateMachine(RaftGroupId gid, ContainerDispatcher dispatcher,
236240

237241
}
238242

243+
private void validatePeers() throws IOException {
244+
if (this.peersValidated.get()) {
245+
return;
246+
}
247+
final RaftGroup group = ratisServer.getServerDivision(getGroupId()).getGroup();
248+
final RaftPeerId selfId = ratisServer.getServer().getId();
249+
if (group.getPeer(selfId) == null) {
250+
throw new StorageContainerException("Current datanode " + selfId + " is not a member of " + group,
251+
ContainerProtos.Result.INVALID_CONFIG);
252+
}
253+
peersValidated.set(true);
254+
}
255+
239256
@Override
240257
public StateMachineStorage getStateMachineStorage() {
241258
return storage;
@@ -854,6 +871,11 @@ private CompletableFuture<ContainerCommandResponseProto> applyTransaction(
854871
final CheckedSupplier<ContainerCommandResponseProto, Exception> task
855872
= () -> {
856873
try {
874+
try {
875+
this.validatePeers();
876+
} catch (StorageContainerException e) {
877+
return ContainerUtils.logAndReturnError(LOG, e, request);
878+
}
857879
return dispatchCommand(request, context);
858880
} catch (Exception e) {
859881
exceptionHandler.accept(e);

hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@
100100
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.DELETE_ON_NON_EMPTY_CONTAINER;
101101
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.DELETE_ON_OPEN_CONTAINER;
102102
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.GET_SMALL_FILE_ERROR;
103+
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.INVALID_ARGUMENT;
103104
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.INVALID_CONTAINER_STATE;
104105
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.IO_EXCEPTION;
105106
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.PUT_SMALL_FILE_ERROR;
@@ -236,6 +237,15 @@ static ContainerCommandResponseProto dispatchRequest(KeyValueHandler handler,
236237
ContainerCommandRequestProto request, KeyValueContainer kvContainer,
237238
DispatcherContext dispatcherContext) {
238239
Type cmdType = request.getCmdType();
240+
// Validate the request has been made to the correct datanode with the node id matching.
241+
if (kvContainer != null) {
242+
try {
243+
handler.validateRequestDatanodeId(kvContainer.getContainerData().getReplicaIndex(),
244+
request.getDatanodeUuid());
245+
} catch (StorageContainerException e) {
246+
return ContainerUtils.logAndReturnError(LOG, e, request);
247+
}
248+
}
239249

240250
switch (cmdType) {
241251
case CreateContainer:
@@ -343,6 +353,13 @@ ContainerCommandResponseProto handleCreateContainer(
343353
" already exists", null, CONTAINER_ALREADY_EXISTS), request);
344354
}
345355

356+
try {
357+
this.validateRequestDatanodeId(request.getCreateContainer().hasReplicaIndex() ?
358+
request.getCreateContainer().getReplicaIndex() : null, request.getDatanodeUuid());
359+
} catch (StorageContainerException e) {
360+
return ContainerUtils.logAndReturnError(LOG, e, request);
361+
}
362+
346363
long containerID = request.getContainerID();
347364
State containerState = request.getCreateContainer().getState();
348365

@@ -1387,4 +1404,22 @@ public static Logger getLogger() {
13871404
return LOG;
13881405
}
13891406

1407+
1408+
/**
1409+
* Verify if request's replicaIndex matches with containerData. This validates only for EC containers i.e.
1410+
* containerReplicaIdx should be > 0.
1411+
*
1412+
* @param containerReplicaIdx replicaIndex for the container command.
1413+
* @param requestDatanodeUUID requested block info
1414+
* @throws StorageContainerException if replicaIndex mismatches.
1415+
*/
1416+
private boolean validateRequestDatanodeId(Integer containerReplicaIdx, String requestDatanodeUUID)
1417+
throws StorageContainerException {
1418+
if (containerReplicaIdx != null && containerReplicaIdx > 0 && !requestDatanodeUUID.equals(this.getDatanodeId())) {
1419+
throw new StorageContainerException(
1420+
String.format("Request is trying to write to node with uuid : %s but the current nodeId is: %s .",
1421+
requestDatanodeUUID, this.getDatanodeId()), INVALID_ARGUMENT);
1422+
}
1423+
return true;
1424+
}
13901425
}

hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,13 @@ public void testHandlerCommandHandling() throws Exception {
143143
.build();
144144

145145
KeyValueContainer container = Mockito.mock(KeyValueContainer.class);
146-
146+
KeyValueContainerData containerData = Mockito.mock(KeyValueContainerData.class);
147+
Mockito.when(container.getContainerData()).thenReturn(containerData);
148+
Mockito.when(containerData.getReplicaIndex()).thenReturn(1);
149+
ContainerProtos.ContainerCommandResponseProto responseProto = KeyValueHandler.dispatchRequest(handler,
150+
createContainerRequest, container, null);
151+
Assertions.assertEquals(ContainerProtos.Result.INVALID_ARGUMENT, responseProto.getResult());
152+
Mockito.when(handler.getDatanodeId()).thenReturn(DATANODE_UUID);
147153
KeyValueHandler
148154
.dispatchRequest(handler, createContainerRequest, container, null);
149155
Mockito.verify(handler, times(0)).handleListBlock(

hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java

Lines changed: 59 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import java.io.File;
2121
import java.io.IOException;
22+
import java.io.UncheckedIOException;
2223
import java.nio.charset.StandardCharsets;
2324
import java.nio.file.Path;
2425
import java.time.Duration;
@@ -32,13 +33,16 @@
3233
import java.util.concurrent.TimeUnit;
3334
import java.util.concurrent.atomic.AtomicInteger;
3435

36+
import org.apache.commons.io.FileUtils;
3537
import org.apache.commons.lang3.ArrayUtils;
3638
import org.apache.hadoop.fs.FileUtil;
3739
import org.apache.hadoop.hdds.HddsUtils;
40+
import org.apache.hadoop.hdds.client.ReplicationConfig;
3841
import org.apache.hadoop.hdds.client.ReplicationFactor;
3942
import org.apache.hadoop.hdds.client.ReplicationType;
4043
import org.apache.hadoop.hdds.conf.DatanodeRatisServerConfig;
4144
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
45+
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
4246
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
4347
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
4448
import org.apache.hadoop.hdds.ratis.conf.RatisClientConfig;
@@ -49,6 +53,7 @@
4953
import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
5054
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException;
5155
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
56+
import org.apache.hadoop.hdds.utils.HddsServerUtil;
5257
import org.apache.hadoop.hdds.utils.IOUtils;
5358
import org.apache.hadoop.ozone.HddsDatanodeService;
5459
import org.apache.hadoop.ozone.MiniOzoneCluster;
@@ -75,6 +80,7 @@
7580
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
7681
import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
7782
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
83+
import org.apache.ozone.test.GenericTestUtils;
7884
import org.apache.ozone.test.LambdaTestUtils;
7985
import org.apache.ozone.test.tag.Flaky;
8086

@@ -98,6 +104,7 @@
98104
import static org.junit.Assert.assertThat;
99105
import static org.junit.Assert.fail;
100106
import org.junit.jupiter.api.AfterAll;
107+
import org.junit.jupiter.api.Assertions;
101108
import org.junit.jupiter.api.BeforeAll;
102109
import org.junit.jupiter.api.Test;
103110

@@ -228,8 +235,7 @@ public void testContainerStateMachineCloseOnMissingPipeline()
228235
// Test applicable only for RATIS based channel.
229236
return;
230237
}
231-
wc.notifyGroupRemove(RaftGroupId
232-
.valueOf(omKeyLocationInfo.getPipeline().getId().getId()));
238+
wc.notifyGroupRemove(RaftGroupId.valueOf(omKeyLocationInfo.getPipeline().getId().getId()));
233239
SCMCommand<?> command = new CloseContainerCommand(
234240
containerID, omKeyLocationInfo.getPipeline().getId());
235241
command.setTerm(
@@ -252,6 +258,57 @@ public void testContainerStateMachineCloseOnMissingPipeline()
252258
key.close();
253259
}
254260

261+
262+
@Test
263+
public void testContainerStateMachineRestartWithDNChangePipeline()
264+
throws Exception {
265+
try (OzoneOutputStream key = objectStore.getVolume(volumeName).getBucket(bucketName)
266+
.createKey("testDNRestart", 1024, ReplicationConfig.fromTypeAndFactor(ReplicationType.RATIS,
267+
ReplicationFactor.THREE), new HashMap<>())) {
268+
key.write("ratis".getBytes(UTF_8));
269+
key.flush();
270+
271+
KeyOutputStream groupOutputStream = (KeyOutputStream) key.
272+
getOutputStream();
273+
List<OmKeyLocationInfo> locationInfoList =
274+
groupOutputStream.getLocationInfoList();
275+
Assert.assertEquals(1, locationInfoList.size());
276+
277+
OmKeyLocationInfo omKeyLocationInfo = locationInfoList.get(0);
278+
Pipeline pipeline = omKeyLocationInfo.getPipeline();
279+
List<HddsDatanodeService> datanodes =
280+
new ArrayList<>(TestHelper.getDatanodeServices(cluster,
281+
pipeline));
282+
283+
DatanodeDetails dn = datanodes.get(0).getDatanodeDetails();
284+
285+
// Delete all data volumes.
286+
cluster.getHddsDatanode(dn).getDatanodeStateMachine().getContainer().getVolumeSet().getVolumesList()
287+
.stream().forEach(v -> {
288+
try {
289+
FileUtils.deleteDirectory(v.getStorageDir());
290+
} catch (IOException e) {
291+
throw new RuntimeException(e);
292+
}
293+
});
294+
295+
// Delete datanode.id datanodeIdFile.
296+
File datanodeIdFile = new File(HddsServerUtil.getDatanodeIdFilePath(cluster.getHddsDatanode(dn).getConf()));
297+
boolean deleted = datanodeIdFile.delete();
298+
Assertions.assertTrue(deleted);
299+
cluster.restartHddsDatanode(dn, false);
300+
GenericTestUtils.waitFor(() -> {
301+
try {
302+
key.write("ratis".getBytes(UTF_8));
303+
key.flush();
304+
return groupOutputStream.getLocationInfoList().size() > 1;
305+
} catch (IOException e) {
306+
throw new UncheckedIOException(e);
307+
}
308+
}, 1000, 30000);
309+
}
310+
}
311+
255312
@Test
256313
public void testContainerStateMachineFailures() throws Exception {
257314
OzoneOutputStream key =

0 commit comments

Comments
 (0)