Skip to content

Commit 82be7e9

Browse files
Erik Kriegkuuji
andcommitted
implements concurrency for deployments
Co-authored-by: kuuji <[email protected]>
1 parent 56f961d commit 82be7e9

File tree

1 file changed

+145
-82
lines changed

1 file changed

+145
-82
lines changed

pkg/devspace/deploy/deploy.go

Lines changed: 145 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
package deploy
22

33
import (
4+
"fmt"
45
"io"
56
"strings"
67

78
config2 "github.com/loft-sh/devspace/pkg/devspace/config"
89
"github.com/loft-sh/devspace/pkg/devspace/dependency/types"
10+
"github.com/sirupsen/logrus"
911

1012
"github.com/loft-sh/devspace/pkg/devspace/config/versions/latest"
1113
"github.com/loft-sh/devspace/pkg/devspace/deploy/deployer"
@@ -16,6 +18,7 @@ import (
1618
"github.com/loft-sh/devspace/pkg/devspace/hook"
1719
kubectlclient "github.com/loft-sh/devspace/pkg/devspace/kubectl"
1820
"github.com/loft-sh/devspace/pkg/util/log"
21+
"github.com/loft-sh/devspace/pkg/util/scanner"
1922

2023
"github.com/pkg/errors"
2124
)
@@ -157,124 +160,184 @@ func (c *controller) getDeployClient(deployConfig *latest.DeploymentConfig, helm
157160
}
158161

159162
// Deploy deploys all deployments in the config
160-
func (c *controller) Deploy(options *Options, log log.Logger) error {
163+
func (c *controller) Deploy(options *Options, logLogger log.Logger) error {
161164
config := c.config.Config()
162165
if config.Deployments != nil && len(config.Deployments) > 0 {
163166
helmV2Clients := map[string]helmtypes.Client{}
164167

165168
// Execute before deployments deploy hook
166-
err := hook.ExecuteHooks(c.client, c.config, c.dependencies, nil, log, "before:deploy")
169+
err := hook.ExecuteHooks(c.client, c.config, c.dependencies, nil, logLogger, "before:deploy")
167170
if err != nil {
168171
return err
169172
}
170173

174+
var (
175+
concurrentDeployments []*latest.DeploymentConfig
176+
sequentialDeployments []*latest.DeploymentConfig
177+
)
178+
171179
for _, deployConfig := range config.Deployments {
172-
if deployConfig.Disabled {
173-
log.Debugf("Skip deployment %s, because it is disabled", deployConfig.Name)
174-
continue
180+
if deployConfig.Concurrency {
181+
concurrentDeployments = append(concurrentDeployments, deployConfig)
182+
} else {
183+
sequentialDeployments = append(sequentialDeployments, deployConfig)
175184
}
185+
}
176186

177-
if len(options.Deployments) > 0 {
178-
shouldSkip := true
179-
180-
for _, deployment := range options.Deployments {
181-
if deployment == strings.TrimSpace(deployConfig.Name) {
182-
shouldSkip = false
183-
break
187+
var (
188+
errChan = make(chan error)
189+
deployedChan = make(chan bool)
190+
)
191+
192+
for i, deployConfig := range concurrentDeployments {
193+
go func(deployConfig *latest.DeploymentConfig, deployNumber int) {
194+
// Create new logger to allow concurrent logging.
195+
reader, writer := io.Pipe()
196+
streamLog := log.NewStreamLogger(writer, logrus.InfoLevel)
197+
logsLog := log.NewPrefixLogger("["+deployConfig.Name+"] ", log.Colors[(len(log.Colors)-1)-(deployNumber%len(log.Colors))], logLogger)
198+
go func() {
199+
scanner := scanner.NewScanner(reader)
200+
for scanner.Scan() {
201+
logsLog.Info(scanner.Text())
184202
}
185-
}
203+
}()
186204

187-
if shouldSkip {
188-
continue
189-
}
190-
}
191-
192-
var (
193-
deployClient deployer.Interface
194-
err error
195-
method string
196-
)
197-
198-
if deployConfig.Kubectl != nil {
199-
deployClient, err = kubectl.New(c.config, c.dependencies, c.client, deployConfig, log)
205+
wasDeployed, err := c.deployOne(deployConfig, streamLog, options, helmV2Clients)
206+
_ = writer.Close()
200207
if err != nil {
201-
return errors.Errorf("error deploying: deployment %s error: %v", deployConfig.Name, err)
208+
errChan <- err
209+
} else {
210+
deployedChan <- wasDeployed
202211
}
212+
}(deployConfig, i)
213+
}
203214

204-
method = "kubectl"
205-
} else if deployConfig.Helm != nil {
206-
// Get helm client
207-
helmClient, err := GetCachedHelmClient(c.config.Config(), deployConfig, c.client, helmV2Clients, false, log)
208-
if err != nil {
209-
return err
210-
}
215+
logLogger.StartWait(fmt.Sprintf("Deploying %d deployments concurrently", len(concurrentDeployments)))
211216

212-
deployClient, err = helm.New(c.config, c.dependencies, helmClient, c.client, deployConfig, log)
213-
if err != nil {
214-
return errors.Errorf("error deploying: deployment %s error: %v", deployConfig.Name, err)
215-
}
217+
// Wait for concurrent deployments to complete before starting sequential deployments.
218+
for i := 0; i < len(concurrentDeployments); i++ {
219+
select {
220+
case err := <-errChan:
221+
return err
222+
case <-deployedChan:
223+
logLogger.StartWait(fmt.Sprintf("Deploying %d deployments concurrently", len(concurrentDeployments)-i-1))
216224

217-
method = "helm"
218-
} else {
219-
return errors.Errorf("error deploying: deployment %s has no deployment method", deployConfig.Name)
220225
}
226+
}
227+
logLogger.StopWait()
221228

222-
// Execute before deployment deploy hook
223-
err = hook.ExecuteHooks(c.client, c.config, c.dependencies, map[string]interface{}{
224-
"DEPLOY_NAME": deployConfig.Name,
225-
"DEPLOY_CONFIG": deployConfig,
226-
}, log, hook.EventsForSingle("before:deploy", deployConfig.Name).With("deploy.beforeDeploy")...)
229+
for _, deployConfig := range sequentialDeployments {
230+
_, err := c.deployOne(deployConfig, logLogger, options, helmV2Clients)
227231
if err != nil {
228232
return err
229233
}
234+
}
230235

231-
wasDeployed, err := deployClient.Deploy(options.ForceDeploy, options.BuiltImages)
232-
if err != nil {
233-
hookErr := hook.ExecuteHooks(c.client, c.config, c.dependencies, map[string]interface{}{
234-
"DEPLOY_NAME": deployConfig.Name,
235-
"DEPLOY_CONFIG": deployConfig,
236-
"ERROR": err,
237-
}, log, hook.EventsForSingle("error:deploy", deployConfig.Name).With("deploy.errorDeploy")...)
238-
if hookErr != nil {
239-
return hookErr
240-
}
236+
// Execute after deployments deploy hook
237+
err = hook.ExecuteHooks(c.client, c.config, c.dependencies, nil, logLogger, "after:deploy")
238+
if err != nil {
239+
return err
240+
}
241+
}
241242

242-
return errors.Errorf("error deploying %s: %v", deployConfig.Name, err)
243-
}
243+
return nil
244+
}
244245

245-
if wasDeployed {
246-
log.Donef("Successfully deployed %s with %s", deployConfig.Name, method)
246+
func (c *controller) deployOne(deployConfig *latest.DeploymentConfig, log log.Logger, options *Options, helmV2Clients map[string]helmtypes.Client) (bool, error) {
247+
if deployConfig.Disabled {
248+
log.Debugf("Skip deployment %s, because it is disabled", deployConfig.Name)
249+
return true, nil
250+
}
247251

248-
// Execute after deployment deploy hook
249-
err = hook.ExecuteHooks(c.client, c.config, c.dependencies, map[string]interface{}{
250-
"DEPLOY_NAME": deployConfig.Name,
251-
"DEPLOY_CONFIG": deployConfig,
252-
}, log, hook.EventsForSingle("after:deploy", deployConfig.Name).With("deploy.afterDeploy")...)
253-
if err != nil {
254-
return err
255-
}
256-
} else {
257-
log.Infof("Skipping deployment %s", deployConfig.Name)
252+
if len(options.Deployments) > 0 {
253+
shouldSkip := true
258254

259-
// Execute after deployment deploy hook
260-
err = hook.ExecuteHooks(c.client, c.config, c.dependencies, map[string]interface{}{
261-
"DEPLOY_NAME": deployConfig.Name,
262-
"DEPLOY_CONFIG": deployConfig,
263-
}, log, hook.EventsForSingle("skip:deploy", deployConfig.Name)...)
264-
if err != nil {
265-
return err
266-
}
255+
for _, deployment := range options.Deployments {
256+
if deployment == strings.TrimSpace(deployConfig.Name) {
257+
shouldSkip = false
258+
break
267259
}
268260
}
269261

270-
// Execute after deployments deploy hook
271-
err = hook.ExecuteHooks(c.client, c.config, c.dependencies, nil, log, "after:deploy")
262+
if shouldSkip {
263+
return true, nil
264+
}
265+
}
266+
267+
var (
268+
deployClient deployer.Interface
269+
err error
270+
method string
271+
)
272+
273+
if deployConfig.Kubectl != nil {
274+
deployClient, err = kubectl.New(c.config, c.dependencies, c.client, deployConfig, log)
272275
if err != nil {
273-
return err
276+
return true, errors.Errorf("error deploying: deployment %s error: %v", deployConfig.Name, err)
277+
}
278+
279+
method = "kubectl"
280+
} else if deployConfig.Helm != nil {
281+
// Get helm client
282+
helmClient, err := GetCachedHelmClient(c.config.Config(), deployConfig, c.client, helmV2Clients, false, log)
283+
if err != nil {
284+
return true, err
274285
}
286+
287+
deployClient, err = helm.New(c.config, c.dependencies, helmClient, c.client, deployConfig, log)
288+
if err != nil {
289+
return true, errors.Errorf("error deploying: deployment %s error: %v", deployConfig.Name, err)
290+
}
291+
292+
method = "helm"
293+
} else {
294+
return true, errors.Errorf("error deploying: deployment %s has no deployment method", deployConfig.Name)
295+
}
296+
// Execute before deployment deploy hook
297+
err = hook.ExecuteHooks(c.client, c.config, c.dependencies, map[string]interface{}{
298+
"DEPLOY_NAME": deployConfig.Name,
299+
"DEPLOY_CONFIG": deployConfig,
300+
}, log, hook.EventsForSingle("before:deploy", deployConfig.Name).With("deploy.beforeDeploy")...)
301+
if err != nil {
302+
return true, err
275303
}
276304

277-
return nil
305+
wasDeployed, err := deployClient.Deploy(options.ForceDeploy, options.BuiltImages)
306+
if err != nil {
307+
hookErr := hook.ExecuteHooks(c.client, c.config, c.dependencies, map[string]interface{}{
308+
"DEPLOY_NAME": deployConfig.Name,
309+
"DEPLOY_CONFIG": deployConfig,
310+
"ERROR": err,
311+
}, log, hook.EventsForSingle("error:deploy", deployConfig.Name).With("deploy.errorDeploy")...)
312+
if hookErr != nil {
313+
return true, hookErr
314+
}
315+
316+
return true, errors.Errorf("error deploying %s: %v", deployConfig.Name, err)
317+
}
318+
319+
if wasDeployed {
320+
log.Donef("Successfully deployed %s with %s", deployConfig.Name, method)
321+
// Execute after deployment deploy hook
322+
err = hook.ExecuteHooks(c.client, c.config, c.dependencies, map[string]interface{}{
323+
"DEPLOY_NAME": deployConfig.Name,
324+
"DEPLOY_CONFIG": deployConfig,
325+
}, log, hook.EventsForSingle("after:deploy", deployConfig.Name).With("deploy.afterDeploy")...)
326+
if err != nil {
327+
return true, err
328+
}
329+
} else {
330+
log.Infof("Skipping deployment %s", deployConfig.Name)
331+
// Execute skip deploy hook
332+
err = hook.ExecuteHooks(c.client, c.config, c.dependencies, map[string]interface{}{
333+
"DEPLOY_NAME": deployConfig.Name,
334+
"DEPLOY_CONFIG": deployConfig,
335+
}, log, hook.EventsForSingle("skip:deploy", deployConfig.Name)...)
336+
if err != nil {
337+
return true, err
338+
}
339+
}
340+
return false, nil
278341
}
279342

280343
// Purge removes all deployments or a set of deployments from the cluster

0 commit comments

Comments
 (0)