Skip to content

Commit ad8ebfd

Browse files
authored
feat(job): remove useless runtime client interface (#1151)
* feat(job): remove useless runtime client interface * update single job * add ut * fix ut
1 parent 37b1b23 commit ad8ebfd

File tree

19 files changed

+1462
-97
lines changed

19 files changed

+1462
-97
lines changed

pkg/common/k8s/discovery.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ var DiscoveryHandlerFunc = http.HandlerFunc(func(w http.ResponseWriter, req *htt
2525
APIResources: []metav1.APIResource{
2626
{Name: "queues", Namespaced: false, Kind: "Queue"},
2727
{Name: "elasticresourcequotas", Namespaced: false, Kind: "ElasticResourceQuota"},
28+
{Name: "podgroups", Namespaced: true, Kind: "PodGroup"},
2829
},
2930
}
3031
case "/apis/sparkoperator.k8s.io/v1beta2":

pkg/common/schema/job.go

Lines changed: 14 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -28,18 +28,12 @@ type Framework string
2828
type MemberRole string
2929

3030
const (
31-
EnvJobType = "PF_JOB_TYPE"
32-
EnvJobQueueName = "PF_JOB_QUEUE_NAME"
33-
EnvJobQueueID = "PF_JOB_QUEUE_ID"
34-
EnvJobClusterName = "PF_JOB_CLUSTER_NAME"
35-
EnvJobClusterID = "PF_JOB_CLUSTER_ID"
36-
EnvJobNamespace = "PF_JOB_NAMESPACE"
37-
EnvJobUserName = "PF_USER_NAME"
38-
EnvJobFsID = "PF_FS_ID"
39-
EnvJobPVCName = "PF_JOB_PVC_NAME"
40-
EnvJobPriority = "PF_JOB_PRIORITY"
41-
EnvJobMode = "PF_JOB_MODE"
42-
EnvJobFramework = "PF_JOB_FRAMEWORK"
31+
EnvJobType = "PF_JOB_TYPE"
32+
EnvJobQueueName = "PF_JOB_QUEUE_NAME"
33+
EnvJobNamespace = "PF_JOB_NAMESPACE"
34+
EnvJobUserName = "PF_USER_NAME"
35+
EnvJobMode = "PF_JOB_MODE"
36+
EnvJobFramework = "PF_JOB_FRAMEWORK"
4337
// EnvJobYamlPath Additional configuration for a specific job
4438
EnvJobYamlPath = "PF_JOB_YAML_PATH"
4539
EnvIsCustomYaml = "PF_IS_CUSTOM_YAML"
@@ -52,32 +46,17 @@ const (
5246
EnvEnableJobQueueSync = "PF_JOB_QUEUE_SYNC"
5347

5448
// EnvJobModePS env
55-
EnvJobModePS = "PS"
56-
EnvJobPSPort = "PF_JOB_PS_PORT"
57-
EnvJobPServerReplicas = "PF_JOB_PSERVER_REPLICAS"
58-
EnvJobPServerFlavour = "PF_JOB_PSERVER_FLAVOUR"
59-
EnvJobPServerCommand = "PF_JOB_PSERVER_COMMAND"
60-
EnvJobWorkerReplicas = "PF_JOB_WORKER_REPLICAS"
61-
EnvJobWorkerFlavour = "PF_JOB_WORKER_FLAVOUR"
62-
EnvJobWorkerCommand = "PF_JOB_WORKER_COMMAND"
63-
49+
EnvJobModePS = "PS"
6450
// EnvJobModeCollective env
6551
EnvJobModeCollective = "Collective"
66-
EnvJobReplicas = "PF_JOB_REPLICAS"
6752
EnvJobFlavour = "PF_JOB_FLAVOUR"
6853
EnvJobLimitFlavour = "PF_JOB_LIMIT_FLAVOUR"
6954
EnvJobLimitFlavourNone = "NONE"
7055

71-
// EnvJobModePod env reuse EnvJobReplicas and EnvJobFlavour
72-
EnvJobModePod = "Pod"
73-
7456
// spark job env
75-
EnvJobSparkMainFile = "PF_JOB_SPARK_MAIN_FILE"
76-
EnvJobSparkMainClass = "PF_JOB_SPARK_MAIN_CLASS"
77-
EnvJobSparkArguments = "PF_JOB_SPARK_ARGUMENTS"
78-
EnvJobDriverFlavour = "PF_JOB_DRIVER_FLAVOUR"
79-
EnvJobExecutorReplicas = "PF_JOB_EXECUTOR_REPLICAS"
80-
EnvJobExecutorFlavour = "PF_JOB_EXECUTOR_FLAVOUR"
57+
EnvJobSparkMainFile = "PF_JOB_SPARK_MAIN_FILE"
58+
EnvJobSparkMainClass = "PF_JOB_SPARK_MAIN_CLASS"
59+
EnvJobSparkArguments = "PF_JOB_SPARK_ARGUMENTS"
8160

8261
// TODO move to framework
8362
TypeVcJob JobType = "vcjob"
@@ -155,9 +134,10 @@ const (
155134
// JobKindGroupVersionAnnotation KindGroupVersion for job, format: {kind}.{group}/{version}
156135
JobKindGroupVersionAnnotation = "paddleflow/job-kind-group-version"
157136

158-
VolcanoJobNameLabel = "volcano.sh/job-name"
159-
QueueLabelKey = "volcano.sh/queue-name"
160-
SparkAPPJobNameLabel = "sparkoperator.k8s.io/app-name"
137+
VolcanoJobNameLabel = "volcano.sh/job-name"
138+
QueueLabelKey = "volcano.sh/queue-name"
139+
SchedulingQueueLabelKey = "scheduling.volcano.sh/queue-name"
140+
SparkAPPJobNameLabel = "sparkoperator.k8s.io/app-name"
161141

162142
QueueNamespaceAnnotation = "paddleflow/queue-namespace"
163143

pkg/job/api/job_test.go

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*
2+
Copyright (c) 2023 PaddlePaddle Authors. All Rights Reserve.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package api
18+
19+
import (
20+
"fmt"
21+
"testing"
22+
23+
"github.com/stretchr/testify/assert"
24+
)
25+
26+
func TestPFJob(t *testing.T) {
27+
tfjob := PFJob{
28+
ID: "test-id",
29+
Name: "test-name",
30+
Namespace: "default",
31+
}
32+
t.Run("test GetID func", func(t *testing.T) {
33+
assert.Equal(t, tfjob.ID, tfjob.GetID())
34+
})
35+
36+
t.Run("test NamespacedName func", func(t *testing.T) {
37+
namespacedName := fmt.Sprintf("%s/%s", tfjob.Namespace, tfjob.ID)
38+
assert.Equal(t, namespacedName, tfjob.NamespacedName())
39+
})
40+
41+
t.Run("test UpdateLabels", func(t *testing.T) {
42+
tfjob.UpdateLabels(map[string]string{
43+
"a": "b",
44+
})
45+
assert.Equal(t, "b", tfjob.Labels["a"])
46+
})
47+
48+
t.Run("test UpdateAnnotations", func(t *testing.T) {
49+
tfjob.UpdateAnnotations(map[string]string{
50+
"paddleflow/xx": "b",
51+
})
52+
assert.Equal(t, "b", tfjob.Annotations["paddleflow/xx"])
53+
})
54+
55+
t.Run("test UpdateJobPriority", func(t *testing.T) {
56+
newPC := "high"
57+
tfjob.UpdateJobPriority(newPC)
58+
assert.Equal(t, newPC, tfjob.PriorityClassName)
59+
})
60+
}

pkg/job/api/queue_test.go

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
/*
2+
Copyright (c) 2023 PaddlePaddle Authors. All Rights Reserve.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package api
18+
19+
import (
20+
"fmt"
21+
"testing"
22+
"time"
23+
24+
"github.com/stretchr/testify/assert"
25+
)
26+
27+
func TestQueueInfo_JobOrderFn(t *testing.T) {
28+
currentTime := time.Now()
29+
testCases := []struct {
30+
name string
31+
left *PFJob
32+
right *PFJob
33+
wantAns bool
34+
}{
35+
{
36+
name: "left < right",
37+
left: &PFJob{
38+
CreateTime: time.Now(),
39+
},
40+
right: &PFJob{
41+
CreateTime: time.Now().Add(20 * time.Minute),
42+
},
43+
wantAns: true,
44+
},
45+
{
46+
name: "left = right",
47+
left: &PFJob{
48+
ID: "aa",
49+
CreateTime: currentTime,
50+
},
51+
right: &PFJob{
52+
ID: "ab",
53+
CreateTime: currentTime,
54+
},
55+
wantAns: true,
56+
},
57+
}
58+
59+
queueInfo := &QueueInfo{
60+
SortPolicies: []SortPolicy{},
61+
}
62+
for _, tc := range testCases {
63+
t.Run(tc.name, func(t *testing.T) {
64+
ans := queueInfo.JobOrderFn(tc.left, tc.right)
65+
assert.Equal(t, tc.wantAns, ans)
66+
})
67+
}
68+
}
69+
70+
func TestNewRegistry(t *testing.T) {
71+
r := make(Registry)
72+
73+
pc := func(configuration Arguments) (SortPolicy, error) { return nil, nil }
74+
t.Run("test registry", func(t *testing.T) {
75+
err := r.Register("p1", pc)
76+
assert.Equal(t, nil, err)
77+
78+
err = r.Register("p1", pc)
79+
assert.Equal(t, fmt.Errorf("a sort policy named p1 already exists"), err)
80+
81+
err = r.Unregister("p1")
82+
assert.Equal(t, nil, err)
83+
84+
err = r.Unregister("p2")
85+
assert.Equal(t, fmt.Errorf("no sort policy named p2 exists"), err)
86+
})
87+
88+
t.Run("test new registry", func(t *testing.T) {
89+
QueueSortPolicies["priority"] = pc
90+
91+
ans := NewRegistry([]string{"priority"})
92+
assert.Equal(t, 1, len(ans))
93+
})
94+
}

pkg/job/runtime_v2/client/k3s_runtime_client.go

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -316,11 +316,6 @@ func (k3s *K3SRuntimeClient) startDynamicListener(listenerType string, stopCh <-
316316
return err
317317
}
318318

319-
// ListNodeQuota resource api for cluster nodes
320-
func (k3s *K3SRuntimeClient) ListNodeQuota(ctx context.Context) (pfschema.QuotaSummary, []pfschema.NodeQuotaInfo, error) {
321-
return pfschema.QuotaSummary{}, nil, nil
322-
}
323-
324319
func (k3s *K3SRuntimeClient) GetTaskLogV2(namespace, name string, logPage utils.LogPage) ([]pfschema.TaskLogInfo, error) {
325320
return getTaskLogV2(k3s.Client, namespace, name, logPage)
326321
}

pkg/job/runtime_v2/client/k3s_runtime_client_test.go

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -124,16 +124,6 @@ func TestK3SRuntimeClient_Cluster(t *testing.T) {
124124
assert.Contains(t, runtimeClient.ClusterName(), "mockNodeName")
125125
}
126126

127-
func TestK3SRuntimeClient_ListNodeQuota(t *testing.T) {
128-
// todo://add list node quota
129-
var server = httptest.NewServer(k8s.DiscoveryHandlerFunc)
130-
defer server.Close()
131-
runtimeClient := NewFakeK3SRuntimeClient(server)
132-
_, ret2, err := runtimeClient.ListNodeQuota(context.TODO())
133-
assert.Nil(t, ret2)
134-
assert.Nil(t, err)
135-
}
136-
137127
func TestK3SExecutor(t *testing.T) {
138128
var server = httptest.NewServer(k8s.DiscoveryHandlerFunc)
139129
defer server.Close()

pkg/job/runtime_v2/client/kube_runtime_client.go

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -628,11 +628,6 @@ func (krc *KubeRuntimeClient) ClusterID() string {
628628
return clusterID
629629
}
630630

631-
func (krc *KubeRuntimeClient) ListNodeQuota(ctx context.Context) (pfschema.QuotaSummary, []pfschema.NodeQuotaInfo, error) {
632-
// TODO: add ListNodeQuota logic
633-
return pfschema.QuotaSummary{}, []pfschema.NodeQuotaInfo{}, nil
634-
}
635-
636631
func (krc *KubeRuntimeClient) GetGVR(gvk schema.GroupVersionKind) (meta.RESTMapping, error) {
637632
gvr, ok := krc.GVKToGVR.Load(gvk.String())
638633
if ok {

pkg/job/runtime_v2/framework/interface.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,4 @@ type RuntimeClientInterface interface {
8585
RegisterListener(listenerType string, workQueue workqueue.RateLimitingInterface) error
8686

8787
StartListener(listenerType string, stopCh <-chan struct{}) error
88-
89-
// ListNodeQuota resource api for cluster nodes
90-
ListNodeQuota(ctx context.Context) (pfschema.QuotaSummary, []pfschema.NodeQuotaInfo, error)
9188
}

0 commit comments

Comments
 (0)