Skip to content

Commit 37b1b23

Browse files
authored
feat(job): add kubeflow PaddleJob (#1150)
* feat(job): add kubeflow PaddleJob * update ut * change the annotation of job KindGroupVersion * minor change
1 parent 89706c4 commit 37b1b23

File tree

11 files changed

+700
-50
lines changed

11 files changed

+700
-50
lines changed

config/server/default/job/job_template.yaml

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,33 @@ spec:
9595
# paddle-collective-job
9696
---
9797
apiVersion: "kubeflow.org/v1"
98+
kind: PaddleJob
99+
metadata:
100+
name: paddle-simple-cpu
101+
namespace: kubeflow
102+
spec:
103+
paddleReplicaSpecs:
104+
Worker:
105+
replicas: 2
106+
restartPolicy: OnFailure
107+
template:
108+
spec:
109+
containers:
110+
- name: paddle
111+
image: registry.baidubce.com/paddlepaddle/paddle:2.4.0rc0-cpu
112+
command:
113+
- python
114+
args:
115+
- "-m"
116+
- paddle.distributed.launch
117+
- "run_check"
118+
ports:
119+
- containerPort: 37777
120+
name: master
121+
imagePullPolicy: Always
122+
# PaddleJob-kubeflow.org/v1-collective
123+
---
124+
apiVersion: "kubeflow.org/v1"
98125
kind: "PyTorchJob"
99126
metadata:
100127
name: "pytorch-dist-sendrecv"
Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
/*
2+
Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserve.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package v1
18+
19+
import (
20+
commonv1 "github.com/kubeflow/common/pkg/apis/common/v1"
21+
autoscalingv2 "k8s.io/api/autoscaling/v2beta2"
22+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
23+
)
24+
25+
const (
26+
// PaddleJobDefaultPortName is name of the port used to communicate between Master and
27+
// workers.
28+
PaddleJobDefaultPortName = "master"
29+
// PaddleJobDefaultContainerName is the name of the PaddleJob container.
30+
PaddleJobDefaultContainerName = "paddle"
31+
// PaddleJobDefaultPort is default value of the port.
32+
PaddleJobDefaultPort = 36543
33+
// PaddleJobDefaultRestartPolicy is default RestartPolicy for PaddleReplicaSpec.
34+
PaddleJobDefaultRestartPolicy = commonv1.RestartPolicyOnFailure
35+
// PaddleJobKind is the kind name.
36+
PaddleJobKind = "PaddleJob"
37+
// PaddleJobPlural is the PaddlePlural for paddleJob.
38+
PaddleJobPlural = "paddlejobs"
39+
// PaddleJobSingular is the singular for paddleJob.
40+
PaddleJobSingular = "paddlejob"
41+
// PaddleJobFrameworkName is the name of the ML Framework
42+
PaddleJobFrameworkName = "paddle"
43+
// PaddleJobReplicaTypeMaster is the type of Master of distributed Paddle
44+
PaddleJobReplicaTypeMaster commonv1.ReplicaType = "Master"
45+
// PaddleJobReplicaTypeWorker is the type for workers of distributed Paddle.
46+
PaddleJobReplicaTypeWorker commonv1.ReplicaType = "Worker"
47+
)
48+
49+
// +genclient
50+
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
51+
// +resource:path=paddlejob
52+
//+kubebuilder:object:root=true
53+
//+kubebuilder:subresource:status
54+
//+kubebuilder:printcolumn:name="State",type=string,JSONPath=`.status.conditions[-1:].type`
55+
//+kubebuilder:printcolumn:name="Age",type=date,JSONPath=`.metadata.creationTimestamp`
56+
// +kubebuilder:subresource:scale:specpath=.spec.paddleReplicaSpecs.Worker.replicas,statuspath=.status.replicaStatuses.Worker.active,selectorpath=.status.replicaStatuses.Worker.selector
57+
58+
// PaddleJob Represents a PaddleJob resource.
59+
type PaddleJob struct {
60+
// Standard Kubernetes type metadata.
61+
metav1.TypeMeta `json:",inline"`
62+
63+
metav1.ObjectMeta `json:"metadata,omitempty"`
64+
65+
// Specification of the desired state of the PaddleJob.
66+
Spec PaddleJobSpec `json:"spec,omitempty"`
67+
68+
// Most recently observed status of the PaddleJob.
69+
// Read-only (modified by the system).
70+
Status commonv1.JobStatus `json:"status,omitempty"`
71+
}
72+
73+
// PaddleJobSpec is a desired state description of the PaddleJob.
74+
type PaddleJobSpec struct {
75+
// RunPolicy encapsulates various runtime policies of the distributed training
76+
// job, for example how to clean up resources and how long the job can stay
77+
// active.
78+
//+kubebuilder:validation:Optional
79+
RunPolicy commonv1.RunPolicy `json:"runPolicy"`
80+
81+
// ElasticPolicy holds the elastic policy for paddle job.
82+
ElasticPolicy *PaddleElasticPolicy `json:"elasticPolicy,omitempty"`
83+
84+
// A map of PaddleReplicaType (type) to ReplicaSpec (value). Specifies the Paddle cluster configuration.
85+
// For example,
86+
// {
87+
// "Master": PaddleReplicaSpec,
88+
// "Worker": PaddleReplicaSpec,
89+
// }
90+
PaddleReplicaSpecs map[commonv1.ReplicaType]*commonv1.ReplicaSpec `json:"paddleReplicaSpecs"`
91+
}
92+
93+
type PaddleElasticPolicy struct {
94+
// minReplicas is the lower limit for the number of replicas to which the training job
95+
// can scale down. It defaults to null.
96+
// +optional
97+
MinReplicas *int32 `json:"minReplicas,omitempty"`
98+
// upper limit for the number of pods that can be set by the autoscaler; cannot be smaller than MinReplicas, defaults to null.
99+
// +optional
100+
MaxReplicas *int32 `json:"maxReplicas,omitempty"`
101+
102+
// MaxRestarts is the limit for restart times of pods in elastic mode.
103+
// +optional
104+
MaxRestarts *int32 `json:"maxRestarts,omitempty"`
105+
106+
// Metrics contains the specifications which are used to calculate the
107+
// desired replica count (the maximum replica count across all metrics will
108+
// be used). The desired replica count is calculated with multiplying the
109+
// ratio between the target value and the current value by the current
110+
// number of pods. Ergo, metrics used must decrease as the pod count is
111+
// increased, and vice-versa. See the individual metric source types for
112+
// more information about how each type of metric must respond.
113+
// If not set, the HPA will not be created.
114+
// +optional
115+
Metrics []autoscalingv2.MetricSpec `json:"metrics,omitempty"`
116+
}
117+
118+
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
119+
// +resource:path=paddlejobs
120+
//+kubebuilder:object:root=true
121+
122+
// PaddleJobList is a list of PaddleJobs.
123+
type PaddleJobList struct {
124+
// Standard type metadata.
125+
metav1.TypeMeta `json:",inline"`
126+
127+
// Standard list metadata.
128+
metav1.ListMeta `json:"metadata,omitempty"`
129+
130+
// List of PaddleJobs.
131+
Items []PaddleJob `json:"items"`
132+
}

pkg/apiserver/controller/job/create.go

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -620,11 +620,13 @@ func getFrameworkRoles(framework schema.Framework) map[schema.MemberRole]int {
620620
func buildJob(request *CreateJobInfo) (*model.Job, error) {
621621
log.Debugf("begin build job with request: %#v", request)
622622
// build main job config
623-
conf := buildMainConf(request)
623+
conf, err := buildMainConf(request)
624+
if err != nil {
625+
return nil, err
626+
}
624627
// convert job members if necessary
625628
var members []schema.Member
626629
var templateJson string
627-
var err error
628630
if len(request.Members) != 0 {
629631
members = buildMembers(request)
630632
log.Debugf("members is %v", members)
@@ -652,7 +654,7 @@ func buildJob(request *CreateJobInfo) (*model.Job, error) {
652654
return jobInfo, nil
653655
}
654656

655-
func buildMainConf(request *CreateJobInfo) *schema.Conf {
657+
func buildMainConf(request *CreateJobInfo) (*schema.Conf, error) {
656658
log.Debugf("buildMainConf request %v", request)
657659
var conf = &schema.Conf{
658660
Name: request.Name,
@@ -681,12 +683,16 @@ func buildMainConf(request *CreateJobInfo) *schema.Conf {
681683
if request.Type == schema.TypeWorkflow {
682684
conf.KindGroupVersion = schema.WorkflowKindGroupVersion
683685
} else {
684-
conf.KindGroupVersion = schema.ToKindGroupVersion("", request.Framework, conf.Annotations)
686+
var err error
687+
conf.KindGroupVersion, err = schema.ToKindGroupVersion("", request.Framework, conf.Annotations)
688+
if err != nil {
689+
return nil, err
690+
}
685691
}
686692

687693
// TODO: remove job mode
688694
conf.SetEnv(schema.EnvJobMode, request.Mode)
689-
return conf
695+
return conf, nil
690696
}
691697

692698
func buildMembers(request *CreateJobInfo) []schema.Member {

pkg/common/k8s/discovery.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ var DiscoveryHandlerFunc = http.HandlerFunc(func(w http.ResponseWriter, req *htt
4848
{Name: "pytorchjobs", Namespaced: true, Kind: "PyTorchJob"},
4949
{Name: "tfjobs", Namespaced: true, Kind: "TFJob"},
5050
{Name: "mpijobs", Namespaced: true, Kind: "MPIJob"},
51+
{Name: "paddlejobs", Namespaced: true, Kind: "PaddleJob"},
5152
},
5253
}
5354
case "/apis/argoproj.io/v1alpha1":

pkg/common/schema/job.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -152,9 +152,8 @@ const (
152152
JobIDLabel = "paddleflow-job-id"
153153
JobTTLSeconds = "paddleflow/job-ttl-seconds"
154154
JobLabelFramework = "paddleflow-job-framework"
155-
// JobKindAnnotation kind for job
156-
JobKindAnnotation = "paddleflow/job-kind"
157-
JobGroupVersionAnnotation = "paddleflow/job-group-version"
155+
// JobKindGroupVersionAnnotation KindGroupVersion for job, format: {kind}.{group}/{version}
156+
JobKindGroupVersionAnnotation = "paddleflow/job-kind-group-version"
158157

159158
VolcanoJobNameLabel = "volcano.sh/job-name"
160159
QueueLabelKey = "volcano.sh/queue-name"

pkg/common/schema/kind_version.go

Lines changed: 38 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -36,43 +36,53 @@ func NewKindGroupVersion(kind, group, version string) KindGroupVersion {
3636
}
3737

3838
func (kv KindGroupVersion) String() string {
39-
return fmt.Sprintf("kind: %s, groupVersion: %s/%s", kv.Kind, kv.Group, kv.APIVersion)
39+
return fmt.Sprintf("kind: %s, groupVersion: %s", kv.Kind, kv.GroupVersion())
4040
}
4141

42-
func ToKindGroupVersion(clusterType string, framework Framework, annotations map[string]string) KindGroupVersion {
42+
func (kv KindGroupVersion) GroupVersion() string {
43+
return fmt.Sprintf("%s/%s", kv.Group, kv.APIVersion)
44+
}
45+
46+
func ToKindGroupVersion(clusterType string, framework Framework, annotations map[string]string) (KindGroupVersion, error) {
4347
// TODO: get KindGroupVersion for different cluster
44-
// get KindGroupVersion from job annotations
45-
kind := annotations[JobKindAnnotation]
46-
groupVersion := annotations[JobGroupVersionAnnotation]
47-
if kind != "" && groupVersion != "" {
48-
group, version, err := FromGroupVersion(groupVersion)
48+
// 1. get KindGroupVersion from default
49+
kindGV := frameworkKindGroupVersionMap[framework]
50+
// 2. get KindGroupVersion from job annotations if set
51+
kindGroupVersionStr := annotations[JobKindGroupVersionAnnotation]
52+
if kindGroupVersionStr != "" {
53+
kv, err := parseKindGroupVersion(kindGroupVersionStr)
4954
if err == nil {
50-
return NewKindGroupVersion(kind, group, version)
55+
kindGV = kv
56+
} else {
57+
return KindGroupVersion{}, err
5158
}
5259
}
53-
// get KindGroupVersion from default
54-
kv, ok := frameworkKindGroupVersionMap[framework]
55-
if !ok {
56-
return KindGroupVersion{}
60+
// 3. check KindGroupVersion
61+
_, find := JobKindGroupVersionMap[kindGV]
62+
if !find {
63+
return KindGroupVersion{}, fmt.Errorf("the KindGroupVersion %s is not supported", kindGV)
5764
}
58-
return kv
65+
return kindGV, nil
5966
}
6067

61-
func FromGroupVersion(groupVersion string) (string, string, error) {
62-
group := ""
63-
version := ""
64-
var err error
65-
items := strings.Split(groupVersion, "/")
66-
switch len(items) {
67-
case 1:
68-
version = items[0]
69-
case 2:
70-
version = items[1]
71-
group = items[0]
72-
default:
73-
err = fmt.Errorf("unexpected GroupVersion string: %v", groupVersion)
68+
// parseKindGroupVersion convert KinGroupVersion str to struct, str format is {kind}.{group}/{version}
69+
func parseKindGroupVersion(kindGVStr string) (KindGroupVersion, error) {
70+
kindGV := KindGroupVersion{}
71+
kindVersion := strings.Split(kindGVStr, "/")
72+
if len(kindVersion) != 2 {
73+
return kindGV, fmt.Errorf("the KindGroupVersion %s is invalid, "+
74+
"and it's format must be {kind}.{group}/{version}", kindGVStr)
75+
}
76+
// get kind and group
77+
kindGroups := strings.Split(kindVersion[0], ".")
78+
if len(kindGroups) < 2 {
79+
return kindGV, fmt.Errorf("the KindGroupVersion %s is invalid, "+
80+
"and it's format must be {kind}.{group}/{version}", kindGVStr)
7481
}
75-
return group, version, err
82+
kindGV.Kind = kindGroups[0]
83+
kindGV.Group = strings.TrimPrefix(kindVersion[0], kindGV.Kind+".")
84+
kindGV.APIVersion = kindVersion[1]
85+
return kindGV, nil
7686
}
7787

7888
var (
@@ -103,6 +113,7 @@ var (
103113
StandaloneKindGroupVersion: true,
104114
SparkKindGroupVersion: true,
105115
PaddleKindGroupVersion: true,
116+
KFPaddleKindGroupVersion: true,
106117
PyTorchKindGroupVersion: true,
107118
TFKindGroupVersion: true,
108119
MXNetKindGroupVersion: true,

0 commit comments

Comments
 (0)