Skip to content

Commit 2e5cac1

Browse files
wanglijie95zhuzhurk
authored andcommitted
[FLINK-28145][runtime] Let ResourceManager support blocklist
This closes #20218.
1 parent 9815caa commit 2e5cac1

18 files changed

+148
-7
lines changed

flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@
2424
import org.apache.flink.api.common.time.Time;
2525
import org.apache.flink.api.java.tuple.Tuple2;
2626
import org.apache.flink.runtime.blob.TransientBlobKey;
27+
import org.apache.flink.runtime.blocklist.BlockedNode;
28+
import org.apache.flink.runtime.blocklist.BlocklistContext;
29+
import org.apache.flink.runtime.blocklist.BlocklistHandler;
2730
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
2831
import org.apache.flink.runtime.clusterframework.types.AllocationID;
2932
import org.apache.flink.runtime.clusterframework.types.ResourceID;
@@ -96,6 +99,7 @@
9699
import java.util.stream.Collectors;
97100

98101
import static org.apache.flink.util.Preconditions.checkNotNull;
102+
import static org.apache.flink.util.Preconditions.checkState;
99103

100104
/**
101105
* ResourceManager implementation. The resource manager is responsible for resource de-/allocation
@@ -157,6 +161,8 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
157161

158162
private final DelegationTokenManager delegationTokenManager;
159163

164+
protected final BlocklistHandler blocklistHandler;
165+
160166
public ResourceManager(
161167
RpcService rpcService,
162168
UUID leaderSessionId,
@@ -165,6 +171,7 @@ public ResourceManager(
165171
DelegationTokenManager delegationTokenManager,
166172
SlotManager slotManager,
167173
ResourceManagerPartitionTrackerFactory clusterPartitionTrackerFactory,
174+
BlocklistHandler.Factory blocklistHandlerFactory,
168175
JobLeaderIdService jobLeaderIdService,
169176
ClusterInformation clusterInformation,
170177
FatalErrorHandler fatalErrorHandler,
@@ -189,6 +196,12 @@ public ResourceManager(
189196
this.jmResourceIdRegistrations = new HashMap<>(4);
190197
this.taskExecutors = new HashMap<>(8);
191198
this.taskExecutorGatewayFutures = new HashMap<>(8);
199+
this.blocklistHandler =
200+
blocklistHandlerFactory.create(
201+
new ResourceManagerBlocklistContext(),
202+
this::getNodeIdOfTaskManager,
203+
getMainThreadExecutor(),
204+
log);
192205

193206
this.jobManagerHeartbeatManager = NoOpHeartbeatManager.getInstance();
194207
this.taskManagerHeartbeatManager = NoOpHeartbeatManager.getInstance();
@@ -246,7 +259,10 @@ private void startResourceManagerServices() throws Exception {
246259
startHeartbeatServices();
247260

248261
slotManager.start(
249-
getFencingToken(), getMainThreadExecutor(), new ResourceActionsImpl());
262+
getFencingToken(),
263+
getMainThreadExecutor(),
264+
new ResourceActionsImpl(),
265+
blocklistHandler::isBlockedTaskManager);
250266

251267
delegationTokenManager.start();
252268

@@ -833,10 +849,21 @@ public CompletableFuture<TaskExecutorThreadInfoGateway> requestTaskExecutorThrea
833849
}
834850
}
835851

852+
@Override
853+
public CompletableFuture<Acknowledge> notifyNewBlockedNodes(Collection<BlockedNode> newNodes) {
854+
blocklistHandler.addNewBlockedNodes(newNodes);
855+
return CompletableFuture.completedFuture(Acknowledge.get());
856+
}
857+
836858
// ------------------------------------------------------------------------
837859
// Internal methods
838860
// ------------------------------------------------------------------------
839861

862+
private String getNodeIdOfTaskManager(ResourceID taskManagerId) {
863+
checkState(taskExecutors.containsKey(taskManagerId));
864+
return taskExecutors.get(taskManagerId).getNodeId();
865+
}
866+
840867
/**
841868
* Registers a new JobMaster.
842869
*
@@ -1444,6 +1471,18 @@ public Void retrievePayload(ResourceID resourceID) {
14441471
}
14451472
}
14461473

1474+
private class ResourceManagerBlocklistContext implements BlocklistContext {
1475+
@Override
1476+
public void blockResources(Collection<BlockedNode> blockedNodes) {}
1477+
1478+
@Override
1479+
public void unblockResources(Collection<BlockedNode> unBlockedNodes) {
1480+
// when a node is unblocked, we should trigger the resource requirements because the
1481+
// slots on this node become available again.
1482+
slotManager.triggerResourceRequirementsCheck();
1483+
}
1484+
}
1485+
14471486
// ------------------------------------------------------------------------
14481487
// Resource Management
14491488
// ------------------------------------------------------------------------

flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.flink.api.java.tuple.Tuple2;
2525
import org.apache.flink.runtime.blob.BlobServer;
2626
import org.apache.flink.runtime.blob.TransientBlobKey;
27+
import org.apache.flink.runtime.blocklist.BlocklistListener;
2728
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
2829
import org.apache.flink.runtime.clusterframework.types.AllocationID;
2930
import org.apache.flink.runtime.clusterframework.types.ResourceID;
@@ -55,7 +56,7 @@
5556

5657
/** The {@link ResourceManager}'s RPC gateway interface. */
5758
public interface ResourceManagerGateway
58-
extends FencedRpcGateway<ResourceManagerId>, ClusterPartitionManager {
59+
extends FencedRpcGateway<ResourceManagerId>, ClusterPartitionManager, BlocklistListener {
5960

6061
/**
6162
* Register a {@link JobMaster} at the resource manager.

flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.flink.runtime.resourcemanager;
2020

2121
import org.apache.flink.api.common.time.Time;
22+
import org.apache.flink.runtime.blocklist.BlocklistHandler;
2223
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
2324
import org.apache.flink.runtime.clusterframework.types.ResourceID;
2425
import org.apache.flink.runtime.entrypoint.ClusterInformation;
@@ -57,6 +58,7 @@ public StandaloneResourceManager(
5758
DelegationTokenManager delegationTokenManager,
5859
SlotManager slotManager,
5960
ResourceManagerPartitionTrackerFactory clusterPartitionTrackerFactory,
61+
BlocklistHandler.Factory blocklistHandlerFactory,
6062
JobLeaderIdService jobLeaderIdService,
6163
ClusterInformation clusterInformation,
6264
FatalErrorHandler fatalErrorHandler,
@@ -72,6 +74,7 @@ public StandaloneResourceManager(
7274
delegationTokenManager,
7375
slotManager,
7476
clusterPartitionTrackerFactory,
77+
blocklistHandlerFactory,
7578
jobLeaderIdService,
7679
clusterInformation,
7780
fatalErrorHandler,

flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManagerFactory.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.flink.configuration.Configuration;
2525
import org.apache.flink.configuration.ConfigurationUtils;
2626
import org.apache.flink.configuration.ResourceManagerOptions;
27+
import org.apache.flink.runtime.blocklist.BlocklistUtils;
2728
import org.apache.flink.runtime.clusterframework.types.ResourceID;
2829
import org.apache.flink.runtime.entrypoint.ClusterInformation;
2930
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
@@ -82,6 +83,7 @@ protected ResourceManager<ResourceID> createResourceManager(
8283
delegationTokenManager,
8384
resourceManagerRuntimeServices.getSlotManager(),
8485
ResourceManagerPartitionTrackerImpl::new,
86+
BlocklistUtils.loadBlocklistHandlerFactory(configuration),
8587
resourceManagerRuntimeServices.getJobLeaderIdService(),
8688
clusterInformation,
8789
fatalErrorHandler,

flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManager.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.flink.api.common.time.Time;
2323
import org.apache.flink.configuration.AkkaOptions;
2424
import org.apache.flink.configuration.Configuration;
25+
import org.apache.flink.runtime.blocklist.BlocklistHandler;
2526
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
2627
import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec;
2728
import org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils;
@@ -50,7 +51,6 @@
5051

5152
import java.time.Duration;
5253
import java.util.Collection;
53-
import java.util.Collections;
5454
import java.util.HashMap;
5555
import java.util.HashSet;
5656
import java.util.Map;
@@ -113,6 +113,7 @@ public ActiveResourceManager(
113113
DelegationTokenManager delegationTokenManager,
114114
SlotManager slotManager,
115115
ResourceManagerPartitionTrackerFactory clusterPartitionTrackerFactory,
116+
BlocklistHandler.Factory blocklistHandlerFactory,
116117
JobLeaderIdService jobLeaderIdService,
117118
ClusterInformation clusterInformation,
118119
FatalErrorHandler fatalErrorHandler,
@@ -129,6 +130,7 @@ public ActiveResourceManager(
129130
delegationTokenManager,
130131
slotManager,
131132
clusterPartitionTrackerFactory,
133+
blocklistHandlerFactory,
132134
jobLeaderIdService,
133135
clusterInformation,
134136
fatalErrorHandler,
@@ -157,12 +159,11 @@ public ActiveResourceManager(
157159
@Override
158160
protected void initialize() throws ResourceManagerException {
159161
try {
160-
// TODO: pass the blocked node retriever
161162
resourceManagerDriver.initialize(
162163
this,
163164
new GatewayMainThreadExecutor(),
164165
ioExecutor,
165-
() -> Collections.emptySet());
166+
blocklistHandler::getAllBlockedNodeIds);
166167
} catch (Exception e) {
167168
throw new ResourceManagerException("Cannot initialize resource provider.", e);
168169
}

flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManagerFactory.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.flink.configuration.MemorySize;
2626
import org.apache.flink.configuration.ResourceManagerOptions;
2727
import org.apache.flink.configuration.TaskManagerOptions;
28+
import org.apache.flink.runtime.blocklist.BlocklistUtils;
2829
import org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils;
2930
import org.apache.flink.runtime.clusterframework.types.ResourceID;
3031
import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable;
@@ -119,6 +120,7 @@ public ResourceManager<WorkerType> createResourceManager(
119120
delegationTokenManager,
120121
resourceManagerRuntimeServices.getSlotManager(),
121122
ResourceManagerPartitionTrackerImpl::new,
123+
BlocklistUtils.loadBlocklistHandlerFactory(configuration),
122124
resourceManagerRuntimeServices.getJobLeaderIdService(),
123125
clusterInformation,
124126
fatalErrorHandler,

flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DeclarativeSlotManager.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,11 @@ public void setFailUnfulfillableRequest(boolean failUnfulfillableRequest) {
185185
}
186186
}
187187

188+
@Override
189+
public void triggerResourceRequirementsCheck() {
190+
checkResourceRequirementsWithDelay();
191+
}
192+
188193
// ---------------------------------------------------------------------------------------------
189194
// Component lifecycle methods
190195
// ---------------------------------------------------------------------------------------------

flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,11 @@ public void setFailUnfulfillableRequest(boolean failUnfulfillableRequest) {
172172
}
173173
}
174174

175+
@Override
176+
public void triggerResourceRequirementsCheck() {
177+
checkResourceRequirementsWithDelay();
178+
}
179+
175180
// ---------------------------------------------------------------------------------------------
176181
// Component lifecycle methods
177182
// ---------------------------------------------------------------------------------------------

flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,4 +152,10 @@ boolean registerTaskManager(
152152
void freeSlot(SlotID slotId, AllocationID allocationId);
153153

154154
void setFailUnfulfillableRequest(boolean failUnfulfillableRequest);
155+
156+
/**
157+
* Trigger the resource requirement check. This method will be called when some slot statuses
158+
* changed.
159+
*/
160+
void triggerResourceRequirementsCheck();
155161
}

flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,10 @@
2222
import org.apache.flink.api.common.JobStatus;
2323
import org.apache.flink.api.common.time.Time;
2424
import org.apache.flink.core.testutils.OneShotLatch;
25+
import org.apache.flink.runtime.blocklist.BlockedNode;
26+
import org.apache.flink.runtime.blocklist.BlocklistHandler;
27+
import org.apache.flink.runtime.blocklist.DefaultBlocklistHandler;
28+
import org.apache.flink.runtime.blocklist.NoOpBlocklistHandler;
2529
import org.apache.flink.runtime.clusterframework.types.ResourceID;
2630
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
2731
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
@@ -62,6 +66,7 @@
6266
import org.junit.jupiter.api.BeforeEach;
6367
import org.junit.jupiter.api.Test;
6468

69+
import java.time.Duration;
6570
import java.util.Collections;
6671
import java.util.UUID;
6772
import java.util.concurrent.CompletableFuture;
@@ -511,6 +516,33 @@ void testDisconnectTaskManager() throws Exception {
511516
assertThat(stopWorkerFuture.get()).isEqualTo(taskExecutorId);
512517
}
513518

519+
@Test
520+
void testUnblockResourcesWillTriggerResourceRequirementsCheck() throws Exception {
521+
522+
final CompletableFuture<Void> triggerRequirementsCheckFuture = new CompletableFuture<>();
523+
524+
final SlotManager slotManager =
525+
new TestingSlotManagerBuilder()
526+
.setTriggerRequirementsCheckConsumer(
527+
triggerRequirementsCheckFuture::complete)
528+
.createSlotManager();
529+
resourceManager =
530+
new ResourceManagerBuilder()
531+
.withSlotManager(slotManager)
532+
.withBlocklistHandlerFactory(
533+
new DefaultBlocklistHandler.Factory(Duration.ofMillis(100L)))
534+
.buildAndStart();
535+
536+
final ResourceManagerGateway resourceManagerGateway =
537+
resourceManager.getSelfGateway(ResourceManagerGateway.class);
538+
539+
resourceManagerGateway.notifyNewBlockedNodes(
540+
Collections.singleton(
541+
new BlockedNode("node", "Test cause", System.currentTimeMillis())));
542+
543+
triggerRequirementsCheckFuture.get();
544+
}
545+
514546
private void testDisconnectJobManager(JobStatus jobStatus) throws Exception {
515547
final TestingJobMasterGateway jobMasterGateway =
516548
new TestingJobMasterGatewayBuilder()
@@ -598,6 +630,8 @@ private class ResourceManagerBuilder {
598630
private HeartbeatServices heartbeatServices = null;
599631
private JobLeaderIdService jobLeaderIdService = null;
600632
private SlotManager slotManager = null;
633+
private BlocklistHandler.Factory blocklistHandlerFactory =
634+
new NoOpBlocklistHandler.Factory();
601635
private Function<ResourceID, Boolean> stopWorkerFunction = null;
602636

603637
private ResourceManagerBuilder withHeartbeatServices(HeartbeatServices heartbeatServices) {
@@ -616,6 +650,12 @@ private ResourceManagerBuilder withSlotManager(SlotManager slotManager) {
616650
return this;
617651
}
618652

653+
private ResourceManagerBuilder withBlocklistHandlerFactory(
654+
BlocklistHandler.Factory blocklistHandlerFactory) {
655+
this.blocklistHandlerFactory = blocklistHandlerFactory;
656+
return this;
657+
}
658+
619659
private ResourceManagerBuilder withStopWorkerFunction(
620660
Function<ResourceID, Boolean> stopWorkerFunction) {
621661
this.stopWorkerFunction = stopWorkerFunction;
@@ -655,6 +695,7 @@ private TestingResourceManager buildAndStart() throws Exception {
655695
new NoOpDelegationTokenManager(),
656696
slotManager,
657697
NoOpResourceManagerPartitionTracker::get,
698+
blocklistHandlerFactory,
658699
jobLeaderIdService,
659700
testingFatalErrorHandler,
660701
UnregisteredMetricGroups.createUnregisteredResourceManagerMetricGroup(),

0 commit comments

Comments
 (0)