Skip to content

Commit 86530c0

Browse files
committed
Move image event publishing to metadata store
The metadata store is in the best place to handle events directly after the database has been updated. This prevents every user of the image store interface from having to know whether or not they are responsible for publishing events and avoid double events if the grpc local service is used. Signed-off-by: Derek McGowan <[email protected]>
1 parent 2f807b6 commit 86530c0

File tree

6 files changed

+39
-77
lines changed

6 files changed

+39
-77
lines changed

core/metadata/images.go

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"sync/atomic"
2626
"time"
2727

28+
eventstypes "github.com/containerd/containerd/v2/api/events"
2829
"github.com/containerd/containerd/v2/core/images"
2930
"github.com/containerd/containerd/v2/core/metadata/boltutil"
3031
"github.com/containerd/containerd/v2/pkg/epoch"
@@ -164,6 +165,15 @@ func (s *imageStore) Create(ctx context.Context, image images.Image) (images.Ima
164165
return images.Image{}, err
165166
}
166167

168+
if s.db.dbopts.publisher != nil {
169+
if err := s.db.dbopts.publisher.Publish(ctx, "/images/create", &eventstypes.ImageCreate{
170+
Name: image.Name,
171+
Labels: image.Labels,
172+
}); err != nil {
173+
return images.Image{}, err
174+
}
175+
}
176+
167177
return image, nil
168178
}
169179

@@ -256,6 +266,15 @@ func (s *imageStore) Update(ctx context.Context, image images.Image, fieldpaths
256266
return images.Image{}, err
257267
}
258268

269+
if s.db.dbopts.publisher != nil {
270+
if err := s.db.dbopts.publisher.Publish(ctx, "/images/update", &eventstypes.ImageUpdate{
271+
Name: updated.Name,
272+
Labels: updated.Labels,
273+
}); err != nil {
274+
return images.Image{}, err
275+
}
276+
}
277+
259278
return updated, nil
260279

261280
}
@@ -273,7 +292,7 @@ func (s *imageStore) Delete(ctx context.Context, name string, opts ...images.Del
273292
}
274293
}
275294

276-
return update(ctx, s.db, func(tx *bolt.Tx) error {
295+
err = update(ctx, s.db, func(tx *bolt.Tx) error {
277296
bkt := getImagesBucket(tx, namespace)
278297
if bkt == nil {
279298
return fmt.Errorf("image %q: %w", name, errdefs.ErrNotFound)
@@ -310,6 +329,19 @@ func (s *imageStore) Delete(ctx context.Context, name string, opts ...images.Del
310329

311330
return nil
312331
})
332+
if err != nil {
333+
return err
334+
}
335+
336+
if s.db.dbopts.publisher != nil {
337+
if err := s.db.dbopts.publisher.Publish(ctx, "/images/delete", &eventstypes.ImageDelete{
338+
Name: name,
339+
}); err != nil {
340+
return err
341+
}
342+
}
343+
344+
return nil
313345
}
314346

315347
func validateImage(image *images.Image) error {

internal/cri/server/images/image_pull.go

Lines changed: 0 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@ import (
3838
imagespec "github.com/opencontainers/image-spec/specs-go/v1"
3939
runtime "k8s.io/cri-api/pkg/apis/runtime/v1"
4040

41-
eventstypes "github.com/containerd/containerd/v2/api/events"
4241
containerd "github.com/containerd/containerd/v2/client"
4342
"github.com/containerd/containerd/v2/core/diff"
4443
containerdimages "github.com/containerd/containerd/v2/core/images"
@@ -322,14 +321,6 @@ func (c *CRIImageService) createImageReference(ctx context.Context, name string,
322321
// TODO: Call CRIImageService directly
323322
oldImg, err := c.images.Create(ctx, img)
324323
if err == nil {
325-
if c.publisher != nil {
326-
if err := c.publisher.Publish(ctx, "/images/create", &eventstypes.ImageCreate{
327-
Name: img.Name,
328-
Labels: img.Labels,
329-
}); err != nil {
330-
return err
331-
}
332-
}
333324
return nil
334325
} else if !errdefs.IsAlreadyExists(err) {
335326
return err
@@ -338,16 +329,6 @@ func (c *CRIImageService) createImageReference(ctx context.Context, name string,
338329
return nil
339330
}
340331
_, err = c.images.Update(ctx, img, "target", "labels."+crilabels.ImageLabelKey)
341-
if err == nil && c.publisher != nil {
342-
if c.publisher != nil {
343-
if err := c.publisher.Publish(ctx, "/images/update", &eventstypes.ImageUpdate{
344-
Name: img.Name,
345-
Labels: img.Labels,
346-
}); err != nil {
347-
return err
348-
}
349-
}
350-
}
351332
return err
352333
}
353334

internal/cri/server/images/image_remove.go

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ import (
2020
"context"
2121
"fmt"
2222

23-
eventstypes "github.com/containerd/containerd/v2/api/events"
2423
"github.com/containerd/containerd/v2/core/images"
2524
"github.com/containerd/containerd/v2/pkg/tracing"
2625
"github.com/containerd/errdefs"
@@ -63,14 +62,6 @@ func (c *GRPCCRIImageService) RemoveImage(ctx context.Context, r *runtime.Remove
6362
if err := c.imageStore.Update(ctx, ref); err != nil {
6463
return nil, fmt.Errorf("failed to update image reference %q for %q: %w", ref, image.ID, err)
6564
}
66-
67-
if c.publisher != nil {
68-
if err := c.publisher.Publish(ctx, "/images/delete", &eventstypes.ImageDelete{
69-
Name: ref,
70-
}); err != nil {
71-
return nil, err
72-
}
73-
}
7465
continue
7566
}
7667
return nil, fmt.Errorf("failed to delete image reference %q for %q: %w", ref, image.ID, err)

internal/cri/server/images/service.go

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ import (
2828
imagestore "github.com/containerd/containerd/v2/internal/cri/store/image"
2929
snapshotstore "github.com/containerd/containerd/v2/internal/cri/store/snapshot"
3030
"github.com/containerd/containerd/v2/internal/kmutex"
31-
"github.com/containerd/containerd/v2/pkg/events"
3231
"github.com/containerd/log"
3332
"github.com/containerd/platforms"
3433
docker "github.com/distribution/reference"
@@ -54,8 +53,6 @@ type CRIImageService struct {
5453
// images is the lower level image store used for raw storage,
5554
// no event publishing should currently be assumed
5655
images images.Store
57-
// publisher is the events publisher
58-
publisher events.Publisher
5956
// client is a subset of the containerd client
6057
// and will be replaced by image store and transfer service
6158
client imageClient
@@ -88,8 +85,6 @@ type CRIImageServiceOptions struct {
8885

8986
Snapshotters map[string]snapshots.Snapshotter
9087

91-
Publisher events.Publisher
92-
9388
Client imageClient
9489
}
9590

plugins/cri/images/plugin.go

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ import (
2828
criconfig "github.com/containerd/containerd/v2/internal/cri/config"
2929
"github.com/containerd/containerd/v2/internal/cri/constants"
3030
"github.com/containerd/containerd/v2/internal/cri/server/images"
31-
"github.com/containerd/containerd/v2/pkg/events"
3231
"github.com/containerd/containerd/v2/plugins"
3332
"github.com/containerd/containerd/v2/plugins/services/warning"
3433
"github.com/containerd/log"
@@ -46,7 +45,6 @@ func init() {
4645
Config: &config,
4746
Requires: []plugin.Type{
4847
plugins.LeasePlugin,
49-
plugins.EventPlugin,
5048
plugins.MetadataPlugin,
5149
plugins.SandboxStorePlugin,
5250
plugins.ServicePlugin, // For client
@@ -60,11 +58,6 @@ func init() {
6058
}
6159
mdb := m.(*metadata.DB)
6260

63-
ep, err := ic.GetSingle(plugins.EventPlugin)
64-
if err != nil {
65-
return nil, err
66-
}
67-
6861
if warnings, err := criconfig.ValidateImageConfig(ic.Context, &config); err != nil {
6962
return nil, fmt.Errorf("invalid cri image config: %w", err)
7063
} else if len(warnings) > 0 {
@@ -84,7 +77,6 @@ func init() {
8477
RuntimePlatforms: map[string]images.ImagePlatform{},
8578
Snapshotters: map[string]snapshots.Snapshotter{},
8679
ImageFSPaths: map[string]string{},
87-
Publisher: ep.(events.Publisher),
8880
}
8981

9082
options.Client, err = containerd.New(

plugins/services/images/local.go

Lines changed: 6 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,11 @@ import (
2424
"google.golang.org/grpc/codes"
2525
"google.golang.org/grpc/status"
2626

27-
eventstypes "github.com/containerd/containerd/v2/api/events"
2827
imagesapi "github.com/containerd/containerd/v2/api/services/images/v1"
2928
"github.com/containerd/containerd/v2/core/images"
3029
"github.com/containerd/containerd/v2/core/metadata"
3130
"github.com/containerd/containerd/v2/pkg/deprecation"
3231
"github.com/containerd/containerd/v2/pkg/epoch"
33-
"github.com/containerd/containerd/v2/pkg/events"
3432
"github.com/containerd/containerd/v2/pkg/gc"
3533
"github.com/containerd/containerd/v2/plugins"
3634
"github.com/containerd/containerd/v2/plugins/services"
@@ -46,7 +44,6 @@ func init() {
4644
Type: plugins.ServicePlugin,
4745
ID: services.ImagesService,
4846
Requires: []plugin.Type{
49-
plugins.EventPlugin,
5047
plugins.MetadataPlugin,
5148
plugins.GCPlugin,
5249
plugins.WarningPlugin,
@@ -60,20 +57,15 @@ func init() {
6057
if err != nil {
6158
return nil, err
6259
}
63-
ep, err := ic.GetSingle(plugins.EventPlugin)
64-
if err != nil {
65-
return nil, err
66-
}
6760
w, err := ic.GetSingle(plugins.WarningPlugin)
6861
if err != nil {
6962
return nil, err
7063
}
7164

7265
return &local{
73-
store: metadata.NewImageStore(m.(*metadata.DB)),
74-
publisher: ep.(events.Publisher),
75-
gc: g.(gcScheduler),
76-
warnings: w.(warning.Service),
66+
store: metadata.NewImageStore(m.(*metadata.DB)),
67+
gc: g.(gcScheduler),
68+
warnings: w.(warning.Service),
7769
}, nil
7870
},
7971
})
@@ -84,10 +76,9 @@ type gcScheduler interface {
8476
}
8577

8678
type local struct {
87-
store images.Store
88-
gc gcScheduler
89-
publisher events.Publisher
90-
warnings warning.Service
79+
store images.Store
80+
gc gcScheduler
81+
warnings warning.Service
9182
}
9283

9384
var _ imagesapi.ImagesClient = &local{}
@@ -136,13 +127,6 @@ func (l *local) Create(ctx context.Context, req *imagesapi.CreateImageRequest, _
136127

137128
resp.Image = imageToProto(&created)
138129

139-
if err := l.publisher.Publish(ctx, "/images/create", &eventstypes.ImageCreate{
140-
Name: resp.Image.Name,
141-
Labels: resp.Image.Labels,
142-
}); err != nil {
143-
return nil, err
144-
}
145-
146130
l.emitSchema1DeprecationWarning(ctx, &image)
147131
return &resp, nil
148132

@@ -175,13 +159,6 @@ func (l *local) Update(ctx context.Context, req *imagesapi.UpdateImageRequest, _
175159

176160
resp.Image = imageToProto(&updated)
177161

178-
if err := l.publisher.Publish(ctx, "/images/update", &eventstypes.ImageUpdate{
179-
Name: resp.Image.Name,
180-
Labels: resp.Image.Labels,
181-
}); err != nil {
182-
return nil, err
183-
}
184-
185162
l.emitSchema1DeprecationWarning(ctx, &image)
186163
return &resp, nil
187164
}
@@ -200,12 +177,6 @@ func (l *local) Delete(ctx context.Context, req *imagesapi.DeleteImageRequest, _
200177
return nil, errdefs.ToGRPC(err)
201178
}
202179

203-
if err := l.publisher.Publish(ctx, "/images/delete", &eventstypes.ImageDelete{
204-
Name: req.Name,
205-
}); err != nil {
206-
return nil, err
207-
}
208-
209180
if req.Sync {
210181
if _, err := l.gc.ScheduleAndWait(ctx); err != nil {
211182
return nil, err

0 commit comments

Comments
 (0)