Skip to content

Commit b8dfb4d

Browse files
committed
cri: support io by streaming api
Signed-off-by: Abel Feng <[email protected]>
1 parent a26c686 commit b8dfb4d

File tree

8 files changed

+364
-38
lines changed

8 files changed

+364
-38
lines changed

core/transfer/streaming/reader.go

Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
/*
2+
Copyright The containerd Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package streaming
18+
19+
import (
20+
"context"
21+
"errors"
22+
"fmt"
23+
"io"
24+
25+
transferapi "github.com/containerd/containerd/api/types/transfer"
26+
"github.com/containerd/containerd/v2/core/streaming"
27+
"github.com/containerd/typeurl/v2"
28+
)
29+
30+
type readByteStream struct {
31+
ctx context.Context
32+
stream streaming.Stream
33+
window int32
34+
updated chan struct{}
35+
errCh chan error
36+
remaining []byte
37+
}
38+
39+
func ReadByteStream(ctx context.Context, stream streaming.Stream) io.ReadCloser {
40+
rbs := &readByteStream{
41+
ctx: ctx,
42+
stream: stream,
43+
window: 0,
44+
errCh: make(chan error),
45+
updated: make(chan struct{}, 1),
46+
}
47+
go func() {
48+
for {
49+
if rbs.window >= windowSize {
50+
select {
51+
case <-ctx.Done():
52+
return
53+
case <-rbs.updated:
54+
continue
55+
}
56+
}
57+
update := &transferapi.WindowUpdate{
58+
Update: windowSize,
59+
}
60+
anyType, err := typeurl.MarshalAny(update)
61+
if err != nil {
62+
rbs.errCh <- err
63+
return
64+
}
65+
if err := stream.Send(anyType); err == nil {
66+
rbs.window += windowSize
67+
} else if !errors.Is(err, io.EOF) {
68+
rbs.errCh <- err
69+
}
70+
}
71+
72+
}()
73+
return rbs
74+
}
75+
76+
func (r *readByteStream) Read(p []byte) (n int, err error) {
77+
plen := len(p)
78+
if len(r.remaining) > 0 {
79+
copied := copy(p, r.remaining)
80+
if len(r.remaining) > plen {
81+
r.remaining = r.remaining[plen:]
82+
} else {
83+
r.remaining = nil
84+
}
85+
return copied, nil
86+
}
87+
select {
88+
case <-r.ctx.Done():
89+
return 0, r.ctx.Err()
90+
case err := <-r.errCh:
91+
return 0, err
92+
default:
93+
}
94+
anyType, err := r.stream.Recv()
95+
if err != nil {
96+
return 0, err
97+
}
98+
i, err := typeurl.UnmarshalAny(anyType)
99+
if err != nil {
100+
return 0, err
101+
}
102+
switch v := i.(type) {
103+
case *transferapi.Data:
104+
n := copy(p, v.Data)
105+
if len(v.Data) > plen {
106+
r.remaining = v.Data[plen:]
107+
}
108+
r.window = r.window - int32(n)
109+
if r.window < windowSize {
110+
r.updated <- struct{}{}
111+
}
112+
return n, nil
113+
default:
114+
return 0, fmt.Errorf("stream received error type %v", v)
115+
}
116+
117+
}
118+
119+
func (r *readByteStream) Close() error {
120+
return r.stream.Close()
121+
}

internal/cri/config/config.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,10 @@ const (
7171
// DefaultSandboxImage is the default image to use for sandboxes when empty or
7272
// for default configurations.
7373
DefaultSandboxImage = "registry.k8s.io/pause:3.9"
74+
// IOTypeFifo is container io implemented by creating named pipe
75+
IOTypeFifo = "fifo"
76+
// IOTypeStreaming is container io implemented by connecting the streaming api to sandbox endpoint
77+
IOTypeStreaming = "streaming"
7478
)
7579

7680
// Runtime struct to contain the type(ID), engine, and root variables for a default runtime
@@ -116,6 +120,11 @@ type Runtime struct {
116120
// shim - means use whatever Controller implementation provided by shim (e.g. use RemoteController).
117121
// podsandbox - means use Controller implementation from sbserver podsandbox package.
118122
Sandboxer string `toml:"sandboxer" json:"sandboxer"`
123+
// IOType defines how containerd transfer the io streams of the container
124+
// if it is not set, the named pipe will be created for the container
125+
// we can also set it to "streaming" to create a stream by streaming api,
126+
// and use it as a channel to transfer the io stream
127+
IOType string `toml:"io_type" json:"io_type"`
119128
}
120129

121130
// ContainerdConfig contains toml config related to containerd
@@ -527,6 +536,13 @@ func ValidateRuntimeConfig(ctx context.Context, c *RuntimeConfig) ([]deprecation
527536
r.Sandboxer = string(ModePodSandbox)
528537
c.ContainerdConfig.Runtimes[k] = r
529538
}
539+
540+
if len(r.IOType) == 0 {
541+
r.IOType = IOTypeFifo
542+
}
543+
if r.IOType != IOTypeStreaming && r.IOType != IOTypeFifo {
544+
return warnings, errors.New("`io_type` can only be `streaming` or `named_pipe`")
545+
}
530546
}
531547

532548
// Validation for drain_exec_sync_io_timeout

internal/cri/io/container_io.go

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,15 @@ package io
1818

1919
import (
2020
"errors"
21+
"fmt"
2122
"io"
2223
"strings"
2324
"sync"
2425

25-
"github.com/containerd/containerd/v2/pkg/cio"
2626
"github.com/containerd/log"
2727

2828
"github.com/containerd/containerd/v2/internal/cri/util"
29+
"github.com/containerd/containerd/v2/pkg/cio"
2930
cioutil "github.com/containerd/containerd/v2/pkg/ioutil"
3031
)
3132

@@ -39,7 +40,7 @@ type ContainerIO struct {
3940
id string
4041

4142
fifos *cio.FIFOSet
42-
*stdioPipes
43+
*stdioStream
4344

4445
stdoutGroup *cioutil.WriterGroup
4546
stderrGroup *cioutil.WriterGroup
@@ -71,6 +72,20 @@ func WithNewFIFOs(root string, tty, stdin bool) ContainerIOOpts {
7172
}
7273
}
7374

75+
// WithStreams creates new streams for the container io.
76+
func WithStreams(address string, tty, stdin bool) ContainerIOOpts {
77+
return func(c *ContainerIO) error {
78+
if address == "" {
79+
return fmt.Errorf("address can not be empty for io stream")
80+
}
81+
fifos, err := newStreams(address, c.id, tty, stdin)
82+
if err != nil {
83+
return err
84+
}
85+
return WithFIFOs(fifos)(c)
86+
}
87+
}
88+
7489
// NewContainerIO creates container io.
7590
func NewContainerIO(id string, opts ...ContainerIOOpts) (_ *ContainerIO, err error) {
7691
c := &ContainerIO{
@@ -87,11 +102,11 @@ func NewContainerIO(id string, opts ...ContainerIOOpts) (_ *ContainerIO, err err
87102
return nil, errors.New("fifos are not set")
88103
}
89104
// Create actual fifos.
90-
stdio, closer, err := newStdioPipes(c.fifos)
105+
stdio, closer, err := newStdioStream(c.fifos)
91106
if err != nil {
92107
return nil, err
93108
}
94-
c.stdioPipes = stdio
109+
c.stdioStream = stdio
95110
c.closer = closer
96111
return c, nil
97112
}

internal/cri/io/exec_io.go

Lines changed: 28 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,36 +20,55 @@ import (
2020
"io"
2121
"sync"
2222

23+
"github.com/containerd/log"
24+
2325
"github.com/containerd/containerd/v2/pkg/cio"
2426
cioutil "github.com/containerd/containerd/v2/pkg/ioutil"
25-
"github.com/containerd/log"
2627
)
2728

2829
// ExecIO holds the exec io.
2930
type ExecIO struct {
3031
id string
3132
fifos *cio.FIFOSet
32-
*stdioPipes
33+
*stdioStream
3334
closer *wgCloser
3435
}
3536

3637
var _ cio.IO = &ExecIO{}
3738

38-
// NewExecIO creates exec io.
39-
func NewExecIO(id, root string, tty, stdin bool) (*ExecIO, error) {
39+
// NewFifoExecIO creates exec io by named pipes.
40+
func NewFifoExecIO(id, root string, tty, stdin bool) (*ExecIO, error) {
4041
fifos, err := newFifos(root, id, tty, stdin)
4142
if err != nil {
4243
return nil, err
4344
}
44-
stdio, closer, err := newStdioPipes(fifos)
45+
stdio, closer, err := newStdioStream(fifos)
46+
if err != nil {
47+
return nil, err
48+
}
49+
return &ExecIO{
50+
id: id,
51+
fifos: fifos,
52+
stdioStream: stdio,
53+
closer: closer,
54+
}, nil
55+
}
56+
57+
// NewStreamExecIO creates exec io with streaming.
58+
func NewStreamExecIO(id, address string, tty, stdin bool) (*ExecIO, error) {
59+
fifos, err := newStreams(address, id, tty, stdin)
60+
if err != nil {
61+
return nil, err
62+
}
63+
stdio, closer, err := newStdioStream(fifos)
4564
if err != nil {
4665
return nil, err
4766
}
4867
return &ExecIO{
49-
id: id,
50-
fifos: fifos,
51-
stdioPipes: stdio,
52-
closer: closer,
68+
id: id,
69+
fifos: fifos,
70+
stdioStream: stdio,
71+
closer: closer,
5372
}, nil
5473
}
5574

0 commit comments

Comments
 (0)