Skip to content

Commit 2e8e033

Browse files
surikdmcgowan
authored andcommitted
Add multi-subscriber support to GetContainerEvents CRI API
This commit addresses issue containerd#7318 by introducing events broadcasting to the current implementation. The integration/container_event_test.go is extended to demonstrate the broadcasting capabilities of two simultaneous connected clients. Signed-off-by: Yury Gargay <[email protected]>
1 parent e7eb08e commit 2e8e033

File tree

2 files changed

+47
-29
lines changed

2 files changed

+47
-29
lines changed

integration/container_event_test.go

Lines changed: 38 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -36,14 +36,21 @@ func TestContainerEvents(t *testing.T) {
3636
ctx, cancel := context.WithCancel(context.Background())
3737
t.Cleanup(cancel)
3838

39-
t.Log("Set up container events streaming client")
40-
containerEventsStreamingClient, err := runtimeService.GetContainerEvents(ctx, &runtime.GetEventsRequest{})
39+
t.Log("Set up container events streaming clients")
40+
containerEventsStreamingClient1, err := runtimeService.GetContainerEvents(ctx, &runtime.GetEventsRequest{})
4141
assert.NoError(t, err)
42-
containerEventsChan := make(chan *runtime.ContainerEventResponse)
42+
containerEventsStreamingClient2, err := runtimeService.GetContainerEvents(ctx, &runtime.GetEventsRequest{})
43+
assert.NoError(t, err)
44+
containerEventsChan1 := make(chan *runtime.ContainerEventResponse)
45+
containerEventsChan2 := make(chan *runtime.ContainerEventResponse)
4346

44-
go listenToEventChannel(ctx, t, containerEventsChan, containerEventsStreamingClient)
47+
go listenToEventChannel(ctx, t, containerEventsChan1, containerEventsStreamingClient1)
48+
go listenToEventChannel(ctx, t, containerEventsChan2, containerEventsStreamingClient2)
4549
// drain all events emitted by previous tests.
46-
drainContainerEventsChan(containerEventsChan)
50+
drainContainerEventsChan(containerEventsChan1)
51+
drainContainerEventsChan(containerEventsChan2)
52+
53+
containerEventsChannels := []chan *runtime.ContainerEventResponse{containerEventsChan1, containerEventsChan2}
4754

4855
t.Logf("Step 1: RunPodSandbox and check for expected events")
4956
sandboxName := "container_events_sandbox"
@@ -56,19 +63,19 @@ func TestContainerEvents(t *testing.T) {
5663
expectedPodSandboxStatus := &runtime.PodSandboxStatus{State: runtime.PodSandboxState_SANDBOX_NOTREADY}
5764
t.Logf("Step 6: StopPodSandbox and check events")
5865
assert.NoError(t, runtimeService.StopPodSandbox(sb))
59-
checkContainerEventResponse(t, containerEventsChan, runtime.ContainerEventType_CONTAINER_STOPPED_EVENT, expectedPodSandboxStatus, expectedContainerStates)
66+
checkContainerEventResponse(t, containerEventsChannels, runtime.ContainerEventType_CONTAINER_STOPPED_EVENT, expectedPodSandboxStatus, expectedContainerStates)
6067
t.Logf("Step 7: RemovePodSandbox and check events")
6168
assert.NoError(t, runtimeService.RemovePodSandbox(sb))
62-
checkContainerEventResponse(t, containerEventsChan, runtime.ContainerEventType_CONTAINER_DELETED_EVENT, nil, expectedContainerStates)
69+
checkContainerEventResponse(t, containerEventsChannels, runtime.ContainerEventType_CONTAINER_DELETED_EVENT, nil, expectedContainerStates)
6370
})
6471

6572
// PodSandbox ready, container state list empty
6673
expectedPodSandboxStatus := &runtime.PodSandboxStatus{State: runtime.PodSandboxState_SANDBOX_READY}
6774
expectedContainerStates := []runtime.ContainerState{}
6875
// PodSandbox created. Check for start event for podsandbox container. Should be zero containers in the podsandbox
69-
checkContainerEventResponse(t, containerEventsChan, runtime.ContainerEventType_CONTAINER_CREATED_EVENT, expectedPodSandboxStatus, expectedContainerStates)
76+
checkContainerEventResponse(t, containerEventsChannels, runtime.ContainerEventType_CONTAINER_CREATED_EVENT, expectedPodSandboxStatus, expectedContainerStates)
7077
// PodSandbox started. Check for start event for podsandbox container. Should be zero containers in the podsandbox
71-
checkContainerEventResponse(t, containerEventsChan, runtime.ContainerEventType_CONTAINER_STARTED_EVENT, expectedPodSandboxStatus, expectedContainerStates)
78+
checkContainerEventResponse(t, containerEventsChannels, runtime.ContainerEventType_CONTAINER_STARTED_EVENT, expectedPodSandboxStatus, expectedContainerStates)
7279

7380
t.Logf("Step 2: CreateContainer and check events")
7481
pauseImage := images.Get(images.Pause)
@@ -82,26 +89,26 @@ func TestContainerEvents(t *testing.T) {
8289
cn, err := runtimeService.CreateContainer(sb, containerConfig, sbConfig)
8390
require.NoError(t, err)
8491
expectedContainerStates = []runtime.ContainerState{runtime.ContainerState_CONTAINER_CREATED}
85-
checkContainerEventResponse(t, containerEventsChan, runtime.ContainerEventType_CONTAINER_CREATED_EVENT, expectedPodSandboxStatus, expectedContainerStates)
92+
checkContainerEventResponse(t, containerEventsChannels, runtime.ContainerEventType_CONTAINER_CREATED_EVENT, expectedPodSandboxStatus, expectedContainerStates)
8693

8794
t.Cleanup(func() {
8895
t.Logf("Step 5: RemoveContainer and check events")
8996
assert.NoError(t, runtimeService.RemoveContainer(cn))
9097
// No container status after the container is removed
9198
expectedContainerStates := []runtime.ContainerState{}
92-
checkContainerEventResponse(t, containerEventsChan, runtime.ContainerEventType_CONTAINER_DELETED_EVENT, expectedPodSandboxStatus, expectedContainerStates)
99+
checkContainerEventResponse(t, containerEventsChannels, runtime.ContainerEventType_CONTAINER_DELETED_EVENT, expectedPodSandboxStatus, expectedContainerStates)
93100
})
94101

95102
t.Logf("Step 3: StartContainer and check events")
96103
require.NoError(t, runtimeService.StartContainer(cn))
97104
expectedContainerStates = []runtime.ContainerState{runtime.ContainerState_CONTAINER_RUNNING}
98-
checkContainerEventResponse(t, containerEventsChan, runtime.ContainerEventType_CONTAINER_STARTED_EVENT, expectedPodSandboxStatus, expectedContainerStates)
105+
checkContainerEventResponse(t, containerEventsChannels, runtime.ContainerEventType_CONTAINER_STARTED_EVENT, expectedPodSandboxStatus, expectedContainerStates)
99106

100107
t.Cleanup(func() {
101108
t.Logf("Step 4: StopContainer and check events")
102109
assert.NoError(t, runtimeService.StopContainer(cn, 10))
103110
expectedContainerStates := []runtime.ContainerState{runtime.ContainerState_CONTAINER_EXITED}
104-
checkContainerEventResponse(t, containerEventsChan, runtime.ContainerEventType_CONTAINER_STOPPED_EVENT, expectedPodSandboxStatus, expectedContainerStates)
111+
checkContainerEventResponse(t, containerEventsChannels, runtime.ContainerEventType_CONTAINER_STOPPED_EVENT, expectedPodSandboxStatus, expectedContainerStates)
105112
})
106113
}
107114

@@ -133,24 +140,26 @@ func drainContainerEventsChan(containerEventsChan chan *runtime.ContainerEventRe
133140
}
134141
}
135142

136-
func checkContainerEventResponse(t *testing.T, containerEventsChan chan *runtime.ContainerEventResponse, expectedType runtime.ContainerEventType, expectedPodSandboxStatus *runtime.PodSandboxStatus, expectedContainerStates []runtime.ContainerState) {
143+
func checkContainerEventResponse(t *testing.T, containerEventsChans []chan *runtime.ContainerEventResponse, expectedType runtime.ContainerEventType, expectedPodSandboxStatus *runtime.PodSandboxStatus, expectedContainerStates []runtime.ContainerState) {
137144
t.Helper()
138-
var resp *runtime.ContainerEventResponse
139-
select {
140-
case resp = <-containerEventsChan:
141-
case <-time.After(readContainerEventChannelTimeout):
142-
t.Error("assertContainerEventResponse: timeout waiting for events from channel")
143-
}
144-
t.Logf("Container Event response received: %+v", *resp)
145-
assert.Equal(t, expectedType, resp.ContainerEventType)
145+
for _, ch := range containerEventsChans {
146+
var resp *runtime.ContainerEventResponse
147+
select {
148+
case resp = <-ch:
149+
case <-time.After(readContainerEventChannelTimeout):
150+
t.Error("assertContainerEventResponse: timeout waiting for events from channel")
151+
}
152+
t.Logf("Container Event response received: %+v", *resp)
153+
assert.Equal(t, expectedType, resp.ContainerEventType)
146154

147-
// Checking only the State field of PodSandboxStatus
148-
if expectedPodSandboxStatus != nil {
149-
assert.Equal(t, expectedPodSandboxStatus.State, resp.PodSandboxStatus.State)
150-
}
155+
// Checking only the State field of PodSandboxStatus
156+
if expectedPodSandboxStatus != nil {
157+
assert.Equal(t, expectedPodSandboxStatus.State, resp.PodSandboxStatus.State)
158+
}
151159

152-
// Checking only the State field of ContainersStatus
153-
for i, cs := range resp.ContainersStatuses {
154-
assert.Equal(t, expectedContainerStates[i], cs.State)
160+
// Checking only the State field of ContainersStatus
161+
for i, cs := range resp.ContainersStatuses {
162+
assert.Equal(t, expectedContainerStates[i], cs.State)
163+
}
155164
}
156165
}

integration/main_test.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ const (
6363

6464
var (
6565
runtimeService cri.RuntimeService
66+
runtimeService2 cri.RuntimeService // to test GetContainerEvents broadcast
6667
imageService cri.ImageManagerService
6768
containerdClient *containerd.Client
6869
containerdEndpoint string
@@ -87,6 +88,10 @@ func ConnectDaemons() error {
8788
if err != nil {
8889
return fmt.Errorf("failed to create runtime service: %w", err)
8990
}
91+
runtimeService2, err = remote.NewRuntimeService(*criEndpoint, timeout)
92+
if err != nil {
93+
return fmt.Errorf("failed to create runtime service: %w", err)
94+
}
9095
imageService, err = remote.NewImageService(*criEndpoint, timeout)
9196
if err != nil {
9297
return fmt.Errorf("failed to create image service: %w", err)
@@ -98,6 +103,10 @@ func ConnectDaemons() error {
98103
if err != nil {
99104
return fmt.Errorf("failed to list containers: %w", err)
100105
}
106+
_, err = runtimeService2.ListContainers(&runtime.ContainerFilter{})
107+
if err != nil {
108+
return fmt.Errorf("failed to list containers: %w", err)
109+
}
101110
_, err = imageService.ListImages(&runtime.ImageFilter{})
102111
if err != nil {
103112
return fmt.Errorf("failed to list images: %w", err)

0 commit comments

Comments
 (0)