Skip to content

Commit 9945de6

Browse files
authored
HDDS-11667. Validating DatanodeID on any request to the datanode (#7418)
1 parent fc6a2ea commit 9945de6

File tree

8 files changed

+269
-28
lines changed

8 files changed

+269
-28
lines changed

hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.Collections;
2323
import java.util.EnumSet;
2424
import java.util.List;
25+
import java.util.Objects;
2526
import java.util.Set;
2627
import java.util.UUID;
2728

@@ -647,6 +648,20 @@ public boolean equals(Object obj) {
647648
uuid.equals(((DatanodeDetails) obj).uuid);
648649
}
649650

651+
652+
/**
653+
* Checks hostname, ipAddress and port of the 2 nodes are the same.
654+
* @param datanodeDetails dnDetails object to compare with.
655+
* @return true if the values match otherwise false.
656+
*/
657+
public boolean compareNodeValues(DatanodeDetails datanodeDetails) {
658+
if (this == datanodeDetails || super.equals(datanodeDetails)) {
659+
return true;
660+
}
661+
return Objects.equals(ipAddress, datanodeDetails.ipAddress)
662+
&& Objects.equals(hostName, datanodeDetails.hostName) && Objects.equals(ports, datanodeDetails.ports);
663+
}
664+
650665
@Override
651666
public int hashCode() {
652667
return uuid.hashCode();

hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -330,7 +330,11 @@ public List<DatanodeDetails> getNodesInOrder() {
330330
}
331331

332332
void reportDatanode(DatanodeDetails dn) throws IOException {
333-
if (nodeStatus.get(dn) == null) {
333+
//This is a workaround for the case a datanode restarted with reinitializing it's dnId but it still reports the
334+
// same set of pipelines it was part of. The pipeline report should be accepted for this anomalous condition.
335+
// We rely on StaleNodeHandler in closing this pipeline eventually.
336+
if (dn == null || (nodeStatus.get(dn) == null
337+
&& nodeStatus.keySet().stream().noneMatch(node -> node.compareNodeValues(dn)))) {
334338
throw new IOException(
335339
String.format("Datanode=%s not part of pipeline=%s", dn, id));
336340
}

hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.io.InputStream;
2323
import java.io.OutputStream;
2424

25+
import com.google.common.annotations.VisibleForTesting;
2526
import org.apache.hadoop.hdds.conf.ConfigurationSource;
2627
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
2728
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
@@ -93,7 +94,8 @@ public abstract StateMachine.DataChannel getStreamDataChannel(
9394
*
9495
* @return datanode Id
9596
*/
96-
protected String getDatanodeId() {
97+
@VisibleForTesting
98+
public String getDatanodeId() {
9799
return datanodeId;
98100
}
99101

hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@
6565
import org.apache.hadoop.ozone.HddsDatanodeService;
6666
import org.apache.hadoop.ozone.OzoneConfigKeys;
6767
import org.apache.hadoop.ozone.common.utils.BufferUtils;
68+
import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
6869
import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
6970
import org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
7071
import org.apache.hadoop.ozone.container.keyvalue.impl.KeyValueStreamDataChannel;
@@ -78,6 +79,7 @@
7879
import org.apache.ratis.proto.RaftProtos.StateMachineLogEntryProto;
7980
import org.apache.ratis.protocol.Message;
8081
import org.apache.ratis.protocol.RaftClientRequest;
82+
import org.apache.ratis.protocol.RaftGroup;
8183
import org.apache.ratis.protocol.RaftGroupId;
8284
import org.apache.ratis.protocol.RaftGroupMemberId;
8385
import org.apache.ratis.protocol.RaftPeer;
@@ -202,6 +204,7 @@ long getStartTime() {
202204
private final boolean waitOnBothFollowers;
203205
private final HddsDatanodeService datanodeService;
204206
private static Semaphore semaphore = new Semaphore(1);
207+
private final AtomicBoolean peersValidated;
205208

206209
/**
207210
* CSM metrics.
@@ -252,6 +255,7 @@ public ContainerStateMachine(HddsDatanodeService hddsDatanodeService, RaftGroupI
252255
HDDS_CONTAINER_RATIS_STATEMACHINE_MAX_PENDING_APPLY_TXNS_DEFAULT);
253256
applyTransactionSemaphore = new Semaphore(maxPendingApplyTransactions);
254257
stateMachineHealthy = new AtomicBoolean(true);
258+
this.peersValidated = new AtomicBoolean(false);
255259

256260
ThreadFactory threadFactory = new ThreadFactoryBuilder()
257261
.setNameFormat(
@@ -265,6 +269,19 @@ public ContainerStateMachine(HddsDatanodeService hddsDatanodeService, RaftGroupI
265269

266270
}
267271

272+
private void validatePeers() throws IOException {
273+
if (this.peersValidated.get()) {
274+
return;
275+
}
276+
final RaftGroup group = ratisServer.getServerDivision(getGroupId()).getGroup();
277+
final RaftPeerId selfId = ratisServer.getServer().getId();
278+
if (group.getPeer(selfId) == null) {
279+
throw new StorageContainerException("Current datanode " + selfId + " is not a member of " + group,
280+
ContainerProtos.Result.INVALID_CONFIG);
281+
}
282+
peersValidated.set(true);
283+
}
284+
268285
@Override
269286
public StateMachineStorage getStateMachineStorage() {
270287
return storage;
@@ -962,6 +979,11 @@ private CompletableFuture<ContainerCommandResponseProto> applyTransaction(
962979
final CheckedSupplier<ContainerCommandResponseProto, Exception> task
963980
= () -> {
964981
try {
982+
try {
983+
this.validatePeers();
984+
} catch (StorageContainerException e) {
985+
return ContainerUtils.logAndReturnError(LOG, e, request);
986+
}
965987
long timeNow = Time.monotonicNowNanos();
966988
long queueingDelay = timeNow - context.getStartTime();
967989
metrics.recordQueueingDelay(request.getCmdType(), queueingDelay);

hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@
101101
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.DELETE_ON_NON_EMPTY_CONTAINER;
102102
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.DELETE_ON_OPEN_CONTAINER;
103103
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.GET_SMALL_FILE_ERROR;
104+
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.INVALID_ARGUMENT;
104105
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.INVALID_CONTAINER_STATE;
105106
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.IO_EXCEPTION;
106107
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.PUT_SMALL_FILE_ERROR;
@@ -242,6 +243,15 @@ static ContainerCommandResponseProto dispatchRequest(KeyValueHandler handler,
242243
ContainerCommandRequestProto request, KeyValueContainer kvContainer,
243244
DispatcherContext dispatcherContext) {
244245
Type cmdType = request.getCmdType();
246+
// Validate the request has been made to the correct datanode with the node id matching.
247+
if (kvContainer != null) {
248+
try {
249+
handler.validateRequestDatanodeId(kvContainer.getContainerData().getReplicaIndex(),
250+
request.getDatanodeUuid());
251+
} catch (StorageContainerException e) {
252+
return ContainerUtils.logAndReturnError(LOG, e, request);
253+
}
254+
}
245255

246256
switch (cmdType) {
247257
case CreateContainer:
@@ -353,6 +363,13 @@ ContainerCommandResponseProto handleCreateContainer(
353363
" already exists", null, CONTAINER_ALREADY_EXISTS), request);
354364
}
355365

366+
try {
367+
this.validateRequestDatanodeId(request.getCreateContainer().hasReplicaIndex() ?
368+
request.getCreateContainer().getReplicaIndex() : null, request.getDatanodeUuid());
369+
} catch (StorageContainerException e) {
370+
return ContainerUtils.logAndReturnError(LOG, e, request);
371+
}
372+
356373
long containerID = request.getContainerID();
357374
State containerState = request.getCreateContainer().getState();
358375

@@ -1532,4 +1549,22 @@ public static FaultInjector getInjector() {
15321549
public static void setInjector(FaultInjector instance) {
15331550
injector = instance;
15341551
}
1552+
1553+
/**
1554+
* Verify if request's replicaIndex matches with containerData. This validates only for EC containers i.e.
1555+
* containerReplicaIdx should be > 0.
1556+
*
1557+
* @param containerReplicaIdx replicaIndex for the container command.
1558+
* @param requestDatanodeUUID requested block info
1559+
* @throws StorageContainerException if replicaIndex mismatches.
1560+
*/
1561+
private boolean validateRequestDatanodeId(Integer containerReplicaIdx, String requestDatanodeUUID)
1562+
throws StorageContainerException {
1563+
if (containerReplicaIdx != null && containerReplicaIdx > 0 && !requestDatanodeUUID.equals(this.getDatanodeId())) {
1564+
throw new StorageContainerException(
1565+
String.format("Request is trying to write to node with uuid : %s but the current nodeId is: %s .",
1566+
requestDatanodeUUID, this.getDatanodeId()), INVALID_ARGUMENT);
1567+
}
1568+
return true;
1569+
}
15351570
}

hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@
6868
import org.junit.jupiter.api.Test;
6969
import org.junit.jupiter.api.Timeout;
7070
import org.junit.jupiter.api.io.TempDir;
71+
import org.mockito.Mockito;
7172

7273
import static org.mockito.Mockito.doCallRealMethod;
7374
import static org.mockito.Mockito.mock;
@@ -131,7 +132,13 @@ public void testHandlerCommandHandling() throws Exception {
131132
.build();
132133

133134
KeyValueContainer container = mock(KeyValueContainer.class);
134-
135+
KeyValueContainerData containerData = mock(KeyValueContainerData.class);
136+
Mockito.when(container.getContainerData()).thenReturn(containerData);
137+
Mockito.when(containerData.getReplicaIndex()).thenReturn(1);
138+
ContainerProtos.ContainerCommandResponseProto responseProto = KeyValueHandler.dispatchRequest(handler,
139+
createContainerRequest, container, null);
140+
assertEquals(ContainerProtos.Result.INVALID_ARGUMENT, responseProto.getResult());
141+
Mockito.when(handler.getDatanodeId()).thenReturn(DATANODE_UUID);
135142
KeyValueHandler
136143
.dispatchRequest(handler, createContainerRequest, container, null);
137144
verify(handler, times(0)).handleListBlock(

hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import java.io.File;
2121
import java.io.IOException;
22+
import java.io.UncheckedIOException;
2223
import java.nio.charset.StandardCharsets;
2324
import java.nio.file.Path;
2425
import java.time.Duration;
@@ -32,6 +33,7 @@
3233
import java.util.concurrent.TimeUnit;
3334
import java.util.concurrent.atomic.AtomicInteger;
3435

36+
import org.apache.commons.io.FileUtils;
3537
import org.apache.commons.lang3.ArrayUtils;
3638
import org.apache.hadoop.fs.FileUtil;
3739
import org.apache.hadoop.hdds.HddsUtils;
@@ -40,6 +42,7 @@
4042
import org.apache.hadoop.hdds.client.ReplicationType;
4143
import org.apache.hadoop.hdds.conf.DatanodeRatisServerConfig;
4244
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
45+
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
4346
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
4447
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
4548
import org.apache.hadoop.hdds.ratis.conf.RatisClientConfig;
@@ -50,6 +53,7 @@
5053
import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
5154
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException;
5255
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
56+
import org.apache.hadoop.hdds.utils.HddsServerUtil;
5357
import org.apache.hadoop.hdds.utils.IOUtils;
5458
import org.apache.hadoop.ozone.HddsDatanodeService;
5559
import org.apache.hadoop.ozone.MiniOzoneCluster;
@@ -264,6 +268,57 @@ public void testContainerStateMachineCloseOnMissingPipeline()
264268
key.close();
265269
}
266270

271+
272+
@Test
273+
public void testContainerStateMachineRestartWithDNChangePipeline()
274+
throws Exception {
275+
try (OzoneOutputStream key = objectStore.getVolume(volumeName).getBucket(bucketName)
276+
.createKey("testDNRestart", 1024, ReplicationConfig.fromTypeAndFactor(ReplicationType.RATIS,
277+
ReplicationFactor.THREE), new HashMap<>())) {
278+
key.write("ratis".getBytes(UTF_8));
279+
key.flush();
280+
281+
KeyOutputStream groupOutputStream = (KeyOutputStream) key.
282+
getOutputStream();
283+
List<OmKeyLocationInfo> locationInfoList =
284+
groupOutputStream.getLocationInfoList();
285+
assertEquals(1, locationInfoList.size());
286+
287+
OmKeyLocationInfo omKeyLocationInfo = locationInfoList.get(0);
288+
Pipeline pipeline = omKeyLocationInfo.getPipeline();
289+
List<HddsDatanodeService> datanodes =
290+
new ArrayList<>(TestHelper.getDatanodeServices(cluster,
291+
pipeline));
292+
293+
DatanodeDetails dn = datanodes.get(0).getDatanodeDetails();
294+
295+
// Delete all data volumes.
296+
cluster.getHddsDatanode(dn).getDatanodeStateMachine().getContainer().getVolumeSet().getVolumesList()
297+
.stream().forEach(v -> {
298+
try {
299+
FileUtils.deleteDirectory(v.getStorageDir());
300+
} catch (IOException e) {
301+
throw new RuntimeException(e);
302+
}
303+
});
304+
305+
// Delete datanode.id datanodeIdFile.
306+
File datanodeIdFile = new File(HddsServerUtil.getDatanodeIdFilePath(cluster.getHddsDatanode(dn).getConf()));
307+
boolean deleted = datanodeIdFile.delete();
308+
assertTrue(deleted);
309+
cluster.restartHddsDatanode(dn, false);
310+
GenericTestUtils.waitFor(() -> {
311+
try {
312+
key.write("ratis".getBytes(UTF_8));
313+
key.flush();
314+
return groupOutputStream.getLocationInfoList().size() > 1;
315+
} catch (IOException e) {
316+
throw new UncheckedIOException(e);
317+
}
318+
}, 1000, 30000);
319+
}
320+
}
321+
267322
@Test
268323
public void testContainerStateMachineFailures() throws Exception {
269324
OzoneOutputStream key =

0 commit comments

Comments
 (0)