Skip to content

Commit 7d33f90

Browse files
committed
feat: add watch command
1 parent c29f582 commit 7d33f90

File tree

8 files changed

+299
-14
lines changed

8 files changed

+299
-14
lines changed

e2e/tests/pipelines/pipelines.go

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,16 @@
11
package pipelines
22

33
import (
4+
"context"
45
"github.com/loft-sh/devspace/cmd"
56
"github.com/loft-sh/devspace/cmd/flags"
67
"github.com/loft-sh/devspace/e2e/framework"
78
"github.com/loft-sh/devspace/e2e/kube"
89
"github.com/loft-sh/devspace/pkg/util/factory"
910
"github.com/onsi/ginkgo"
11+
"io/ioutil"
1012
"os"
13+
"time"
1114
)
1215

1316
var _ = DevSpaceDescribe("portforward", func() {
@@ -48,4 +51,69 @@ var _ = DevSpaceDescribe("portforward", func() {
4851
framework.ExpectNoError(err)
4952
framework.ExpectLocalFileContentsImmediately("test.txt", "Hello World!\n")
5053
})
54+
55+
ginkgo.It("should exec container", func() {
56+
tempDir, err := framework.CopyToTempDir("tests/pipelines/testdata/watch")
57+
framework.ExpectNoError(err)
58+
defer framework.CleanupTempDir(initialDir, tempDir)
59+
60+
ns, err := kubeClient.CreateNamespace("pipelines")
61+
framework.ExpectNoError(err)
62+
defer framework.ExpectDeleteNamespace(kubeClient, ns)
63+
64+
done := make(chan error)
65+
cancelCtx, cancel := context.WithCancel(context.Background())
66+
defer cancel()
67+
go func() {
68+
devCmd := &cmd.DevCmd{
69+
GlobalFlags: &flags.GlobalFlags{
70+
NoWarn: true,
71+
Namespace: ns,
72+
},
73+
Ctx: cancelCtx,
74+
}
75+
err := devCmd.Run(f)
76+
if err != nil {
77+
f.GetLog().Errorf("error: %v", err)
78+
}
79+
done <- err
80+
}()
81+
82+
framework.ExpectLocalFileContents("test.yaml", "Hello World\n")
83+
framework.ExpectLocalFileContents("test2.yaml", "Hello World\n")
84+
85+
// make a change to a txt file
86+
err = ioutil.WriteFile("test.txt", []byte("abc.txt"), 0777)
87+
framework.ExpectNoError(err)
88+
err = ioutil.WriteFile("test2.txt", []byte("abc123.txt"), 0777)
89+
framework.ExpectNoError(err)
90+
time.Sleep(time.Millisecond * 500)
91+
err = ioutil.WriteFile("test3.txt", []byte("abc456.txt"), 0777)
92+
framework.ExpectNoError(err)
93+
err = ioutil.WriteFile("test4.txt", []byte("abc789.txt"), 0777)
94+
framework.ExpectNoError(err)
95+
96+
framework.ExpectLocalFileContents("test.yaml", "Hello World\nHello World\n")
97+
framework.ExpectLocalFileContents("test2.yaml", "Hello World\nHello World\n")
98+
99+
// make a change to a txt file
100+
err = ioutil.WriteFile("test4.txt", []byte("abc.txt"), 0777)
101+
framework.ExpectNoError(err)
102+
err = ioutil.WriteFile("test5.txt", []byte("abc123.txt"), 0777)
103+
framework.ExpectNoError(err)
104+
time.Sleep(time.Millisecond * 500)
105+
err = ioutil.WriteFile("test6.txt", []byte("abc456.txt"), 0777)
106+
framework.ExpectNoError(err)
107+
err = ioutil.WriteFile("test7.txt", []byte("abc789.txt"), 0777)
108+
framework.ExpectNoError(err)
109+
110+
framework.ExpectLocalFileContents("test.yaml", "Hello World\nHello World\nHello World\n")
111+
framework.ExpectLocalFileContents("test2.yaml", "Hello World\nHello World\nHello World\n")
112+
113+
cancel()
114+
err = <-done
115+
if err != nil && err != context.Canceled {
116+
framework.ExpectNoError(err)
117+
}
118+
})
51119
})
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
version: v2beta1
2+
name: watch
3+
4+
pipelines:
5+
other:
6+
steps:
7+
- run: |-
8+
echo "Hello World" >> test.yaml
9+
sleep 1000
10+
echo "Hello World2" >> test.yaml
11+
12+
other2:
13+
steps:
14+
- run: |-
15+
echo "Hello World" >> test2.yaml
16+
17+
dev:
18+
steps:
19+
- run: |-
20+
watch -p *.txt -- run_pipelines other other2
Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
11
package commands
22

33
import (
4+
"context"
45
"fmt"
56
"strconv"
67
"time"
78
)
89

9-
func Sleep(args []string) error {
10+
func Sleep(ctx context.Context, args []string) error {
1011
if len(args) != 1 {
1112
return fmt.Errorf("usage: sleep seconds")
1213
}
@@ -16,6 +17,9 @@ func Sleep(args []string) error {
1617
return fmt.Errorf("usage: sleep seconds")
1718
}
1819

19-
time.Sleep(time.Duration(duration) * time.Second)
20+
select {
21+
case <-ctx.Done():
22+
case <-time.After(time.Duration(duration) * time.Second):
23+
}
2024
return nil
2125
}

pkg/devspace/pipeline/engine/basichandler/handler.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ var BasicCommands = map[string]func(ctx context.Context, args []string) error{
2929
return enginecommands.IsCommand(ctx, args)
3030
},
3131
"sleep": func(ctx context.Context, args []string) error {
32-
return handleError(interp.HandlerCtx(ctx), enginecommands.Sleep(args))
32+
return handleError(interp.HandlerCtx(ctx), enginecommands.Sleep(ctx, args))
3333
},
3434
"cat": func(ctx context.Context, args []string) error {
3535
hc := interp.HandlerCtx(ctx)

pkg/devspace/pipeline/engine/pipelinehandler/commands/run_pipelines.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,19 +21,23 @@ func RunPipelines(ctx *devspacecontext.Context, pipeline types.Pipeline, args []
2121
if err != nil {
2222
return errors.Wrap(err, "parse args")
2323
}
24-
if len(args) == 0 {
25-
return fmt.Errorf("no pipeline to run specified")
26-
}
2724

2825
pipelines := []*latest.Pipeline{}
2926
for _, arg := range args {
27+
if arg == "" {
28+
continue
29+
}
30+
3031
pipelineConfig, ok := ctx.Config.Config().Pipelines[arg]
3132
if !ok {
3233
return fmt.Errorf("couldn't find pipeline %s", arg)
3334
}
3435

3536
pipelines = append(pipelines, pipelineConfig)
3637
}
38+
if len(pipelines) == 0 {
39+
return fmt.Errorf("no pipeline to run specified")
40+
}
3741

3842
return pipeline.StartNewPipelines(ctx, pipelines, options.PipelineOptions)
3943
}
Lines changed: 190 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,190 @@
1+
package commands
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"github.com/bmatcuk/doublestar"
7+
"github.com/jessevdk/go-flags"
8+
devspacecontext "github.com/loft-sh/devspace/pkg/devspace/context"
9+
"github.com/loft-sh/devspace/pkg/devspace/pipeline/engine"
10+
"github.com/loft-sh/devspace/pkg/devspace/pipeline/types"
11+
"github.com/loft-sh/devspace/pkg/util/log"
12+
"github.com/loft-sh/devspace/pkg/util/tomb"
13+
"github.com/loft-sh/notify"
14+
"github.com/pkg/errors"
15+
"mvdan.cc/sh/v3/interp"
16+
"os"
17+
"path/filepath"
18+
"strings"
19+
"time"
20+
)
21+
22+
type WatchOptions struct {
23+
FailOnError bool `long:"fail-on-error" description:"If true the command will fail on an error while running the sub command"`
24+
25+
Paths []string `long:"path" short:"p" description:"The paths to watch. Can be patterns in the form of ./**/my-file.txt"`
26+
}
27+
28+
func Watch(devCtx *devspacecontext.Context, pipeline types.Pipeline, args []string, newHandler NewHandlerFn) error {
29+
devCtx.Log.Debugf("watch %s", strings.Join(args, " "))
30+
options := &WatchOptions{}
31+
args, err := flags.ParseArgs(options, args)
32+
if err != nil {
33+
return errors.Wrap(err, "parse args")
34+
}
35+
if len(options.Paths) == 0 {
36+
return fmt.Errorf("usage: watch --path MY_PATH -- my_command")
37+
}
38+
if len(args) == 0 {
39+
return fmt.Errorf("usage: watch --path MY_PATH -- my_command")
40+
}
41+
42+
w := &watcher{}
43+
hc := interp.HandlerCtx(devCtx.Context)
44+
return w.Watch(devCtx.Context, options.Paths, options.FailOnError, func(ctx context.Context) error {
45+
devCtx := devCtx.WithContext(ctx)
46+
_, err := engine.ExecutePipelineShellCommand(ctx, args[0]+" $@", args[1:], hc.Dir, false, hc.Stdout, hc.Stderr, hc.Stdin, hc.Env, newHandler(devCtx, hc.Stdout, pipeline))
47+
return err
48+
}, devCtx.Log)
49+
}
50+
51+
type watcher struct{}
52+
53+
func (w *watcher) Watch(ctx context.Context, patterns []string, failOnError bool, action func(ctx context.Context) error, log log.Logger) error {
54+
// prepare patterns
55+
for i, p := range patterns {
56+
patterns[i] = strings.TrimSuffix(strings.TrimPrefix(strings.TrimSpace(p), "./"), "/")
57+
}
58+
59+
// get folders from patterns
60+
pathsToWatch := map[string]bool{}
61+
for _, p := range patterns {
62+
patternsSplitted := strings.Split(filepath.ToSlash(p), "/")
63+
lastIndex := len(patternsSplitted) - 1
64+
for i, s := range patternsSplitted {
65+
if strings.Contains(s, "*") {
66+
lastIndex = i
67+
break
68+
}
69+
}
70+
71+
targetPath := strings.Join(patternsSplitted[:lastIndex], "/")
72+
if targetPath == "" {
73+
targetPath = "."
74+
}
75+
76+
absolutePath, err := filepath.Abs(filepath.FromSlash(targetPath))
77+
if err != nil {
78+
return errors.Wrap(err, "error resolving "+targetPath)
79+
}
80+
81+
absolutePath, err = filepath.EvalSymlinks(absolutePath)
82+
if err != nil {
83+
return errors.Wrap(err, "eval symlinks")
84+
}
85+
86+
pathsToWatch[absolutePath] = true
87+
}
88+
89+
watchTree := notify.NewTree()
90+
defer watchTree.Close()
91+
92+
globalChannel := make(chan string, 100)
93+
for targetPath := range pathsToWatch {
94+
stat, err := os.Stat(targetPath)
95+
if err != nil {
96+
if os.IsNotExist(err) {
97+
return fmt.Errorf("cannot watch %s as the directory or file must exist", targetPath)
98+
}
99+
100+
return errors.Wrap(err, "stat watch path "+targetPath)
101+
}
102+
103+
// watch recursive if target path is a directory
104+
watchPath := targetPath
105+
if stat.IsDir() {
106+
watchPath = filepath.Join(watchPath, "...")
107+
}
108+
109+
// start watching
110+
eventChannel := make(chan notify.EventInfo, 100)
111+
log.Debugf("Start watching %v", targetPath)
112+
err = watchTree.Watch(watchPath, eventChannel, func(s string) bool {
113+
return false
114+
}, notify.All)
115+
if err != nil {
116+
return errors.Wrap(err, "start watching "+targetPath)
117+
}
118+
defer watchTree.Stop(eventChannel)
119+
120+
go func(base string, eventChannel chan notify.EventInfo) {
121+
for {
122+
select {
123+
case <-ctx.Done():
124+
return
125+
case e := <-eventChannel:
126+
// make relative
127+
relPath, err := filepath.Rel(base, e.Path())
128+
if err != nil {
129+
log.Debugf("error converting path %s: %v", e.Path(), err)
130+
} else {
131+
globalChannel <- filepath.ToSlash(relPath)
132+
}
133+
}
134+
}
135+
}(targetPath, eventChannel)
136+
}
137+
138+
// start command
139+
return w.handleCommand(ctx, patterns, failOnError, action, globalChannel, log)
140+
}
141+
142+
func (w *watcher) handleCommand(ctx context.Context, patterns []string, failOnError bool, action func(ctx context.Context) error, events chan string, log log.Logger) error {
143+
t := w.startCommand(ctx, action)
144+
numEvents := 0
145+
for {
146+
select {
147+
case <-ctx.Done():
148+
t.Kill(nil)
149+
<-t.Dead()
150+
return nil
151+
case e := <-events:
152+
// check if match
153+
for _, p := range patterns {
154+
hasMatched, _ := doublestar.Match(p, e)
155+
if hasMatched {
156+
numEvents++
157+
break
158+
}
159+
}
160+
case <-time.After(time.Second * 2):
161+
if numEvents > 0 {
162+
// kill application and wait for exit
163+
log.Infof("Restarting command...")
164+
t.Kill(nil)
165+
select {
166+
case <-ctx.Done():
167+
return nil
168+
case <-t.Dead():
169+
}
170+
171+
// restart the command
172+
t = w.startCommand(ctx, action)
173+
numEvents = 0
174+
}
175+
}
176+
177+
// check if terminated
178+
if failOnError && t.Terminated() && t.Err() != nil {
179+
return t.Err()
180+
}
181+
}
182+
}
183+
184+
func (w *watcher) startCommand(ctx context.Context, action func(ctx context.Context) error) *tomb.Tomb {
185+
t, tombCtx := tomb.WithContext(ctx)
186+
t.Go(func() error {
187+
return action(tombCtx)
188+
})
189+
return t
190+
}

pkg/devspace/pipeline/engine/pipelinehandler/handler.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,9 @@ var PipelineCommands = map[string]func(devCtx *devspacecontext.Context, pipeline
3030
"run_default_pipeline": func(devCtx *devspacecontext.Context, pipeline types.Pipeline, args []string) error {
3131
return commands.RunDefaultPipeline(devCtx, pipeline, args, NewPipelineExecHandler)
3232
},
33+
"watch": func(devCtx *devspacecontext.Context, pipeline types.Pipeline, args []string) error {
34+
return commands.Watch(devCtx, pipeline, args, NewPipelineExecHandler)
35+
},
3336
"run_pipelines": func(devCtx *devspacecontext.Context, pipeline types.Pipeline, args []string) error {
3437
return commands.RunPipelines(devCtx, pipeline, args)
3538
},

0 commit comments

Comments
 (0)