Skip to content

Commit 4e4c907

Browse files
authored
Merge pull request firecracker-microvm#483 from kzys/fix-attach-3
Re-establish a connection when vsock breaks
2 parents 37b281d + 568c840 commit 4e4c907

File tree

12 files changed

+1379
-17
lines changed

12 files changed

+1379
-17
lines changed

agent/ioproxy_handler.go

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License"). You may
4+
// not use this file except in compliance with the License. A copy of the
5+
// License is located at
6+
//
7+
// http://aws.amazon.com/apache2.0/
8+
//
9+
// or in the "license" file accompanying this file. This file is distributed
10+
// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
11+
// express or implied. See the License for the specific language governing
12+
// permissions and limitations under the License.
13+
14+
package main
15+
16+
import (
17+
"context"
18+
19+
"github.com/containerd/containerd/log"
20+
"github.com/containerd/containerd/runtime/v2/task"
21+
"github.com/firecracker-microvm/firecracker-containerd/internal/vm"
22+
ioproxy "github.com/firecracker-microvm/firecracker-containerd/proto/service/ioproxy/ttrpc"
23+
"github.com/golang/protobuf/ptypes/empty"
24+
)
25+
26+
// ioProxyHandler implements IOProxyService that exposes the state of
27+
// IOProxy instances.
28+
type ioProxyHandler struct {
29+
runcService task.TaskService
30+
taskManager vm.TaskManager
31+
}
32+
33+
var _ ioproxy.IOProxyService = &ioProxyHandler{}
34+
35+
// State returns whether the given exec's IOProxy is still open or not.
36+
func (ps *ioProxyHandler) State(_ context.Context, req *ioproxy.StateRequest) (*ioproxy.StateResponse, error) {
37+
open, err := ps.taskManager.IsProxyOpen(req.ID, req.ExecID)
38+
if err != nil {
39+
return nil, err
40+
}
41+
return &ioproxy.StateResponse{IsOpen: open}, nil
42+
}
43+
44+
// Attach a new IOProxy to the given exec.
45+
func (ps *ioProxyHandler) Attach(ctx context.Context, req *ioproxy.AttachRequest) (*empty.Empty, error) {
46+
state, err := ps.runcService.State(ctx, &task.StateRequest{ID: req.ID, ExecID: req.ExecID})
47+
if err != nil {
48+
return nil, err
49+
}
50+
51+
logger := log.G(ctx).WithField("TaskID", req.ID).WithField("ExecID", req.ExecID)
52+
53+
var proxy vm.IOProxy
54+
if vm.IsAgentOnlyIO(state.Stdout, logger) {
55+
proxy = vm.NewNullIOProxy()
56+
} else {
57+
proxy = vm.NewIOConnectorProxy(
58+
vm.InputPair(req.StdinPort, state.Stdin),
59+
vm.OutputPair(state.Stdout, req.StdoutPort),
60+
vm.OutputPair(state.Stderr, req.StderrPort),
61+
)
62+
}
63+
64+
err = ps.taskManager.AttachIO(ctx, req.ID, req.ExecID, proxy)
65+
if err != nil {
66+
return nil, err
67+
}
68+
69+
return &empty.Empty{}, nil
70+
}

agent/main.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ import (
3737
"github.com/firecracker-microvm/firecracker-containerd/internal/vm"
3838

3939
drivemount "github.com/firecracker-microvm/firecracker-containerd/proto/service/drivemount/ttrpc"
40+
ioproxy "github.com/firecracker-microvm/firecracker-containerd/proto/service/ioproxy/ttrpc"
4041
)
4142

4243
const (
@@ -111,6 +112,11 @@ func main() {
111112
}
112113
drivemount.RegisterDriveMounterService(server, dh)
113114

115+
ioproxy.RegisterIOProxyService(server, &ioProxyHandler{
116+
runcService: taskService.runcService,
117+
taskManager: taskService.taskManager,
118+
})
119+
114120
// Run ttrpc over vsock
115121

116122
vsockLogger := log.G(shimCtx).WithField("port", port)

agent/service.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ func NewTaskService(
9191
shimCtx context.Context,
9292
shimCancel context.CancelFunc,
9393
publisher shim.Publisher,
94-
) (taskAPI.TaskService, error) {
94+
) (*TaskService, error) {
9595
// We provide an empty string for "id" as the service manages multiple tasks; there is no single
9696
// "id" being managed. As noted in the comments of the called code, the "id" arg is only used by
9797
// the Cleanup function, so it will never be invoked as part of the task service API, which is all

internal/vm/agent.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,3 +54,12 @@ func (*nullIOProxy) start(proc *vmProc) (ioInitDone <-chan error, ioCopyDone <-c
5454

5555
return initCh, copyCh
5656
}
57+
58+
// Close is no-op.
59+
func (*nullIOProxy) Close() {
60+
}
61+
62+
// IsOpen always returns true, since this proxy implementation is no-op.
63+
func (*nullIOProxy) IsOpen() bool {
64+
return true
65+
}

internal/vm/ioproxy.go

Lines changed: 65 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"context"
1818
"io"
1919
"strings"
20+
"sync"
2021
"time"
2122

2223
"github.com/firecracker-microvm/firecracker-containerd/internal"
@@ -36,6 +37,12 @@ type IOProxy interface {
3637
// process. It returns two channels, one to indicate io initialization
3738
// is completed and one to indicate io copying is completed.
3839
start(proc *vmProc) (ioInitDone <-chan error, ioCopyDone <-chan error)
40+
41+
// Close the proxy.
42+
Close()
43+
44+
// IsOpen returns true if the proxy hasn't been closed.
45+
IsOpen() bool
3946
}
4047

4148
// IOConnector is function that begins initializing an IO connection (i.e.
@@ -71,6 +78,7 @@ func (connectorPair *IOConnectorPair) proxy(
7178
logger *logrus.Entry,
7279
timeoutAfterExit time.Duration,
7380
) (ioInitDone <-chan error, ioCopyDone <-chan error) {
81+
// initDone might not have to be buffered. We only send ioInitErr once.
7482
initDone := make(chan error, 2)
7583
copyDone := make(chan error)
7684

@@ -152,6 +160,10 @@ type ioConnectorSet struct {
152160
stdin *IOConnectorPair
153161
stdout *IOConnectorPair
154162
stderr *IOConnectorPair
163+
164+
// closeMu is needed since Close() will be called from different goroutines.
165+
closeMu sync.Mutex
166+
closed bool
155167
}
156168

157169
// NewIOConnectorProxy implements the IOProxy interface for a set of
@@ -163,12 +175,33 @@ func NewIOConnectorProxy(stdin, stdout, stderr *IOConnectorPair) IOProxy {
163175
stdin: stdin,
164176
stdout: stdout,
165177
stderr: stderr,
178+
closed: false,
166179
}
167180
}
168181

182+
func (ioConnectorSet *ioConnectorSet) Close() {
183+
ioConnectorSet.closeMu.Lock()
184+
defer ioConnectorSet.closeMu.Unlock()
185+
186+
ioConnectorSet.closed = true
187+
}
188+
189+
func (ioConnectorSet *ioConnectorSet) IsOpen() bool {
190+
ioConnectorSet.closeMu.Lock()
191+
defer ioConnectorSet.closeMu.Unlock()
192+
193+
return !ioConnectorSet.closed
194+
}
195+
196+
// start starts goroutines to copy stdio and returns two channels.
197+
// The first channel returns its initialization error. The second channel returns errors from copying.
169198
func (ioConnectorSet *ioConnectorSet) start(proc *vmProc) (ioInitDone <-chan error, ioCopyDone <-chan error) {
170199
var initErrG errgroup.Group
171-
var copyErrG errgroup.Group
200+
201+
// When one of the goroutines returns an error, we will cancel
202+
// the rest of goroutines through the ctx below.
203+
copyErrG, ctx := errgroup.WithContext(proc.ctx)
204+
172205
waitErrs := func(initErrCh, copyErrCh <-chan error) {
173206
initErrG.Go(func() error { return <-initErrCh })
174207
copyErrG.Go(func() error { return <-copyErrCh })
@@ -177,24 +210,25 @@ func (ioConnectorSet *ioConnectorSet) start(proc *vmProc) (ioInitDone <-chan err
177210
if ioConnectorSet.stdin != nil {
178211
// For Stdin only, provide 0 as the timeout to wait after the proc exits before closing IO streams.
179212
// There's no reason to send stdin data to a proc that's already dead.
180-
waitErrs(ioConnectorSet.stdin.proxy(proc.ctx, proc.logger.WithField("stream", "stdin"), 0))
181-
213+
waitErrs(ioConnectorSet.stdin.proxy(ctx, proc.logger.WithField("stream", "stdin"), 0))
182214
} else {
183215
proc.logger.Debug("skipping proxy io for unset stdin")
184216
}
185217

186218
if ioConnectorSet.stdout != nil {
187-
waitErrs(ioConnectorSet.stdout.proxy(proc.ctx, proc.logger.WithField("stream", "stdout"), defaultIOFlushTimeout))
219+
waitErrs(ioConnectorSet.stdout.proxy(ctx, proc.logger.WithField("stream", "stdout"), defaultIOFlushTimeout))
188220
} else {
189221
proc.logger.Debug("skipping proxy io for unset stdout")
190222
}
191223

192224
if ioConnectorSet.stderr != nil {
193-
waitErrs(ioConnectorSet.stderr.proxy(proc.ctx, proc.logger.WithField("stream", "stderr"), defaultIOFlushTimeout))
225+
waitErrs(ioConnectorSet.stderr.proxy(ctx, proc.logger.WithField("stream", "stderr"), defaultIOFlushTimeout))
194226
} else {
195227
proc.logger.Debug("skipping proxy io for unset stderr")
196228
}
197229

230+
// These channels are not buffered, since we will close them right after having one error.
231+
// Callers must read the channels.
198232
initDone := make(chan error)
199233
go func() {
200234
defer close(initDone)
@@ -227,3 +261,29 @@ func logClose(logger *logrus.Entry, streams ...io.Closer) {
227261
logger.WithError(closeErr).Error("error closing io stream")
228262
}
229263
}
264+
265+
// InputPair returns an IOConnectorPair from the given vsock port to
266+
// the FIFO file.
267+
func InputPair(src uint32, dest string) *IOConnectorPair {
268+
if dest == "" {
269+
return nil
270+
}
271+
272+
return &IOConnectorPair{
273+
ReadConnector: VSockAcceptConnector(src),
274+
WriteConnector: WriteFIFOConnector(dest),
275+
}
276+
}
277+
278+
// OutputPair returns an IOConnectorPair from the given FIFO to
279+
// the vsock port.
280+
func OutputPair(src string, dest uint32) *IOConnectorPair {
281+
if src == "" {
282+
return nil
283+
}
284+
285+
return &IOConnectorPair{
286+
ReadConnector: ReadFIFOConnector(src),
287+
WriteConnector: VSockAcceptConnector(dest),
288+
}
289+
}

0 commit comments

Comments
 (0)