Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ require (
github.com/Microsoft/go-winio v0.6.2
github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d
github.com/buger/goterm v1.0.4
github.com/compose-spec/compose-go/v2 v2.4.5
github.com/compose-spec/compose-go/v2 v2.4.6-0.20241203131247-9a9cc5d9c345
github.com/containerd/containerd v1.7.24
github.com/containerd/platforms v0.2.1
github.com/davecgh/go-spew v1.1.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,8 @@ github.com/cncf/xds/go v0.0.0-20240905190251-b4127c9b8d78 h1:QVw89YDxXxEe+l8gU8E
github.com/cncf/xds/go v0.0.0-20240905190251-b4127c9b8d78/go.mod h1:W+zGtBO5Y1IgJhy4+A9GOqVhqLpfZi+vwmdNXUehLA8=
github.com/codahale/rfc6979 v0.0.0-20141003034818-6a90f24967eb h1:EDmT6Q9Zs+SbUoc7Ik9EfrFqcylYqgPZ9ANSbTAntnE=
github.com/codahale/rfc6979 v0.0.0-20141003034818-6a90f24967eb/go.mod h1:ZjrT6AXHbDs86ZSdt/osfBi5qfexBrKUdONk989Wnk4=
github.com/compose-spec/compose-go/v2 v2.4.5 h1:p4ih4Jb6VgGPLPxh3fSFVKAjFHtZd+7HVLCSFzcFx9Y=
github.com/compose-spec/compose-go/v2 v2.4.5/go.mod h1:lFN0DrMxIncJGYAXTfWuajfwj5haBJqrBkarHcnjJKc=
github.com/compose-spec/compose-go/v2 v2.4.6-0.20241203131247-9a9cc5d9c345 h1:oLm7hga9jjaDedg+dqsWiI1GeRrcGLBPxu8W0VfpiKA=
github.com/compose-spec/compose-go/v2 v2.4.6-0.20241203131247-9a9cc5d9c345/go.mod h1:lFN0DrMxIncJGYAXTfWuajfwj5haBJqrBkarHcnjJKc=
github.com/containerd/cgroups v1.1.0 h1:v8rEWFl6EoqHB+swVNjVoCJE8o3jX7e8nqBGPLaDFBM=
github.com/containerd/cgroups/v3 v3.0.3 h1:S5ByHZ/h9PMe5IOQoN7E+nMc2UcLEM/V48DGDJ9kip0=
github.com/containerd/cgroups/v3 v3.0.3/go.mod h1:8HBe7V3aWGLFPd/k03swSIsGjZhHI2WzJmticMgVuz0=
Expand Down
142 changes: 85 additions & 57 deletions pkg/compose/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@ import (
"os"
"path"
"path/filepath"
"sort"
"strconv"
"strings"
"time"

"github.com/compose-spec/compose-go/v2/types"
ccli "github.com/docker/cli/cli/command/container"
pathutil "github.com/docker/compose/v2/internal/paths"
"github.com/docker/compose/v2/internal/sync"
"github.com/docker/compose/v2/pkg/api"
Expand All @@ -48,7 +48,7 @@ const quietPeriod = 500 * time.Millisecond
// fileEvent contains the Compose service and modified host system path.
type fileEvent struct {
sync.PathMapping
Action types.WatchAction
Trigger types.Trigger
}

// getSyncImplementation returns an appropriate sync implementation for the
Expand Down Expand Up @@ -298,7 +298,7 @@ func maybeFileEvent(trigger types.Trigger, hostPath string, ignore watch.PathMat
}

return &fileEvent{
Action: trigger.Action,
Trigger: trigger,
PathMapping: sync.PathMapping{
HostPath: hostPath,
ContainerPath: containerPath,
Expand Down Expand Up @@ -336,7 +336,10 @@ func loadDevelopmentConfig(service types.ServiceConfig, project *types.Project)
}

if trigger.Action == types.WatchActionRebuild && service.Build == nil {
return nil, fmt.Errorf("service %s doesn't have a build section, can't apply 'rebuild' on watch", service.Name)
return nil, fmt.Errorf("service %s doesn't have a build section, can't apply %s on watch", types.WatchActionRebuild, service.Name)
}
if trigger.Action == types.WatchActionSyncExec && len(trigger.Exec.Command) == 0 {
return nil, fmt.Errorf("can't watch with action %q on service %s wihtout a command", types.WatchActionSyncExec, service.Name)
}

config.Watch[i] = trigger
Expand All @@ -352,24 +355,17 @@ func batchDebounceEvents(ctx context.Context, clock clockwork.Clock, delay time.
out := make(chan []fileEvent)
go func() {
defer close(out)
seen := make(map[fileEvent]time.Time)
seen := make(map[string]fileEvent)
flushEvents := func() {
if len(seen) == 0 {
return
}
events := make([]fileEvent, 0, len(seen))
for e := range seen {
for _, e := range seen {
events = append(events, e)
}
// sort batch by oldest -> newest
// (if an event is seen > 1 per batch, it gets the latest timestamp)
sort.SliceStable(events, func(i, j int) bool {
x := events[i]
y := events[j]
return seen[x].Before(seen[y])
})
out <- events
seen = make(map[fileEvent]time.Time)
seen = make(map[string]fileEvent)
}

t := clock.NewTicker(delay)
Expand All @@ -386,7 +382,10 @@ func batchDebounceEvents(ctx context.Context, clock clockwork.Clock, delay time.
flushEvents()
return
}
seen[e] = time.Now()
if _, ok := seen[e.HostPath]; !ok {
// already know updated path, first rule in watch configuration wins
seen[e.HostPath] = e
}
t.Reset(delay)
}
}
Expand Down Expand Up @@ -485,49 +484,10 @@ func (s *composeService) handleWatchBatch(ctx context.Context, project *types.Pr
pathMappings := make([]sync.PathMapping, len(batch))
restartService := false
for i := range batch {
if batch[i].Action == types.WatchActionRebuild {
options.LogTo.Log(api.WatchLogger, fmt.Sprintf("Rebuilding service %q after changes were detected...", serviceName))
// restrict the build to ONLY this service, not any of its dependencies
options.Build.Services = []string{serviceName}
imageNameToIdMap, err := s.build(ctx, project, *options.Build, nil)

if err != nil {
options.LogTo.Log(api.WatchLogger, fmt.Sprintf("Build failed. Error: %v", err))
return err
}

if options.Prune {
s.pruneDanglingImagesOnRebuild(ctx, project.Name, imageNameToIdMap)
}

options.LogTo.Log(api.WatchLogger, fmt.Sprintf("service %q successfully built", serviceName))

err = s.create(ctx, project, api.CreateOptions{
Services: []string{serviceName},
Inherit: true,
Recreate: api.RecreateForce,
})
if err != nil {
options.LogTo.Log(api.WatchLogger, fmt.Sprintf("Failed to recreate service after update. Error: %v", err))
return err
}

services := []string{serviceName}
p, err := project.WithSelectedServices(services)
if err != nil {
return err
}
err = s.start(ctx, project.Name, api.StartOptions{
Project: p,
Services: services,
AttachTo: services,
}, nil)
if err != nil {
options.LogTo.Log(api.WatchLogger, fmt.Sprintf("Application failed to start after update. Error: %v", err))
}
return nil
if batch[i].Trigger.Action == types.WatchActionRebuild {
return s.rebuild(ctx, project, serviceName, options)
}
if batch[i].Action == types.WatchActionSyncRestart {
if batch[i].Trigger.Action == types.WatchActionSyncRestart {
restartService = true
}
pathMappings[i] = batch[i].PathMapping
Expand All @@ -554,7 +514,75 @@ func (s *composeService) handleWatchBatch(ctx context.Context, project *types.Pr
options.LogTo.Log(
api.WatchLogger,
fmt.Sprintf("service %q restarted", serviceName))
}
eg, ctx := errgroup.WithContext(ctx)
for _, b := range batch {
if b.Trigger.Action == types.WatchActionSyncExec {
containers, err := s.getContainers(ctx, project.Name, oneOffExclude, false, serviceName)
if err != nil {
return err
}
x := b.Trigger.Exec
for _, c := range containers {
eg.Go(func() error {
exec := ccli.NewExecOptions()
exec.User = x.User
exec.Privileged = x.Privileged
exec.Command = x.Command
exec.Workdir = x.WorkingDir
for _, v := range x.Environment.ToMapping().Values() {
err := exec.Env.Set(v)
if err != nil {
return err
}
}
return ccli.RunExec(ctx, s.dockerCli, c.ID, exec)
})
}
}
}
return eg.Wait()
}

func (s *composeService) rebuild(ctx context.Context, project *types.Project, serviceName string, options api.WatchOptions) error {
options.LogTo.Log(api.WatchLogger, fmt.Sprintf("Rebuilding service %q after changes were detected...", serviceName))
// restrict the build to ONLY this service, not any of its dependencies
options.Build.Services = []string{serviceName}
imageNameToIdMap, err := s.build(ctx, project, *options.Build, nil)

if err != nil {
options.LogTo.Log(api.WatchLogger, fmt.Sprintf("Build failed. Error: %v", err))
return err
}

if options.Prune {
s.pruneDanglingImagesOnRebuild(ctx, project.Name, imageNameToIdMap)
}

options.LogTo.Log(api.WatchLogger, fmt.Sprintf("service %q successfully built", serviceName))

err = s.create(ctx, project, api.CreateOptions{
Services: []string{serviceName},
Inherit: true,
Recreate: api.RecreateForce,
})
if err != nil {
options.LogTo.Log(api.WatchLogger, fmt.Sprintf("Failed to recreate service after update. Error: %v", err))
return err
}

services := []string{serviceName}
p, err := project.WithSelectedServices(services)
if err != nil {
return err
}
err = s.start(ctx, project.Name, api.StartOptions{
Project: p,
Services: services,
AttachTo: services,
}, nil)
if err != nil {
options.LogTo.Log(api.WatchLogger, fmt.Sprintf("Application failed to start after update. Error: %v", err))
}
return nil
}
Expand Down
23 changes: 17 additions & 6 deletions pkg/compose/watch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"context"
"fmt"
"os"
"slices"
"strings"
"testing"
"time"

Expand All @@ -42,23 +44,32 @@ func TestDebounceBatching(t *testing.T) {
ctx, stop := context.WithCancel(context.Background())
t.Cleanup(stop)

trigger := types.Trigger{
Path: "/",
}
matcher := watch.EmptyMatcher{}
eventBatchCh := batchDebounceEvents(ctx, clock, quietPeriod, ch)
for i := 0; i < 100; i++ {
var action types.WatchAction = "a"
var path = "/a"
if i%2 == 0 {
action = "b"
path = "/b"
}
ch <- fileEvent{Action: action}

event := maybeFileEvent(trigger, path, matcher)
require.NotNil(t, event)
ch <- *event
}
// we sent 100 events + the debouncer
clock.BlockUntil(101)
clock.Advance(quietPeriod)
select {
case batch := <-eventBatchCh:
require.ElementsMatch(t, batch, []fileEvent{
{Action: "a"},
{Action: "b"},
slices.SortFunc(batch, func(a, b fileEvent) int {
return strings.Compare(a.HostPath, b.HostPath)
})
assert.Equal(t, len(batch), 2)
assert.Equal(t, batch[0].HostPath, "/a")
assert.Equal(t, batch[1].HostPath, "/b")
case <-time.After(50 * time.Millisecond):
t.Fatal("timed out waiting for events")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
<html lang="en">
<head>
<meta charset="utf-8">
<title>Docker Nginx</title>
<title>Static file 2</title>
</head>
<body>
<h2>Hello from Nginx container</h2>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
<html lang="en">
<head>
<meta charset="utf-8">
<title>Docker Nginx</title>
<title>Static file 2</title>
</head>
<body>
<h2>Hello from Nginx container</h2>
Expand Down
14 changes: 14 additions & 0 deletions pkg/e2e/fixtures/watch/exec.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
services:
test:
build:
dockerfile_inline: FROM alpine
command: ping localhost
volumes:
- /data
develop:
watch:
- path: .
target: /data
action: sync+exec
exec:
command: echo "SUCCESS"
38 changes: 38 additions & 0 deletions pkg/e2e/watch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package e2e

import (
"bytes"
"crypto/rand"
"fmt"
"os"
Expand Down Expand Up @@ -289,3 +290,40 @@ func doTest(t *testing.T, svcName string) {

testComplete.Store(true)
}

func TestWatchExec(t *testing.T) {
c := NewCLI(t)
const projectName = "test_watch_exec"

defer c.cleanupWithDown(t, projectName)

tmpdir := t.TempDir()
composeFilePath := filepath.Join(tmpdir, "compose.yaml")
CopyFile(t, filepath.Join("fixtures", "watch", "exec.yaml"), composeFilePath)
cmd := c.NewDockerComposeCmd(t, "-p", projectName, "-f", composeFilePath, "up", "--watch")
buffer := bytes.NewBuffer(nil)
cmd.Stdout = buffer
watch := icmd.StartCmd(cmd)

poll.WaitOn(t, func(l poll.LogT) poll.Result {
out := buffer.String()
if strings.Contains(out, "64 bytes from") {
return poll.Success()
}
return poll.Continue("%v", watch.Stdout())
})

t.Logf("Create new file")

testFile := filepath.Join(tmpdir, "test")
require.NoError(t, os.WriteFile(testFile, []byte("test\n"), 0o600))

poll.WaitOn(t, func(l poll.LogT) poll.Result {
out := buffer.String()
if strings.Contains(out, "SUCCESS") {
return poll.Success()
}
return poll.Continue("%v", out)
})
c.RunDockerComposeCmdNoCheck(t, "-p", projectName, "kill", "-s", "9")
}
Loading