Skip to content

Commit ec04e4f

Browse files
committed
Add streaming proxy
Signed-off-by: Derek McGowan <[email protected]> Signed-off-by: Derek McGowan <[email protected]>
1 parent 193af78 commit ec04e4f

File tree

2 files changed

+130
-70
lines changed

2 files changed

+130
-70
lines changed

client/transfer.go

Lines changed: 2 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -18,17 +18,12 @@ package client
1818

1919
import (
2020
"context"
21-
"errors"
22-
"io"
2321

24-
streamingapi "github.com/containerd/containerd/api/services/streaming/v1"
2522
transferapi "github.com/containerd/containerd/api/services/transfer/v1"
2623
"github.com/containerd/containerd/v2/core/streaming"
24+
streamproxy "github.com/containerd/containerd/v2/core/streaming/proxy"
2725
"github.com/containerd/containerd/v2/core/transfer"
2826
"github.com/containerd/containerd/v2/core/transfer/proxy"
29-
"github.com/containerd/containerd/v2/pkg/protobuf"
30-
"github.com/containerd/errdefs"
31-
"github.com/containerd/typeurl/v2"
3227
)
3328

3429
func (c *Client) Transfer(ctx context.Context, src interface{}, dest interface{}, opts ...transfer.Opt) error {
@@ -42,68 +37,5 @@ func (c *Client) Transfer(ctx context.Context, src interface{}, dest interface{}
4237
}
4338

4439
func (c *Client) streamCreator() streaming.StreamCreator {
45-
return &streamCreator{
46-
client: streamingapi.NewStreamingClient(c.conn),
47-
}
48-
}
49-
50-
type streamCreator struct {
51-
client streamingapi.StreamingClient
52-
}
53-
54-
func (sc *streamCreator) Create(ctx context.Context, id string) (streaming.Stream, error) {
55-
stream, err := sc.client.Stream(ctx)
56-
if err != nil {
57-
return nil, err
58-
}
59-
60-
a, err := typeurl.MarshalAny(&streamingapi.StreamInit{
61-
ID: id,
62-
})
63-
if err != nil {
64-
return nil, err
65-
}
66-
err = stream.Send(protobuf.FromAny(a))
67-
if err != nil {
68-
if !errors.Is(err, io.EOF) {
69-
err = errdefs.FromGRPC(err)
70-
}
71-
return nil, err
72-
}
73-
74-
// Receive an ack that stream is init and ready
75-
if _, err = stream.Recv(); err != nil {
76-
if !errors.Is(err, io.EOF) {
77-
err = errdefs.FromGRPC(err)
78-
}
79-
return nil, err
80-
}
81-
82-
return &clientStream{
83-
s: stream,
84-
}, nil
85-
}
86-
87-
type clientStream struct {
88-
s streamingapi.Streaming_StreamClient
89-
}
90-
91-
func (cs *clientStream) Send(a typeurl.Any) (err error) {
92-
err = cs.s.Send(protobuf.FromAny(a))
93-
if !errors.Is(err, io.EOF) {
94-
err = errdefs.FromGRPC(err)
95-
}
96-
return
97-
}
98-
99-
func (cs *clientStream) Recv() (a typeurl.Any, err error) {
100-
a, err = cs.s.Recv()
101-
if !errors.Is(err, io.EOF) {
102-
err = errdefs.FromGRPC(err)
103-
}
104-
return
105-
}
106-
107-
func (cs *clientStream) Close() error {
108-
return cs.s.CloseSend()
40+
return streamproxy.NewStreamCreator(c.conn)
10941
}

core/streaming/proxy/streaming.go

Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
/*
2+
Copyright The containerd Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package proxy
18+
19+
import (
20+
"context"
21+
"errors"
22+
"fmt"
23+
"io"
24+
25+
streamingapi "github.com/containerd/containerd/api/services/streaming/v1"
26+
"github.com/containerd/containerd/v2/core/streaming"
27+
"github.com/containerd/containerd/v2/pkg/protobuf"
28+
"github.com/containerd/errdefs"
29+
"github.com/containerd/ttrpc"
30+
"github.com/containerd/typeurl/v2"
31+
"google.golang.org/grpc"
32+
)
33+
34+
// NewStreamCreator returns a new stream creator which can communicate over a GRPC
35+
// or TTRPC connection using the containerd streaming API.
36+
func NewStreamCreator(client any) streaming.StreamCreator {
37+
switch c := client.(type) {
38+
case streamingapi.StreamingClient:
39+
return &streamCreator{
40+
client: convertClient{c},
41+
}
42+
case grpc.ClientConnInterface:
43+
return &streamCreator{
44+
client: convertClient{streamingapi.NewStreamingClient(c)},
45+
}
46+
case streamingapi.TTRPCStreamingClient:
47+
return &streamCreator{
48+
client: c,
49+
}
50+
case *ttrpc.Client:
51+
return &streamCreator{
52+
client: streamingapi.NewTTRPCStreamingClient(c),
53+
}
54+
case streaming.StreamCreator:
55+
return c
56+
default:
57+
panic(fmt.Errorf("unsupported stream client %T: %w", client, errdefs.ErrNotImplemented))
58+
}
59+
}
60+
61+
type convertClient struct {
62+
streamingapi.StreamingClient
63+
}
64+
65+
func (c convertClient) Stream(ctx context.Context) (streamingapi.TTRPCStreaming_StreamClient, error) {
66+
return c.StreamingClient.Stream(ctx)
67+
}
68+
69+
type streamCreator struct {
70+
client streamingapi.TTRPCStreamingClient
71+
}
72+
73+
func (sc *streamCreator) Create(ctx context.Context, id string) (streaming.Stream, error) {
74+
stream, err := sc.client.Stream(ctx)
75+
if err != nil {
76+
return nil, err
77+
}
78+
79+
a, err := typeurl.MarshalAny(&streamingapi.StreamInit{
80+
ID: id,
81+
})
82+
if err != nil {
83+
return nil, err
84+
}
85+
err = stream.Send(protobuf.FromAny(a))
86+
if err != nil {
87+
if !errors.Is(err, io.EOF) {
88+
err = errdefs.FromGRPC(err)
89+
}
90+
return nil, err
91+
}
92+
93+
// Receive an ack that stream is init and ready
94+
if _, err = stream.Recv(); err != nil {
95+
if !errors.Is(err, io.EOF) {
96+
err = errdefs.FromGRPC(err)
97+
}
98+
return nil, err
99+
}
100+
101+
return &clientStream{
102+
s: stream,
103+
}, nil
104+
}
105+
106+
type clientStream struct {
107+
s streamingapi.TTRPCStreaming_StreamClient
108+
}
109+
110+
func (cs *clientStream) Send(a typeurl.Any) (err error) {
111+
err = cs.s.Send(protobuf.FromAny(a))
112+
if !errors.Is(err, io.EOF) {
113+
err = errdefs.FromGRPC(err)
114+
}
115+
return
116+
}
117+
118+
func (cs *clientStream) Recv() (a typeurl.Any, err error) {
119+
a, err = cs.s.Recv()
120+
if !errors.Is(err, io.EOF) {
121+
err = errdefs.FromGRPC(err)
122+
}
123+
return
124+
}
125+
126+
func (cs *clientStream) Close() error {
127+
return cs.s.CloseSend()
128+
}

0 commit comments

Comments
 (0)