Skip to content

Commit 9104e6a

Browse files
committed
Add events proxy interface
Signed-off-by: Derek McGowan <[email protected]>
1 parent 9a2b855 commit 9104e6a

File tree

2 files changed

+224
-2
lines changed

2 files changed

+224
-2
lines changed

client/client.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ import (
2929
containersapi "github.com/containerd/containerd/v2/api/services/containers/v1"
3030
contentapi "github.com/containerd/containerd/v2/api/services/content/v1"
3131
diffapi "github.com/containerd/containerd/v2/api/services/diff/v1"
32-
eventsapi "github.com/containerd/containerd/v2/api/services/events/v1"
3332
imagesapi "github.com/containerd/containerd/v2/api/services/images/v1"
3433
introspectionapi "github.com/containerd/containerd/v2/api/services/introspection/v1"
3534
leasesapi "github.com/containerd/containerd/v2/api/services/leases/v1"
@@ -43,6 +42,7 @@ import (
4342
"github.com/containerd/containerd/v2/core/content"
4443
contentproxy "github.com/containerd/containerd/v2/core/content/proxy"
4544
"github.com/containerd/containerd/v2/core/events"
45+
eventsproxy "github.com/containerd/containerd/v2/core/events/proxy"
4646
"github.com/containerd/containerd/v2/core/images"
4747
"github.com/containerd/containerd/v2/core/leases"
4848
leasesproxy "github.com/containerd/containerd/v2/core/leases/proxy"
@@ -708,7 +708,7 @@ func (c *Client) EventService() EventService {
708708
}
709709
c.connMu.Lock()
710710
defer c.connMu.Unlock()
711-
return NewEventServiceFromClient(eventsapi.NewEventsClient(c.conn))
711+
return eventsproxy.NewRemoteEvents(c.conn)
712712
}
713713

714714
// SandboxStore returns the underlying sandbox store client

core/events/proxy/remote_events.go

Lines changed: 222 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,222 @@
1+
/*
2+
Copyright The containerd Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package proxy
18+
19+
import (
20+
"context"
21+
"fmt"
22+
23+
api "github.com/containerd/containerd/v2/api/services/events/v1"
24+
"github.com/containerd/containerd/v2/api/types"
25+
"github.com/containerd/containerd/v2/core/events"
26+
"github.com/containerd/containerd/v2/protobuf"
27+
"github.com/containerd/errdefs"
28+
"github.com/containerd/ttrpc"
29+
"github.com/containerd/typeurl/v2"
30+
"google.golang.org/grpc"
31+
)
32+
33+
type EventService interface {
34+
events.Publisher
35+
events.Forwarder
36+
events.Subscriber
37+
}
38+
39+
func NewRemoteEvents(client any) EventService {
40+
switch c := client.(type) {
41+
case api.EventsClient:
42+
return &grpcEventsProxy{
43+
client: c,
44+
}
45+
case api.TTRPCEventsClient:
46+
return &ttrpcEventsProxy{
47+
client: c,
48+
}
49+
case grpc.ClientConnInterface:
50+
return &grpcEventsProxy{
51+
client: api.NewEventsClient(c),
52+
}
53+
case *ttrpc.Client:
54+
return &ttrpcEventsProxy{
55+
client: api.NewTTRPCEventsClient(c),
56+
}
57+
default:
58+
panic(fmt.Errorf("unsupported events client %T: %w", client, errdefs.ErrNotImplemented))
59+
}
60+
}
61+
62+
type grpcEventsProxy struct {
63+
client api.EventsClient
64+
}
65+
66+
func (p *grpcEventsProxy) Publish(ctx context.Context, topic string, event events.Event) error {
67+
evt, err := typeurl.MarshalAny(event)
68+
if err != nil {
69+
return err
70+
}
71+
req := &api.PublishRequest{
72+
Topic: topic,
73+
Event: protobuf.FromAny(evt),
74+
}
75+
if _, err := p.client.Publish(ctx, req); err != nil {
76+
return errdefs.FromGRPC(err)
77+
}
78+
return nil
79+
}
80+
81+
func (p *grpcEventsProxy) Forward(ctx context.Context, envelope *events.Envelope) error {
82+
req := &api.ForwardRequest{
83+
Envelope: &types.Envelope{
84+
Timestamp: protobuf.ToTimestamp(envelope.Timestamp),
85+
Namespace: envelope.Namespace,
86+
Topic: envelope.Topic,
87+
Event: protobuf.FromAny(envelope.Event),
88+
},
89+
}
90+
if _, err := p.client.Forward(ctx, req); err != nil {
91+
return errdefs.FromGRPC(err)
92+
}
93+
return nil
94+
}
95+
96+
func (p *grpcEventsProxy) Subscribe(ctx context.Context, filters ...string) (ch <-chan *events.Envelope, errs <-chan error) {
97+
var (
98+
evq = make(chan *events.Envelope)
99+
errq = make(chan error, 1)
100+
)
101+
102+
errs = errq
103+
ch = evq
104+
105+
session, err := p.client.Subscribe(ctx, &api.SubscribeRequest{
106+
Filters: filters,
107+
})
108+
if err != nil {
109+
errq <- err
110+
close(errq)
111+
return
112+
}
113+
114+
go func() {
115+
defer close(errq)
116+
117+
for {
118+
ev, err := session.Recv()
119+
if err != nil {
120+
errq <- err
121+
return
122+
}
123+
124+
select {
125+
case evq <- &events.Envelope{
126+
Timestamp: protobuf.FromTimestamp(ev.Timestamp),
127+
Namespace: ev.Namespace,
128+
Topic: ev.Topic,
129+
Event: ev.Event,
130+
}:
131+
case <-ctx.Done():
132+
if cerr := ctx.Err(); cerr != context.Canceled {
133+
errq <- cerr
134+
}
135+
return
136+
}
137+
}
138+
}()
139+
140+
return ch, errs
141+
}
142+
143+
type ttrpcEventsProxy struct {
144+
client api.TTRPCEventsClient
145+
}
146+
147+
func (p *ttrpcEventsProxy) Publish(ctx context.Context, topic string, event events.Event) error {
148+
evt, err := typeurl.MarshalAny(event)
149+
if err != nil {
150+
return err
151+
}
152+
req := &api.PublishRequest{
153+
Topic: topic,
154+
Event: protobuf.FromAny(evt),
155+
}
156+
if _, err := p.client.Publish(ctx, req); err != nil {
157+
return errdefs.FromGRPC(err)
158+
}
159+
return nil
160+
}
161+
162+
func (p *ttrpcEventsProxy) Forward(ctx context.Context, envelope *events.Envelope) error {
163+
req := &api.ForwardRequest{
164+
Envelope: &types.Envelope{
165+
Timestamp: protobuf.ToTimestamp(envelope.Timestamp),
166+
Namespace: envelope.Namespace,
167+
Topic: envelope.Topic,
168+
Event: protobuf.FromAny(envelope.Event),
169+
},
170+
}
171+
if _, err := p.client.Forward(ctx, req); err != nil {
172+
return errdefs.FromGRPC(err)
173+
}
174+
return nil
175+
}
176+
177+
func (p *ttrpcEventsProxy) Subscribe(ctx context.Context, filters ...string) (ch <-chan *events.Envelope, errs <-chan error) {
178+
var (
179+
evq = make(chan *events.Envelope)
180+
errq = make(chan error, 1)
181+
)
182+
183+
errs = errq
184+
ch = evq
185+
186+
session, err := p.client.Subscribe(ctx, &api.SubscribeRequest{
187+
Filters: filters,
188+
})
189+
if err != nil {
190+
errq <- err
191+
close(errq)
192+
return
193+
}
194+
195+
go func() {
196+
defer close(errq)
197+
198+
for {
199+
ev, err := session.Recv()
200+
if err != nil {
201+
errq <- err
202+
return
203+
}
204+
205+
select {
206+
case evq <- &events.Envelope{
207+
Timestamp: protobuf.FromTimestamp(ev.Timestamp),
208+
Namespace: ev.Namespace,
209+
Topic: ev.Topic,
210+
Event: ev.Event,
211+
}:
212+
case <-ctx.Done():
213+
if cerr := ctx.Err(); cerr != context.Canceled {
214+
errq <- cerr
215+
}
216+
return
217+
}
218+
}
219+
}()
220+
221+
return ch, errs
222+
}

0 commit comments

Comments
 (0)