Skip to content

Commit 6729203

Browse files
authored
Merge pull request #715 from kzys/agent-fail-fast
Fail-fast if a VM is terminated
2 parents 1595039 + e6028be commit 6729203

File tree

2 files changed

+149
-17
lines changed

2 files changed

+149
-17
lines changed

runtime/service.go

Lines changed: 95 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1230,7 +1230,11 @@ func (s *service) Create(requestCtx context.Context, request *taskAPI.CreateTask
12301230
request.Rootfs = nil
12311231
}
12321232

1233-
resp, err := s.taskManager.CreateTask(requestCtx, request, s.agentClient, ioConnectorSet)
1233+
agent, err := s.agent()
1234+
if err != nil {
1235+
return nil, err
1236+
}
1237+
resp, err := s.taskManager.CreateTask(requestCtx, request, agent, ioConnectorSet)
12341238
if err != nil {
12351239
err = fmt.Errorf("failed to create task: %w", err)
12361240
logger.WithError(err).Error()
@@ -1253,7 +1257,11 @@ func (s *service) Start(requestCtx context.Context, req *taskAPI.StartRequest) (
12531257
defer logPanicAndDie(log.G(requestCtx))
12541258

12551259
log.G(requestCtx).WithFields(logrus.Fields{"task_id": req.ID, "exec_id": req.ExecID}).Debug("start")
1256-
resp, err := s.agentClient.Start(requestCtx, req)
1260+
agent, err := s.agent()
1261+
if err != nil {
1262+
return nil, err
1263+
}
1264+
resp, err := agent.Start(requestCtx, req)
12571265
if err != nil {
12581266
return nil, err
12591267
}
@@ -1267,7 +1275,11 @@ func (s *service) Delete(requestCtx context.Context, req *taskAPI.DeleteRequest)
12671275

12681276
logger.Debug("delete")
12691277

1270-
resp, err := s.taskManager.DeleteProcess(requestCtx, req, s.agentClient)
1278+
agent, err := s.agent()
1279+
if err != nil {
1280+
return nil, err
1281+
}
1282+
resp, err := s.taskManager.DeleteProcess(requestCtx, req, agent)
12711283
if err != nil {
12721284
return nil, err
12731285
}
@@ -1316,6 +1328,11 @@ func (s *service) Exec(requestCtx context.Context, req *taskAPI.ExecProcessReque
13161328
logger := s.logger.WithField("task_id", req.ID).WithField("exec_id", req.ExecID)
13171329
logger.Debug("exec")
13181330

1331+
agent, err := s.agent()
1332+
if err != nil {
1333+
return nil, err
1334+
}
1335+
13191336
// no OCI config bytes to provide for Exec, just leave those fields empty
13201337
extraData, err := s.generateExtraData(nil, req.Spec)
13211338
if err != nil {
@@ -1336,7 +1353,7 @@ func (s *service) Exec(requestCtx context.Context, req *taskAPI.ExecProcessReque
13361353
return nil, err
13371354
}
13381355

1339-
resp, err := s.taskManager.ExecProcess(requestCtx, req, s.agentClient, ioConnectorSet)
1356+
resp, err := s.taskManager.ExecProcess(requestCtx, req, agent, ioConnectorSet)
13401357
if err != nil {
13411358
return nil, err
13421359
}
@@ -1359,7 +1376,11 @@ func (s *service) ResizePty(requestCtx context.Context, req *taskAPI.ResizePtyRe
13591376
defer logPanicAndDie(log.G(requestCtx))
13601377

13611378
log.G(requestCtx).WithFields(logrus.Fields{"task_id": req.ID, "exec_id": req.ExecID}).Debug("resize_pty")
1362-
resp, err := s.agentClient.ResizePty(requestCtx, req)
1379+
agent, err := s.agent()
1380+
if err != nil {
1381+
return nil, err
1382+
}
1383+
resp, err := agent.ResizePty(requestCtx, req)
13631384
if err != nil {
13641385
return nil, err
13651386
}
@@ -1373,7 +1394,11 @@ func (s *service) State(requestCtx context.Context, req *taskAPI.StateRequest) (
13731394

13741395
logger := log.G(requestCtx).WithFields(logrus.Fields{"task_id": req.ID, "exec_id": req.ExecID})
13751396
logger.Debug("state")
1376-
resp, err := s.agentClient.State(requestCtx, req)
1397+
agent, err := s.agent()
1398+
if err != nil {
1399+
return nil, err
1400+
}
1401+
resp, err := agent.State(requestCtx, req)
13771402
if err != nil {
13781403
return nil, err
13791404
}
@@ -1451,7 +1476,11 @@ func (s *service) Pause(requestCtx context.Context, req *taskAPI.PauseRequest) (
14511476
defer logPanicAndDie(log.G(requestCtx))
14521477

14531478
log.G(requestCtx).WithField("task_id", req.ID).Debug("pause")
1454-
resp, err := s.agentClient.Pause(requestCtx, req)
1479+
agent, err := s.agent()
1480+
if err != nil {
1481+
return nil, err
1482+
}
1483+
resp, err := agent.Pause(requestCtx, req)
14551484
if err != nil {
14561485
return nil, err
14571486
}
@@ -1464,7 +1493,11 @@ func (s *service) Resume(requestCtx context.Context, req *taskAPI.ResumeRequest)
14641493
defer logPanicAndDie(log.G(requestCtx))
14651494

14661495
log.G(requestCtx).WithField("task_id", req.ID).Debug("resume")
1467-
resp, err := s.agentClient.Resume(requestCtx, req)
1496+
agent, err := s.agent()
1497+
if err != nil {
1498+
return nil, err
1499+
}
1500+
resp, err := agent.Resume(requestCtx, req)
14681501
if err != nil {
14691502
return nil, err
14701503
}
@@ -1477,7 +1510,11 @@ func (s *service) Kill(requestCtx context.Context, req *taskAPI.KillRequest) (*t
14771510
defer logPanicAndDie(log.G(requestCtx))
14781511

14791512
log.G(requestCtx).WithFields(logrus.Fields{"task_id": req.ID, "exec_id": req.ExecID}).Debug("kill")
1480-
resp, err := s.agentClient.Kill(requestCtx, req)
1513+
agent, err := s.agent()
1514+
if err != nil {
1515+
return nil, err
1516+
}
1517+
resp, err := agent.Kill(requestCtx, req)
14811518
if err != nil {
14821519
return nil, err
14831520
}
@@ -1489,7 +1526,11 @@ func (s *service) Pids(requestCtx context.Context, req *taskAPI.PidsRequest) (*t
14891526
defer logPanicAndDie(log.G(requestCtx))
14901527

14911528
log.G(requestCtx).WithField("task_id", req.ID).Debug("pids")
1492-
resp, err := s.agentClient.Pids(requestCtx, req)
1529+
agent, err := s.agent()
1530+
if err != nil {
1531+
return nil, err
1532+
}
1533+
resp, err := agent.Pids(requestCtx, req)
14931534
if err != nil {
14941535
return nil, err
14951536
}
@@ -1502,7 +1543,11 @@ func (s *service) CloseIO(requestCtx context.Context, req *taskAPI.CloseIOReques
15021543
defer logPanicAndDie(log.G(requestCtx))
15031544

15041545
log.G(requestCtx).WithFields(logrus.Fields{"task_id": req.ID, "exec_id": req.ExecID}).Debug("close_io")
1505-
resp, err := s.agentClient.CloseIO(requestCtx, req)
1546+
agent, err := s.agent()
1547+
if err != nil {
1548+
return nil, err
1549+
}
1550+
resp, err := agent.CloseIO(requestCtx, req)
15061551
if err != nil {
15071552
return nil, err
15081553
}
@@ -1515,7 +1560,11 @@ func (s *service) Checkpoint(requestCtx context.Context, req *taskAPI.Checkpoint
15151560
defer logPanicAndDie(log.G(requestCtx))
15161561

15171562
log.G(requestCtx).WithFields(logrus.Fields{"task_id": req.ID, "path": req.Path}).Info("checkpoint")
1518-
resp, err := s.agentClient.Checkpoint(requestCtx, req)
1563+
agent, err := s.agent()
1564+
if err != nil {
1565+
return nil, err
1566+
}
1567+
resp, err := agent.Checkpoint(requestCtx, req)
15191568
if err != nil {
15201569
return nil, err
15211570
}
@@ -1528,7 +1577,11 @@ func (s *service) Connect(requestCtx context.Context, req *taskAPI.ConnectReques
15281577
defer logPanicAndDie(log.G(requestCtx))
15291578

15301579
log.G(requestCtx).WithField("id", req.ID).Debug("connect")
1531-
resp, err := s.agentClient.Connect(requestCtx, req)
1580+
agent, err := s.agent()
1581+
if err != nil {
1582+
return nil, err
1583+
}
1584+
resp, err := agent.Connect(requestCtx, req)
15321585
if err != nil {
15331586
return nil, err
15341587
}
@@ -1614,7 +1667,11 @@ func (s *service) terminate(ctx context.Context) (retErr error) {
16141667
}
16151668

16161669
s.logger.Info("gracefully shutdown VM")
1617-
_, err = s.agentClient.Shutdown(ctx, &taskAPI.ShutdownRequest{ID: s.vmID, Now: true})
1670+
agent, err := s.agent()
1671+
if err != nil {
1672+
return err
1673+
}
1674+
_, err = agent.Shutdown(ctx, &taskAPI.ShutdownRequest{ID: s.vmID, Now: true})
16181675
if err != nil {
16191676
s.logger.WithError(err).Error("failed to call in-VM agent")
16201677
return
@@ -1634,7 +1691,11 @@ func (s *service) Stats(requestCtx context.Context, req *taskAPI.StatsRequest) (
16341691
defer logPanicAndDie(log.G(requestCtx))
16351692
log.G(requestCtx).WithField("task_id", req.ID).Debug("stats")
16361693

1637-
resp, err := s.agentClient.Stats(requestCtx, req)
1694+
agent, err := s.agent()
1695+
if err != nil {
1696+
return nil, err
1697+
}
1698+
resp, err := agent.Stats(requestCtx, req)
16381699
if err != nil {
16391700
return nil, err
16401701
}
@@ -1647,7 +1708,11 @@ func (s *service) Update(requestCtx context.Context, req *taskAPI.UpdateTaskRequ
16471708
defer logPanicAndDie(log.G(requestCtx))
16481709
log.G(requestCtx).WithField("task_id", req.ID).Debug("update")
16491710

1650-
resp, err := s.agentClient.Update(requestCtx, req)
1711+
agent, err := s.agent()
1712+
if err != nil {
1713+
return nil, err
1714+
}
1715+
resp, err := agent.Update(requestCtx, req)
16511716
if err != nil {
16521717
return nil, err
16531718
}
@@ -1660,7 +1725,11 @@ func (s *service) Wait(requestCtx context.Context, req *taskAPI.WaitRequest) (*t
16601725
defer logPanicAndDie(log.G(requestCtx))
16611726
log.G(requestCtx).WithFields(logrus.Fields{"task_id": req.ID, "exec_id": req.ExecID}).Debug("wait")
16621727

1663-
resp, err := s.agentClient.Wait(requestCtx, req)
1728+
agent, err := s.agent()
1729+
if err != nil {
1730+
return nil, err
1731+
}
1732+
resp, err := agent.Wait(requestCtx, req)
16641733
if err != nil {
16651734
return nil, err
16661735
}
@@ -1715,3 +1784,12 @@ func (s *service) monitorVMExit() {
17151784
s.logger.WithError(err).Error("failed to clean up the VM")
17161785
}
17171786
}
1787+
1788+
// agent returns a client to talk to in-VM agent, only if the VM is not terminated.
1789+
func (s *service) agent() (taskAPI.TaskService, error) {
1790+
pid, _ := s.machine.PID()
1791+
if pid == 0 {
1792+
return nil, status.Errorf(codes.NotFound, "failed to find VM %q", s.vmID)
1793+
}
1794+
return s.agentClient, nil
1795+
}

runtime/service_integ_test.go

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1899,6 +1899,60 @@ func TestStopVM_Isolated(t *testing.T) {
18991899
}
19001900
}
19011901

1902+
func TestStopVMFailFast_Isolated(t *testing.T) {
1903+
integtest.Prepare(t)
1904+
1905+
client, err := containerd.New(integtest.ContainerdSockPath, containerd.WithDefaultRuntime(firecrackerRuntime))
1906+
require.NoError(t, err, "unable to create client to containerd service at %s, is containerd running?", integtest.ContainerdSockPath)
1907+
defer client.Close()
1908+
1909+
name := testNameToVMID(t.Name())
1910+
1911+
ctx := namespaces.WithNamespace(context.Background(), "default")
1912+
1913+
image, err := alpineImage(ctx, client, defaultSnapshotterName)
1914+
require.NoError(t, err, "failed to get alpine image")
1915+
1916+
fcClient, err := integtest.NewFCControlClient(integtest.ContainerdSockPath)
1917+
require.NoError(t, err)
1918+
1919+
_, err = fcClient.CreateVM(ctx, &proto.CreateVMRequest{VMID: name})
1920+
require.NoError(t, err)
1921+
1922+
c, err := client.NewContainer(ctx,
1923+
"container-"+name,
1924+
containerd.WithSnapshotter(defaultSnapshotterName),
1925+
containerd.WithNewSnapshot("snapshot-"+name, image),
1926+
containerd.WithNewSpec(oci.WithProcessArgs("/bin/echo", "-n", "hello"), firecrackeroci.WithVMID(name)),
1927+
)
1928+
require.NoError(t, err)
1929+
1930+
var stdout, stderr bytes.Buffer
1931+
task, err := c.NewTask(ctx, cio.NewCreator(cio.WithStreams(nil, &stdout, &stderr)))
1932+
require.NoError(t, err)
1933+
1934+
err = task.Start(ctx)
1935+
require.NoError(t, err)
1936+
1937+
processes, err := findProcess(ctx, findFirecracker)
1938+
require.NoError(t, err, "failed waiting for expected firecracker process %q to come up", firecrackerProcessName)
1939+
require.Len(t, processes, 1, "expected only one firecracker process to exist")
1940+
fc := processes[0]
1941+
err = fc.KillWithContext(ctx)
1942+
require.NoError(t, err)
1943+
1944+
// Unix signals are asynchronous.
1945+
// We have to wait for a moment to make sure that the process is died.
1946+
err = internal.WaitForPidToExit(ctx, time.Second, fc.Pid)
1947+
assert.NoError(t, err)
1948+
1949+
_, err = task.Delete(ctx)
1950+
assert.ErrorIs(t, err, errdefs.ErrNotFound)
1951+
1952+
err = c.Delete(ctx)
1953+
assert.NoError(t, err)
1954+
}
1955+
19021956
func TestExec_Isolated(t *testing.T) {
19031957
integtest.Prepare(t)
19041958

0 commit comments

Comments
 (0)