Skip to content

Commit 841f3ce

Browse files
authored
HDDS-13614. Create separate bounded executor for ClosePipelineCommandHandler and CreatePipelineCommandHandlerHandler. (apache#8977)
1 parent db4b436 commit 841f3ce

File tree

3 files changed

+180
-116
lines changed

3 files changed

+180
-116
lines changed

hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java

Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,9 @@
2626
import java.util.Map;
2727
import java.util.concurrent.ExecutorService;
2828
import java.util.concurrent.Executors;
29+
import java.util.concurrent.LinkedBlockingQueue;
2930
import java.util.concurrent.ThreadFactory;
31+
import java.util.concurrent.ThreadPoolExecutor;
3032
import java.util.concurrent.TimeUnit;
3133
import java.util.concurrent.atomic.AtomicLong;
3234
import java.util.concurrent.locks.ReadWriteLock;
@@ -96,7 +98,8 @@ public class DatanodeStateMachine implements Closeable {
9698
private static final Logger LOG =
9799
LoggerFactory.getLogger(DatanodeStateMachine.class);
98100
private final ExecutorService executorService;
99-
private final ExecutorService pipelineCommandExecutorService;
101+
private final ExecutorService closePipelineCommandExecutorService;
102+
private final ExecutorService createPipelineCommandExecutorService;
100103
private final ConfigurationSource conf;
101104
private final SCMConnectionManager connectionManager;
102105
private final ECReconstructionCoordinator ecReconstructionCoordinator;
@@ -236,11 +239,24 @@ public DatanodeStateMachine(HddsDatanodeService hddsDatanodeService,
236239
// datanode clients.
237240
DNContainerOperationClient dnClient = new DNContainerOperationClient(conf, certClient, secretKeyClient);
238241

239-
ThreadFactory threadFactory = new ThreadFactoryBuilder()
240-
.setNameFormat(threadNamePrefix + "PipelineCommandHandlerThread-%d")
242+
// Create separate bounded executors for pipeline command handlers
243+
ThreadFactory closePipelineThreadFactory = new ThreadFactoryBuilder()
244+
.setNameFormat(threadNamePrefix + "ClosePipelineCommandHandlerThread-%d")
241245
.build();
242-
pipelineCommandExecutorService = Executors
243-
.newSingleThreadExecutor(threadFactory);
246+
closePipelineCommandExecutorService = new ThreadPoolExecutor(
247+
1, 1,
248+
0L, TimeUnit.MILLISECONDS,
249+
new LinkedBlockingQueue<>(dnConf.getCommandQueueLimit()),
250+
closePipelineThreadFactory);
251+
252+
ThreadFactory createPipelineThreadFactory = new ThreadFactoryBuilder()
253+
.setNameFormat(threadNamePrefix + "CreatePipelineCommandHandlerThread-%d")
254+
.build();
255+
createPipelineCommandExecutorService = new ThreadPoolExecutor(
256+
1, 1,
257+
0L, TimeUnit.MILLISECONDS,
258+
new LinkedBlockingQueue<>(dnConf.getCommandQueueLimit()),
259+
createPipelineThreadFactory);
244260

245261
// When we add new handlers just adding a new handler here should do the
246262
// trick.
@@ -257,9 +273,9 @@ public DatanodeStateMachine(HddsDatanodeService hddsDatanodeService,
257273
dnConf.getContainerDeleteThreads(), clock,
258274
dnConf.getCommandQueueLimit(), threadNamePrefix))
259275
.addHandler(new ClosePipelineCommandHandler(conf,
260-
pipelineCommandExecutorService))
276+
closePipelineCommandExecutorService))
261277
.addHandler(new CreatePipelineCommandHandler(conf,
262-
pipelineCommandExecutorService))
278+
createPipelineCommandExecutorService))
263279
.addHandler(new SetNodeOperationalStateCommandHandler(conf,
264280
supervisor::nodeStateUpdated))
265281
.addHandler(new FinalizeNewLayoutVersionCommandHandler())
@@ -436,7 +452,8 @@ public void close() throws IOException {
436452
replicationSupervisorMetrics.unRegister();
437453
ecReconstructionMetrics.unRegister();
438454
executorServiceShutdownGraceful(executorService);
439-
executorServiceShutdownGraceful(pipelineCommandExecutorService);
455+
executorServiceShutdownGraceful(closePipelineCommandExecutorService);
456+
executorServiceShutdownGraceful(createPipelineCommandExecutorService);
440457

441458
if (connectionManager != null) {
442459
connectionManager.close();

hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ClosePipelineCommandHandler.java

Lines changed: 85 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,12 @@
1919

2020
import java.io.IOException;
2121
import java.util.Collection;
22+
import java.util.Set;
23+
import java.util.UUID;
2224
import java.util.concurrent.CompletableFuture;
25+
import java.util.concurrent.ConcurrentHashMap;
2326
import java.util.concurrent.Executor;
27+
import java.util.concurrent.RejectedExecutionException;
2428
import java.util.concurrent.atomic.AtomicInteger;
2529
import java.util.concurrent.atomic.AtomicLong;
2630
import java.util.function.BiFunction;
@@ -62,6 +66,7 @@ public class ClosePipelineCommandHandler implements CommandHandler {
6266
private final Executor executor;
6367
private final BiFunction<RaftPeer, GrpcTlsConfig, RaftClient> newRaftClient;
6468
private final MutableRate opsLatencyMs;
69+
private final Set<UUID> pipelinesInProgress;
6570

6671
/**
6772
* Constructs a closePipelineCommand handler.
@@ -82,6 +87,7 @@ public ClosePipelineCommandHandler(
8287
MetricsRegistry registry = new MetricsRegistry(
8388
ClosePipelineCommandHandler.class.getSimpleName());
8489
this.opsLatencyMs = registry.newRate(SCMCommandProto.Type.closePipelineCommand + "Ms");
90+
this.pipelinesInProgress = ConcurrentHashMap.newKeySet();
8591
}
8692

8793
/**
@@ -95,70 +101,88 @@ public ClosePipelineCommandHandler(
95101
@Override
96102
public void handle(SCMCommand<?> command, OzoneContainer ozoneContainer,
97103
StateContext context, SCMConnectionManager connectionManager) {
98-
queuedCount.incrementAndGet();
99-
CompletableFuture.runAsync(() -> {
100-
invocationCount.incrementAndGet();
101-
final long startTime = Time.monotonicNow();
102-
final DatanodeDetails dn = context.getParent().getDatanodeDetails();
103-
ClosePipelineCommand closePipelineCommand =
104-
(ClosePipelineCommand) command;
105-
final PipelineID pipelineID = closePipelineCommand.getPipelineID();
106-
final HddsProtos.PipelineID pipelineIdProto = pipelineID.getProtobuf();
104+
final ClosePipelineCommand closePipelineCommand = (ClosePipelineCommand) command;
105+
final PipelineID pipelineID = closePipelineCommand.getPipelineID();
106+
final UUID pipelineUUID = pipelineID.getId();
107+
108+
// Check if this pipeline is already being processed
109+
if (!pipelinesInProgress.add(pipelineUUID)) {
110+
LOG.debug("Close Pipeline command for pipeline {} is already in progress, " +
111+
"skipping duplicate command.", pipelineID);
112+
return;
113+
}
114+
115+
try {
116+
queuedCount.incrementAndGet();
117+
CompletableFuture.runAsync(() -> {
118+
invocationCount.incrementAndGet();
119+
final long startTime = Time.monotonicNow();
120+
final DatanodeDetails dn = context.getParent().getDatanodeDetails();
121+
final HddsProtos.PipelineID pipelineIdProto = pipelineID.getProtobuf();
107122

108-
try {
109-
XceiverServerSpi server = ozoneContainer.getWriteChannel();
110-
if (server.isExist(pipelineIdProto)) {
111-
if (server instanceof XceiverServerRatis) {
112-
// TODO: Refactor Ratis logic to XceiverServerRatis
113-
// Propagate the group remove to the other Raft peers in the pipeline
114-
XceiverServerRatis ratisServer = (XceiverServerRatis) server;
115-
final RaftGroupId raftGroupId = RaftGroupId.valueOf(pipelineID.getId());
116-
final boolean shouldDeleteRatisLogDirectory = ratisServer.getShouldDeleteRatisLogDirectory();
117-
// This might throw GroupMismatchException if the Ratis group has been closed by other datanodes
118-
final Collection<RaftPeer> peers = ratisServer.getRaftPeersInPipeline(pipelineID);
119-
// Try to send remove group for the other datanodes first, ignoring GroupMismatchException
120-
// if the Ratis group has been closed in the other datanodes
121-
peers.stream()
122-
.filter(peer -> !peer.getId().equals(ratisServer.getServer().getId()))
123-
.forEach(peer -> {
124-
try (RaftClient client = newRaftClient.apply(peer, ozoneContainer.getTlsClientConfig())) {
125-
client.getGroupManagementApi(peer.getId())
126-
.remove(raftGroupId, shouldDeleteRatisLogDirectory, !shouldDeleteRatisLogDirectory);
127-
} catch (GroupMismatchException ae) {
128-
// ignore silently since this means that the group has been closed by earlier close pipeline
129-
// command in another datanode
130-
LOG.debug("Failed to remove group {} for pipeline {} on peer {} since the group has " +
131-
"been removed by earlier close pipeline command handled in another datanode", raftGroupId,
132-
pipelineID, peer.getId());
133-
} catch (IOException ioe) {
134-
LOG.warn("Failed to remove group {} of pipeline {} on peer {}",
135-
raftGroupId, pipelineID, peer.getId(), ioe);
136-
}
137-
});
123+
try {
124+
XceiverServerSpi server = ozoneContainer.getWriteChannel();
125+
if (server.isExist(pipelineIdProto)) {
126+
if (server instanceof XceiverServerRatis) {
127+
// TODO: Refactor Ratis logic to XceiverServerRatis
128+
// Propagate the group remove to the other Raft peers in the pipeline
129+
XceiverServerRatis ratisServer = (XceiverServerRatis) server;
130+
final RaftGroupId raftGroupId = RaftGroupId.valueOf(pipelineID.getId());
131+
final boolean shouldDeleteRatisLogDirectory = ratisServer.getShouldDeleteRatisLogDirectory();
132+
// This might throw GroupMismatchException if the Ratis group has been closed by other datanodes
133+
final Collection<RaftPeer> peers = ratisServer.getRaftPeersInPipeline(pipelineID);
134+
// Try to send remove group for the other datanodes first, ignoring GroupMismatchException
135+
// if the Ratis group has been closed in the other datanodes
136+
peers.stream()
137+
.filter(peer -> !peer.getId().equals(ratisServer.getServer().getId()))
138+
.forEach(peer -> {
139+
try (RaftClient client = newRaftClient.apply(peer, ozoneContainer.getTlsClientConfig())) {
140+
client.getGroupManagementApi(peer.getId())
141+
.remove(raftGroupId, shouldDeleteRatisLogDirectory, !shouldDeleteRatisLogDirectory);
142+
} catch (GroupMismatchException ae) {
143+
// ignore silently since this means that the group has been closed by earlier close pipeline
144+
// command in another datanode
145+
LOG.debug("Failed to remove group {} for pipeline {} on peer {} since the group has " +
146+
"been removed by earlier close pipeline command handled in another datanode", raftGroupId,
147+
pipelineID, peer.getId());
148+
} catch (IOException ioe) {
149+
LOG.warn("Failed to remove group {} of pipeline {} on peer {}",
150+
raftGroupId, pipelineID, peer.getId(), ioe);
151+
}
152+
});
153+
}
154+
// Remove the Ratis group from the current datanode pipeline, might throw GroupMismatchException as
155+
// well. It is a no-op for XceiverServerSpi implementations (e.g. XceiverServerGrpc)
156+
server.removeGroup(pipelineIdProto);
157+
LOG.info("Close Pipeline {} command on datanode {}.", pipelineID, dn);
158+
} else {
159+
LOG.debug("Ignoring close pipeline command for pipeline {} on datanode {} " +
160+
"as it does not exist", pipelineID, dn);
138161
}
139-
// Remove the Ratis group from the current datanode pipeline, might throw GroupMismatchException as
140-
// well. It is a no-op for XceiverServerSpi implementations (e.g. XceiverServerGrpc)
141-
server.removeGroup(pipelineIdProto);
142-
LOG.info("Close Pipeline {} command on datanode {}.", pipelineID, dn);
143-
} else {
144-
LOG.debug("Ignoring close pipeline command for pipeline {} on datanode {} " +
145-
"as it does not exist", pipelineID, dn);
146-
}
147-
} catch (IOException e) {
148-
Throwable gme = HddsClientUtils.containsException(e, GroupMismatchException.class);
149-
if (gme != null) {
150-
// ignore silently since this means that the group has been closed by earlier close pipeline
151-
// command in another datanode
152-
LOG.debug("The group for pipeline {} on datanode {} has been removed by earlier close " +
153-
"pipeline command handled in another datanode", pipelineID, dn);
154-
} else {
155-
LOG.error("Can't close pipeline {}", pipelineID, e);
162+
} catch (IOException e) {
163+
Throwable gme = HddsClientUtils.containsException(e, GroupMismatchException.class);
164+
if (gme != null) {
165+
// ignore silently since this means that the group has been closed by earlier close pipeline
166+
// command in another datanode
167+
LOG.debug("The group for pipeline {} on datanode {} has been removed by earlier close " +
168+
"pipeline command handled in another datanode", pipelineID, dn);
169+
} else {
170+
LOG.error("Can't close pipeline {}", pipelineID, e);
171+
}
172+
} finally {
173+
long endTime = Time.monotonicNow();
174+
this.opsLatencyMs.add(endTime - startTime);
156175
}
157-
} finally {
158-
long endTime = Time.monotonicNow();
159-
this.opsLatencyMs.add(endTime - startTime);
160-
}
161-
}, executor).whenComplete((v, e) -> queuedCount.decrementAndGet());
176+
}, executor).whenComplete((v, e) -> {
177+
queuedCount.decrementAndGet();
178+
pipelinesInProgress.remove(pipelineUUID);
179+
});
180+
} catch (RejectedExecutionException ex) {
181+
queuedCount.decrementAndGet();
182+
pipelinesInProgress.remove(pipelineUUID);
183+
LOG.warn("Close Pipeline command for pipeline {} is rejected as " +
184+
"command queue has reached max size.", pipelineID);
185+
}
162186
}
163187

164188
/**

0 commit comments

Comments
 (0)