Skip to content

Commit c2fe4ba

Browse files
authored
feat(region): pod containers start in dependency order (#23668)
1 parent bce7790 commit c2fe4ba

File tree

9 files changed

+429
-10
lines changed

9 files changed

+429
-10
lines changed

pkg/apis/container.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,10 @@ type ContainerSpec struct {
144144
StartupProbe *ContainerProbe `json:"startup_probe,omitempty"`
145145
AlwaysRestart bool `json:"always_restart"`
146146
Primary bool `json:"primary"`
147+
148+
// DependsOn is a list of container name which this container depends on when pod start
149+
// Only works for containers created & started by pod-create & server-start
150+
DependsOn []string `json:"depends_on,omitempty"`
147151
}
148152

149153
func (c *ContainerSpec) NeedProbe() bool {

pkg/compute/guestdrivers/pod.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ import (
4040
"yunion.io/x/onecloud/pkg/cloudcommon/db/taskman"
4141
"yunion.io/x/onecloud/pkg/compute/models"
4242
"yunion.io/x/onecloud/pkg/compute/options"
43+
"yunion.io/x/onecloud/pkg/compute/utils"
4344
"yunion.io/x/onecloud/pkg/httperrors"
4445
"yunion.io/x/onecloud/pkg/mcclient"
4546
"yunion.io/x/onecloud/pkg/util/pod/remotecommand/spdy"
@@ -89,6 +90,7 @@ func (p *SPodDriver) ValidateCreateData(ctx context.Context, userCred mcclient.T
8990

9091
ctrNames := sets.NewString()
9192
volUniqNames := sets.NewString()
93+
9294
for idx, ctr := range input.Pod.Containers {
9395
if err := p.validateContainerData(ctx, userCred, idx, input.Name, ctr, input); err != nil {
9496
return nil, errors.Wrapf(err, "data of %d container", idx)
@@ -109,6 +111,15 @@ func (p *SPodDriver) ValidateCreateData(ctx context.Context, userCred mcclient.T
109111
}
110112
}
111113

114+
err := utils.TopologicalSortContainers(
115+
input.Pod.Containers,
116+
func(ctr *api.PodContainerCreateInput) string { return ctr.Name },
117+
func(ctr *api.PodContainerCreateInput) []string { return ctr.DependsOn },
118+
)
119+
if err != nil {
120+
return nil, errors.Wrap(err, "invalid container dependency")
121+
}
122+
112123
return input, nil
113124
}
114125

pkg/compute/models/containers.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ import (
3838
"yunion.io/x/onecloud/pkg/cloudcommon/db"
3939
"yunion.io/x/onecloud/pkg/cloudcommon/db/taskman"
4040
"yunion.io/x/onecloud/pkg/compute/options"
41+
"yunion.io/x/onecloud/pkg/compute/utils"
4142
"yunion.io/x/onecloud/pkg/httperrors"
4243
"yunion.io/x/onecloud/pkg/mcclient"
4344
"yunion.io/x/onecloud/pkg/mcclient/auth"
@@ -205,6 +206,27 @@ func (m *SContainerManager) ValidateSpec(ctx context.Context, userCred mcclient.
205206
return errors.Wrap(err, "validate probe configuration")
206207
}
207208

209+
if ctr != nil {
210+
// only detect loop when update container
211+
ctrs, err := m.GetContainersByPod(pod.GetId())
212+
if err != nil {
213+
return errors.Wrap(err, "get containers by pod")
214+
}
215+
for idx, container := range ctrs {
216+
if container.GetId() == ctr.GetId() {
217+
ctrs[idx].Spec = spec
218+
}
219+
}
220+
err = utils.TopologicalSortContainers(
221+
ctrs,
222+
func(ctr SContainer) string { return ctr.Name },
223+
func(ctr SContainer) []string { return ctr.Spec.DependsOn },
224+
)
225+
if err != nil {
226+
return errors.Wrap(err, "validate topological sort")
227+
}
228+
}
229+
208230
return nil
209231
}
210232

pkg/compute/tasks/guest/pod_create_task.go

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ func (t *PodCreateTask) OnPodCreated(ctx context.Context, guest *models.SGuest,
6262
}
6363

6464
for idx, ctr := range ctrs {
65-
if err := ctr.StartCreateTask(ctx, t.GetUserCred(), t.GetTaskId(), t.GetParams()); err != nil {
65+
if err := ctr.StartCreateTask(ctx, t.GetUserCred(), t.GetTaskId(), nil); err != nil {
6666
t.onCreateContainerError(ctx, guest, errors.Wrapf(err, "start container %d creation task", idx))
6767
return
6868
}
@@ -96,8 +96,18 @@ func (t *PodCreateTask) OnContainerCreated(ctx context.Context, guest *models.SG
9696
}
9797
}
9898
if isAllCreated {
99-
t.SetStage("OnStatusSynced", nil)
100-
guest.StartSyncstatus(ctx, t.GetUserCred(), t.GetTaskId())
99+
if jsonutils.QueryBoolean(t.GetParams(), "auto_start", false) {
100+
t.SetStage("OnContainerStarted", nil)
101+
task, err := taskman.TaskManager.NewTask(ctx, "PodStartContainerInDependencyOrderTask", guest, t.GetUserCred(), nil, t.GetTaskId(), "", nil)
102+
if err != nil {
103+
t.SetStageFailed(ctx, jsonutils.NewString(errors.Wrap(err, "New PodStartContainerInDependencyOrderTask").Error()))
104+
return
105+
}
106+
task.ScheduleRun(nil)
107+
} else {
108+
t.SetStage("OnStatusSynced", nil)
109+
guest.StartSyncstatus(ctx, t.GetUserCred(), t.GetTaskId())
110+
}
101111
}
102112
}
103113

@@ -106,6 +116,16 @@ func (t *PodCreateTask) OnContainerCreatedFailed(ctx context.Context, guest *mod
106116
t.SetStageFailed(ctx, data)
107117
}
108118

119+
func (t *PodCreateTask) OnContainerStartedFailed(ctx context.Context, guest *models.SGuest, data jsonutils.JSONObject) {
120+
guest.SetStatus(ctx, t.GetUserCred(), api.POD_STATUS_CREATE_CONTAINER_FAILED, data.String())
121+
t.SetStageFailed(ctx, data)
122+
}
123+
124+
func (t *PodCreateTask) OnContainerStarted(ctx context.Context, guest *models.SGuest, data jsonutils.JSONObject) {
125+
t.SetStage("OnStatusSynced", nil)
126+
guest.StartSyncstatus(ctx, t.GetUserCred(), t.GetTaskId())
127+
}
128+
109129
func (t *PodCreateTask) OnStatusSynced(ctx context.Context, guest *models.SGuest, data jsonutils.JSONObject) {
110130
t.SetStageComplete(ctx, nil)
111131
}
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
package guest
2+
3+
import (
4+
"context"
5+
6+
"yunion.io/x/jsonutils"
7+
"yunion.io/x/log"
8+
"yunion.io/x/pkg/errors"
9+
10+
"yunion.io/x/onecloud/pkg/cloudcommon/db"
11+
"yunion.io/x/onecloud/pkg/cloudcommon/db/taskman"
12+
"yunion.io/x/onecloud/pkg/compute/models"
13+
"yunion.io/x/onecloud/pkg/compute/utils"
14+
)
15+
16+
type PodStartContainerInDependencyOrderTask struct {
17+
SGuestBaseTask
18+
}
19+
20+
func init() {
21+
taskman.RegisterTask(PodStartContainerInDependencyOrderTask{})
22+
}
23+
24+
func (task *PodStartContainerInDependencyOrderTask) taskFailed(ctx context.Context, pod *models.SGuest, err string) {
25+
task.SetStageFailed(ctx, jsonutils.NewString(err))
26+
}
27+
28+
func (task *PodStartContainerInDependencyOrderTask) taskComplete(ctx context.Context, pod *models.SGuest) {
29+
task.SetStageComplete(ctx, nil)
30+
}
31+
32+
func (t *PodStartContainerInDependencyOrderTask) OnInit(ctx context.Context, obj db.IStandaloneModel, body jsonutils.JSONObject) {
33+
pod := obj.(*models.SGuest)
34+
ctrs, err := models.GetContainerManager().GetContainersByPod(pod.GetId())
35+
if err != nil {
36+
t.SetStageFailed(ctx, jsonutils.NewString(err.Error()))
37+
return
38+
}
39+
40+
// init graph
41+
dep, err := utils.NewDependencyTopoGraph(
42+
ctrs,
43+
func(ctr models.SContainer) string { return ctr.Id },
44+
func(ctr models.SContainer) string { return ctr.Name },
45+
func(ctr models.SContainer) []string { return ctr.Spec.DependsOn },
46+
)
47+
if err != nil {
48+
t.taskFailed(ctx, pod, errors.Wrap(err, "New pod container dependency").Error())
49+
return
50+
}
51+
52+
t.SaveParams(jsonutils.Marshal(dep).(*jsonutils.JSONDict))
53+
54+
t.requestContainersStart(ctx, pod, nil)
55+
}
56+
57+
func (t *PodStartContainerInDependencyOrderTask) requestContainersStart(ctx context.Context, pod *models.SGuest, body jsonutils.JSONObject) {
58+
dep := new(utils.DependencyTopoGraph[models.SContainer])
59+
if err := t.GetParams().Unmarshal(dep); err != nil {
60+
t.taskFailed(ctx, pod, errors.Wrap(err, "Unmarshal container order").Error())
61+
return
62+
}
63+
64+
fetchById := func(uuid string) models.SContainer {
65+
pctr, err := models.GetContainerManager().FetchById(uuid)
66+
if err != nil {
67+
log.Infof("FetchById %s error: %s", uuid, err.Error())
68+
return models.SContainer{}
69+
}
70+
return *pctr.(*models.SContainer)
71+
}
72+
currentBatch := dep.GetNextBatch(fetchById)
73+
if currentBatch == nil {
74+
t.taskComplete(ctx, pod)
75+
return
76+
}
77+
78+
// start current Batch
79+
t.SetStage("OnContainerStarted", jsonutils.Marshal(dep).(*jsonutils.JSONDict))
80+
if err := models.GetContainerManager().StartBatchStartTask(ctx, t.GetUserCred(), currentBatch, t.GetId()); err != nil {
81+
t.OnContainerStartedFailed(ctx, pod, jsonutils.NewString(err.Error()))
82+
return
83+
}
84+
}
85+
86+
func (t *PodStartContainerInDependencyOrderTask) OnContainerStarted(ctx context.Context, pod *models.SGuest, data jsonutils.JSONObject) {
87+
t.requestContainersStart(ctx, pod, nil)
88+
}
89+
90+
func (t *PodStartContainerInDependencyOrderTask) OnContainerStartedFailed(ctx context.Context, pod *models.SGuest, data jsonutils.JSONObject) {
91+
t.taskFailed(ctx, pod, data.String())
92+
}

pkg/compute/tasks/guest/pod_start_task.go

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -45,16 +45,19 @@ func (t *PodStartTask) OnInit(ctx context.Context, obj db.IStandaloneModel, body
4545

4646
func (t *PodStartTask) OnPodStarted(ctx context.Context, pod *models.SGuest, _ jsonutils.JSONObject) {
4747
pod.SetStatus(ctx, t.GetUserCred(), api.POD_STATUS_STARTING_CONTAINER, "")
48-
ctrs, err := models.GetContainerManager().GetContainersByPod(pod.GetId())
49-
if err != nil {
50-
t.OnContainerStartedFailed(ctx, pod, jsonutils.NewString(errors.Wrap(err, "GetContainersByPod").Error()))
51-
return
52-
}
48+
// ctrs, err := models.GetContainerManager().GetContainersByPod(pod.GetId())
49+
// if err != nil {
50+
// t.OnContainerStartedFailed(ctx, pod, jsonutils.NewString(errors.Wrap(err, "GetContainersByPod").Error()))
51+
// return
52+
// }
5353
t.SetStage("OnContainerStarted", nil)
54-
if err := models.GetContainerManager().StartBatchStartTask(ctx, t.GetUserCred(), ctrs, t.GetId()); err != nil {
55-
t.OnContainerStartedFailed(ctx, pod, jsonutils.NewString(err.Error()))
54+
55+
task, err := taskman.TaskManager.NewTask(ctx, "PodStartContainerInDependencyOrderTask", pod, t.GetUserCred(), nil, t.GetTaskId(), "", nil)
56+
if err != nil {
57+
t.SetStageFailed(ctx, jsonutils.NewString(errors.Wrap(err, "New PodStartContainerInDependencyOrderTask").Error()))
5658
return
5759
}
60+
task.ScheduleRun(nil)
5861
}
5962

6063
func (t *PodStartTask) OnPodStartedFailed(ctx context.Context, pod *models.SGuest, reason jsonutils.JSONObject) {

pkg/compute/utils/doc.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
package utils // import "yunion.io/x/onecloud/pkg/compute/utils"
Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
package utils
2+
3+
import (
4+
"yunion.io/x/pkg/errors"
5+
)
6+
7+
type GetObjIdName[T any] func(T) string
8+
type GetDependencies[T any] func(T) []string
9+
10+
func TopologicalSortContainers[T any](objs []T, getName GetObjIdName[T], getDependencies GetDependencies[T]) error {
11+
if len(objs) == 0 {
12+
return nil
13+
}
14+
15+
// Build a dependency graph and an in-degree table
16+
graph := make(map[string][]string)
17+
inDegree := make(map[string]int)
18+
19+
// init graph and inDegree
20+
for _, obj := range objs {
21+
inDegree[getName(obj)] = 0
22+
}
23+
for _, obj := range objs {
24+
oName := getName(obj)
25+
for _, dep := range getDependencies(obj) {
26+
if _, exists := inDegree[dep]; !exists {
27+
return errors.Errorf("The dependent container %s does not exist.", dep)
28+
}
29+
graph[dep] = append(graph[dep], oName)
30+
inDegree[oName]++
31+
}
32+
}
33+
34+
// Topological sorting: use a queue to process nodes with an in-degree of 0
35+
queue := []string{}
36+
for name, degree := range inDegree {
37+
if degree == 0 {
38+
queue = append(queue, name)
39+
}
40+
}
41+
42+
sorted := []string{}
43+
for len(queue) > 0 {
44+
current := queue[0]
45+
queue = queue[1:]
46+
sorted = append(sorted, current)
47+
48+
// Decrease the in-degree of neighboring nodes
49+
for _, neighbor := range graph[current] {
50+
inDegree[neighbor]--
51+
if inDegree[neighbor] == 0 {
52+
queue = append(queue, neighbor)
53+
}
54+
}
55+
}
56+
57+
// Check whether a cycle exists
58+
if len(sorted) != len(objs) {
59+
return errors.Errorf("There is a circular dependency among the dependencies.")
60+
}
61+
62+
return nil
63+
}
64+
65+
type DependencyTopoGraph[T any] struct {
66+
Graph map[string][]string
67+
Degree map[string]int
68+
Leafs []string // containers whose in-degree is zero
69+
}
70+
71+
func NewDependencyTopoGraph[T any](
72+
objs []T,
73+
getId GetObjIdName[T],
74+
getName GetObjIdName[T],
75+
getDependencies GetDependencies[T],
76+
) (*DependencyTopoGraph[T], error) {
77+
depGraph := &DependencyTopoGraph[T]{
78+
Graph: make(map[string][]string),
79+
Degree: make(map[string]int),
80+
Leafs: make([]string, 0, len(objs)),
81+
}
82+
83+
nameToUUID := make(map[string]string)
84+
85+
for _, obj := range objs {
86+
uuid := getId(obj)
87+
name := getName(obj)
88+
89+
depGraph.Degree[uuid] = 0
90+
nameToUUID[name] = uuid
91+
}
92+
93+
for _, obj := range objs {
94+
for _, dep := range getDependencies(obj) {
95+
depId := nameToUUID[dep]
96+
uuid := getId(obj)
97+
depGraph.Graph[depId] = append(depGraph.Graph[depId], uuid)
98+
depGraph.Degree[uuid]++
99+
}
100+
}
101+
102+
for uuid, indegree := range depGraph.Degree {
103+
if indegree == 0 {
104+
depGraph.Leafs = append(depGraph.Leafs, uuid)
105+
}
106+
}
107+
108+
return depGraph, nil
109+
}
110+
111+
type FetchObjById[T any] func(string) T
112+
113+
func (dep *DependencyTopoGraph[T]) GetNextBatch(fetchById FetchObjById[T]) []T {
114+
if len(dep.Leafs) == 0 {
115+
return nil
116+
}
117+
118+
objs := make([]T, 0, len(dep.Leafs))
119+
120+
nextLeafs := make([]string, 0)
121+
122+
for _, uuid := range dep.Leafs {
123+
objs = append(objs, fetchById(uuid))
124+
for _, neighbor := range dep.Graph[uuid] {
125+
dep.Degree[neighbor]--
126+
if dep.Degree[neighbor] == 0 {
127+
nextLeafs = append(nextLeafs, neighbor)
128+
}
129+
}
130+
}
131+
132+
// log.Infof("Get next batch: %s", dep.Leafs)
133+
134+
dep.Leafs = nextLeafs
135+
136+
return objs
137+
}

0 commit comments

Comments
 (0)