Skip to content

Commit 568c840

Browse files
committed
Re-establish a connection when vsock breaks
IOProxy struct connects the container's stdio inside a micro VM to the FIFO files on its host. containerd clients consume the FIFO files to access the container's stdio. However, IOProxy was stopping itself when there was an error from the streams, such as EPIPE. In that case, reading from the FIFO files blocks indefinitely since there were no writers. This change adds ioProxyHandler and uses the handler on State() to make sure IOProxy is running. If not, ioProxyHandler#Attach() creates a new IOProxy instance. Fixes firecracker-microvm#482. Signed-off-by: Kazuyoshi Kato <[email protected]>
1 parent 03e42ae commit 568c840

File tree

12 files changed

+1314
-18
lines changed

12 files changed

+1314
-18
lines changed

agent/ioproxy_handler.go

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License"). You may
4+
// not use this file except in compliance with the License. A copy of the
5+
// License is located at
6+
//
7+
// http://aws.amazon.com/apache2.0/
8+
//
9+
// or in the "license" file accompanying this file. This file is distributed
10+
// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
11+
// express or implied. See the License for the specific language governing
12+
// permissions and limitations under the License.
13+
14+
package main
15+
16+
import (
17+
"context"
18+
19+
"github.com/containerd/containerd/log"
20+
"github.com/containerd/containerd/runtime/v2/task"
21+
"github.com/firecracker-microvm/firecracker-containerd/internal/vm"
22+
ioproxy "github.com/firecracker-microvm/firecracker-containerd/proto/service/ioproxy/ttrpc"
23+
"github.com/golang/protobuf/ptypes/empty"
24+
)
25+
26+
// ioProxyHandler implements IOProxyService that exposes the state of
27+
// IOProxy instances.
28+
type ioProxyHandler struct {
29+
runcService task.TaskService
30+
taskManager vm.TaskManager
31+
}
32+
33+
var _ ioproxy.IOProxyService = &ioProxyHandler{}
34+
35+
// State returns whether the given exec's IOProxy is still open or not.
36+
func (ps *ioProxyHandler) State(_ context.Context, req *ioproxy.StateRequest) (*ioproxy.StateResponse, error) {
37+
open, err := ps.taskManager.IsProxyOpen(req.ID, req.ExecID)
38+
if err != nil {
39+
return nil, err
40+
}
41+
return &ioproxy.StateResponse{IsOpen: open}, nil
42+
}
43+
44+
// Attach a new IOProxy to the given exec.
45+
func (ps *ioProxyHandler) Attach(ctx context.Context, req *ioproxy.AttachRequest) (*empty.Empty, error) {
46+
state, err := ps.runcService.State(ctx, &task.StateRequest{ID: req.ID, ExecID: req.ExecID})
47+
if err != nil {
48+
return nil, err
49+
}
50+
51+
logger := log.G(ctx).WithField("TaskID", req.ID).WithField("ExecID", req.ExecID)
52+
53+
var proxy vm.IOProxy
54+
if vm.IsAgentOnlyIO(state.Stdout, logger) {
55+
proxy = vm.NewNullIOProxy()
56+
} else {
57+
proxy = vm.NewIOConnectorProxy(
58+
vm.InputPair(req.StdinPort, state.Stdin),
59+
vm.OutputPair(state.Stdout, req.StdoutPort),
60+
vm.OutputPair(state.Stderr, req.StderrPort),
61+
)
62+
}
63+
64+
err = ps.taskManager.AttachIO(ctx, req.ID, req.ExecID, proxy)
65+
if err != nil {
66+
return nil, err
67+
}
68+
69+
return &empty.Empty{}, nil
70+
}

agent/main.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ import (
3737
"github.com/firecracker-microvm/firecracker-containerd/internal/vm"
3838

3939
drivemount "github.com/firecracker-microvm/firecracker-containerd/proto/service/drivemount/ttrpc"
40+
ioproxy "github.com/firecracker-microvm/firecracker-containerd/proto/service/ioproxy/ttrpc"
4041
)
4142

4243
const (
@@ -111,6 +112,11 @@ func main() {
111112
}
112113
drivemount.RegisterDriveMounterService(server, dh)
113114

115+
ioproxy.RegisterIOProxyService(server, &ioProxyHandler{
116+
runcService: taskService.runcService,
117+
taskManager: taskService.taskManager,
118+
})
119+
114120
// Run ttrpc over vsock
115121

116122
vsockLogger := log.G(shimCtx).WithField("port", port)

agent/service.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ func NewTaskService(
9191
shimCtx context.Context,
9292
shimCancel context.CancelFunc,
9393
publisher shim.Publisher,
94-
) (taskAPI.TaskService, error) {
94+
) (*TaskService, error) {
9595
// We provide an empty string for "id" as the service manages multiple tasks; there is no single
9696
// "id" being managed. As noted in the comments of the called code, the "id" arg is only used by
9797
// the Cleanup function, so it will never be invoked as part of the task service API, which is all

internal/vm/agent.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,3 +54,12 @@ func (*nullIOProxy) start(proc *vmProc) (ioInitDone <-chan error, ioCopyDone <-c
5454

5555
return initCh, copyCh
5656
}
57+
58+
// Close is no-op.
59+
func (*nullIOProxy) Close() {
60+
}
61+
62+
// IsOpen always returns true, since this proxy implementation is no-op.
63+
func (*nullIOProxy) IsOpen() bool {
64+
return true
65+
}

internal/vm/ioproxy.go

Lines changed: 65 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"context"
1818
"io"
1919
"strings"
20+
"sync"
2021
"time"
2122

2223
"github.com/firecracker-microvm/firecracker-containerd/internal"
@@ -36,6 +37,12 @@ type IOProxy interface {
3637
// process. It returns two channels, one to indicate io initialization
3738
// is completed and one to indicate io copying is completed.
3839
start(proc *vmProc) (ioInitDone <-chan error, ioCopyDone <-chan error)
40+
41+
// Close the proxy.
42+
Close()
43+
44+
// IsOpen returns true if the proxy hasn't been closed.
45+
IsOpen() bool
3946
}
4047

4148
// IOConnector is function that begins initializing an IO connection (i.e.
@@ -71,6 +78,7 @@ func (connectorPair *IOConnectorPair) proxy(
7178
logger *logrus.Entry,
7279
timeoutAfterExit time.Duration,
7380
) (ioInitDone <-chan error, ioCopyDone <-chan error) {
81+
// initDone might not have to be buffered. We only send ioInitErr once.
7482
initDone := make(chan error, 2)
7583
copyDone := make(chan error)
7684

@@ -152,6 +160,10 @@ type ioConnectorSet struct {
152160
stdin *IOConnectorPair
153161
stdout *IOConnectorPair
154162
stderr *IOConnectorPair
163+
164+
// closeMu is needed since Close() will be called from different goroutines.
165+
closeMu sync.Mutex
166+
closed bool
155167
}
156168

157169
// NewIOConnectorProxy implements the IOProxy interface for a set of
@@ -163,12 +175,33 @@ func NewIOConnectorProxy(stdin, stdout, stderr *IOConnectorPair) IOProxy {
163175
stdin: stdin,
164176
stdout: stdout,
165177
stderr: stderr,
178+
closed: false,
166179
}
167180
}
168181

182+
func (ioConnectorSet *ioConnectorSet) Close() {
183+
ioConnectorSet.closeMu.Lock()
184+
defer ioConnectorSet.closeMu.Unlock()
185+
186+
ioConnectorSet.closed = true
187+
}
188+
189+
func (ioConnectorSet *ioConnectorSet) IsOpen() bool {
190+
ioConnectorSet.closeMu.Lock()
191+
defer ioConnectorSet.closeMu.Unlock()
192+
193+
return !ioConnectorSet.closed
194+
}
195+
196+
// start starts goroutines to copy stdio and returns two channels.
197+
// The first channel returns its initialization error. The second channel returns errors from copying.
169198
func (ioConnectorSet *ioConnectorSet) start(proc *vmProc) (ioInitDone <-chan error, ioCopyDone <-chan error) {
170199
var initErrG errgroup.Group
171-
var copyErrG errgroup.Group
200+
201+
// When one of the goroutines returns an error, we will cancel
202+
// the rest of goroutines through the ctx below.
203+
copyErrG, ctx := errgroup.WithContext(proc.ctx)
204+
172205
waitErrs := func(initErrCh, copyErrCh <-chan error) {
173206
initErrG.Go(func() error { return <-initErrCh })
174207
copyErrG.Go(func() error { return <-copyErrCh })
@@ -177,24 +210,25 @@ func (ioConnectorSet *ioConnectorSet) start(proc *vmProc) (ioInitDone <-chan err
177210
if ioConnectorSet.stdin != nil {
178211
// For Stdin only, provide 0 as the timeout to wait after the proc exits before closing IO streams.
179212
// There's no reason to send stdin data to a proc that's already dead.
180-
waitErrs(ioConnectorSet.stdin.proxy(proc.ctx, proc.logger.WithField("stream", "stdin"), 0))
181-
213+
waitErrs(ioConnectorSet.stdin.proxy(ctx, proc.logger.WithField("stream", "stdin"), 0))
182214
} else {
183215
proc.logger.Debug("skipping proxy io for unset stdin")
184216
}
185217

186218
if ioConnectorSet.stdout != nil {
187-
waitErrs(ioConnectorSet.stdout.proxy(proc.ctx, proc.logger.WithField("stream", "stdout"), defaultIOFlushTimeout))
219+
waitErrs(ioConnectorSet.stdout.proxy(ctx, proc.logger.WithField("stream", "stdout"), defaultIOFlushTimeout))
188220
} else {
189221
proc.logger.Debug("skipping proxy io for unset stdout")
190222
}
191223

192224
if ioConnectorSet.stderr != nil {
193-
waitErrs(ioConnectorSet.stderr.proxy(proc.ctx, proc.logger.WithField("stream", "stderr"), defaultIOFlushTimeout))
225+
waitErrs(ioConnectorSet.stderr.proxy(ctx, proc.logger.WithField("stream", "stderr"), defaultIOFlushTimeout))
194226
} else {
195227
proc.logger.Debug("skipping proxy io for unset stderr")
196228
}
197229

230+
// These channels are not buffered, since we will close them right after having one error.
231+
// Callers must read the channels.
198232
initDone := make(chan error)
199233
go func() {
200234
defer close(initDone)
@@ -227,3 +261,29 @@ func logClose(logger *logrus.Entry, streams ...io.Closer) {
227261
logger.WithError(closeErr).Error("error closing io stream")
228262
}
229263
}
264+
265+
// InputPair returns an IOConnectorPair from the given vsock port to
266+
// the FIFO file.
267+
func InputPair(src uint32, dest string) *IOConnectorPair {
268+
if dest == "" {
269+
return nil
270+
}
271+
272+
return &IOConnectorPair{
273+
ReadConnector: VSockAcceptConnector(src),
274+
WriteConnector: WriteFIFOConnector(dest),
275+
}
276+
}
277+
278+
// OutputPair returns an IOConnectorPair from the given FIFO to
279+
// the vsock port.
280+
func OutputPair(src string, dest uint32) *IOConnectorPair {
281+
if src == "" {
282+
return nil
283+
}
284+
285+
return &IOConnectorPair{
286+
ReadConnector: ReadFIFOConnector(src),
287+
WriteConnector: VSockAcceptConnector(dest),
288+
}
289+
}

0 commit comments

Comments
 (0)