Skip to content

Commit 90a8667

Browse files
author
Maksym Pavlenko
authored
Merge pull request containerd#10190 from abel-von/fix-streaming-io-path
fix: modify streaming io url and add docs of sandboxer and io_type
2 parents 4fa8ce9 + 0b113d7 commit 90a8667

File tree

5 files changed

+68
-13
lines changed

5 files changed

+68
-13
lines changed

docs/cri/config.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -369,6 +369,17 @@ version = 2
369369
# See https://github.com/containerd/containerd/issues/6657 for context.
370370
snapshotter = ""
371371

372+
# sandboxer is the sandbox controller for the runtime.
373+
# The default sandbox controller is the podsandbox controller, which create a "pause" container as a sandbox.
374+
# We can create our own "shim" sandbox controller by implementing the sandbox api defined in runtime/sandbox/v1/sandbox.proto in our shim, and specifiy the sandboxer to "shim" here.
375+
# We can also run a grpc or ttrpc server to serve the sandbox controller API defined in services/sandbox/v1/sandbox.proto, and define a ProxyPlugin of "sandbox" type, and specify the name of the ProxyPlugin here.
376+
sandboxer = ""
377+
378+
# io_type is the way containerd get stdin/stdout/stderr from container or the execed process.
379+
# The default value is "fifo", in which containerd will create a set of named pipes and transfer io by them.
380+
# Currently the value of "streaming" is supported, in this way, sandbox should serve streaming api defined in services/streaming/v1/streaming.proto, and containerd will connect to sandbox's endpoint and create a set of streams to it, as channels to transfer io of container or process.
381+
io_type = ""
382+
372383
# 'plugins."io.containerd.grpc.v1.cri".containerd.runtimes.runc.options' is options specific to
373384
# "io.containerd.runc.v1" and "io.containerd.runc.v2". Its corresponding options type is:
374385
# https://github.com/containerd/containerd/blob/v1.3.2/runtime/v2/runc/options/oci.pb.go#L26 .

internal/cri/io/container_io.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,17 @@ func WithNewFIFOs(root string, tty, stdin bool) ContainerIOOpts {
7373
}
7474

7575
// WithStreams creates new streams for the container io.
76+
// The stream address is in format of `protocol://address?stream_id=xyz`.
77+
// It allocates ContainerID-stdin, ContainerID-stdout and ContainerID-stderr as streaming IDs.
78+
// For example, that advertiser address of shim is `ttrpc+unix:///run/demo.sock` and container ID is `app`.
79+
// There are three streams if stdin is enabled and TTY is disabled.
80+
//
81+
// - Stdin: ttrpc+unix:///run/demo.sock?stream_id=app-stdin
82+
// - Stdout: ttrpc+unix:///run/demo.sock?stream_id=app-stdout
83+
// - stderr: ttrpc+unix:///run/demo.sock?stream_id=app-stderr
84+
//
85+
// The streaming IDs will be used as unique key to establish stream tunnel.
86+
// And it should support reconnection with the same streaming ID if containerd restarts.
7687
func WithStreams(address string, tty, stdin bool) ContainerIOOpts {
7788
return func(c *ContainerIO) error {
7889
if address == "" {

internal/cri/io/exec_io.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,17 @@ func NewFifoExecIO(id, root string, tty, stdin bool) (*ExecIO, error) {
5555
}
5656

5757
// NewStreamExecIO creates exec io with streaming.
58+
// The stream address is in format of `protocol://address?stream_id=xyz`.
59+
// It allocates ExecID-stdin, ExecID-stdout and ExecID-stderr as streaming IDs.
60+
// For example, that advertiser address of shim is `ttrpc+unix:///run/demo.sock` and exec ID is `app`.
61+
// There are three streams if stdin is enabled and TTY is disabled.
62+
//
63+
// - Stdin: ttrpc+unix:///run/demo.sock?stream_id=app-stdin
64+
// - Stdout: ttrpc+unix:///run/demo.sock?stream_id=app-stdout
65+
// - stderr: ttrpc+unix:///run/demo.sock?stream_id=app-stderr
66+
//
67+
// The streaming IDs will be used as unique key to establish stream tunnel.
68+
// And it should support reconnection with the same streaming ID if containerd restarts.
5869
func NewStreamExecIO(id, address string, tty, stdin bool) (*ExecIO, error) {
5970
fifos, err := newStreams(address, id, tty, stdin)
6071
if err != nil {

internal/cri/io/helpers.go

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -105,13 +105,13 @@ func newStreams(address, id string, tty, stdin bool) (*cio.FIFOSet, error) {
105105
fifos := cio.NewFIFOSet(cio.Config{}, func() error { return nil })
106106
if stdin {
107107
streamID := id + "-stdin"
108-
fifos.Stdin = fmt.Sprintf("%s/streaming?id=%s", address, streamID)
108+
fifos.Stdin = fmt.Sprintf("%s?streaming_id=%s", address, streamID)
109109
}
110110
stdoutStreamID := id + "-stdout"
111-
fifos.Stdout = fmt.Sprintf("%s/streaming?id=%s", address, stdoutStreamID)
111+
fifos.Stdout = fmt.Sprintf("%s?streaming_id=%s", address, stdoutStreamID)
112112
if !tty {
113113
stderrStreamID := id + "-stderr"
114-
fifos.Stderr = fmt.Sprintf("%s/streaming?id=%s", address, stderrStreamID)
114+
fifos.Stderr = fmt.Sprintf("%s?streaming_id=%s", address, stderrStreamID)
115115
}
116116
fifos.Terminal = tty
117117
return fifos, nil
@@ -209,6 +209,8 @@ func openOutputStream(ctx context.Context, url string) (io.ReadCloser, error) {
209209
}
210210

211211
func openStream(ctx context.Context, urlStr string) (streamingapi.Stream, error) {
212+
// urlStr should be in the form of:
213+
// <ttrpc|grpc>+<unix|vsock|hvsock>://<uds-path|vsock-cid:vsock-port|uds-path:hvsock-port>?streaming_id=<stream-id>
212214
u, err := url.Parse(urlStr)
213215
if err != nil {
214216
return nil, fmt.Errorf("address url parse error: %v", err)
@@ -221,12 +223,7 @@ func openStream(ctx context.Context, urlStr string) (streamingapi.Stream, error)
221223
" the form of <protocol>+<unix|vsock|tcp>, i.e. ttrpc+unix or grpc+vsock")
222224
}
223225

224-
if u.Path != "streaming" {
225-
// TODO, support connect stream other than streaming api
226-
return nil, fmt.Errorf("only <address>/streaming?id=xxx supported")
227-
}
228-
229-
id := u.Query().Get("id")
226+
id := u.Query().Get("streaming_id")
230227
if id == "" {
231228
return nil, fmt.Errorf("no stream id in url queries")
232229
}

internal/cri/server/restart.go

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -257,7 +257,6 @@ func (c *criService) loadContainer(ctx context.Context, cntr containerd.Containe
257257
defer cancel()
258258
id := cntr.ID()
259259
containerDir := c.getContainerRootDir(id)
260-
volatileContainerDir := c.getVolatileContainerRootDir(id)
261260
var container containerstore.Container
262261
// Load container metadata.
263262
exts, err := cntr.Extensions(ctx)
@@ -336,9 +335,7 @@ func (c *criService) loadContainer(ctx context.Context, cntr containerd.Containe
336335
// NOTE: Another possibility is that we've tried to start the container, but
337336
// containerd got restarted during that. In that case, we still
338337
// treat the container as `CREATED`.
339-
containerIO, err = cio.NewContainerIO(id,
340-
cio.WithNewFIFOs(volatileContainerDir, meta.Config.GetTty(), meta.Config.GetStdin()),
341-
)
338+
containerIO, err = c.createContainerIO(id, meta.SandboxID, meta.Config)
342339
if err != nil {
343340
return fmt.Errorf("failed to create container io: %w", err)
344341
}
@@ -465,3 +462,31 @@ func cleanupOrphanedIDDirs(ctx context.Context, cntrs []containerd.Container, ba
465462
}
466463
return nil
467464
}
465+
466+
func (c *criService) createContainerIO(containerID, sandboxID string, config *runtime.ContainerConfig) (*cio.ContainerIO, error) {
467+
if config == nil {
468+
return nil, fmt.Errorf("ContainerConfig should not be nil when create container io")
469+
}
470+
sb, err := c.sandboxStore.Get(sandboxID)
471+
if err != nil {
472+
return nil, fmt.Errorf("an error occurred when try to find sandbox %q: %w", sandboxID, err)
473+
}
474+
ociRuntime, err := c.config.GetSandboxRuntime(sb.Config, sb.Metadata.RuntimeHandler)
475+
if err != nil {
476+
return nil, fmt.Errorf("failed to get sandbox runtime: %w", err)
477+
}
478+
var containerIO *cio.ContainerIO
479+
switch ociRuntime.IOType {
480+
case criconfig.IOTypeStreaming:
481+
containerIO, err = cio.NewContainerIO(containerID,
482+
cio.WithStreams(sb.Endpoint.Address, config.GetTty(), config.GetStdin()))
483+
default:
484+
volatileContainerRootDir := c.getVolatileContainerRootDir(containerID)
485+
containerIO, err = cio.NewContainerIO(containerID,
486+
cio.WithNewFIFOs(volatileContainerRootDir, config.GetTty(), config.GetStdin()))
487+
}
488+
if err != nil {
489+
return nil, fmt.Errorf("failed to create container io: %w", err)
490+
}
491+
return containerIO, nil
492+
}

0 commit comments

Comments
 (0)