Skip to content

Commit c595cdd

Browse files
committed
[FLINK-37813][runtime] Pass numberSlots via WorkerRegistration instead of SlotReport (#27100)
* [FLINK-37813][runtime] Pass numberSlots via WorkerRegistration instead of SlotReport SlotReport should only report the current slot usage, while numberSlots is an attribute of the TaskManager and should not be included in the SlotReport.
1 parent 042956c commit c595cdd

File tree

10 files changed

+89
-18
lines changed

10 files changed

+89
-18
lines changed

flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -518,7 +518,7 @@ public CompletableFuture<Acknowledge> sendSlotReport(
518518
WorkerResourceSpec workerResourceSpec =
519519
WorkerResourceSpec.fromTotalResourceProfile(
520520
workerTypeWorkerRegistration.getTotalResourceProfile(),
521-
slotReport.getNumSlotStatus());
521+
workerTypeWorkerRegistration.getNumberSlots());
522522
onWorkerRegistered(workerTypeWorkerRegistration.getWorker(), workerResourceSpec);
523523
} else if (registrationResult == SlotManager.RegistrationResult.REJECTED) {
524524
closeTaskManagerConnection(
@@ -1074,7 +1074,8 @@ private RegistrationResponse registerTaskExecutorInternal(
10741074
taskExecutorRegistration.getMemoryConfiguration(),
10751075
taskExecutorRegistration.getTotalResourceProfile(),
10761076
taskExecutorRegistration.getDefaultSlotResourceProfile(),
1077-
taskExecutorRegistration.getNodeId());
1077+
taskExecutorRegistration.getNodeId(),
1078+
taskExecutorRegistration.getNumberSlots());
10781079

10791080
log.info(
10801081
"Registering TaskManager with ResourceID {} ({}) at ResourceManager",

flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/TaskExecutorRegistration.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,9 @@ public class TaskExecutorRegistration implements Serializable {
6464
*/
6565
private final String nodeId;
6666

67+
/** Number of slots in static slot allocation. */
68+
private final int numberSlots;
69+
6770
public TaskExecutorRegistration(
6871
final String taskExecutorAddress,
6972
final ResourceID resourceId,
@@ -73,7 +76,8 @@ public TaskExecutorRegistration(
7376
final TaskExecutorMemoryConfiguration memoryConfiguration,
7477
final ResourceProfile defaultSlotResourceProfile,
7578
final ResourceProfile totalResourceProfile,
76-
final String nodeId) {
79+
final String nodeId,
80+
final int numberSlots) {
7781
this.taskExecutorAddress = checkNotNull(taskExecutorAddress);
7882
this.resourceId = checkNotNull(resourceId);
7983
this.dataPort = dataPort;
@@ -83,6 +87,7 @@ public TaskExecutorRegistration(
8387
this.defaultSlotResourceProfile = checkNotNull(defaultSlotResourceProfile);
8488
this.totalResourceProfile = checkNotNull(totalResourceProfile);
8589
this.nodeId = checkNotNull(nodeId);
90+
this.numberSlots = numberSlots;
8691
}
8792

8893
public String getTaskExecutorAddress() {
@@ -120,4 +125,8 @@ public ResourceProfile getTotalResourceProfile() {
120125
public String getNodeId() {
121126
return nodeId;
122127
}
128+
129+
public int getNumberSlots() {
130+
return numberSlots;
131+
}
123132
}

flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/WorkerRegistration.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ public class WorkerRegistration<WorkerType extends ResourceIDRetrievable>
4545

4646
private final String nodeId;
4747

48+
private final int numberSlots;
49+
4850
public WorkerRegistration(
4951
TaskExecutorGateway taskExecutorGateway,
5052
WorkerType worker,
@@ -54,7 +56,8 @@ public WorkerRegistration(
5456
TaskExecutorMemoryConfiguration memoryConfiguration,
5557
ResourceProfile totalResourceProfile,
5658
ResourceProfile defaultSlotResourceProfile,
57-
String nodeId) {
59+
String nodeId,
60+
int numberSlots) {
5861

5962
super(worker.getResourceID(), taskExecutorGateway);
6063

@@ -66,6 +69,7 @@ public WorkerRegistration(
6669
this.totalResourceProfile = Preconditions.checkNotNull(totalResourceProfile);
6770
this.defaultSlotResourceProfile = Preconditions.checkNotNull(defaultSlotResourceProfile);
6871
this.nodeId = Preconditions.checkNotNull(nodeId);
72+
this.numberSlots = numberSlots;
6973
}
7074

7175
public WorkerType getWorker() {
@@ -99,4 +103,8 @@ public ResourceProfile getTotalResourceProfile() {
99103
public String getNodeId() {
100104
return nodeId;
101105
}
106+
107+
public int getNumberSlots() {
108+
return numberSlots;
109+
}
102110
}

flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1566,7 +1566,8 @@ private void connectToResourceManager() {
15661566
memoryConfiguration,
15671567
taskManagerConfiguration.getDefaultSlotResourceProfile(),
15681568
taskManagerConfiguration.getTotalResourceProfile(),
1569-
unresolvedTaskManagerLocation.getNodeId());
1569+
unresolvedTaskManagerLocation.getNodeId(),
1570+
taskManagerConfiguration.getNumberSlots());
15701571

15711572
resourceManagerConnection =
15721573
new TaskExecutorToResourceManagerConnection(

flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerPartitionLifecycleTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,8 @@ static void registerTaskExecutor(
187187
1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L),
188188
ResourceProfile.ZERO,
189189
ResourceProfile.ZERO,
190-
taskExecutorAddress);
190+
taskExecutorAddress,
191+
1);
191192
final CompletableFuture<RegistrationResponse> registrationFuture =
192193
resourceManagerGateway.registerTaskExecutor(
193194
taskExecutorRegistration, TestingUtils.TIMEOUT);

flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,8 @@ void testDelayedRegisterTaskExecutor() throws Exception {
222222
1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L),
223223
DEFAULT_SLOT_PROFILE,
224224
DEFAULT_SLOT_PROFILE,
225-
taskExecutorGateway.getAddress());
225+
taskExecutorGateway.getAddress(),
226+
1);
226227

227228
CompletableFuture<RegistrationResponse> firstFuture =
228229
rmGateway.registerTaskExecutor(taskExecutorRegistration, fastTimeout);
@@ -287,7 +288,8 @@ void testDisconnectTaskExecutor() throws Exception {
287288
1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L),
288289
DEFAULT_SLOT_PROFILE,
289290
DEFAULT_SLOT_PROFILE.multiply(numberSlots),
290-
taskExecutorGateway.getAddress());
291+
taskExecutorGateway.getAddress(),
292+
numberSlots);
291293
final RegistrationResponse registrationResponse =
292294
rmGateway.registerTaskExecutor(taskExecutorRegistration, TIMEOUT).get();
293295
assertThat(registrationResponse).isInstanceOf(TaskExecutorRegistrationSuccess.class);
@@ -364,7 +366,8 @@ private CompletableFuture<RegistrationResponse> registerTaskExecutor(
364366
1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L),
365367
DEFAULT_SLOT_PROFILE,
366368
DEFAULT_SLOT_PROFILE,
367-
taskExecutorAddress),
369+
taskExecutorAddress,
370+
1),
368371
TIMEOUT);
369372
}
370373
}

flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -255,7 +255,8 @@ private void registerTaskExecutor(
255255
1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L),
256256
ResourceProfile.ZERO,
257257
ResourceProfile.ZERO,
258-
taskExecutorAddress);
258+
taskExecutorAddress,
259+
1);
259260
final CompletableFuture<RegistrationResponse> registrationFuture =
260261
resourceManagerGateway.registerTaskExecutor(
261262
taskExecutorRegistration, TestingUtils.TIMEOUT);
@@ -768,7 +769,8 @@ private void registerTaskExecutorAndSlot(
768769
1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L),
769770
ResourceProfile.fromResources(1, 1024),
770771
ResourceProfile.fromResources(1, 1024).multiply(slotCount),
771-
taskExecutorGateway.getAddress());
772+
taskExecutorGateway.getAddress(),
773+
slotCount);
772774
RegistrationResponse registrationResult =
773775
resourceManagerGateway
774776
.registerTaskExecutor(taskExecutorRegistration, TestingUtils.TIMEOUT)

flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManagerTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1246,7 +1246,8 @@ CompletableFuture<RegistrationResponse> registerTaskExecutor(
12461246
TESTING_CONFIG,
12471247
ResourceProfile.ZERO,
12481248
ResourceProfile.ZERO,
1249-
resourceID.toString());
1249+
resourceID.toString(),
1250+
1);
12501251

12511252
return resourceManager
12521253
.getSelfGateway(ResourceManagerGateway.class)

flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorRecoveryTest.java

Lines changed: 49 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,15 @@
2727
import org.apache.flink.runtime.clusterframework.types.AllocationID;
2828
import org.apache.flink.runtime.clusterframework.types.ResourceID;
2929
import org.apache.flink.runtime.clusterframework.types.SlotID;
30+
import org.apache.flink.runtime.entrypoint.ClusterInformation;
3031
import org.apache.flink.runtime.entrypoint.WorkingDirectory;
3132
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
33+
import org.apache.flink.runtime.instance.InstanceID;
3234
import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGateway;
3335
import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder;
3436
import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
3537
import org.apache.flink.runtime.messages.Acknowledge;
38+
import org.apache.flink.runtime.resourcemanager.TaskExecutorRegistration;
3639
import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
3740
import org.apache.flink.runtime.rpc.TestingRpcService;
3841
import org.apache.flink.runtime.rpc.TestingRpcServiceExtension;
@@ -63,8 +66,19 @@ class TaskExecutorRecoveryTest {
6366
new EachCallbackWrapper<>(rpcServiceExtension);
6467

6568
@Test
66-
void testRecoveredTaskExecutorWillRestoreAllocationState(@TempDir File tempDir)
67-
throws Exception {
69+
void testRecoveredTaskExecutorWillRestoreAllocationStateWithFixedSlotRequest(
70+
@TempDir File tempDir) throws Exception {
71+
testRecoveredTaskExecutorWillRestoreAllocationState(tempDir, false);
72+
}
73+
74+
@Test
75+
void testRecoveredTaskExecutorWillRestoreAllocationStateWithDynamicSlotRequest(
76+
@TempDir File tempDir) throws Exception {
77+
testRecoveredTaskExecutorWillRestoreAllocationState(tempDir, true);
78+
}
79+
80+
private void testRecoveredTaskExecutorWillRestoreAllocationState(
81+
File tempDir, boolean useDynamicRequest) throws Exception {
6882
final ResourceID resourceId = ResourceID.generate();
6983

7084
final Configuration configuration = new Configuration();
@@ -82,6 +96,20 @@ void testRecoveredTaskExecutorWillRestoreAllocationState(@TempDir File tempDir)
8296
return CompletableFuture.completedFuture(Acknowledge.get());
8397
});
8498

99+
final ArrayBlockingQueue<TaskExecutorRegistration> taskExecutorRegistrations =
100+
new ArrayBlockingQueue<>(2);
101+
102+
testingResourceManagerGateway.setRegisterTaskExecutorFunction(
103+
taskExecutorRegistration -> {
104+
taskExecutorRegistrations.offer(taskExecutorRegistration);
105+
return CompletableFuture.completedFuture(
106+
new TaskExecutorRegistrationSuccess(
107+
new InstanceID(),
108+
taskExecutorRegistration.getResourceId(),
109+
new ClusterInformation("localhost", 1234),
110+
null));
111+
});
112+
85113
final TestingRpcService rpcService = rpcServiceExtension.getTestingRpcService();
86114
rpcService.registerGateway(
87115
testingResourceManagerGateway.getAddress(), testingResourceManagerGateway);
@@ -118,8 +146,14 @@ void testRecoveredTaskExecutorWillRestoreAllocationState(@TempDir File tempDir)
118146

119147
assertThat(slotReport.getNumSlotStatus(), is(2));
120148

149+
final TaskExecutorRegistration taskExecutorRegistration = taskExecutorRegistrations.take();
150+
assertThat(taskExecutorRegistration.getNumberSlots(), is(2));
151+
121152
final SlotStatus slotStatus = slotReport.iterator().next();
122-
final SlotID allocatedSlotID = slotStatus.getSlotID();
153+
final SlotID allocatedSlotID =
154+
useDynamicRequest
155+
? SlotID.getDynamicSlotID(slotStatus.getSlotID().getResourceID())
156+
: slotStatus.getSlotID();
123157

124158
final AllocationID allocationId = new AllocationID();
125159
taskExecutorGateway
@@ -160,16 +194,26 @@ void testRecoveredTaskExecutorWillRestoreAllocationState(@TempDir File tempDir)
160194
recoveredTaskExecutor.start();
161195

162196
final TaskExecutorSlotReport recoveredSlotReport = queue.take();
163-
197+
final int expectedNumberOfSlots = useDynamicRequest ? 3 : 2;
198+
assertThat(
199+
recoveredSlotReport.getSlotReport().getNumSlotStatus(), is(expectedNumberOfSlots));
164200
for (SlotStatus status : recoveredSlotReport.getSlotReport()) {
165-
if (status.getSlotID().equals(allocatedSlotID)) {
201+
boolean isAllocatedSlot =
202+
useDynamicRequest
203+
? status.getSlotID().getSlotNumber() == 2
204+
: status.getSlotID().equals(allocatedSlotID);
205+
if (isAllocatedSlot) {
166206
assertThat(status.getJobID(), is(jobId));
167207
assertThat(status.getAllocationID(), is(allocationId));
168208
} else {
169209
assertThat(status.getJobID(), is(nullValue()));
170210
}
171211
}
172212

213+
final TaskExecutorRegistration recoveredTaskExecutorRegistration =
214+
taskExecutorRegistrations.take();
215+
assertThat(recoveredTaskExecutorRegistration.getNumberSlots(), is(2));
216+
173217
final Collection<SlotOffer> take = offeredSlots.take();
174218

175219
assertThat(take, hasSize(1));

flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnectionTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,8 @@ void testResourceManagerRegistrationIsRejected() {
140140
TASK_MANAGER_MEMORY_CONFIGURATION,
141141
ResourceProfile.ZERO,
142142
ResourceProfile.ZERO,
143-
TASK_MANAGER_NODE_ID);
143+
TASK_MANAGER_NODE_ID,
144+
1);
144145
return new TaskExecutorToResourceManagerConnection(
145146
LOGGER,
146147
rpcService,

0 commit comments

Comments
 (0)