Skip to content

Commit 9da4f23

Browse files
author
Aitozi
committed
[FRS-66] Introduce shuffleworker deployed by statefulset
1 parent 9f6d4e0 commit 9da4f23

File tree

9 files changed

+530
-24
lines changed

9 files changed

+530
-24
lines changed

shuffle-core/src/main/java/com/alibaba/flink/shuffle/core/config/KubernetesOptions.java

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,18 @@
1818

1919
import com.alibaba.flink.shuffle.common.config.ConfigOption;
2020

21+
import org.slf4j.Logger;
22+
import org.slf4j.LoggerFactory;
23+
2124
import java.util.Collections;
2225
import java.util.List;
2326
import java.util.Map;
2427

2528
/** This class holds configuration constants used by the remote shuffle deployment. */
2629
public class KubernetesOptions {
2730

31+
private static final Logger LOG = LoggerFactory.getLogger(KubernetesOptions.class);
32+
2833
// --------------------------------------------------------------------------------------------
2934
// Common configurations.
3035
// --------------------------------------------------------------------------------------------
@@ -277,6 +282,41 @@ public class KubernetesOptions {
277282
+ "value:value1,effect:NoSchedule;key:key2,operator:Exists,"
278283
+ "effect:NoExecute,tolerationSeconds:6000.");
279284

285+
/** The number of shuffler worker. */
286+
public static final ConfigOption<Integer> SHUFFLE_WORKER_REPLICAS =
287+
new ConfigOption<Integer>("remote-shuffle.kubernetes.worker.replicas")
288+
.defaultValue(1)
289+
.description("The number of shuffle worker");
290+
291+
/** The access mode of the shuffle worker's pvc. */
292+
public static final ConfigOption<List<String>> SHUFFLE_WORKER_PVC_ACCESS_MODE =
293+
new ConfigOption<List<String>>("remote-shuffle.kubernetes.worker.pvc.access-mode")
294+
.defaultValue(Collections.singletonList("ReadWriteOnce"))
295+
.description("The access mode of the shuffle worker's pvc");
296+
297+
/** The storage class of the pvc. */
298+
public static final ConfigOption<String> SHUFFLE_WORKER_PVC_STORAGE_CLASS =
299+
new ConfigOption<String>("remote-shuffle.kubernetes.worker.pvc.storage-class")
300+
.defaultValue("alibabacloud-filesystem-ssd")
301+
.description("The storage class of the pvc");
302+
303+
/** The storage size of each pvc. */
304+
public static final ConfigOption<String> SHUFFLE_WORKER_PVC_STORAGE_SIZE =
305+
new ConfigOption<String>("remote-shuffle.kubernetes.worker.pvc.storage.size")
306+
.defaultValue("100Gi")
307+
.description("The storage size of each pvc.");
308+
309+
/** The mount path of the worker pvc. */
310+
public static final ConfigOption<String> SHUFFLE_WORKER_PVC_MOUNT_PATH =
311+
new ConfigOption<String>("remote-shuffle.kubernetes.worker.pvc.mount.path")
312+
.defaultValue("/shuffle-data-pvc")
313+
.description("The mount path of the worker pvc");
314+
315+
public static final ConfigOption<String> SHUFFLE_WORKER_DEPLOY_MODE =
316+
new ConfigOption<String>("remote-shuffle.kubernetes.worker.deploy.mode")
317+
.defaultValue(WorkerMode.DAEMON_SETS.name())
318+
.description("The deploy mode of the shuffle worker");
319+
280320
/**
281321
* The prefix of Kubernetes resource limit factor. It should not be less than 1. The resource
282322
* could be cpu, memory, ephemeral-storage and all other types supported by Kubernetes.
@@ -291,6 +331,20 @@ public class KubernetesOptions {
291331
public static final String SHUFFLE_WORKER_RESOURCE_LIMIT_FACTOR_PREFIX =
292332
"remote-shuffle.kubernetes.worker.limit-factor.";
293333

334+
/** The mode of the shuffle worker. */
335+
public enum WorkerMode {
336+
STATEFUL_SETS,
337+
DAEMON_SETS;
338+
339+
public static WorkerMode fromString(String value) {
340+
try {
341+
return WorkerMode.valueOf(value);
342+
} catch (Exception e) {
343+
LOG.warn("Failed to parse the worker mode: " + value, e);
344+
return DAEMON_SETS;
345+
}
346+
}
347+
}
294348
// ------------------------------------------------------------------------
295349

296350
/** Not intended to be instantiated. */

shuffle-kubernetes-operator/src/main/java/com/alibaba/flink/shuffle/kubernetes/operator/parameters/KubernetesShuffleWorkerParameters.java

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import com.alibaba.flink.shuffle.kubernetes.operator.util.Constants;
2525
import com.alibaba.flink.shuffle.kubernetes.operator.util.KubernetesUtils;
2626

27+
import io.fabric8.kubernetes.api.model.Quantity;
2728
import io.fabric8.kubernetes.api.model.apps.DaemonSet;
2829

2930
import java.util.ArrayList;
@@ -41,7 +42,7 @@
4142
* ShuffleWorkers configuration to kubernetes {@link DaemonSet} configuration.
4243
*/
4344
public class KubernetesShuffleWorkerParameters extends AbstractKubernetesParameters
44-
implements KubernetesDaemonSetParameters {
45+
implements KubernetesDaemonSetParameters, KubernetesStatefulSetParameters {
4546

4647
private final ShuffleWorkerProcessSpec shuffleWorkerProcessSpec;
4748

@@ -189,8 +190,41 @@ public String getDaemonSetName() {
189190
return KubernetesUtils.getShuffleWorkersNameWithClusterId(getClusterId());
190191
}
191192

193+
@Override
194+
public String getStatefulSetName() {
195+
return KubernetesUtils.getShuffleWorkersNameWithClusterId(getClusterId());
196+
}
197+
192198
@Override
193199
public KubernetesPodParameters getPodTemplateParameters() {
194200
return this;
195201
}
202+
203+
@Override
204+
public int getReplicas() {
205+
return conf.getInteger(KubernetesOptions.SHUFFLE_WORKER_REPLICAS);
206+
}
207+
208+
@Override
209+
public Map<String, Quantity> getStorageResource() {
210+
String size = conf.getString(KubernetesOptions.SHUFFLE_WORKER_PVC_STORAGE_SIZE);
211+
Map<String, Quantity> resource = new HashMap<>();
212+
resource.put(Constants.RESOURCE_NAME_STORAGE, new Quantity(size));
213+
return resource;
214+
}
215+
216+
@Override
217+
public String getStorageClass() {
218+
return conf.getString(KubernetesOptions.SHUFFLE_WORKER_PVC_STORAGE_CLASS);
219+
}
220+
221+
@Override
222+
public List<String> getAccessMode() {
223+
return conf.getList(KubernetesOptions.SHUFFLE_WORKER_PVC_ACCESS_MODE, String.class);
224+
}
225+
226+
@Override
227+
public String getMountPath() {
228+
return conf.getString(KubernetesOptions.SHUFFLE_WORKER_PVC_MOUNT_PATH);
229+
}
196230
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Copyright 2021 The Flink Remote Shuffle Project
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.alibaba.flink.shuffle.kubernetes.operator.parameters;
18+
19+
import io.fabric8.kubernetes.api.model.Quantity;
20+
21+
import java.util.List;
22+
import java.util.Map;
23+
24+
/** The parameters for StatefulSet. */
25+
public interface KubernetesStatefulSetParameters extends KubernetesCommonParameters {
26+
27+
String getStatefulSetName();
28+
29+
KubernetesPodParameters getPodTemplateParameters();
30+
31+
int getReplicas();
32+
33+
Map<String, Quantity> getStorageResource();
34+
35+
String getStorageClass();
36+
37+
List<String> getAccessMode();
38+
39+
String getMountPath();
40+
}

shuffle-kubernetes-operator/src/main/java/com/alibaba/flink/shuffle/kubernetes/operator/reconciler/RemoteShuffleApplicationReconciler.java

Lines changed: 55 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,18 @@
1919
import com.alibaba.flink.shuffle.common.config.Configuration;
2020
import com.alibaba.flink.shuffle.common.functions.RunnableWithException;
2121
import com.alibaba.flink.shuffle.core.config.ClusterOptions;
22+
import com.alibaba.flink.shuffle.core.config.KubernetesOptions;
2223
import com.alibaba.flink.shuffle.kubernetes.operator.crd.RemoteShuffleApplication;
2324
import com.alibaba.flink.shuffle.kubernetes.operator.parameters.K8sRemoteShuffleFileConfigsParameters;
2425
import com.alibaba.flink.shuffle.kubernetes.operator.parameters.KubernetesConfigMapParameters;
2526
import com.alibaba.flink.shuffle.kubernetes.operator.parameters.KubernetesDaemonSetParameters;
2627
import com.alibaba.flink.shuffle.kubernetes.operator.parameters.KubernetesShuffleManagerParameters;
2728
import com.alibaba.flink.shuffle.kubernetes.operator.parameters.KubernetesShuffleWorkerParameters;
29+
import com.alibaba.flink.shuffle.kubernetes.operator.parameters.KubernetesStatefulSetParameters;
2830
import com.alibaba.flink.shuffle.kubernetes.operator.resources.KubernetesConfigMapBuilder;
2931
import com.alibaba.flink.shuffle.kubernetes.operator.resources.KubernetesDaemonSetBuilder;
3032
import com.alibaba.flink.shuffle.kubernetes.operator.resources.KubernetesDeploymentBuilder;
33+
import com.alibaba.flink.shuffle.kubernetes.operator.resources.KubernetesStatefulSetBuilder;
3134
import com.alibaba.flink.shuffle.kubernetes.operator.util.Constants;
3235
import com.alibaba.flink.shuffle.kubernetes.operator.util.KubernetesInternalOptions;
3336
import com.alibaba.flink.shuffle.kubernetes.operator.util.KubernetesUtils;
@@ -36,6 +39,7 @@
3639
import io.fabric8.kubernetes.api.model.HasMetadata;
3740
import io.fabric8.kubernetes.api.model.apps.DaemonSet;
3841
import io.fabric8.kubernetes.api.model.apps.Deployment;
42+
import io.fabric8.kubernetes.api.model.apps.StatefulSet;
3943
import io.fabric8.kubernetes.client.KubernetesClient;
4044
import org.slf4j.Logger;
4145
import org.slf4j.LoggerFactory;
@@ -48,7 +52,7 @@
4852

4953
/**
5054
* The {@link RemoteShuffleApplicationReconciler} is responsible for reconciling {@link
51-
* RemoteShuffleApplication} to the desired state which is described by {@link
55+
* RemoteShuffleApplication} to the desired state which is described by {@code
5256
* RemoteShuffleApplication#spec}.
5357
*/
5458
public class RemoteShuffleApplicationReconciler {
@@ -130,27 +134,56 @@ private void reconcileShuffleManager(Configuration configuration, HasMetadata ow
130134
*/
131135
private void reconcileShuffleWorkers(Configuration configuration, HasMetadata owner) {
132136

133-
KubernetesDaemonSetParameters shuffleWorkerParameters =
134-
new KubernetesShuffleWorkerParameters(configuration);
135-
DaemonSet daemonSet =
136-
new KubernetesDaemonSetBuilder()
137-
.buildKubernetesResourceFrom(shuffleWorkerParameters);
138-
KubernetesUtils.setOwnerReference(daemonSet, owner);
139-
140-
LOG.info("Reconcile shuffle workers {}.", daemonSet.getMetadata().getName());
141-
executeReconcileWithRetry(
142-
() -> {
143-
LOG.debug("Try to create or update DaemonSet {}.", daemonSet.toString());
144-
this.kubeClient
145-
.apps()
146-
.daemonSets()
147-
.inNamespace(
148-
checkNotNull(
149-
configuration.getString(
150-
KubernetesInternalOptions.NAMESPACE)))
151-
.createOrReplace(daemonSet);
152-
},
153-
daemonSet.getMetadata().getName());
137+
KubernetesOptions.WorkerMode mode =
138+
KubernetesOptions.WorkerMode.fromString(
139+
configuration.getString(KubernetesOptions.SHUFFLE_WORKER_DEPLOY_MODE));
140+
141+
switch (mode) {
142+
case STATEFUL_SETS:
143+
KubernetesStatefulSetParameters statefulSetParameters =
144+
new KubernetesShuffleWorkerParameters(configuration);
145+
StatefulSet statefulSet =
146+
KubernetesStatefulSetBuilder.INSTANCE.buildKubernetesResourceFrom(
147+
statefulSetParameters);
148+
KubernetesUtils.setOwnerReference(statefulSet, owner);
149+
LOG.info("Reconcile shuffle workers {}.", statefulSet.getMetadata().getName());
150+
executeReconcileWithRetry(
151+
() -> {
152+
LOG.debug("Try to create or update StatefulSet {}.", statefulSet);
153+
this.kubeClient
154+
.apps()
155+
.statefulSets()
156+
.inNamespace(
157+
checkNotNull(
158+
configuration.getString(
159+
KubernetesInternalOptions.NAMESPACE)))
160+
.createOrReplace(statefulSet);
161+
},
162+
statefulSet.getMetadata().getName());
163+
break;
164+
case DAEMON_SETS:
165+
KubernetesDaemonSetParameters shuffleWorkerParameters =
166+
new KubernetesShuffleWorkerParameters(configuration);
167+
DaemonSet daemonSet =
168+
new KubernetesDaemonSetBuilder()
169+
.buildKubernetesResourceFrom(shuffleWorkerParameters);
170+
KubernetesUtils.setOwnerReference(daemonSet, owner);
171+
LOG.info("Reconcile shuffle workers {}.", daemonSet.getMetadata().getName());
172+
executeReconcileWithRetry(
173+
() -> {
174+
LOG.debug(
175+
"Try to create or update DaemonSet {}.", daemonSet.toString());
176+
this.kubeClient
177+
.apps()
178+
.daemonSets()
179+
.inNamespace(
180+
checkNotNull(
181+
configuration.getString(
182+
KubernetesInternalOptions.NAMESPACE)))
183+
.createOrReplace(daemonSet);
184+
},
185+
daemonSet.getMetadata().getName());
186+
}
154187
}
155188

156189
/**

0 commit comments

Comments
 (0)