Skip to content

Commit eba4e8b

Browse files
committed
Open a FIFO with O_RDONLY to correctly see EOF
FIFOConnector was opening a FIFO with O_RDWR, but we used the FIFO only for either reading or writing, not both. On the reading case, read(2) on the FIFO wouldn't return EOF and that made IOConnectionPair#proxy()'s goroutine keep running. Signed-off-by: Kazuyoshi Kato <[email protected]>
1 parent 2f4b9bf commit eba4e8b

File tree

5 files changed

+70
-79
lines changed

5 files changed

+70
-79
lines changed

agent/service.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -228,15 +228,15 @@ func (ts *TaskService) Create(requestCtx context.Context, req *taskAPI.CreateTas
228228
req.Stdin = fifoSet.Stdin
229229
stdinConnectorPair = &vm.IOConnectorPair{
230230
ReadConnector: vm.VSockAcceptConnector(extraData.StdinPort),
231-
WriteConnector: vm.FIFOConnector(fifoSet.Stdin),
231+
WriteConnector: vm.WriteFIFOConnector(fifoSet.Stdin),
232232
}
233233
}
234234

235235
var stdoutConnectorPair *vm.IOConnectorPair
236236
if req.Stdout != "" {
237237
req.Stdout = fifoSet.Stdout
238238
stdoutConnectorPair = &vm.IOConnectorPair{
239-
ReadConnector: vm.FIFOConnector(fifoSet.Stdout),
239+
ReadConnector: vm.ReadFIFOConnector(fifoSet.Stdout),
240240
WriteConnector: vm.VSockAcceptConnector(extraData.StdoutPort),
241241
}
242242
}
@@ -245,7 +245,7 @@ func (ts *TaskService) Create(requestCtx context.Context, req *taskAPI.CreateTas
245245
if req.Stderr != "" {
246246
req.Stderr = fifoSet.Stderr
247247
stderrConnectorPair = &vm.IOConnectorPair{
248-
ReadConnector: vm.FIFOConnector(fifoSet.Stderr),
248+
ReadConnector: vm.ReadFIFOConnector(fifoSet.Stderr),
249249
WriteConnector: vm.VSockAcceptConnector(extraData.StderrPort),
250250
}
251251
}
@@ -454,7 +454,7 @@ func (ts *TaskService) Exec(requestCtx context.Context, req *taskAPI.ExecProcess
454454
req.Stdin = fifoSet.Stdin
455455
stdinConnectorPair = &vm.IOConnectorPair{
456456
ReadConnector: vm.VSockAcceptConnector(extraData.StdinPort),
457-
WriteConnector: vm.FIFOConnector(fifoSet.Stdin),
457+
WriteConnector: vm.WriteFIFOConnector(fifoSet.Stdin),
458458
}
459459
ts.addCleanup(taskExecID, func() error {
460460
return os.RemoveAll(req.Stdin)
@@ -465,7 +465,7 @@ func (ts *TaskService) Exec(requestCtx context.Context, req *taskAPI.ExecProcess
465465
if req.Stdout != "" {
466466
req.Stdout = fifoSet.Stdout
467467
stdoutConnectorPair = &vm.IOConnectorPair{
468-
ReadConnector: vm.FIFOConnector(fifoSet.Stdout),
468+
ReadConnector: vm.ReadFIFOConnector(fifoSet.Stdout),
469469
WriteConnector: vm.VSockAcceptConnector(extraData.StdoutPort),
470470
}
471471
ts.addCleanup(taskExecID, func() error {
@@ -477,7 +477,7 @@ func (ts *TaskService) Exec(requestCtx context.Context, req *taskAPI.ExecProcess
477477
if req.Stderr != "" {
478478
req.Stderr = fifoSet.Stderr
479479
stderrConnectorPair = &vm.IOConnectorPair{
480-
ReadConnector: vm.FIFOConnector(fifoSet.Stderr),
480+
ReadConnector: vm.ReadFIFOConnector(fifoSet.Stderr),
481481
WriteConnector: vm.VSockAcceptConnector(extraData.StderrPort),
482482
}
483483
ts.addCleanup(taskExecID, func() error {

internal/vm/fifo.go

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,16 +21,16 @@ import (
2121
"github.com/sirupsen/logrus"
2222
)
2323

24-
// FIFOConnector adapts containerd's fifo package to the IOConnector interface
25-
func FIFOConnector(path string) IOConnector {
24+
// fifoConnector adapts containerd's fifo package to the IOConnector interface
25+
func fifoConnector(path string, flag int) IOConnector {
2626
return func(procCtx context.Context, logger *logrus.Entry) <-chan IOConnectorResult {
2727
returnCh := make(chan IOConnectorResult, 1)
2828
defer close(returnCh)
2929

3030
// We open the FIFO synchronously to ensure that the FIFO is created (via O_CREAT) before
31-
// it is passed to any task service. O_RDWR ensures that we don't block on the syscall
31+
// it is passed to any task service. O_NONBLOCK ensures that we don't block on the syscall
3232
// level (as documented in the fifo pkg).
33-
fifo, err := fifo.OpenFifo(procCtx, path, syscall.O_CREAT|syscall.O_RDWR, 0300)
33+
fifo, err := fifo.OpenFifo(procCtx, path, syscall.O_CREAT|syscall.O_NONBLOCK|flag, 0300)
3434
returnCh <- IOConnectorResult{
3535
ReadWriteCloser: fifo,
3636
Err: err,
@@ -39,3 +39,13 @@ func FIFOConnector(path string) IOConnector {
3939
return returnCh
4040
}
4141
}
42+
43+
// ReadFIFOConnector returns a FIFO which is open for reading
44+
func ReadFIFOConnector(path string) IOConnector {
45+
return fifoConnector(path, syscall.O_RDONLY)
46+
}
47+
48+
// WriteFIFOConnector returns a FIFO which is open for writing
49+
func WriteFIFOConnector(path string) IOConnector {
50+
return fifoConnector(path, syscall.O_WRONLY)
51+
}

internal/vm/ioproxy.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ func (connectorPair *IOConnectorPair) proxy(
128128
logger.Debug("begin copying io")
129129
defer logger.Debug("end copying io")
130130

131-
_, err := io.CopyBuffer(writer, reader, make([]byte, internal.DefaultBufferSize))
131+
size, err := io.CopyBuffer(writer, reader, make([]byte, internal.DefaultBufferSize))
132132
if err != nil {
133133
if strings.Contains(err.Error(), "use of closed network connection") ||
134134
strings.Contains(err.Error(), "file already closed") {
@@ -138,6 +138,8 @@ func (connectorPair *IOConnectorPair) proxy(
138138
}
139139
copyDone <- err
140140
}
141+
logger.Debugf("copied %d", size)
142+
defer logClose(logger, reader, writer)
141143
}()
142144

143145
return initDone, copyDone

internal/vm/task.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -269,6 +269,9 @@ func (m *taskManager) monitorExit(proc *vmProc, taskService taskAPI.TaskService)
269269
ID: proc.taskID,
270270
ExecID: proc.execID,
271271
})
272+
273+
<-proc.ioCopyDone
274+
272275
proc.cancel()
273276

274277
if waitErr == context.Canceled {

runtime/service.go

Lines changed: 44 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -787,6 +787,46 @@ func (s *service) buildRootDrive(req *proto.CreateVMRequest) []models.Drive {
787787
return builder.Build()
788788
}
789789

790+
func (s *service) newIOProxy(logger *logrus.Entry, stdin, stdout, stderr string, extraData *proto.ExtraData) (vm.IOProxy, error) {
791+
var ioConnectorSet vm.IOProxy
792+
793+
relVSockPath, err := s.jailer.JailPath().FirecrackerVSockRelPath()
794+
if err != nil {
795+
return nil, errors.Wrapf(err, "failed to get relative path to firecracker vsock")
796+
}
797+
798+
if vm.IsAgentOnlyIO(stdout, logger) {
799+
ioConnectorSet = vm.NewNullIOProxy()
800+
} else {
801+
var stdinConnectorPair *vm.IOConnectorPair
802+
if stdin != "" {
803+
stdinConnectorPair = &vm.IOConnectorPair{
804+
ReadConnector: vm.ReadFIFOConnector(stdin),
805+
WriteConnector: vm.VSockDialConnector(relVSockPath, extraData.StdinPort),
806+
}
807+
}
808+
809+
var stdoutConnectorPair *vm.IOConnectorPair
810+
if stdout != "" {
811+
stdoutConnectorPair = &vm.IOConnectorPair{
812+
ReadConnector: vm.VSockDialConnector(relVSockPath, extraData.StdoutPort),
813+
WriteConnector: vm.WriteFIFOConnector(stdout),
814+
}
815+
}
816+
817+
var stderrConnectorPair *vm.IOConnectorPair
818+
if stderr != "" {
819+
stderrConnectorPair = &vm.IOConnectorPair{
820+
ReadConnector: vm.VSockDialConnector(relVSockPath, extraData.StderrPort),
821+
WriteConnector: vm.WriteFIFOConnector(stderr),
822+
}
823+
}
824+
825+
ioConnectorSet = vm.NewIOConnectorProxy(stdinConnectorPair, stdoutConnectorPair, stderrConnectorPair)
826+
}
827+
return ioConnectorSet, nil
828+
}
829+
790830
func (s *service) Create(requestCtx context.Context, request *taskAPI.CreateTaskRequest) (*taskAPI.CreateTaskResponse, error) {
791831
logger := s.logger.WithField("task_id", request.ID)
792832
defer logPanicAndDie(logger)
@@ -857,41 +897,9 @@ func (s *service) Create(requestCtx context.Context, request *taskAPI.CreateTask
857897
return nil, err
858898
}
859899

860-
relVSockPath, err := s.jailer.JailPath().FirecrackerVSockRelPath()
900+
ioConnectorSet, err := s.newIOProxy(logger, request.Stdin, request.Stdout, request.Stderr, extraData)
861901
if err != nil {
862-
return nil, errors.Wrapf(err, "failed to get relative path to firecracker vsock")
863-
}
864-
865-
var ioConnectorSet vm.IOProxy
866-
867-
if vm.IsAgentOnlyIO(request.Stdout, logger) {
868-
ioConnectorSet = vm.NewNullIOProxy()
869-
} else {
870-
var stdinConnectorPair *vm.IOConnectorPair
871-
if request.Stdin != "" {
872-
stdinConnectorPair = &vm.IOConnectorPair{
873-
ReadConnector: vm.FIFOConnector(request.Stdin),
874-
WriteConnector: vm.VSockDialConnector(relVSockPath, extraData.StdinPort),
875-
}
876-
}
877-
878-
var stdoutConnectorPair *vm.IOConnectorPair
879-
if request.Stdout != "" {
880-
stdoutConnectorPair = &vm.IOConnectorPair{
881-
ReadConnector: vm.VSockDialConnector(relVSockPath, extraData.StdoutPort),
882-
WriteConnector: vm.FIFOConnector(request.Stdout),
883-
}
884-
}
885-
886-
var stderrConnectorPair *vm.IOConnectorPair
887-
if request.Stderr != "" {
888-
stderrConnectorPair = &vm.IOConnectorPair{
889-
ReadConnector: vm.VSockDialConnector(relVSockPath, extraData.StderrPort),
890-
WriteConnector: vm.FIFOConnector(request.Stderr),
891-
}
892-
}
893-
894-
ioConnectorSet = vm.NewIOConnectorProxy(stdinConnectorPair, stdoutConnectorPair, stderrConnectorPair)
902+
return nil, err
895903
}
896904

897905
// override the request with the bundle dir that should be used inside the VM
@@ -987,41 +995,9 @@ func (s *service) Exec(requestCtx context.Context, req *taskAPI.ExecProcessReque
987995
return nil, err
988996
}
989997

990-
relVSockPath, err := s.jailer.JailPath().FirecrackerVSockRelPath()
998+
ioConnectorSet, err := s.newIOProxy(logger, req.Stdin, req.Stdout, req.Stderr, extraData)
991999
if err != nil {
992-
return nil, errors.Wrapf(err, "failed to get relative path to firecracker vsock")
993-
}
994-
995-
var ioConnectorSet vm.IOProxy
996-
997-
if vm.IsAgentOnlyIO(req.Stdout, logger) {
998-
ioConnectorSet = vm.NewNullIOProxy()
999-
} else {
1000-
var stdinConnectorPair *vm.IOConnectorPair
1001-
if req.Stdin != "" {
1002-
stdinConnectorPair = &vm.IOConnectorPair{
1003-
ReadConnector: vm.FIFOConnector(req.Stdin),
1004-
WriteConnector: vm.VSockDialConnector(relVSockPath, extraData.StdinPort),
1005-
}
1006-
}
1007-
1008-
var stdoutConnectorPair *vm.IOConnectorPair
1009-
if req.Stdout != "" {
1010-
stdoutConnectorPair = &vm.IOConnectorPair{
1011-
ReadConnector: vm.VSockDialConnector(relVSockPath, extraData.StdoutPort),
1012-
WriteConnector: vm.FIFOConnector(req.Stdout),
1013-
}
1014-
}
1015-
1016-
var stderrConnectorPair *vm.IOConnectorPair
1017-
if req.Stderr != "" {
1018-
stderrConnectorPair = &vm.IOConnectorPair{
1019-
ReadConnector: vm.VSockDialConnector(relVSockPath, extraData.StderrPort),
1020-
WriteConnector: vm.FIFOConnector(req.Stderr),
1021-
}
1022-
}
1023-
1024-
ioConnectorSet = vm.NewIOConnectorProxy(stdinConnectorPair, stdoutConnectorPair, stderrConnectorPair)
1000+
return nil, err
10251001
}
10261002

10271003
resp, err := s.taskManager.ExecProcess(requestCtx, req, s.agentClient, ioConnectorSet)

0 commit comments

Comments
 (0)