Skip to content

Commit ff464f3

Browse files
authored
Merge pull request containerd#9779 from dmcgowan/move-image-event-publishing
Move image event publishing to metadata store
2 parents 805ed8e + 9eb9038 commit ff464f3

File tree

8 files changed

+59
-83
lines changed

8 files changed

+59
-83
lines changed

core/metadata/db.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -270,6 +270,20 @@ func (m *DB) Update(fn func(*bolt.Tx) error) error {
270270
return err
271271
}
272272

273+
// Publisher returns an event publisher if one is configured
274+
// and the current context is not inside a transaction.
275+
func (m *DB) Publisher(ctx context.Context) events.Publisher {
276+
_, ok := ctx.Value(transactionKey{}).(*bolt.Tx)
277+
if ok {
278+
// Do no publish events within a transaction
279+
return nil
280+
}
281+
if m.dbopts.publisher != nil {
282+
return m.dbopts.publisher
283+
}
284+
return nil
285+
}
286+
273287
// RegisterMutationCallback registers a function to be called after a metadata
274288
// mutations has been performed.
275289
//

core/metadata/images.go

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"sync/atomic"
2626
"time"
2727

28+
eventstypes "github.com/containerd/containerd/v2/api/events"
2829
"github.com/containerd/containerd/v2/core/images"
2930
"github.com/containerd/containerd/v2/core/metadata/boltutil"
3031
"github.com/containerd/containerd/v2/pkg/epoch"
@@ -164,6 +165,15 @@ func (s *imageStore) Create(ctx context.Context, image images.Image) (images.Ima
164165
return images.Image{}, err
165166
}
166167

168+
if publisher := s.db.Publisher(ctx); publisher != nil {
169+
if err := publisher.Publish(ctx, "/images/create", &eventstypes.ImageCreate{
170+
Name: image.Name,
171+
Labels: image.Labels,
172+
}); err != nil {
173+
return images.Image{}, err
174+
}
175+
}
176+
167177
return image, nil
168178
}
169179

@@ -256,6 +266,15 @@ func (s *imageStore) Update(ctx context.Context, image images.Image, fieldpaths
256266
return images.Image{}, err
257267
}
258268

269+
if publisher := s.db.Publisher(ctx); publisher != nil {
270+
if err := publisher.Publish(ctx, "/images/update", &eventstypes.ImageUpdate{
271+
Name: updated.Name,
272+
Labels: updated.Labels,
273+
}); err != nil {
274+
return images.Image{}, err
275+
}
276+
}
277+
259278
return updated, nil
260279

261280
}
@@ -273,7 +292,7 @@ func (s *imageStore) Delete(ctx context.Context, name string, opts ...images.Del
273292
}
274293
}
275294

276-
return update(ctx, s.db, func(tx *bolt.Tx) error {
295+
err = update(ctx, s.db, func(tx *bolt.Tx) error {
277296
bkt := getImagesBucket(tx, namespace)
278297
if bkt == nil {
279298
return fmt.Errorf("image %q: %w", name, errdefs.ErrNotFound)
@@ -310,6 +329,19 @@ func (s *imageStore) Delete(ctx context.Context, name string, opts ...images.Del
310329

311330
return nil
312331
})
332+
if err != nil {
333+
return err
334+
}
335+
336+
if publisher := s.db.Publisher(ctx); publisher != nil {
337+
if err := publisher.Publish(ctx, "/images/delete", &eventstypes.ImageDelete{
338+
Name: name,
339+
}); err != nil {
340+
return err
341+
}
342+
}
343+
344+
return nil
313345
}
314346

315347
func validateImage(image *images.Image) error {

core/metadata/snapshot.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -279,8 +279,8 @@ func (s *snapshotter) Prepare(ctx context.Context, key, parent string, opts ...s
279279
return nil, err
280280
}
281281

282-
if s.db.dbopts.publisher != nil {
283-
if err := s.db.dbopts.publisher.Publish(ctx, "/snapshot/prepare", &eventstypes.SnapshotPrepare{
282+
if publisher := s.db.Publisher(ctx); publisher != nil {
283+
if err := publisher.Publish(ctx, "/snapshot/prepare", &eventstypes.SnapshotPrepare{
284284
Key: key,
285285
Parent: parent,
286286
Snapshotter: s.name,
@@ -634,8 +634,8 @@ func (s *snapshotter) Commit(ctx context.Context, name, key string, opts ...snap
634634
return err
635635
}
636636

637-
if s.db.dbopts.publisher != nil {
638-
if err := s.db.dbopts.publisher.Publish(ctx, "/snapshot/commit", &eventstypes.SnapshotCommit{
637+
if publisher := s.db.Publisher(ctx); publisher != nil {
638+
if err := publisher.Publish(ctx, "/snapshot/commit", &eventstypes.SnapshotCommit{
639639
Key: key,
640640
Name: name,
641641
Snapshotter: s.name,
@@ -704,8 +704,8 @@ func (s *snapshotter) Remove(ctx context.Context, key string) error {
704704
return err
705705
}
706706

707-
if s.db.dbopts.publisher != nil {
708-
return s.db.dbopts.publisher.Publish(ctx, "/snapshot/remove", &eventstypes.SnapshotRemove{
707+
if publisher := s.db.Publisher(ctx); publisher != nil {
708+
return publisher.Publish(ctx, "/snapshot/remove", &eventstypes.SnapshotRemove{
709709
Key: key,
710710
Snapshotter: s.name,
711711
})

internal/cri/server/images/image_pull.go

Lines changed: 0 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@ import (
3838
imagespec "github.com/opencontainers/image-spec/specs-go/v1"
3939
runtime "k8s.io/cri-api/pkg/apis/runtime/v1"
4040

41-
eventstypes "github.com/containerd/containerd/v2/api/events"
4241
containerd "github.com/containerd/containerd/v2/client"
4342
"github.com/containerd/containerd/v2/core/diff"
4443
containerdimages "github.com/containerd/containerd/v2/core/images"
@@ -322,14 +321,6 @@ func (c *CRIImageService) createImageReference(ctx context.Context, name string,
322321
// TODO: Call CRIImageService directly
323322
oldImg, err := c.images.Create(ctx, img)
324323
if err == nil {
325-
if c.publisher != nil {
326-
if err := c.publisher.Publish(ctx, "/images/create", &eventstypes.ImageCreate{
327-
Name: img.Name,
328-
Labels: img.Labels,
329-
}); err != nil {
330-
return err
331-
}
332-
}
333324
return nil
334325
} else if !errdefs.IsAlreadyExists(err) {
335326
return err
@@ -338,16 +329,6 @@ func (c *CRIImageService) createImageReference(ctx context.Context, name string,
338329
return nil
339330
}
340331
_, err = c.images.Update(ctx, img, "target", "labels."+crilabels.ImageLabelKey)
341-
if err == nil && c.publisher != nil {
342-
if c.publisher != nil {
343-
if err := c.publisher.Publish(ctx, "/images/update", &eventstypes.ImageUpdate{
344-
Name: img.Name,
345-
Labels: img.Labels,
346-
}); err != nil {
347-
return err
348-
}
349-
}
350-
}
351332
return err
352333
}
353334

internal/cri/server/images/image_remove.go

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ import (
2020
"context"
2121
"fmt"
2222

23-
eventstypes "github.com/containerd/containerd/v2/api/events"
2423
"github.com/containerd/containerd/v2/core/images"
2524
"github.com/containerd/containerd/v2/pkg/tracing"
2625
"github.com/containerd/errdefs"
@@ -63,14 +62,6 @@ func (c *GRPCCRIImageService) RemoveImage(ctx context.Context, r *runtime.Remove
6362
if err := c.imageStore.Update(ctx, ref); err != nil {
6463
return nil, fmt.Errorf("failed to update image reference %q for %q: %w", ref, image.ID, err)
6564
}
66-
67-
if c.publisher != nil {
68-
if err := c.publisher.Publish(ctx, "/images/delete", &eventstypes.ImageDelete{
69-
Name: ref,
70-
}); err != nil {
71-
return nil, err
72-
}
73-
}
7465
continue
7566
}
7667
return nil, fmt.Errorf("failed to delete image reference %q for %q: %w", ref, image.ID, err)

internal/cri/server/images/service.go

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ import (
2828
imagestore "github.com/containerd/containerd/v2/internal/cri/store/image"
2929
snapshotstore "github.com/containerd/containerd/v2/internal/cri/store/snapshot"
3030
"github.com/containerd/containerd/v2/internal/kmutex"
31-
"github.com/containerd/containerd/v2/pkg/events"
3231
"github.com/containerd/log"
3332
"github.com/containerd/platforms"
3433
docker "github.com/distribution/reference"
@@ -54,8 +53,6 @@ type CRIImageService struct {
5453
// images is the lower level image store used for raw storage,
5554
// no event publishing should currently be assumed
5655
images images.Store
57-
// publisher is the events publisher
58-
publisher events.Publisher
5956
// client is a subset of the containerd client
6057
// and will be replaced by image store and transfer service
6158
client imageClient
@@ -88,8 +85,6 @@ type CRIImageServiceOptions struct {
8885

8986
Snapshotters map[string]snapshots.Snapshotter
9087

91-
Publisher events.Publisher
92-
9388
Client imageClient
9489
}
9590

plugins/cri/images/plugin.go

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ import (
2828
criconfig "github.com/containerd/containerd/v2/internal/cri/config"
2929
"github.com/containerd/containerd/v2/internal/cri/constants"
3030
"github.com/containerd/containerd/v2/internal/cri/server/images"
31-
"github.com/containerd/containerd/v2/pkg/events"
3231
"github.com/containerd/containerd/v2/plugins"
3332
"github.com/containerd/containerd/v2/plugins/services/warning"
3433
"github.com/containerd/log"
@@ -46,7 +45,6 @@ func init() {
4645
Config: &config,
4746
Requires: []plugin.Type{
4847
plugins.LeasePlugin,
49-
plugins.EventPlugin,
5048
plugins.MetadataPlugin,
5149
plugins.SandboxStorePlugin,
5250
plugins.ServicePlugin, // For client
@@ -60,11 +58,6 @@ func init() {
6058
}
6159
mdb := m.(*metadata.DB)
6260

63-
ep, err := ic.GetSingle(plugins.EventPlugin)
64-
if err != nil {
65-
return nil, err
66-
}
67-
6861
if warnings, err := criconfig.ValidateImageConfig(ic.Context, &config); err != nil {
6962
return nil, fmt.Errorf("invalid cri image config: %w", err)
7063
} else if len(warnings) > 0 {
@@ -84,7 +77,6 @@ func init() {
8477
RuntimePlatforms: map[string]images.ImagePlatform{},
8578
Snapshotters: map[string]snapshots.Snapshotter{},
8679
ImageFSPaths: map[string]string{},
87-
Publisher: ep.(events.Publisher),
8880
}
8981

9082
options.Client, err = containerd.New(

plugins/services/images/local.go

Lines changed: 6 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,11 @@ import (
2424
"google.golang.org/grpc/codes"
2525
"google.golang.org/grpc/status"
2626

27-
eventstypes "github.com/containerd/containerd/v2/api/events"
2827
imagesapi "github.com/containerd/containerd/v2/api/services/images/v1"
2928
"github.com/containerd/containerd/v2/core/images"
3029
"github.com/containerd/containerd/v2/core/metadata"
3130
"github.com/containerd/containerd/v2/pkg/deprecation"
3231
"github.com/containerd/containerd/v2/pkg/epoch"
33-
"github.com/containerd/containerd/v2/pkg/events"
3432
"github.com/containerd/containerd/v2/pkg/gc"
3533
"github.com/containerd/containerd/v2/plugins"
3634
"github.com/containerd/containerd/v2/plugins/services"
@@ -46,7 +44,6 @@ func init() {
4644
Type: plugins.ServicePlugin,
4745
ID: services.ImagesService,
4846
Requires: []plugin.Type{
49-
plugins.EventPlugin,
5047
plugins.MetadataPlugin,
5148
plugins.GCPlugin,
5249
plugins.WarningPlugin,
@@ -60,20 +57,15 @@ func init() {
6057
if err != nil {
6158
return nil, err
6259
}
63-
ep, err := ic.GetSingle(plugins.EventPlugin)
64-
if err != nil {
65-
return nil, err
66-
}
6760
w, err := ic.GetSingle(plugins.WarningPlugin)
6861
if err != nil {
6962
return nil, err
7063
}
7164

7265
return &local{
73-
store: metadata.NewImageStore(m.(*metadata.DB)),
74-
publisher: ep.(events.Publisher),
75-
gc: g.(gcScheduler),
76-
warnings: w.(warning.Service),
66+
store: metadata.NewImageStore(m.(*metadata.DB)),
67+
gc: g.(gcScheduler),
68+
warnings: w.(warning.Service),
7769
}, nil
7870
},
7971
})
@@ -84,10 +76,9 @@ type gcScheduler interface {
8476
}
8577

8678
type local struct {
87-
store images.Store
88-
gc gcScheduler
89-
publisher events.Publisher
90-
warnings warning.Service
79+
store images.Store
80+
gc gcScheduler
81+
warnings warning.Service
9182
}
9283

9384
var _ imagesapi.ImagesClient = &local{}
@@ -136,13 +127,6 @@ func (l *local) Create(ctx context.Context, req *imagesapi.CreateImageRequest, _
136127

137128
resp.Image = imageToProto(&created)
138129

139-
if err := l.publisher.Publish(ctx, "/images/create", &eventstypes.ImageCreate{
140-
Name: resp.Image.Name,
141-
Labels: resp.Image.Labels,
142-
}); err != nil {
143-
return nil, err
144-
}
145-
146130
l.emitSchema1DeprecationWarning(ctx, &image)
147131
return &resp, nil
148132

@@ -175,13 +159,6 @@ func (l *local) Update(ctx context.Context, req *imagesapi.UpdateImageRequest, _
175159

176160
resp.Image = imageToProto(&updated)
177161

178-
if err := l.publisher.Publish(ctx, "/images/update", &eventstypes.ImageUpdate{
179-
Name: resp.Image.Name,
180-
Labels: resp.Image.Labels,
181-
}); err != nil {
182-
return nil, err
183-
}
184-
185162
l.emitSchema1DeprecationWarning(ctx, &image)
186163
return &resp, nil
187164
}
@@ -200,12 +177,6 @@ func (l *local) Delete(ctx context.Context, req *imagesapi.DeleteImageRequest, _
200177
return nil, errdefs.ToGRPC(err)
201178
}
202179

203-
if err := l.publisher.Publish(ctx, "/images/delete", &eventstypes.ImageDelete{
204-
Name: req.Name,
205-
}); err != nil {
206-
return nil, err
207-
}
208-
209180
if req.Sync {
210181
if _, err := l.gc.ScheduleAndWait(ctx); err != nil {
211182
return nil, err

0 commit comments

Comments
 (0)