Skip to content

Commit b0b6669

Browse files
authored
[FLINK-37813][runtime] Pass numberSlots via WorkerRegistration instead of SlotReport (#27100)
* [FLINK-37813][runtime] Pass numberSlots via WorkerRegistration instead of SlotReport SlotReport should only report the current slot usage, while numberSlots is an attribute of the TaskManager and should not be included in the SlotReport.
1 parent e46a212 commit b0b6669

File tree

10 files changed

+89
-18
lines changed

10 files changed

+89
-18
lines changed

flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -525,7 +525,7 @@ public CompletableFuture<Acknowledge> sendSlotReport(
525525
WorkerResourceSpec workerResourceSpec =
526526
WorkerResourceSpec.fromTotalResourceProfile(
527527
workerTypeWorkerRegistration.getTotalResourceProfile(),
528-
slotReport.getNumSlotStatus());
528+
workerTypeWorkerRegistration.getNumberSlots());
529529
onWorkerRegistered(workerTypeWorkerRegistration.getWorker(), workerResourceSpec);
530530
} else if (registrationResult == SlotManager.RegistrationResult.REJECTED) {
531531
closeTaskManagerConnection(
@@ -1083,7 +1083,8 @@ private RegistrationResponse registerTaskExecutorInternal(
10831083
taskExecutorRegistration.getMemoryConfiguration(),
10841084
taskExecutorRegistration.getTotalResourceProfile(),
10851085
taskExecutorRegistration.getDefaultSlotResourceProfile(),
1086-
taskExecutorRegistration.getNodeId());
1086+
taskExecutorRegistration.getNodeId(),
1087+
taskExecutorRegistration.getNumberSlots());
10871088

10881089
log.info(
10891090
"Registering TaskManager with ResourceID {} ({}) at ResourceManager",

flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/TaskExecutorRegistration.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,9 @@ public class TaskExecutorRegistration implements Serializable {
6464
*/
6565
private final String nodeId;
6666

67+
/** Number of slots in static slot allocation. */
68+
private final int numberSlots;
69+
6770
public TaskExecutorRegistration(
6871
final String taskExecutorAddress,
6972
final ResourceID resourceId,
@@ -73,7 +76,8 @@ public TaskExecutorRegistration(
7376
final TaskExecutorMemoryConfiguration memoryConfiguration,
7477
final ResourceProfile defaultSlotResourceProfile,
7578
final ResourceProfile totalResourceProfile,
76-
final String nodeId) {
79+
final String nodeId,
80+
final int numberSlots) {
7781
this.taskExecutorAddress = checkNotNull(taskExecutorAddress);
7882
this.resourceId = checkNotNull(resourceId);
7983
this.dataPort = dataPort;
@@ -83,6 +87,7 @@ public TaskExecutorRegistration(
8387
this.defaultSlotResourceProfile = checkNotNull(defaultSlotResourceProfile);
8488
this.totalResourceProfile = checkNotNull(totalResourceProfile);
8589
this.nodeId = checkNotNull(nodeId);
90+
this.numberSlots = numberSlots;
8691
}
8792

8893
public String getTaskExecutorAddress() {
@@ -120,4 +125,8 @@ public ResourceProfile getTotalResourceProfile() {
120125
public String getNodeId() {
121126
return nodeId;
122127
}
128+
129+
public int getNumberSlots() {
130+
return numberSlots;
131+
}
123132
}

flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/WorkerRegistration.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ public class WorkerRegistration<WorkerType extends ResourceIDRetrievable>
4545

4646
private final String nodeId;
4747

48+
private final int numberSlots;
49+
4850
public WorkerRegistration(
4951
TaskExecutorGateway taskExecutorGateway,
5052
WorkerType worker,
@@ -54,7 +56,8 @@ public WorkerRegistration(
5456
TaskExecutorMemoryConfiguration memoryConfiguration,
5557
ResourceProfile totalResourceProfile,
5658
ResourceProfile defaultSlotResourceProfile,
57-
String nodeId) {
59+
String nodeId,
60+
int numberSlots) {
5861

5962
super(worker.getResourceID(), taskExecutorGateway);
6063

@@ -66,6 +69,7 @@ public WorkerRegistration(
6669
this.totalResourceProfile = Preconditions.checkNotNull(totalResourceProfile);
6770
this.defaultSlotResourceProfile = Preconditions.checkNotNull(defaultSlotResourceProfile);
6871
this.nodeId = Preconditions.checkNotNull(nodeId);
72+
this.numberSlots = numberSlots;
6973
}
7074

7175
public WorkerType getWorker() {
@@ -99,4 +103,8 @@ public ResourceProfile getTotalResourceProfile() {
99103
public String getNodeId() {
100104
return nodeId;
101105
}
106+
107+
public int getNumberSlots() {
108+
return numberSlots;
109+
}
102110
}

flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1568,7 +1568,8 @@ private void connectToResourceManager() {
15681568
memoryConfiguration,
15691569
taskManagerConfiguration.getDefaultSlotResourceProfile(),
15701570
taskManagerConfiguration.getTotalResourceProfile(),
1571-
unresolvedTaskManagerLocation.getNodeId());
1571+
unresolvedTaskManagerLocation.getNodeId(),
1572+
taskManagerConfiguration.getNumberSlots());
15721573

15731574
resourceManagerConnection =
15741575
new TaskExecutorToResourceManagerConnection(

flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerPartitionLifecycleTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,8 @@ static void registerTaskExecutor(
187187
1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L),
188188
ResourceProfile.ZERO,
189189
ResourceProfile.ZERO,
190-
taskExecutorAddress);
190+
taskExecutorAddress,
191+
1);
191192
final CompletableFuture<RegistrationResponse> registrationFuture =
192193
resourceManagerGateway.registerTaskExecutor(
193194
taskExecutorRegistration, TestingUtils.TIMEOUT);

flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,8 @@ void testDelayedRegisterTaskExecutor() throws Exception {
222222
1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L),
223223
DEFAULT_SLOT_PROFILE,
224224
DEFAULT_SLOT_PROFILE,
225-
taskExecutorGateway.getAddress());
225+
taskExecutorGateway.getAddress(),
226+
1);
226227

227228
CompletableFuture<RegistrationResponse> firstFuture =
228229
rmGateway.registerTaskExecutor(taskExecutorRegistration, fastTimeout);
@@ -287,7 +288,8 @@ void testDisconnectTaskExecutor() throws Exception {
287288
1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L),
288289
DEFAULT_SLOT_PROFILE,
289290
DEFAULT_SLOT_PROFILE.multiply(numberSlots),
290-
taskExecutorGateway.getAddress());
291+
taskExecutorGateway.getAddress(),
292+
numberSlots);
291293
final RegistrationResponse registrationResponse =
292294
rmGateway.registerTaskExecutor(taskExecutorRegistration, TIMEOUT).get();
293295
assertThat(registrationResponse).isInstanceOf(TaskExecutorRegistrationSuccess.class);
@@ -364,7 +366,8 @@ private CompletableFuture<RegistrationResponse> registerTaskExecutor(
364366
1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L),
365367
DEFAULT_SLOT_PROFILE,
366368
DEFAULT_SLOT_PROFILE,
367-
taskExecutorAddress),
369+
taskExecutorAddress,
370+
1),
368371
TIMEOUT);
369372
}
370373
}

flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -254,7 +254,8 @@ private void registerTaskExecutor(
254254
1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L),
255255
ResourceProfile.ZERO,
256256
ResourceProfile.ZERO,
257-
taskExecutorAddress);
257+
taskExecutorAddress,
258+
1);
258259
final CompletableFuture<RegistrationResponse> registrationFuture =
259260
resourceManagerGateway.registerTaskExecutor(
260261
taskExecutorRegistration, TestingUtils.TIMEOUT);
@@ -767,7 +768,8 @@ private void registerTaskExecutorAndSlot(
767768
1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L),
768769
ResourceProfile.fromResources(1, 1024),
769770
ResourceProfile.fromResources(1, 1024).multiply(slotCount),
770-
taskExecutorGateway.getAddress());
771+
taskExecutorGateway.getAddress(),
772+
slotCount);
771773
RegistrationResponse registrationResult =
772774
resourceManagerGateway
773775
.registerTaskExecutor(taskExecutorRegistration, TestingUtils.TIMEOUT)

flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManagerTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1245,7 +1245,8 @@ CompletableFuture<RegistrationResponse> registerTaskExecutor(
12451245
TESTING_CONFIG,
12461246
ResourceProfile.ZERO,
12471247
ResourceProfile.ZERO,
1248-
resourceID.toString());
1248+
resourceID.toString(),
1249+
1);
12491250

12501251
return resourceManager
12511252
.getSelfGateway(ResourceManagerGateway.class)

flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorRecoveryTest.java

Lines changed: 49 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,15 @@
2626
import org.apache.flink.runtime.clusterframework.types.AllocationID;
2727
import org.apache.flink.runtime.clusterframework.types.ResourceID;
2828
import org.apache.flink.runtime.clusterframework.types.SlotID;
29+
import org.apache.flink.runtime.entrypoint.ClusterInformation;
2930
import org.apache.flink.runtime.entrypoint.WorkingDirectory;
3031
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
32+
import org.apache.flink.runtime.instance.InstanceID;
3133
import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGateway;
3234
import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder;
3335
import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
3436
import org.apache.flink.runtime.messages.Acknowledge;
37+
import org.apache.flink.runtime.resourcemanager.TaskExecutorRegistration;
3538
import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
3639
import org.apache.flink.runtime.rpc.TestingRpcService;
3740
import org.apache.flink.runtime.rpc.TestingRpcServiceExtension;
@@ -63,8 +66,19 @@ class TaskExecutorRecoveryTest {
6366
new EachCallbackWrapper<>(rpcServiceExtension);
6467

6568
@Test
66-
void testRecoveredTaskExecutorWillRestoreAllocationState(@TempDir File tempDir)
67-
throws Exception {
69+
void testRecoveredTaskExecutorWillRestoreAllocationStateWithFixedSlotRequest(
70+
@TempDir File tempDir) throws Exception {
71+
testRecoveredTaskExecutorWillRestoreAllocationState(tempDir, false);
72+
}
73+
74+
@Test
75+
void testRecoveredTaskExecutorWillRestoreAllocationStateWithDynamicSlotRequest(
76+
@TempDir File tempDir) throws Exception {
77+
testRecoveredTaskExecutorWillRestoreAllocationState(tempDir, true);
78+
}
79+
80+
private void testRecoveredTaskExecutorWillRestoreAllocationState(
81+
File tempDir, boolean useDynamicRequest) throws Exception {
6882
final ResourceID resourceId = ResourceID.generate();
6983

7084
final Configuration configuration = new Configuration();
@@ -82,6 +96,20 @@ void testRecoveredTaskExecutorWillRestoreAllocationState(@TempDir File tempDir)
8296
return CompletableFuture.completedFuture(Acknowledge.get());
8397
});
8498

99+
final ArrayBlockingQueue<TaskExecutorRegistration> taskExecutorRegistrations =
100+
new ArrayBlockingQueue<>(2);
101+
102+
testingResourceManagerGateway.setRegisterTaskExecutorFunction(
103+
taskExecutorRegistration -> {
104+
taskExecutorRegistrations.offer(taskExecutorRegistration);
105+
return CompletableFuture.completedFuture(
106+
new TaskExecutorRegistrationSuccess(
107+
new InstanceID(),
108+
taskExecutorRegistration.getResourceId(),
109+
new ClusterInformation("localhost", 1234),
110+
null));
111+
});
112+
85113
final TestingRpcService rpcService = rpcServiceExtension.getTestingRpcService();
86114
rpcService.registerGateway(
87115
testingResourceManagerGateway.getAddress(), testingResourceManagerGateway);
@@ -118,8 +146,14 @@ void testRecoveredTaskExecutorWillRestoreAllocationState(@TempDir File tempDir)
118146

119147
assertThat(slotReport.getNumSlotStatus(), is(2));
120148

149+
final TaskExecutorRegistration taskExecutorRegistration = taskExecutorRegistrations.take();
150+
assertThat(taskExecutorRegistration.getNumberSlots(), is(2));
151+
121152
final SlotStatus slotStatus = slotReport.iterator().next();
122-
final SlotID allocatedSlotID = slotStatus.getSlotID();
153+
final SlotID allocatedSlotID =
154+
useDynamicRequest
155+
? SlotID.getDynamicSlotID(slotStatus.getSlotID().getResourceID())
156+
: slotStatus.getSlotID();
123157

124158
final AllocationID allocationId = new AllocationID();
125159
taskExecutorGateway
@@ -160,16 +194,26 @@ void testRecoveredTaskExecutorWillRestoreAllocationState(@TempDir File tempDir)
160194
recoveredTaskExecutor.start();
161195

162196
final TaskExecutorSlotReport recoveredSlotReport = queue.take();
163-
197+
final int expectedNumberOfSlots = useDynamicRequest ? 3 : 2;
198+
assertThat(
199+
recoveredSlotReport.getSlotReport().getNumSlotStatus(), is(expectedNumberOfSlots));
164200
for (SlotStatus status : recoveredSlotReport.getSlotReport()) {
165-
if (status.getSlotID().equals(allocatedSlotID)) {
201+
boolean isAllocatedSlot =
202+
useDynamicRequest
203+
? status.getSlotID().getSlotNumber() == 2
204+
: status.getSlotID().equals(allocatedSlotID);
205+
if (isAllocatedSlot) {
166206
assertThat(status.getJobID(), is(jobId));
167207
assertThat(status.getAllocationID(), is(allocationId));
168208
} else {
169209
assertThat(status.getJobID(), is(nullValue()));
170210
}
171211
}
172212

213+
final TaskExecutorRegistration recoveredTaskExecutorRegistration =
214+
taskExecutorRegistrations.take();
215+
assertThat(recoveredTaskExecutorRegistration.getNumberSlots(), is(2));
216+
173217
final Collection<SlotOffer> take = offeredSlots.take();
174218

175219
assertThat(take, hasSize(1));

flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnectionTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,8 @@ void testResourceManagerRegistrationIsRejected() {
140140
TASK_MANAGER_MEMORY_CONFIGURATION,
141141
ResourceProfile.ZERO,
142142
ResourceProfile.ZERO,
143-
TASK_MANAGER_NODE_ID);
143+
TASK_MANAGER_NODE_ID,
144+
1);
144145
return new TaskExecutorToResourceManagerConnection(
145146
LOGGER,
146147
rpcService,

0 commit comments

Comments
 (0)