Skip to content

Commit d65976b

Browse files
authored
Merge pull request containerd#10163 from dmcgowan/transfer-ttrpc-support
Add support for ttrpc in transfer and streaming service
2 parents ef12da2 + 05a3171 commit d65976b

File tree

3 files changed

+173
-79
lines changed

3 files changed

+173
-79
lines changed

client/transfer.go

Lines changed: 3 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -18,17 +18,11 @@ package client
1818

1919
import (
2020
"context"
21-
"errors"
22-
"io"
2321

24-
streamingapi "github.com/containerd/containerd/api/services/streaming/v1"
25-
transferapi "github.com/containerd/containerd/api/services/transfer/v1"
2622
"github.com/containerd/containerd/v2/core/streaming"
23+
streamproxy "github.com/containerd/containerd/v2/core/streaming/proxy"
2724
"github.com/containerd/containerd/v2/core/transfer"
2825
"github.com/containerd/containerd/v2/core/transfer/proxy"
29-
"github.com/containerd/containerd/v2/pkg/protobuf"
30-
"github.com/containerd/errdefs"
31-
"github.com/containerd/typeurl/v2"
3226
)
3327

3428
func (c *Client) Transfer(ctx context.Context, src interface{}, dest interface{}, opts ...transfer.Opt) error {
@@ -38,72 +32,9 @@ func (c *Client) Transfer(ctx context.Context, src interface{}, dest interface{}
3832
}
3933
defer done(ctx)
4034

41-
return proxy.NewTransferrer(transferapi.NewTransferClient(c.conn), c.streamCreator()).Transfer(ctx, src, dest, opts...)
35+
return proxy.NewTransferrer(c.conn, c.streamCreator()).Transfer(ctx, src, dest, opts...)
4236
}
4337

4438
func (c *Client) streamCreator() streaming.StreamCreator {
45-
return &streamCreator{
46-
client: streamingapi.NewStreamingClient(c.conn),
47-
}
48-
}
49-
50-
type streamCreator struct {
51-
client streamingapi.StreamingClient
52-
}
53-
54-
func (sc *streamCreator) Create(ctx context.Context, id string) (streaming.Stream, error) {
55-
stream, err := sc.client.Stream(ctx)
56-
if err != nil {
57-
return nil, err
58-
}
59-
60-
a, err := typeurl.MarshalAny(&streamingapi.StreamInit{
61-
ID: id,
62-
})
63-
if err != nil {
64-
return nil, err
65-
}
66-
err = stream.Send(protobuf.FromAny(a))
67-
if err != nil {
68-
if !errors.Is(err, io.EOF) {
69-
err = errdefs.FromGRPC(err)
70-
}
71-
return nil, err
72-
}
73-
74-
// Receive an ack that stream is init and ready
75-
if _, err = stream.Recv(); err != nil {
76-
if !errors.Is(err, io.EOF) {
77-
err = errdefs.FromGRPC(err)
78-
}
79-
return nil, err
80-
}
81-
82-
return &clientStream{
83-
s: stream,
84-
}, nil
85-
}
86-
87-
type clientStream struct {
88-
s streamingapi.Streaming_StreamClient
89-
}
90-
91-
func (cs *clientStream) Send(a typeurl.Any) (err error) {
92-
err = cs.s.Send(protobuf.FromAny(a))
93-
if !errors.Is(err, io.EOF) {
94-
err = errdefs.FromGRPC(err)
95-
}
96-
return
97-
}
98-
99-
func (cs *clientStream) Recv() (a typeurl.Any, err error) {
100-
a, err = cs.s.Recv()
101-
if !errors.Is(err, io.EOF) {
102-
err = errdefs.FromGRPC(err)
103-
}
104-
return
105-
}
106-
107-
func (cs *clientStream) Close() error {
108-
return cs.s.CloseSend()
39+
return streamproxy.NewStreamCreator(c.conn)
10940
}

core/streaming/proxy/streaming.go

Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
/*
2+
Copyright The containerd Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package proxy
18+
19+
import (
20+
"context"
21+
"errors"
22+
"fmt"
23+
"io"
24+
25+
streamingapi "github.com/containerd/containerd/api/services/streaming/v1"
26+
"github.com/containerd/containerd/v2/core/streaming"
27+
"github.com/containerd/containerd/v2/pkg/protobuf"
28+
"github.com/containerd/errdefs"
29+
"github.com/containerd/ttrpc"
30+
"github.com/containerd/typeurl/v2"
31+
"google.golang.org/grpc"
32+
)
33+
34+
// NewStreamCreator returns a new stream creator which can communicate over a GRPC
35+
// or TTRPC connection using the containerd streaming API.
36+
func NewStreamCreator(client any) streaming.StreamCreator {
37+
switch c := client.(type) {
38+
case streamingapi.StreamingClient:
39+
return &streamCreator{
40+
client: convertClient{c},
41+
}
42+
case grpc.ClientConnInterface:
43+
return &streamCreator{
44+
client: convertClient{streamingapi.NewStreamingClient(c)},
45+
}
46+
case streamingapi.TTRPCStreamingClient:
47+
return &streamCreator{
48+
client: c,
49+
}
50+
case *ttrpc.Client:
51+
return &streamCreator{
52+
client: streamingapi.NewTTRPCStreamingClient(c),
53+
}
54+
case streaming.StreamCreator:
55+
return c
56+
default:
57+
panic(fmt.Errorf("unsupported stream client %T: %w", client, errdefs.ErrNotImplemented))
58+
}
59+
}
60+
61+
type convertClient struct {
62+
streamingapi.StreamingClient
63+
}
64+
65+
func (c convertClient) Stream(ctx context.Context) (streamingapi.TTRPCStreaming_StreamClient, error) {
66+
return c.StreamingClient.Stream(ctx)
67+
}
68+
69+
type streamCreator struct {
70+
client streamingapi.TTRPCStreamingClient
71+
}
72+
73+
func (sc *streamCreator) Create(ctx context.Context, id string) (streaming.Stream, error) {
74+
stream, err := sc.client.Stream(ctx)
75+
if err != nil {
76+
return nil, err
77+
}
78+
79+
a, err := typeurl.MarshalAny(&streamingapi.StreamInit{
80+
ID: id,
81+
})
82+
if err != nil {
83+
return nil, err
84+
}
85+
err = stream.Send(protobuf.FromAny(a))
86+
if err != nil {
87+
if !errors.Is(err, io.EOF) {
88+
err = errdefs.FromGRPC(err)
89+
}
90+
return nil, err
91+
}
92+
93+
// Receive an ack that stream is init and ready
94+
if _, err = stream.Recv(); err != nil {
95+
if !errors.Is(err, io.EOF) {
96+
err = errdefs.FromGRPC(err)
97+
}
98+
return nil, err
99+
}
100+
101+
return &clientStream{
102+
s: stream,
103+
}, nil
104+
}
105+
106+
type clientStream struct {
107+
s streamingapi.TTRPCStreaming_StreamClient
108+
}
109+
110+
func (cs *clientStream) Send(a typeurl.Any) (err error) {
111+
err = cs.s.Send(protobuf.FromAny(a))
112+
if !errors.Is(err, io.EOF) {
113+
err = errdefs.FromGRPC(err)
114+
}
115+
return
116+
}
117+
118+
func (cs *clientStream) Recv() (a typeurl.Any, err error) {
119+
a, err = cs.s.Recv()
120+
if !errors.Is(err, io.EOF) {
121+
err = errdefs.FromGRPC(err)
122+
}
123+
return
124+
}
125+
126+
func (cs *clientStream) Close() error {
127+
return cs.s.CloseSend()
128+
}

core/transfer/proxy/transfer.go

Lines changed: 42 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,35 +19,70 @@ package proxy
1919
import (
2020
"context"
2121
"errors"
22+
"fmt"
2223
"io"
2324

25+
"google.golang.org/grpc"
2426
"google.golang.org/protobuf/types/known/anypb"
27+
"google.golang.org/protobuf/types/known/emptypb"
2528

2629
transferapi "github.com/containerd/containerd/api/services/transfer/v1"
2730
transfertypes "github.com/containerd/containerd/api/types/transfer"
2831
"github.com/containerd/containerd/v2/core/streaming"
2932
"github.com/containerd/containerd/v2/core/transfer"
3033
tstreaming "github.com/containerd/containerd/v2/core/transfer/streaming"
3134
"github.com/containerd/containerd/v2/pkg/oci"
35+
"github.com/containerd/errdefs"
3236
"github.com/containerd/log"
37+
"github.com/containerd/ttrpc"
3338
"github.com/containerd/typeurl/v2"
3439
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
3540
)
3641

3742
type proxyTransferrer struct {
38-
client transferapi.TransferClient
43+
client transferapi.TTRPCTransferService
3944
streamCreator streaming.StreamCreator
4045
}
4146

42-
// NewTransferrer returns a new transferrer which communicates over a GRPC
43-
// connection using the containerd transfer API
44-
func NewTransferrer(client transferapi.TransferClient, sc streaming.StreamCreator) transfer.Transferrer {
45-
return &proxyTransferrer{
46-
client: client,
47-
streamCreator: sc,
47+
// NewTransferrer returns a new transferrer which can communicate over a GRPC
48+
// or TTRPC connection using the containerd transfer API
49+
func NewTransferrer(client any, sc streaming.StreamCreator) transfer.Transferrer {
50+
switch c := client.(type) {
51+
case transferapi.TransferClient:
52+
return &proxyTransferrer{
53+
client: convertClient{c},
54+
streamCreator: sc,
55+
}
56+
case grpc.ClientConnInterface:
57+
return &proxyTransferrer{
58+
client: convertClient{transferapi.NewTransferClient(c)},
59+
streamCreator: sc,
60+
}
61+
case transferapi.TTRPCTransferService:
62+
return &proxyTransferrer{
63+
client: c,
64+
streamCreator: sc,
65+
}
66+
case *ttrpc.Client:
67+
return &proxyTransferrer{
68+
client: transferapi.NewTTRPCTransferClient(c),
69+
streamCreator: sc,
70+
}
71+
case transfer.Transferrer:
72+
return c
73+
default:
74+
panic(fmt.Errorf("unsupported stream client %T: %w", client, errdefs.ErrNotImplemented))
4875
}
4976
}
5077

78+
type convertClient struct {
79+
transferapi.TransferClient
80+
}
81+
82+
func (c convertClient) Transfer(ctx context.Context, r *transferapi.TransferRequest) (*emptypb.Empty, error) {
83+
return c.TransferClient.Transfer(ctx, r)
84+
}
85+
5186
func (p *proxyTransferrer) Transfer(ctx context.Context, src interface{}, dst interface{}, opts ...transfer.Opt) error {
5287
o := &transfer.Config{}
5388
for _, opt := range opts {

0 commit comments

Comments
 (0)