@@ -97,19 +97,37 @@ func InReverseDependencyOrder(ctx context.Context, project *types.Project, fn fu
97
97
}
98
98
99
99
func (t * graphTraversal ) visit (ctx context.Context , g * Graph ) error {
100
- nodes := t .extremityNodesFn (g )
100
+ expect := len (g .Vertices )
101
+ if expect == 0 {
102
+ return nil
103
+ }
101
104
102
105
eg , ctx := errgroup .WithContext (ctx )
103
106
if t .maxConcurrency > 0 {
104
- eg .SetLimit (t .maxConcurrency )
107
+ eg .SetLimit (t .maxConcurrency + 1 )
105
108
}
106
- t .run (ctx , g , eg , nodes )
109
+ nodeCh := make (chan * Vertex )
110
+ eg .Go (func () error {
111
+ for node := range nodeCh {
112
+ expect --
113
+ if expect == 0 {
114
+ close (nodeCh )
115
+ return nil
116
+ }
117
+ t .run (ctx , g , eg , t .adjacentNodesFn (node ), nodeCh )
118
+ }
119
+ return nil
120
+ })
107
121
108
- return eg .Wait ()
122
+ nodes := t .extremityNodesFn (g )
123
+ t .run (ctx , g , eg , nodes , nodeCh )
124
+
125
+ err := eg .Wait ()
126
+ return err
109
127
}
110
128
111
129
// Note: this could be `graph.walk` or whatever
112
- func (t * graphTraversal ) run (ctx context.Context , graph * Graph , eg * errgroup.Group , nodes []* Vertex ) {
130
+ func (t * graphTraversal ) run (ctx context.Context , graph * Graph , eg * errgroup.Group , nodes []* Vertex , nodeCh chan * Vertex ) {
113
131
for _ , node := range nodes {
114
132
// Don't start this service yet if all of its children have
115
133
// not been started yet.
@@ -125,14 +143,11 @@ func (t *graphTraversal) run(ctx context.Context, graph *Graph, eg *errgroup.Gro
125
143
126
144
eg .Go (func () error {
127
145
err := t .visitorFn (ctx , node .Service )
128
- if err ! = nil {
129
- return err
146
+ if err = = nil {
147
+ graph . UpdateStatus ( node . Key , t . targetServiceStatus )
130
148
}
131
-
132
- graph .UpdateStatus (node .Key , t .targetServiceStatus )
133
-
134
- t .run (ctx , graph , eg , t .adjacentNodesFn (node ))
135
- return nil
149
+ nodeCh <- node
150
+ return err
136
151
})
137
152
}
138
153
}
0 commit comments