Skip to content

Commit e0d31fd

Browse files
authored
feat:bandwidth-aware scheduling (#883)
Signed-off-by: Yuqi Wu <wuyuqi22@mails.ucas.ac.cn>
1 parent 41a03eb commit e0d31fd

File tree

18 files changed

+1095
-495
lines changed

18 files changed

+1095
-495
lines changed

cmd/kuscia/confloader/kuscia_config.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,9 @@ func (lite *LiteKusciaConfig) OverwriteKusciaConfig(kusciaConfig *KusciaConfig)
199199
if lite.ReservedResources.Memory != "" {
200200
kusciaConfig.Agent.ReservedResources.Memory = lite.ReservedResources.Memory
201201
}
202+
if lite.ReservedResources.Bandwidth != "" {
203+
kusciaConfig.Agent.ReservedResources.Bandwidth = lite.ReservedResources.Bandwidth
204+
}
202205

203206
for _, p := range lite.Agent.Plugins {
204207
for j, pp := range kusciaConfig.Agent.Plugins {
@@ -278,6 +281,9 @@ func (autonomy *AutonomyKusciaConfig) OverwriteKusciaConfig(kusciaConfig *Kuscia
278281
if autonomy.ReservedResources.Memory != "" {
279282
kusciaConfig.Agent.ReservedResources.Memory = autonomy.ReservedResources.Memory
280283
}
284+
if autonomy.ReservedResources.Bandwidth != "" {
285+
kusciaConfig.Agent.ReservedResources.Bandwidth = autonomy.ReservedResources.Bandwidth
286+
}
281287

282288
for _, p := range autonomy.Agent.Plugins {
283289
for j, pp := range kusciaConfig.Agent.Plugins {

docs/locales/en/LC_MESSAGES/reference/apis/kusciajob_cn.po

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -845,6 +845,14 @@ msgstr "memory"
845845
msgid "参与方可用内存资源上限"
846846
msgstr "Maximum available memory resources for the party"
847847

848+
#: ../../reference/apis/kusciajob_cn.md:121
849+
msgid "kuscia.io/bandwidth"
850+
msgstr "kuscia.io/bandwidth"
851+
852+
#: ../../reference/apis/kusciajob_cn.md:121
853+
msgid "参与方可用网络带宽下限"
854+
msgstr "Minimum available network bandwidth for the party"
855+
848856
#: ../../reference/apis/kusciajob_cn.md:723
849857
msgid "PartyStatus"
850858
msgstr "PartyStatus"

docs/locales/en/LC_MESSAGES/reference/concepts/kusciajob_cn.po

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -596,12 +596,17 @@ msgstr ""
596596
"`tasks[].parties[].role`: Role of the task participant, defined by the engine. Common examples include Host and Guest. Kuscia combines this with the role field in [appImage](./appimage_cn.md#appimage-ref) to select the appropriate deployment template for launching the engine."
597597

598598
#: ../../reference/concepts/kusciajob_cn.md:396
599-
msgid ""
600-
"`tasks[].parties[].resources`:表示任务参与方的资源信息,详见 [K8s "
601-
"资源要求和限制](https://kubernetes.io/docs/concepts/configuration/manage-"
602-
"resources-containers/)。"
603-
msgstr ""
604-
"`tasks[].parties[].resources`: Resource information for the task participant. See [K8s Resource Requirements and Limits](https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/) for details."
599+
msgid "`tasks[].parties[].resources`:表示任务参与方的资源信息,详见 [K8s 资源要求和限制](https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/)。 除了 K8s 原生的 `cpu`、`memory` 等资源字段外,Kuscia 还扩展了带宽资源字段,用于调度时感知节点带宽能力:"
600+
msgstr "`tasks[].parties[].resources`: Specifies resource information for each party in a task. See [K8s resource requests and limits](https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/). In addition to native K8s fields such as `cpu` and `memory`, Kuscia extends bandwidth resource fields to enable the scheduler to be aware of node bandwidth capacity:"
601+
602+
msgid "`kuscia.io/bandwidth`:表示任务对网络带宽的请求与限制。该字段与 cpu、memory 一样参与调度决策。填写时只需数值,不需要写单位;单位默认为 Mbps。"
603+
msgstr "`kuscia.io/bandwidth`: Specifies the task’s request and limit for network bandwidth. This field participates in scheduling decisions in the same way as cpu and memory. Only a numeric value should be provided, without units; the unit is implicitly Mbps."
604+
605+
msgid "`requests.kuscia.io/bandwidth`:表示任务运行所需的最小带宽需求。例如 `10`,则调度器只会把任务分配到具备 ≥ 10 Mbps 可用带宽容量的节点上。"
606+
msgstr "`requests.kuscia.io/bandwidth`: Specifies the minimum bandwidth required for task execution. For example, with `10`, the scheduler will only assign tasks to nodes with ≥ 10 Mbps of available bandwidth capacity."
607+
608+
msgid "`limits.kuscia.io/bandwidth`:对于自定义资源,limits应与requests保持一致。"
609+
msgstr "`limits.kuscia.io/bandwidth`: For custom resources, limits should be set equal to requests."
605610

606611
#: ../../reference/concepts/kusciajob_cn.md:397
607612
msgid "`tasks[].parties[].bandwidthLimits`:任务参与方的带宽限制信息。"

docs/reference/apis/kusciajob_cn.md

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -902,10 +902,11 @@ curl -k -X POST 'https://localhost:8082/api/v1/job/cancel' \
902902

903903
### JobResource
904904

905-
| 字段 | 类型 | 选填 | 描述 |
906-
|--------|------------|------|------|
907-
| cpu | string | 可选 | 参与方可用 CPU 资源上限 |
908-
| memory | string | 可选 | 参与方可用内存资源上限 |
905+
| 字段 | 类型 | 选填 | 描述 |
906+
|---------------------|--------|------|-----------------|
907+
| cpu | string | 可选 | 参与方可用 CPU 资源上限 |
908+
| memory | string | 可选 | 参与方可用内存资源上限 |
909+
| kuscia.io/bandwidth | string | 可选 | 参与方可用网络带宽下限 |
909910

910911
{#party-status}
911912

docs/reference/concepts/kusciajob_cn.md

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -334,15 +334,15 @@ spec:
334334
parties:
335335
- domainID: alice
336336
resources:
337-
limits: {"cpu": 2, "memory": "4Gi"}
338-
requests: {"cpu": 1, "memory": "2Gi"}
337+
limits: {"cpu": 2, "memory": "4Gi", "kuscia.io/bandwidth": "10"}
338+
requests: {"cpu": 1, "memory": "2Gi", "kuscia.io/bandwidth": "10"}
339339
bandwidthLimits:
340340
- destinationID: bob
341341
limitKBps: 1000
342342
- domainID: bob
343343
resources:
344-
limits: {"cpu": 2, "memory": "4Gi"}
345-
requests: {"cpu": 1, "memory": "2Gi"}
344+
limits: {"cpu": 2, "memory": "4Gi", "kuscia.io/bandwidth": "20"}
345+
requests: {"cpu": 1, "memory": "2Gi", "kuscia.io/bandwidth": "20"}
346346
bandwidthLimits:
347347
- destinationID: alice
348348
limitKBps: 1000
@@ -388,7 +388,10 @@ KusciaJob `spec` 的子字段详细介绍如下:
388388
- `tasks[].parties`:表示任务参与方的信息。
389389
- `tasks[].parties[].domainID`:表示任务参与方的节点 ID。
390390
- `tasks[].parties[].role`:表示任务参与方的角色,这个是由引擎自定义的;比如常见的 Host 、Guest ,Kuscia 会结合 [appImage](./appimage_cn.md#appimage-ref) 中的 role 字段,选择对应的部署模版启动引擎。
391-
- `tasks[].parties[].resources`:表示任务参与方的资源信息,详见 [K8s 资源要求和限制](https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/)。
391+
- `tasks[].parties[].resources`:表示任务参与方的资源信息,详见 [K8s 资源要求和限制](https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/)。 除了 K8s 原生的 `cpu`、`memory` 等资源字段外,Kuscia 还扩展了带宽资源字段,用于调度时感知节点带宽能力:
392+
- `kuscia.io/bandwidth`:表示任务对网络带宽的请求与限制。该字段与 cpu、memory 一样参与调度决策。填写时只需数值,不需要写单位;单位默认为 Mbps。
393+
- `requests.kuscia.io/bandwidth`:表示任务运行所需的最小带宽需求。例如 `10`,则调度器只会把任务分配到具备 ≥ 10 Mbps 可用带宽容量的节点上。
394+
- `limits.kuscia.io/bandwidth`:对于自定义资源,limits应与requests保持一致。
392395
- `tasks[].parties[].bandwidthLimits`:任务参与方的带宽限制信息。
393396
- `tasks[].parties[].bandwidthLimits[].destinationID`:表示限制 `tasks[].parties[].domainID` 发出,目的地为 `destinationID` 的 HTTP 流量 Request/Response 整体带宽。
394397
- `tasks[].parties[].bandwidthLimits[].limitKBps`:表示上述带宽限制的取值,单位为 KiB/s。

etc/conf/kuscia.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ capacity:
5454
memory: # 8Gi
5555
pods: # 500
5656
storage: # 100Gi
57+
bandwidth: # 1000 Custom resource bandwidth, with the unit Mbps. 1000 represents 1000 Mbps.
5758

5859
# Agent image configs
5960
image:

pkg/agent/config/agent_config.go

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,9 @@ const (
4141
defaultK8sClientMaxQPS = 250
4242
defaultPodsCapacity = "500"
4343

44-
DefaultReservedCPU = "0.5"
45-
DefaultReservedMemory = "500Mi"
44+
DefaultReservedCPU = "0.5"
45+
DefaultReservedMemory = "500Mi"
46+
DefaultReservedBandwidth = "10" // Mbps
4647

4748
defaultCRIRemoteEndpoint = "unix:///home/kuscia/containerd/run/containerd.sock"
4849
defaultResolvConfig = "/etc/resolv.conf"
@@ -80,14 +81,16 @@ type AgentLogCfg struct {
8081
type CapacityCfg struct {
8182
CPU string `yaml:"cpu"`
8283
Memory string `yaml:"memory"`
84+
Bandwidth string `yaml:"bandwidth"`
8385
Pods string `yaml:"pods"`
8486
Storage string `yaml:"storage"`
8587
EphemeralStorage string `yaml:"ephemeralStorage"`
8688
}
8789

8890
type ReservedResourcesCfg struct {
89-
CPU string `yaml:"cpu"`
90-
Memory string `yaml:"memory"`
91+
CPU string `yaml:"cpu"`
92+
Memory string `yaml:"memory"`
93+
Bandwidth string `yaml:"bandwidth"`
9194
}
9295

9396
type KubeConnCfg struct {
@@ -290,8 +293,9 @@ func DefaultStaticAgentConfig() *AgentConfig {
290293
Pods: defaultPodsCapacity,
291294
},
292295
ReservedResources: ReservedResourcesCfg{
293-
CPU: DefaultReservedCPU,
294-
Memory: DefaultReservedMemory,
296+
CPU: DefaultReservedCPU,
297+
Memory: DefaultReservedMemory,
298+
Bandwidth: DefaultReservedBandwidth,
295299
},
296300
AllowPrivileged: false,
297301
Log: AgentLogCfg{
Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
// Copyright 2025 Ant Group Co., Ltd.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package bandwidthfilter
16+
17+
import (
18+
"context"
19+
"strings"
20+
21+
corev1 "k8s.io/api/core/v1"
22+
23+
"github.com/secretflow/kuscia/pkg/agent/config"
24+
"github.com/secretflow/kuscia/pkg/agent/middleware/hook"
25+
"github.com/secretflow/kuscia/pkg/agent/middleware/plugin"
26+
"github.com/secretflow/kuscia/pkg/common"
27+
"github.com/secretflow/kuscia/pkg/utils/nlog"
28+
)
29+
30+
func Register() {
31+
plugin.Register(common.PluginNameBandwidthFilter, &bandwidthFilter{})
32+
}
33+
34+
type bandwidthFilterConfig struct {
35+
// Whether to strip bandwidth resources when running in runk; defaults to true if not configured (nil).
36+
StripOnRunk *bool `json:"stripOnRunk" yaml:"stripOnRunk"`
37+
}
38+
39+
type bandwidthFilter struct {
40+
initialized bool
41+
stripOnRunk bool
42+
isRunk bool
43+
}
44+
45+
// Type implements plugin.Plugin.
46+
func (bf *bandwidthFilter) Type() string {
47+
return hook.PluginType
48+
}
49+
50+
// Init implements plugin.Plugin.
51+
// Only registers to the hook system when runtime == runk and stripOnRunk is set to true.
52+
func (bf *bandwidthFilter) Init(ctx context.Context, deps *plugin.Dependencies, cfg *config.PluginCfg) error {
53+
// Default: On
54+
bf.stripOnRunk = true
55+
56+
if cfg != nil {
57+
var c bandwidthFilterConfig
58+
if err := cfg.Config.Decode(&c); err == nil && c.StripOnRunk != nil {
59+
bf.stripOnRunk = *c.StripOnRunk
60+
}
61+
}
62+
63+
rt := ""
64+
if deps != nil && deps.AgentConfig != nil {
65+
rt = strings.ToLower(strings.TrimSpace(deps.AgentConfig.Provider.Runtime))
66+
}
67+
bf.isRunk = rt == config.K8sRuntime
68+
69+
nlog.Infof("[bandwidthfilter] Init: detected runtime=%q (raw=%q), stripOnRunk=%v",
70+
rt, deps.AgentConfig.Provider.Runtime, bf.stripOnRunk)
71+
72+
// If the runtime is not runk, or the configuration disables it, the plugin will not be registered.
73+
if !bf.isRunk || !bf.stripOnRunk {
74+
nlog.Infof("Plugin %s NOT registered (runtime=%q, stripOnRunk=%v)",
75+
common.PluginNameBandwidthFilter, rt, bf.stripOnRunk)
76+
return nil
77+
}
78+
79+
bf.initialized = true
80+
hook.Register(common.PluginNameBandwidthFilter, bf)
81+
nlog.Infof("Plugin %s registered (runtime=%q)", common.PluginNameBandwidthFilter, rt)
82+
return nil
83+
}
84+
85+
// Only executed at the stage “before syncing to the external Kubernetes”.
86+
func (bf *bandwidthFilter) CanExec(ctx hook.Context) bool {
87+
if !bf.initialized {
88+
return false
89+
}
90+
switch ctx.Point() {
91+
case hook.PointK8sProviderSyncPod:
92+
_, ok := ctx.(*hook.K8sProviderSyncPodContext)
93+
return ok
94+
default:
95+
return false
96+
}
97+
}
98+
99+
// ExecHook: strips the bandwidth extended resource from the BkPod before it is dispatched.
100+
func (bf *bandwidthFilter) ExecHook(ctx hook.Context) (*hook.Result, error) {
101+
syncCtx, ok := ctx.(*hook.K8sProviderSyncPodContext)
102+
if !ok || syncCtx.BkPod == nil {
103+
return &hook.Result{}, nil
104+
}
105+
106+
scrub := func(rr *corev1.ResourceRequirements) {
107+
if rr == nil {
108+
return
109+
}
110+
delete(rr.Limits, common.ResourceBandwidth)
111+
delete(rr.Requests, common.ResourceBandwidth)
112+
}
113+
114+
for i := range syncCtx.BkPod.Spec.Containers {
115+
scrub(&syncCtx.BkPod.Spec.Containers[i].Resources)
116+
}
117+
for i := range syncCtx.BkPod.Spec.InitContainers {
118+
scrub(&syncCtx.BkPod.Spec.InitContainers[i].Resources)
119+
}
120+
for i := range syncCtx.BkPod.Spec.EphemeralContainers {
121+
scrub(&syncCtx.BkPod.Spec.EphemeralContainers[i].Resources)
122+
}
123+
if syncCtx.BkPod.Spec.Overhead != nil {
124+
delete(syncCtx.BkPod.Spec.Overhead, common.ResourceBandwidth)
125+
}
126+
127+
return &hook.Result{}, nil
128+
}

0 commit comments

Comments
 (0)