Skip to content

Commit 4081b0b

Browse files
authored
Merge pull request #421 from kzys/exec-test-master
Fix Exec()'s timeout issue
2 parents ca88e0f + d5d6985 commit 4081b0b

File tree

6 files changed

+250
-86
lines changed

6 files changed

+250
-86
lines changed

agent/service.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -228,15 +228,15 @@ func (ts *TaskService) Create(requestCtx context.Context, req *taskAPI.CreateTas
228228
req.Stdin = fifoSet.Stdin
229229
stdinConnectorPair = &vm.IOConnectorPair{
230230
ReadConnector: vm.VSockAcceptConnector(extraData.StdinPort),
231-
WriteConnector: vm.FIFOConnector(fifoSet.Stdin),
231+
WriteConnector: vm.WriteFIFOConnector(fifoSet.Stdin),
232232
}
233233
}
234234

235235
var stdoutConnectorPair *vm.IOConnectorPair
236236
if req.Stdout != "" {
237237
req.Stdout = fifoSet.Stdout
238238
stdoutConnectorPair = &vm.IOConnectorPair{
239-
ReadConnector: vm.FIFOConnector(fifoSet.Stdout),
239+
ReadConnector: vm.ReadFIFOConnector(fifoSet.Stdout),
240240
WriteConnector: vm.VSockAcceptConnector(extraData.StdoutPort),
241241
}
242242
}
@@ -245,7 +245,7 @@ func (ts *TaskService) Create(requestCtx context.Context, req *taskAPI.CreateTas
245245
if req.Stderr != "" {
246246
req.Stderr = fifoSet.Stderr
247247
stderrConnectorPair = &vm.IOConnectorPair{
248-
ReadConnector: vm.FIFOConnector(fifoSet.Stderr),
248+
ReadConnector: vm.ReadFIFOConnector(fifoSet.Stderr),
249249
WriteConnector: vm.VSockAcceptConnector(extraData.StderrPort),
250250
}
251251
}
@@ -454,7 +454,7 @@ func (ts *TaskService) Exec(requestCtx context.Context, req *taskAPI.ExecProcess
454454
req.Stdin = fifoSet.Stdin
455455
stdinConnectorPair = &vm.IOConnectorPair{
456456
ReadConnector: vm.VSockAcceptConnector(extraData.StdinPort),
457-
WriteConnector: vm.FIFOConnector(fifoSet.Stdin),
457+
WriteConnector: vm.WriteFIFOConnector(fifoSet.Stdin),
458458
}
459459
ts.addCleanup(taskExecID, func() error {
460460
return os.RemoveAll(req.Stdin)
@@ -465,7 +465,7 @@ func (ts *TaskService) Exec(requestCtx context.Context, req *taskAPI.ExecProcess
465465
if req.Stdout != "" {
466466
req.Stdout = fifoSet.Stdout
467467
stdoutConnectorPair = &vm.IOConnectorPair{
468-
ReadConnector: vm.FIFOConnector(fifoSet.Stdout),
468+
ReadConnector: vm.ReadFIFOConnector(fifoSet.Stdout),
469469
WriteConnector: vm.VSockAcceptConnector(extraData.StdoutPort),
470470
}
471471
ts.addCleanup(taskExecID, func() error {
@@ -477,7 +477,7 @@ func (ts *TaskService) Exec(requestCtx context.Context, req *taskAPI.ExecProcess
477477
if req.Stderr != "" {
478478
req.Stderr = fifoSet.Stderr
479479
stderrConnectorPair = &vm.IOConnectorPair{
480-
ReadConnector: vm.FIFOConnector(fifoSet.Stderr),
480+
ReadConnector: vm.ReadFIFOConnector(fifoSet.Stderr),
481481
WriteConnector: vm.VSockAcceptConnector(extraData.StderrPort),
482482
}
483483
ts.addCleanup(taskExecID, func() error {

internal/vm/fifo.go

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,16 +21,16 @@ import (
2121
"github.com/sirupsen/logrus"
2222
)
2323

24-
// FIFOConnector adapts containerd's fifo package to the IOConnector interface
25-
func FIFOConnector(path string) IOConnector {
24+
// fifoConnector adapts containerd's fifo package to the IOConnector interface
25+
func fifoConnector(path string, flag int) IOConnector {
2626
return func(procCtx context.Context, logger *logrus.Entry) <-chan IOConnectorResult {
2727
returnCh := make(chan IOConnectorResult, 1)
2828
defer close(returnCh)
2929

3030
// We open the FIFO synchronously to ensure that the FIFO is created (via O_CREAT) before
31-
// it is passed to any task service. O_RDWR ensures that we don't block on the syscall
31+
// it is passed to any task service. O_NONBLOCK ensures that we don't block on the syscall
3232
// level (as documented in the fifo pkg).
33-
fifo, err := fifo.OpenFifo(procCtx, path, syscall.O_CREAT|syscall.O_RDWR, 0300)
33+
fifo, err := fifo.OpenFifo(procCtx, path, syscall.O_CREAT|syscall.O_NONBLOCK|flag, 0300)
3434
returnCh <- IOConnectorResult{
3535
ReadWriteCloser: fifo,
3636
Err: err,
@@ -39,3 +39,13 @@ func FIFOConnector(path string) IOConnector {
3939
return returnCh
4040
}
4141
}
42+
43+
// ReadFIFOConnector returns a FIFO which is open for reading
44+
func ReadFIFOConnector(path string) IOConnector {
45+
return fifoConnector(path, syscall.O_RDONLY)
46+
}
47+
48+
// WriteFIFOConnector returns a FIFO which is open for writing
49+
func WriteFIFOConnector(path string) IOConnector {
50+
return fifoConnector(path, syscall.O_WRONLY)
51+
}

internal/vm/ioproxy.go

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -67,20 +67,23 @@ type IOConnectorPair struct {
6767
}
6868

6969
func (connectorPair *IOConnectorPair) proxy(
70-
proc *vmProc,
70+
ctx context.Context,
7171
logger *logrus.Entry,
7272
timeoutAfterExit time.Duration,
7373
) (ioInitDone <-chan error, ioCopyDone <-chan error) {
7474
initDone := make(chan error, 2)
7575
copyDone := make(chan error)
7676

77+
ioCtx, ioCancel := context.WithCancel(context.Background())
78+
7779
// Start the initialization process. Any synchronous setup made by the connectors will
7880
// be completed after these lines. Async setup will be done once initDone is closed in
7981
// the goroutine below.
80-
readerResultCh := connectorPair.ReadConnector(proc.ctx, logger.WithField("direction", "read"))
81-
writerResultCh := connectorPair.WriteConnector(proc.ctx, logger.WithField("direction", "write"))
82+
readerResultCh := connectorPair.ReadConnector(ioCtx, logger.WithField("direction", "read"))
83+
writerResultCh := connectorPair.WriteConnector(ioCtx, logger.WithField("direction", "write"))
8284

8385
go func() {
86+
defer ioCancel()
8487
defer close(copyDone)
8588

8689
var reader io.ReadCloser
@@ -119,7 +122,7 @@ func (connectorPair *IOConnectorPair) proxy(
119122
// If the io streams close on their own before the timeout, the Close calls here
120123
// should just be no-ops.
121124
go func() {
122-
<-proc.ctx.Done()
125+
<-ctx.Done()
123126
time.AfterFunc(timeoutAfterExit, func() {
124127
logClose(logger, reader, writer)
125128
})
@@ -128,7 +131,8 @@ func (connectorPair *IOConnectorPair) proxy(
128131
logger.Debug("begin copying io")
129132
defer logger.Debug("end copying io")
130133

131-
_, err := io.CopyBuffer(writer, reader, make([]byte, internal.DefaultBufferSize))
134+
size, err := io.CopyBuffer(writer, reader, make([]byte, internal.DefaultBufferSize))
135+
logger.Debugf("copied %d", size)
132136
if err != nil {
133137
if strings.Contains(err.Error(), "use of closed network connection") ||
134138
strings.Contains(err.Error(), "file already closed") {
@@ -138,6 +142,7 @@ func (connectorPair *IOConnectorPair) proxy(
138142
}
139143
copyDone <- err
140144
}
145+
defer logClose(logger, reader, writer)
141146
}()
142147

143148
return initDone, copyDone
@@ -172,19 +177,20 @@ func (ioConnectorSet *ioConnectorSet) start(proc *vmProc) (ioInitDone <-chan err
172177
if ioConnectorSet.stdin != nil {
173178
// For Stdin only, provide 0 as the timeout to wait after the proc exits before closing IO streams.
174179
// There's no reason to send stdin data to a proc that's already dead.
175-
waitErrs(ioConnectorSet.stdin.proxy(proc, proc.logger.WithField("stream", "stdin"), 0))
180+
waitErrs(ioConnectorSet.stdin.proxy(proc.ctx, proc.logger.WithField("stream", "stdin"), 0))
181+
176182
} else {
177183
proc.logger.Debug("skipping proxy io for unset stdin")
178184
}
179185

180186
if ioConnectorSet.stdout != nil {
181-
waitErrs(ioConnectorSet.stdout.proxy(proc, proc.logger.WithField("stream", "stdout"), defaultIOFlushTimeout))
187+
waitErrs(ioConnectorSet.stdout.proxy(proc.ctx, proc.logger.WithField("stream", "stdout"), defaultIOFlushTimeout))
182188
} else {
183189
proc.logger.Debug("skipping proxy io for unset stdout")
184190
}
185191

186192
if ioConnectorSet.stderr != nil {
187-
waitErrs(ioConnectorSet.stderr.proxy(proc, proc.logger.WithField("stream", "stderr"), defaultIOFlushTimeout))
193+
waitErrs(ioConnectorSet.stderr.proxy(proc.ctx, proc.logger.WithField("stream", "stderr"), defaultIOFlushTimeout))
188194
} else {
189195
proc.logger.Debug("skipping proxy io for unset stderr")
190196
}

internal/vm/ioproxy_test.go

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License"). You may
4+
// not use this file except in compliance with the License. A copy of the
5+
// License is located at
6+
//
7+
// http://aws.amazon.com/apache2.0/
8+
//
9+
// or in the "license" file accompanying this file. This file is distributed
10+
// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
11+
// express or implied. See the License for the specific language governing
12+
// permissions and limitations under the License.
13+
14+
package vm
15+
16+
import (
17+
"context"
18+
"io/ioutil"
19+
"os"
20+
"path/filepath"
21+
"testing"
22+
23+
"github.com/sirupsen/logrus"
24+
"github.com/stretchr/testify/assert"
25+
"github.com/stretchr/testify/require"
26+
)
27+
28+
func fileConnector(path string, flag int) IOConnector {
29+
return func(procCtx context.Context, logger *logrus.Entry) <-chan IOConnectorResult {
30+
returnCh := make(chan IOConnectorResult, 1)
31+
defer close(returnCh)
32+
33+
file, err := os.OpenFile(path, flag, 0600)
34+
returnCh <- IOConnectorResult{
35+
ReadWriteCloser: file,
36+
Err: err,
37+
}
38+
39+
return returnCh
40+
}
41+
}
42+
43+
func TestProxy(t *testing.T) {
44+
dir, err := ioutil.TempDir("", t.Name())
45+
require.NoError(t, err)
46+
defer os.RemoveAll(dir)
47+
48+
ctx := context.Background()
49+
content := "hello world"
50+
51+
err = ioutil.WriteFile(filepath.Join(dir, "input"), []byte(content), 0600)
52+
require.NoError(t, err)
53+
54+
pair := &IOConnectorPair{
55+
ReadConnector: fileConnector(filepath.Join(dir, "input"), os.O_RDONLY),
56+
WriteConnector: fileConnector(filepath.Join(dir, "output"), os.O_CREATE|os.O_WRONLY),
57+
}
58+
initCh, copyCh := pair.proxy(ctx, logrus.WithFields(logrus.Fields{}), 0)
59+
60+
assert.Nil(t, <-initCh)
61+
assert.Nil(t, <-copyCh)
62+
63+
bytes, err := ioutil.ReadFile(filepath.Join(dir, "output"))
64+
require.NoError(t, err)
65+
assert.Equal(t, content, string(bytes))
66+
}

runtime/service.go

Lines changed: 44 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -793,6 +793,46 @@ func (s *service) buildRootDrive(req *proto.CreateVMRequest) []models.Drive {
793793
return builder.Build()
794794
}
795795

796+
func (s *service) newIOProxy(logger *logrus.Entry, stdin, stdout, stderr string, extraData *proto.ExtraData) (vm.IOProxy, error) {
797+
var ioConnectorSet vm.IOProxy
798+
799+
relVSockPath, err := s.jailer.JailPath().FirecrackerVSockRelPath()
800+
if err != nil {
801+
return nil, errors.Wrapf(err, "failed to get relative path to firecracker vsock")
802+
}
803+
804+
if vm.IsAgentOnlyIO(stdout, logger) {
805+
ioConnectorSet = vm.NewNullIOProxy()
806+
} else {
807+
var stdinConnectorPair *vm.IOConnectorPair
808+
if stdin != "" {
809+
stdinConnectorPair = &vm.IOConnectorPair{
810+
ReadConnector: vm.ReadFIFOConnector(stdin),
811+
WriteConnector: vm.VSockDialConnector(relVSockPath, extraData.StdinPort),
812+
}
813+
}
814+
815+
var stdoutConnectorPair *vm.IOConnectorPair
816+
if stdout != "" {
817+
stdoutConnectorPair = &vm.IOConnectorPair{
818+
ReadConnector: vm.VSockDialConnector(relVSockPath, extraData.StdoutPort),
819+
WriteConnector: vm.WriteFIFOConnector(stdout),
820+
}
821+
}
822+
823+
var stderrConnectorPair *vm.IOConnectorPair
824+
if stderr != "" {
825+
stderrConnectorPair = &vm.IOConnectorPair{
826+
ReadConnector: vm.VSockDialConnector(relVSockPath, extraData.StderrPort),
827+
WriteConnector: vm.WriteFIFOConnector(stderr),
828+
}
829+
}
830+
831+
ioConnectorSet = vm.NewIOConnectorProxy(stdinConnectorPair, stdoutConnectorPair, stderrConnectorPair)
832+
}
833+
return ioConnectorSet, nil
834+
}
835+
796836
func (s *service) Create(requestCtx context.Context, request *taskAPI.CreateTaskRequest) (*taskAPI.CreateTaskResponse, error) {
797837
logger := s.logger.WithField("task_id", request.ID)
798838
defer logPanicAndDie(logger)
@@ -863,41 +903,9 @@ func (s *service) Create(requestCtx context.Context, request *taskAPI.CreateTask
863903
return nil, err
864904
}
865905

866-
relVSockPath, err := s.jailer.JailPath().FirecrackerVSockRelPath()
906+
ioConnectorSet, err := s.newIOProxy(logger, request.Stdin, request.Stdout, request.Stderr, extraData)
867907
if err != nil {
868-
return nil, errors.Wrapf(err, "failed to get relative path to firecracker vsock")
869-
}
870-
871-
var ioConnectorSet vm.IOProxy
872-
873-
if vm.IsAgentOnlyIO(request.Stdout, logger) {
874-
ioConnectorSet = vm.NewNullIOProxy()
875-
} else {
876-
var stdinConnectorPair *vm.IOConnectorPair
877-
if request.Stdin != "" {
878-
stdinConnectorPair = &vm.IOConnectorPair{
879-
ReadConnector: vm.FIFOConnector(request.Stdin),
880-
WriteConnector: vm.VSockDialConnector(relVSockPath, extraData.StdinPort),
881-
}
882-
}
883-
884-
var stdoutConnectorPair *vm.IOConnectorPair
885-
if request.Stdout != "" {
886-
stdoutConnectorPair = &vm.IOConnectorPair{
887-
ReadConnector: vm.VSockDialConnector(relVSockPath, extraData.StdoutPort),
888-
WriteConnector: vm.FIFOConnector(request.Stdout),
889-
}
890-
}
891-
892-
var stderrConnectorPair *vm.IOConnectorPair
893-
if request.Stderr != "" {
894-
stderrConnectorPair = &vm.IOConnectorPair{
895-
ReadConnector: vm.VSockDialConnector(relVSockPath, extraData.StderrPort),
896-
WriteConnector: vm.FIFOConnector(request.Stderr),
897-
}
898-
}
899-
900-
ioConnectorSet = vm.NewIOConnectorProxy(stdinConnectorPair, stdoutConnectorPair, stderrConnectorPair)
908+
return nil, err
901909
}
902910

903911
// override the request with the bundle dir that should be used inside the VM
@@ -993,41 +1001,9 @@ func (s *service) Exec(requestCtx context.Context, req *taskAPI.ExecProcessReque
9931001
return nil, err
9941002
}
9951003

996-
relVSockPath, err := s.jailer.JailPath().FirecrackerVSockRelPath()
1004+
ioConnectorSet, err := s.newIOProxy(logger, req.Stdin, req.Stdout, req.Stderr, extraData)
9971005
if err != nil {
998-
return nil, errors.Wrapf(err, "failed to get relative path to firecracker vsock")
999-
}
1000-
1001-
var ioConnectorSet vm.IOProxy
1002-
1003-
if vm.IsAgentOnlyIO(req.Stdout, logger) {
1004-
ioConnectorSet = vm.NewNullIOProxy()
1005-
} else {
1006-
var stdinConnectorPair *vm.IOConnectorPair
1007-
if req.Stdin != "" {
1008-
stdinConnectorPair = &vm.IOConnectorPair{
1009-
ReadConnector: vm.FIFOConnector(req.Stdin),
1010-
WriteConnector: vm.VSockDialConnector(relVSockPath, extraData.StdinPort),
1011-
}
1012-
}
1013-
1014-
var stdoutConnectorPair *vm.IOConnectorPair
1015-
if req.Stdout != "" {
1016-
stdoutConnectorPair = &vm.IOConnectorPair{
1017-
ReadConnector: vm.VSockDialConnector(relVSockPath, extraData.StdoutPort),
1018-
WriteConnector: vm.FIFOConnector(req.Stdout),
1019-
}
1020-
}
1021-
1022-
var stderrConnectorPair *vm.IOConnectorPair
1023-
if req.Stderr != "" {
1024-
stderrConnectorPair = &vm.IOConnectorPair{
1025-
ReadConnector: vm.VSockDialConnector(relVSockPath, extraData.StderrPort),
1026-
WriteConnector: vm.FIFOConnector(req.Stderr),
1027-
}
1028-
}
1029-
1030-
ioConnectorSet = vm.NewIOConnectorProxy(stdinConnectorPair, stdoutConnectorPair, stderrConnectorPair)
1006+
return nil, err
10311007
}
10321008

10331009
resp, err := s.taskManager.ExecProcess(requestCtx, req, s.agentClient, ioConnectorSet)

0 commit comments

Comments
 (0)