Skip to content

Commit c1d59e3

Browse files
authored
Merge pull request containerd#9661 from dmcgowan/update-cri-container-events-multisub
Add support for multiple subscribers to CRI container events
2 parents 95d24b6 + 2e8e033 commit c1d59e3

File tree

7 files changed

+381
-45
lines changed

7 files changed

+381
-45
lines changed

integration/container_event_test.go

Lines changed: 38 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -36,14 +36,21 @@ func TestContainerEvents(t *testing.T) {
3636
ctx, cancel := context.WithCancel(context.Background())
3737
t.Cleanup(cancel)
3838

39-
t.Log("Set up container events streaming client")
40-
containerEventsStreamingClient, err := runtimeService.GetContainerEvents(ctx, &runtime.GetEventsRequest{})
39+
t.Log("Set up container events streaming clients")
40+
containerEventsStreamingClient1, err := runtimeService.GetContainerEvents(ctx, &runtime.GetEventsRequest{})
4141
assert.NoError(t, err)
42-
containerEventsChan := make(chan *runtime.ContainerEventResponse)
42+
containerEventsStreamingClient2, err := runtimeService.GetContainerEvents(ctx, &runtime.GetEventsRequest{})
43+
assert.NoError(t, err)
44+
containerEventsChan1 := make(chan *runtime.ContainerEventResponse)
45+
containerEventsChan2 := make(chan *runtime.ContainerEventResponse)
4346

44-
go listenToEventChannel(ctx, t, containerEventsChan, containerEventsStreamingClient)
47+
go listenToEventChannel(ctx, t, containerEventsChan1, containerEventsStreamingClient1)
48+
go listenToEventChannel(ctx, t, containerEventsChan2, containerEventsStreamingClient2)
4549
// drain all events emitted by previous tests.
46-
drainContainerEventsChan(containerEventsChan)
50+
drainContainerEventsChan(containerEventsChan1)
51+
drainContainerEventsChan(containerEventsChan2)
52+
53+
containerEventsChannels := []chan *runtime.ContainerEventResponse{containerEventsChan1, containerEventsChan2}
4754

4855
t.Logf("Step 1: RunPodSandbox and check for expected events")
4956
sandboxName := "container_events_sandbox"
@@ -56,19 +63,19 @@ func TestContainerEvents(t *testing.T) {
5663
expectedPodSandboxStatus := &runtime.PodSandboxStatus{State: runtime.PodSandboxState_SANDBOX_NOTREADY}
5764
t.Logf("Step 6: StopPodSandbox and check events")
5865
assert.NoError(t, runtimeService.StopPodSandbox(sb))
59-
checkContainerEventResponse(t, containerEventsChan, runtime.ContainerEventType_CONTAINER_STOPPED_EVENT, expectedPodSandboxStatus, expectedContainerStates)
66+
checkContainerEventResponse(t, containerEventsChannels, runtime.ContainerEventType_CONTAINER_STOPPED_EVENT, expectedPodSandboxStatus, expectedContainerStates)
6067
t.Logf("Step 7: RemovePodSandbox and check events")
6168
assert.NoError(t, runtimeService.RemovePodSandbox(sb))
62-
checkContainerEventResponse(t, containerEventsChan, runtime.ContainerEventType_CONTAINER_DELETED_EVENT, nil, expectedContainerStates)
69+
checkContainerEventResponse(t, containerEventsChannels, runtime.ContainerEventType_CONTAINER_DELETED_EVENT, nil, expectedContainerStates)
6370
})
6471

6572
// PodSandbox ready, container state list empty
6673
expectedPodSandboxStatus := &runtime.PodSandboxStatus{State: runtime.PodSandboxState_SANDBOX_READY}
6774
expectedContainerStates := []runtime.ContainerState{}
6875
// PodSandbox created. Check for start event for podsandbox container. Should be zero containers in the podsandbox
69-
checkContainerEventResponse(t, containerEventsChan, runtime.ContainerEventType_CONTAINER_CREATED_EVENT, expectedPodSandboxStatus, expectedContainerStates)
76+
checkContainerEventResponse(t, containerEventsChannels, runtime.ContainerEventType_CONTAINER_CREATED_EVENT, expectedPodSandboxStatus, expectedContainerStates)
7077
// PodSandbox started. Check for start event for podsandbox container. Should be zero containers in the podsandbox
71-
checkContainerEventResponse(t, containerEventsChan, runtime.ContainerEventType_CONTAINER_STARTED_EVENT, expectedPodSandboxStatus, expectedContainerStates)
78+
checkContainerEventResponse(t, containerEventsChannels, runtime.ContainerEventType_CONTAINER_STARTED_EVENT, expectedPodSandboxStatus, expectedContainerStates)
7279

7380
t.Logf("Step 2: CreateContainer and check events")
7481
pauseImage := images.Get(images.Pause)
@@ -82,26 +89,26 @@ func TestContainerEvents(t *testing.T) {
8289
cn, err := runtimeService.CreateContainer(sb, containerConfig, sbConfig)
8390
require.NoError(t, err)
8491
expectedContainerStates = []runtime.ContainerState{runtime.ContainerState_CONTAINER_CREATED}
85-
checkContainerEventResponse(t, containerEventsChan, runtime.ContainerEventType_CONTAINER_CREATED_EVENT, expectedPodSandboxStatus, expectedContainerStates)
92+
checkContainerEventResponse(t, containerEventsChannels, runtime.ContainerEventType_CONTAINER_CREATED_EVENT, expectedPodSandboxStatus, expectedContainerStates)
8693

8794
t.Cleanup(func() {
8895
t.Logf("Step 5: RemoveContainer and check events")
8996
assert.NoError(t, runtimeService.RemoveContainer(cn))
9097
// No container status after the container is removed
9198
expectedContainerStates := []runtime.ContainerState{}
92-
checkContainerEventResponse(t, containerEventsChan, runtime.ContainerEventType_CONTAINER_DELETED_EVENT, expectedPodSandboxStatus, expectedContainerStates)
99+
checkContainerEventResponse(t, containerEventsChannels, runtime.ContainerEventType_CONTAINER_DELETED_EVENT, expectedPodSandboxStatus, expectedContainerStates)
93100
})
94101

95102
t.Logf("Step 3: StartContainer and check events")
96103
require.NoError(t, runtimeService.StartContainer(cn))
97104
expectedContainerStates = []runtime.ContainerState{runtime.ContainerState_CONTAINER_RUNNING}
98-
checkContainerEventResponse(t, containerEventsChan, runtime.ContainerEventType_CONTAINER_STARTED_EVENT, expectedPodSandboxStatus, expectedContainerStates)
105+
checkContainerEventResponse(t, containerEventsChannels, runtime.ContainerEventType_CONTAINER_STARTED_EVENT, expectedPodSandboxStatus, expectedContainerStates)
99106

100107
t.Cleanup(func() {
101108
t.Logf("Step 4: StopContainer and check events")
102109
assert.NoError(t, runtimeService.StopContainer(cn, 10))
103110
expectedContainerStates := []runtime.ContainerState{runtime.ContainerState_CONTAINER_EXITED}
104-
checkContainerEventResponse(t, containerEventsChan, runtime.ContainerEventType_CONTAINER_STOPPED_EVENT, expectedPodSandboxStatus, expectedContainerStates)
111+
checkContainerEventResponse(t, containerEventsChannels, runtime.ContainerEventType_CONTAINER_STOPPED_EVENT, expectedPodSandboxStatus, expectedContainerStates)
105112
})
106113
}
107114

@@ -133,24 +140,26 @@ func drainContainerEventsChan(containerEventsChan chan *runtime.ContainerEventRe
133140
}
134141
}
135142

136-
func checkContainerEventResponse(t *testing.T, containerEventsChan chan *runtime.ContainerEventResponse, expectedType runtime.ContainerEventType, expectedPodSandboxStatus *runtime.PodSandboxStatus, expectedContainerStates []runtime.ContainerState) {
143+
func checkContainerEventResponse(t *testing.T, containerEventsChans []chan *runtime.ContainerEventResponse, expectedType runtime.ContainerEventType, expectedPodSandboxStatus *runtime.PodSandboxStatus, expectedContainerStates []runtime.ContainerState) {
137144
t.Helper()
138-
var resp *runtime.ContainerEventResponse
139-
select {
140-
case resp = <-containerEventsChan:
141-
case <-time.After(readContainerEventChannelTimeout):
142-
t.Error("assertContainerEventResponse: timeout waiting for events from channel")
143-
}
144-
t.Logf("Container Event response received: %+v", *resp)
145-
assert.Equal(t, expectedType, resp.ContainerEventType)
145+
for _, ch := range containerEventsChans {
146+
var resp *runtime.ContainerEventResponse
147+
select {
148+
case resp = <-ch:
149+
case <-time.After(readContainerEventChannelTimeout):
150+
t.Error("assertContainerEventResponse: timeout waiting for events from channel")
151+
}
152+
t.Logf("Container Event response received: %+v", *resp)
153+
assert.Equal(t, expectedType, resp.ContainerEventType)
146154

147-
// Checking only the State field of PodSandboxStatus
148-
if expectedPodSandboxStatus != nil {
149-
assert.Equal(t, expectedPodSandboxStatus.State, resp.PodSandboxStatus.State)
150-
}
155+
// Checking only the State field of PodSandboxStatus
156+
if expectedPodSandboxStatus != nil {
157+
assert.Equal(t, expectedPodSandboxStatus.State, resp.PodSandboxStatus.State)
158+
}
151159

152-
// Checking only the State field of ContainersStatus
153-
for i, cs := range resp.ContainersStatuses {
154-
assert.Equal(t, expectedContainerStates[i], cs.State)
160+
// Checking only the State field of ContainersStatus
161+
for i, cs := range resp.ContainersStatuses {
162+
assert.Equal(t, expectedContainerStates[i], cs.State)
163+
}
155164
}
156165
}

integration/main_test.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ const (
6363

6464
var (
6565
runtimeService cri.RuntimeService
66+
runtimeService2 cri.RuntimeService // to test GetContainerEvents broadcast
6667
imageService cri.ImageManagerService
6768
containerdClient *containerd.Client
6869
containerdEndpoint string
@@ -87,6 +88,10 @@ func ConnectDaemons() error {
8788
if err != nil {
8889
return fmt.Errorf("failed to create runtime service: %w", err)
8990
}
91+
runtimeService2, err = remote.NewRuntimeService(*criEndpoint, timeout)
92+
if err != nil {
93+
return fmt.Errorf("failed to create runtime service: %w", err)
94+
}
9095
imageService, err = remote.NewImageService(*criEndpoint, timeout)
9196
if err != nil {
9297
return fmt.Errorf("failed to create image service: %w", err)
@@ -98,6 +103,10 @@ func ConnectDaemons() error {
98103
if err != nil {
99104
return fmt.Errorf("failed to list containers: %w", err)
100105
}
106+
_, err = runtimeService2.ListContainers(&runtime.ContainerFilter{})
107+
if err != nil {
108+
return fmt.Errorf("failed to list containers: %w", err)
109+
}
101110
_, err = imageService.ListImages(&runtime.ImageFilter{})
102111
if err != nil {
103112
return fmt.Errorf("failed to list images: %w", err)

internal/eventq/eventq.go

Lines changed: 163 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,163 @@
1+
/*
2+
Copyright The containerd Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package eventq
18+
19+
import (
20+
"io"
21+
"time"
22+
)
23+
24+
type EventQueue[T any] struct {
25+
events chan<- T
26+
subscriberC chan<- eventSubscription[T]
27+
shutdownC chan struct{}
28+
}
29+
30+
type eventSubscription[T any] struct {
31+
c chan<- T
32+
closeC chan struct{}
33+
}
34+
35+
func (sub eventSubscription[T]) publish(event T) bool {
36+
select {
37+
case <-sub.closeC:
38+
return false
39+
case sub.c <- event:
40+
return true
41+
}
42+
}
43+
44+
func (sub eventSubscription[T]) Close() error {
45+
select {
46+
case <-sub.closeC:
47+
default:
48+
close(sub.closeC)
49+
}
50+
return nil
51+
}
52+
53+
// New provides a queue for sending messages to one or more
54+
// subscribers. Messages are held for the given discardAfter duration
55+
// if there are no subscribers.
56+
func New[T any](discardAfter time.Duration, discardFn func(T)) EventQueue[T] {
57+
events := make(chan T)
58+
subscriberC := make(chan eventSubscription[T])
59+
shutdownC := make(chan struct{})
60+
61+
go func() {
62+
type queuedEvent struct {
63+
event T
64+
discardAt time.Time
65+
}
66+
67+
var discardQueue []queuedEvent
68+
var discardTime <-chan time.Time
69+
var subscribers []eventSubscription[T]
70+
for {
71+
select {
72+
case <-shutdownC:
73+
for _, event := range discardQueue {
74+
discardFn(event.event)
75+
}
76+
for _, sub := range subscribers {
77+
close(sub.c)
78+
}
79+
return
80+
case event := <-events:
81+
if len(subscribers) > 0 {
82+
active := subscribers[:0]
83+
for _, sub := range subscribers {
84+
if sub.publish(event) {
85+
active = append(active, sub)
86+
}
87+
}
88+
subscribers = active
89+
}
90+
if len(subscribers) == 0 {
91+
discardQueue = append(discardQueue, queuedEvent{
92+
event: event,
93+
discardAt: time.Now().Add(discardAfter),
94+
})
95+
if discardTime == nil {
96+
discardTime = time.After(time.Until(discardQueue[0].discardAt).Abs())
97+
}
98+
}
99+
case s := <-subscriberC:
100+
var closed bool
101+
for i, event := range discardQueue {
102+
if !s.publish(event.event) {
103+
discardQueue = discardQueue[i:]
104+
closed = true
105+
break
106+
}
107+
}
108+
if !closed {
109+
discardQueue = nil
110+
discardTime = nil
111+
subscribers = append(subscribers, s)
112+
}
113+
case t := <-discardTime:
114+
toDiscard := discardQueue
115+
discardQueue = nil
116+
for i, event := range toDiscard {
117+
if t.After(event.discardAt) {
118+
discardFn(event.event)
119+
} else {
120+
discardQueue = toDiscard[i:]
121+
break
122+
}
123+
}
124+
if len(discardQueue) == 0 {
125+
discardTime = nil
126+
} else {
127+
// Wait until next item in discard queue plus a small buffer to collect a burst of events
128+
discardTime = time.After(time.Until(discardQueue[0].discardAt).Abs() + 10*time.Millisecond)
129+
}
130+
}
131+
132+
}
133+
}()
134+
135+
return EventQueue[T]{
136+
events: events,
137+
subscriberC: subscriberC,
138+
shutdownC: shutdownC,
139+
}
140+
}
141+
142+
func (eq *EventQueue[T]) Shutdown() {
143+
defer close(eq.shutdownC)
144+
eq.shutdownC <- struct{}{}
145+
}
146+
147+
func (eq *EventQueue[T]) Send(event T) {
148+
select {
149+
case <-eq.shutdownC:
150+
case eq.events <- event:
151+
}
152+
}
153+
154+
func (eq *EventQueue[T]) Subscribe() (<-chan T, io.Closer) {
155+
c := make(chan T, 100)
156+
subscription := eventSubscription[T]{
157+
c: c,
158+
closeC: make(chan struct{}),
159+
}
160+
eq.subscriberC <- subscription
161+
162+
return c, subscription
163+
}

0 commit comments

Comments
 (0)