Skip to content

Commit db4ed89

Browse files
authored
feat(desktop): synchronized file share integration (docker#11614)
1 parent 1b5fa3b commit db4ed89

File tree

17 files changed

+858
-131
lines changed

17 files changed

+858
-131
lines changed

go.mod

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ require (
3232
github.com/opencontainers/go-digest v1.0.0
3333
github.com/opencontainers/image-spec v1.1.0-rc6
3434
github.com/otiai10/copy v1.14.0
35+
github.com/r3labs/sse v0.0.0-20210224172625-26fe804710bc
3536
github.com/sirupsen/logrus v1.9.3
3637
github.com/spf13/cobra v1.8.0
3738
github.com/spf13/pflag v1.0.5
@@ -169,6 +170,7 @@ require (
169170
google.golang.org/genproto/googleapis/api v0.0.0-20230822172742-b8732ec3820d // indirect
170171
google.golang.org/genproto/googleapis/rpc v0.0.0-20230822172742-b8732ec3820d // indirect
171172
google.golang.org/protobuf v1.31.0 // indirect
173+
gopkg.in/cenkalti/backoff.v1 v1.1.0 // indirect
172174
gopkg.in/inf.v0 v0.9.1 // indirect
173175
gopkg.in/yaml.v2 v2.4.0 // indirect
174176
k8s.io/api v0.26.7 // indirect

go.sum

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -425,6 +425,8 @@ github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsT
425425
github.com/prometheus/procfs v0.0.3/go.mod h1:4A/X28fw3Fc593LaREMrKMqOKvUAntwMDaekg4FpcdQ=
426426
github.com/prometheus/procfs v0.10.1 h1:kYK1Va/YMlutzCGazswoHKo//tZVlFpKYh+PymziUAg=
427427
github.com/prometheus/procfs v0.10.1/go.mod h1:nwNm2aOCAYw8uTR/9bWRREkZFxAUcWzPHWJq+XBB/FM=
428+
github.com/r3labs/sse v0.0.0-20210224172625-26fe804710bc h1:zAsgcP8MhzAbhMnB1QQ2O7ZhWYVGYSR2iVcjzQuPV+o=
429+
github.com/r3labs/sse v0.0.0-20210224172625-26fe804710bc/go.mod h1:S8xSOnV3CgpNrWd0GQ/OoQfMtlg2uPRSuTzcSGrzwK8=
428430
github.com/rivo/uniseg v0.2.0 h1:S1pD9weZBuJdFmowNwbpi7BJ8TNftyUImj/0WQi72jY=
429431
github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
430432
github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ=
@@ -562,6 +564,7 @@ golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn
562564
golang.org/x/net v0.0.0-20190603091049-60506f45cf65/go.mod h1:HSz+uSET+XFnRR8LxR5pz3Of3rY3CfYBVs4xY44aLks=
563565
golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
564566
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
567+
golang.org/x/net v0.0.0-20191116160921-f9c825593386/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
565568
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
566569
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
567570
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
@@ -672,6 +675,8 @@ google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs
672675
google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
673676
gopkg.in/airbrake/gobrake.v2 v2.0.9/go.mod h1:/h5ZAUhDkGaJfjzjKLSjv6zCL6O0LLBxU4K+aSYdM/U=
674677
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
678+
gopkg.in/cenkalti/backoff.v1 v1.1.0 h1:Arh75ttbsvlpVA7WtVpH4u9h6Zl46xuptxqLxPiSo4Y=
679+
gopkg.in/cenkalti/backoff.v1 v1.1.0/go.mod h1:J6Vskwqd+OMVJl8C33mmtxTBs2gyzfv7UDAkHu8BrjI=
675680
gopkg.in/cenkalti/backoff.v2 v2.2.1 h1:eJ9UAg01/HIHG987TwxvnzK2MgxXq97YY6rYDpY9aII=
676681
gopkg.in/cenkalti/backoff.v2 v2.2.1/go.mod h1:S0QdOvT2AlerfSBkp0O+dk+bbIMaNbEmVk876gPCthU=
677682
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=

internal/desktop/client.go

Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,18 @@
1717
package desktop
1818

1919
import (
20+
"bytes"
2021
"context"
2122
"encoding/json"
23+
"errors"
2224
"fmt"
25+
"io"
2326
"net"
2427
"net/http"
2528
"strings"
2629

2730
"github.com/docker/compose/v2/internal/memnet"
31+
"github.com/r3labs/sse"
2832
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
2933
)
3034

@@ -119,6 +123,175 @@ func (c *Client) FeatureFlags(ctx context.Context) (FeatureFlagResponse, error)
119123
return ret, nil
120124
}
121125

126+
type CreateFileShareRequest struct {
127+
HostPath string `json:"hostPath"`
128+
Labels map[string]string `json:"labels,omitempty"`
129+
}
130+
131+
type CreateFileShareResponse struct {
132+
FileShareID string `json:"fileShareID"`
133+
}
134+
135+
func (c *Client) CreateFileShare(ctx context.Context, r CreateFileShareRequest) (*CreateFileShareResponse, error) {
136+
rawBody, _ := json.Marshal(r)
137+
req, err := http.NewRequestWithContext(ctx, http.MethodPost, backendURL("/mutagen/file-shares"), bytes.NewReader(rawBody))
138+
req.Header.Set("Content-Type", "application/json")
139+
if err != nil {
140+
return nil, err
141+
}
142+
resp, err := c.client.Do(req)
143+
if err != nil {
144+
return nil, err
145+
}
146+
defer func() {
147+
_ = resp.Body.Close()
148+
}()
149+
150+
if resp.StatusCode != http.StatusOK {
151+
errBody, _ := io.ReadAll(resp.Body)
152+
return nil, fmt.Errorf("unexpected status code %d: %s", resp.StatusCode, string(errBody))
153+
}
154+
var ret CreateFileShareResponse
155+
if err := json.NewDecoder(resp.Body).Decode(&ret); err != nil {
156+
return nil, err
157+
}
158+
return &ret, nil
159+
}
160+
161+
type FileShareReceiverState struct {
162+
TotalReceivedSize uint64 `json:"totalReceivedSize"`
163+
}
164+
165+
type FileShareEndpoint struct {
166+
Path string `json:"path"`
167+
TotalFileSize uint64 `json:"totalFileSize,omitempty"`
168+
StagingProgress *FileShareReceiverState `json:"stagingProgress"`
169+
}
170+
171+
type FileShareSession struct {
172+
SessionID string `json:"identifier"`
173+
Alpha FileShareEndpoint `json:"alpha"`
174+
Beta FileShareEndpoint `json:"beta"`
175+
Labels map[string]string `json:"labels"`
176+
Status string `json:"status"`
177+
}
178+
179+
func (c *Client) ListFileShares(ctx context.Context) ([]FileShareSession, error) {
180+
req, err := http.NewRequestWithContext(ctx, http.MethodGet, backendURL("/mutagen/file-shares"), http.NoBody)
181+
if err != nil {
182+
return nil, err
183+
}
184+
resp, err := c.client.Do(req)
185+
if err != nil {
186+
return nil, err
187+
}
188+
defer func() {
189+
_ = resp.Body.Close()
190+
}()
191+
192+
if resp.StatusCode != http.StatusOK {
193+
return nil, newHTTPStatusCodeError(resp)
194+
}
195+
196+
var ret []FileShareSession
197+
if err := json.NewDecoder(resp.Body).Decode(&ret); err != nil {
198+
return nil, err
199+
}
200+
return ret, nil
201+
}
202+
203+
func (c *Client) DeleteFileShare(ctx context.Context, id string) error {
204+
req, err := http.NewRequestWithContext(ctx, http.MethodDelete, backendURL("/mutagen/file-shares/"+id), http.NoBody)
205+
if err != nil {
206+
return err
207+
}
208+
resp, err := c.client.Do(req)
209+
if err != nil {
210+
return err
211+
}
212+
defer func() {
213+
_ = resp.Body.Close()
214+
}()
215+
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
216+
return newHTTPStatusCodeError(resp)
217+
}
218+
return nil
219+
}
220+
221+
type EventMessage[T any] struct {
222+
Value T
223+
Error error
224+
}
225+
226+
func newHTTPStatusCodeError(resp *http.Response) error {
227+
r := io.LimitReader(resp.Body, 2048)
228+
body, err := io.ReadAll(r)
229+
if err != nil {
230+
return fmt.Errorf("http status code %d", resp.StatusCode)
231+
}
232+
return fmt.Errorf("http status code %d: %s", resp.StatusCode, string(body))
233+
}
234+
235+
func (c *Client) StreamFileShares(ctx context.Context) (<-chan EventMessage[[]FileShareSession], error) {
236+
req, err := http.NewRequestWithContext(ctx, http.MethodGet, backendURL("/mutagen/file-shares/stream"), http.NoBody)
237+
if err != nil {
238+
return nil, err
239+
}
240+
resp, err := c.client.Do(req)
241+
if err != nil {
242+
return nil, err
243+
}
244+
245+
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
246+
defer func() {
247+
_ = resp.Body.Close()
248+
}()
249+
return nil, newHTTPStatusCodeError(resp)
250+
}
251+
252+
events := make(chan EventMessage[[]FileShareSession])
253+
go func(ctx context.Context) {
254+
defer func() {
255+
_ = resp.Body.Close()
256+
for range events {
257+
// drain the channel
258+
}
259+
close(events)
260+
}()
261+
if err := readEvents(ctx, resp.Body, events); err != nil {
262+
select {
263+
case <-ctx.Done():
264+
case events <- EventMessage[[]FileShareSession]{Error: err}:
265+
}
266+
}
267+
}(ctx)
268+
return events, nil
269+
}
270+
271+
func readEvents[T any](ctx context.Context, r io.Reader, events chan<- EventMessage[T]) error {
272+
eventReader := sse.NewEventStreamReader(r)
273+
for {
274+
msg, err := eventReader.ReadEvent()
275+
if errors.Is(err, io.EOF) {
276+
return nil
277+
} else if err != nil {
278+
return fmt.Errorf("reading events: %w", err)
279+
}
280+
msg = bytes.TrimPrefix(msg, []byte("data: "))
281+
282+
var event T
283+
if err := json.Unmarshal(msg, &event); err != nil {
284+
return err
285+
}
286+
select {
287+
case <-ctx.Done():
288+
return context.Cause(ctx)
289+
case events <- EventMessage[T]{Value: event}:
290+
// event was sent to channel, read next
291+
}
292+
}
293+
}
294+
122295
// backendURL generates a URL for the given API path.
123296
//
124297
// NOTE: Custom transport handles communication. The host is to create a valid

internal/desktop/client_test.go

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
Copyright 2024 Docker Compose CLI authors
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package desktop
18+
19+
import (
20+
"context"
21+
"os"
22+
"testing"
23+
"time"
24+
25+
"github.com/stretchr/testify/require"
26+
)
27+
28+
func TestClientPing(t *testing.T) {
29+
if testing.Short() {
30+
t.Skip("Skipped in short mode - test connects to Docker Desktop")
31+
}
32+
desktopEndpoint := os.Getenv("COMPOSE_TEST_DESKTOP_ENDPOINT")
33+
if desktopEndpoint == "" {
34+
t.Skip("Skipping - COMPOSE_TEST_DESKTOP_ENDPOINT not defined")
35+
}
36+
37+
ctx, cancel := context.WithCancel(context.Background())
38+
t.Cleanup(cancel)
39+
40+
client := NewClient(desktopEndpoint)
41+
t.Cleanup(func() {
42+
_ = client.Close()
43+
})
44+
45+
now := time.Now()
46+
47+
ret, err := client.Ping(ctx)
48+
require.NoError(t, err)
49+
50+
serverTime := time.Unix(0, ret.ServerTime)
51+
require.True(t, now.Before(serverTime))
52+
}

0 commit comments

Comments
 (0)