Skip to content

Commit a7ac1f4

Browse files
committed
fix: improve observability
1 parent 656c306 commit a7ac1f4

File tree

12 files changed

+642
-260
lines changed

12 files changed

+642
-260
lines changed

cmd/docker-sync/cmd/sync.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ type syncImage struct {
2323
Tags []string `yaml:"tags"`
2424
MutableTags []string `yaml:"mutableTags"`
2525
IgnoredTags []string `yaml:"ignoredTags"`
26+
Purge bool `yaml:"purge"`
2627
}
2728

2829
type syncAuth struct {
@@ -74,13 +75,15 @@ var syncCmd = &cobra.Command{
7475
tags, _ := cmd.Flags().GetStringSlice("tags")
7576
mutableTags, _ := cmd.Flags().GetStringSlice("mutable-tags")
7677
ignoredTags, _ := cmd.Flags().GetStringSlice("ignored-tags")
78+
purge, _ := cmd.Flags().GetBool("purge")
7779

7880
cnf.Sync.Images = append(cnf.Sync.Images, syncImage{
7981
Source: source,
8082
Targets: []string{target},
8183
Tags: tags,
8284
MutableTags: mutableTags,
8385
IgnoredTags: ignoredTags,
86+
Purge: purge,
8487
})
8588

8689
var registries []syncRegistry
@@ -195,5 +198,7 @@ func init() {
195198
syncCmd.Flags().StringP("target-token", "", os.Getenv("DOCKER_SYNC_TARGET_TOKEN"), "target registry token")
196199
syncCmd.Flags().StringP("target-username", os.Getenv("DOCKER_SYNC_TARGET_USERNAME"), "", "target registry username")
197200

201+
syncCmd.Flags().BoolP("purge", "p", false, "Purge tags not in the source registry")
202+
198203
// For more registries and advanced options, please use a configuration file
199204
}

config/s3.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,11 @@ var (
88
WithDefaultValue(10),
99
WithValidInt())
1010

11+
// SyncS3MaxPurgeConcurrency is the maximum number of concurrent purges to S3.
12+
SyncS3MaxPurgeConcurrency = NewKey("sync.s3.maxPurgeConcurrency",
13+
WithDefaultValue(100),
14+
WithValidInt())
15+
1116
// SyncS3ObjectCacheEnabled enables S3 object-seem cache.
1217
SyncS3ObjectCacheEnabled = NewKey("sync.s3.objectCache.enabled",
1318
WithDefaultValue(true),

entrypoint.sh

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
#!/bin/sh
22

3-
# check if /config.yaml exists
4-
if [ ! -f /config.yaml ]; then
3+
# check if /config.yaml exists and first argument isn't 'sync'
4+
if [ ! -f /config.yaml ] && [ "$1" != "sync" ]; then
55
/docker-sync mergeYaml -o /config.yaml -f /config_map.yaml -f /secret.yaml
66
if [ $? -ne 0 ]; then
77
echo "Error merging YAML files. Exiting."

internal/sync/images.go

Lines changed: 19 additions & 251 deletions
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,11 @@ import (
55
"fmt"
66
"slices"
77
"strings"
8-
"time"
98

10-
"github.com/Altinity/docker-sync/config"
119
"github.com/Altinity/docker-sync/internal/telemetry"
1210
"github.com/Altinity/docker-sync/structs"
1311
"github.com/cenkalti/backoff/v4"
14-
"github.com/containers/image/v5/copy"
1512
"github.com/containers/image/v5/docker"
16-
"github.com/containers/image/v5/signature"
1713
"github.com/containers/image/v5/types"
1814
"github.com/rs/zerolog/log"
1915
"go.opentelemetry.io/otel/attribute"
@@ -34,87 +30,6 @@ func checkRateLimit(err error) error {
3430
return backoff.Permanent(err)
3531
}
3632

37-
func push(ctx context.Context, image *structs.Image, dst string, tag string) error {
38-
return backoff.RetryNotify(func() error {
39-
switch getRepositoryType(dst) {
40-
case S3CompatibleRepository:
41-
fields := strings.Split(dst, ":")
42-
43-
var fn func(context.Context, *structs.Image, string, string, string) error
44-
45-
switch fields[0] {
46-
case "r2":
47-
fn = pushR2
48-
case "s3":
49-
fn = pushS3
50-
default:
51-
return fmt.Errorf("unsupported bucket destination: %s", dst)
52-
}
53-
54-
if err := fn(ctx, image, dst, fields[3], tag); err != nil {
55-
return err
56-
}
57-
58-
return nil
59-
case OCIRepository:
60-
srcAuth, _ := getSkopeoAuth(ctx, image.GetSourceRegistry(), image.GetSourceRepository())
61-
dstAuth, _ := getSkopeoAuth(ctx, image.GetRegistry(dst), image.GetRepository(dst))
62-
63-
dstRef, err := docker.ParseReference(fmt.Sprintf("//%s:%s", dst, tag))
64-
if err != nil {
65-
return err
66-
}
67-
68-
srcRef, err := docker.ParseReference(fmt.Sprintf("//%s:%s", image.Source, tag))
69-
if err != nil {
70-
return err
71-
}
72-
73-
srcCtx := &types.SystemContext{
74-
DockerAuthConfig: srcAuth,
75-
}
76-
dstCtx := &types.SystemContext{
77-
DockerAuthConfig: dstAuth,
78-
}
79-
80-
policy := &signature.Policy{Default: []signature.PolicyRequirement{signature.NewPRInsecureAcceptAnything()}}
81-
policyContext, err := signature.NewPolicyContext(policy)
82-
if err != nil {
83-
return err
84-
}
85-
86-
ch := make(chan types.ProgressProperties)
87-
defer close(ch)
88-
89-
chCtx, cancel := context.WithCancel(ctx)
90-
defer cancel()
91-
go dockerDataCounter(chCtx, image.Source, dst, ch)
92-
93-
_, err = copy.Image(ctx, policyContext, dstRef, srcRef, &copy.Options{
94-
SourceCtx: srcCtx,
95-
DestinationCtx: dstCtx,
96-
ImageListSelection: copy.CopyAllImages,
97-
ProgressInterval: time.Second,
98-
Progress: ch,
99-
})
100-
101-
return checkRateLimit(err)
102-
default:
103-
return fmt.Errorf("unsupported repository type")
104-
}
105-
}, backoff.WithMaxRetries(backoff.NewExponentialBackOff(
106-
backoff.WithInitialInterval(1*time.Minute),
107-
), config.SyncMaxErrors.UInt64()), func(err error, dur time.Duration) {
108-
log.Error().
109-
Err(err).
110-
Dur("backoff", dur).
111-
Str("image", image.Source).
112-
Str("tag", tag).
113-
Str("target", dst).
114-
Msg("Push failed")
115-
})
116-
}
117-
11833
func SyncImage(ctx context.Context, image *structs.Image) error {
11934
log.Info().
12035
Str("image", image.Source).
@@ -132,36 +47,11 @@ func SyncImage(ctx context.Context, image *structs.Image) error {
13247
DockerAuthConfig: srcAuth,
13348
}
13449

135-
var srcTags []string
136-
137-
if len(image.Tags) > 0 {
138-
for _, tag := range image.Tags {
139-
if tag == "@semver" {
140-
allTags, err := docker.GetRepositoryTags(ctx, srcCtx, srcRef)
141-
if err != nil {
142-
return err
143-
}
144-
145-
for _, t := range allTags {
146-
if isSemVerTag(t) {
147-
srcTags = append(srcTags, t)
148-
}
149-
}
150-
} else {
151-
srcTags = append(srcTags, tag)
152-
}
153-
}
154-
} else {
155-
srcTags, err = docker.GetRepositoryTags(ctx, srcCtx, srcRef)
156-
if err != nil {
157-
return err
158-
}
50+
srcTags, err := getSourceTags(ctx, image, srcCtx, srcRef)
51+
if err != nil {
52+
return err
15953
}
16054

161-
// Remove duplicate tags
162-
slices.Sort(srcTags)
163-
srcTags = slices.Compact(srcTags)
164-
16555
if len(srcTags) == 0 {
16656
log.Warn().
16757
Str("image", image.Source).
@@ -171,66 +61,25 @@ func SyncImage(ctx context.Context, image *structs.Image) error {
17161
return nil
17262
}
17363

64+
telemetry.MonitoredTags.Record(ctx, int64(len(srcTags)),
65+
metric.WithAttributes(
66+
attribute.KeyValue{
67+
Key: "image",
68+
Value: attribute.StringValue(image.Source),
69+
},
70+
),
71+
)
72+
17473
log.Info().
17574
Str("image", image.Source).
17675
Str("auth", srcAuthName).
17776
Int("tags", len(srcTags)).
17877
Msg("Found source tags")
17978

18079
// Get all tags from targets
181-
var dstTags []string
182-
183-
for _, dst := range image.Targets {
184-
switch getRepositoryType(dst) {
185-
case S3CompatibleRepository:
186-
fields := strings.Split(dst, ":")
187-
188-
tags, err := listS3Tags(ctx, dst, fields)
189-
if err != nil {
190-
return err
191-
}
192-
193-
if len(tags) > 0 {
194-
log.Info().
195-
Str("image", image.Source).
196-
Str("target", dst).
197-
Int("tags", len(tags)).
198-
Msg("Found destination tags")
199-
200-
dstTags = append(dstTags, tags...)
201-
}
202-
203-
continue
204-
case OCIRepository:
205-
dstRef, err := docker.ParseReference(fmt.Sprintf("//%s", dst))
206-
if err != nil {
207-
return err
208-
}
209-
210-
dstAuth, dstAuthName := getSkopeoAuth(ctx, image.GetRegistry(dst), image.GetRepository(dst))
211-
212-
dstCtx := &types.SystemContext{
213-
DockerAuthConfig: dstAuth,
214-
}
215-
216-
tags, err := docker.GetRepositoryTags(ctx, dstCtx, dstRef)
217-
if err != nil {
218-
return err
219-
}
220-
221-
if len(tags) > 0 {
222-
log.Info().
223-
Str("image", image.Source).
224-
Str("target", dst).
225-
Int("tags", len(tags)).
226-
Str("auth", dstAuthName).
227-
Msg("Found destination tags")
228-
229-
for _, tag := range tags {
230-
dstTags = append(dstTags, fmt.Sprintf("%s:%s", dst, tag))
231-
}
232-
}
233-
}
80+
dstTags, err := getDstTags(ctx, image)
81+
if err != nil {
82+
return err
23483
}
23584

23685
// Sync tags
@@ -243,92 +92,11 @@ func SyncImage(ctx context.Context, image *structs.Image) error {
24392
continue
24493
}
24594

246-
telemetry.Errors.Add(ctx, 0,
247-
metric.WithAttributes(
248-
attribute.KeyValue{
249-
Key: "image",
250-
Value: attribute.StringValue(image.Source),
251-
},
252-
attribute.KeyValue{
253-
Key: "tag",
254-
Value: attribute.StringValue(tag),
255-
},
256-
),
257-
)
258-
259-
if err := func() error {
260-
tag := tag
261-
262-
var actualDsts []string
263-
264-
for _, dst := range image.Targets {
265-
if !slices.Contains(image.MutableTags, tag) && slices.Contains(dstTags, fmt.Sprintf("%s:%s", dst, tag)) {
266-
continue
267-
}
268-
269-
actualDsts = append(actualDsts, dst)
270-
}
271-
272-
if len(actualDsts) == 0 {
273-
log.Debug().
274-
Str("image", image.Source).
275-
Str("tag", tag).
276-
Msg("Tag already exists in all targets, skipping")
277-
278-
return nil
279-
}
280-
281-
log.Info().
282-
Str("image", image.Source).
283-
Str("tag", tag).
284-
Strs("targets", image.Targets).
285-
Msg("Syncing tag")
286-
287-
for _, dst := range actualDsts {
288-
if err := push(ctx, image, dst, tag); err != nil {
289-
return err
290-
}
291-
}
292-
293-
return nil
294-
}(); err != nil {
295-
log.Error().
296-
Err(err).
297-
Str("image", image.Source).
298-
Str("tag", tag).
299-
Msg("Failed to sync tag")
300-
301-
telemetry.Errors.Add(ctx, 1,
302-
metric.WithAttributes(
303-
attribute.KeyValue{
304-
Key: "image",
305-
Value: attribute.StringValue(image.Source),
306-
},
307-
attribute.KeyValue{
308-
Key: "tag",
309-
Value: attribute.StringValue(tag),
310-
},
311-
attribute.KeyValue{
312-
Key: "error",
313-
Value: attribute.StringValue(err.Error()),
314-
},
315-
),
316-
)
317-
} else {
318-
telemetry.Syncs.Add(ctx, 1,
319-
metric.WithAttributes(
320-
attribute.KeyValue{
321-
Key: "image",
322-
Value: attribute.StringValue(image.Source),
323-
},
324-
attribute.KeyValue{
325-
Key: "tag",
326-
Value: attribute.StringValue(tag),
327-
},
328-
),
329-
)
330-
}
95+
syncTag(ctx, image, tag, dstTags)
33196
}
33297

98+
// Purge
99+
purge(ctx, image, srcTags, dstTags)
100+
333101
return nil
334102
}

0 commit comments

Comments
 (0)