Skip to content

Commit 1324e37

Browse files
committed
Fix listener
1 parent e8092e4 commit 1324e37

File tree

6 files changed

+82
-84
lines changed

6 files changed

+82
-84
lines changed

ctriface/iface.go

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ package ctriface
2525
import (
2626
"context"
2727
"encoding/json"
28+
"net"
2829
"os"
2930
"os/exec"
3031
"path/filepath"
@@ -504,6 +505,8 @@ func (o *Orchestrator) LoadSnapshot(ctx context.Context, originVmID string, vmID
504505
BackendType: fileBackend,
505506
BackendPath: snap.GetMemFilePath(),
506507
}
508+
509+
var uffdConn *net.UnixConn
507510

508511
if o.GetUPFEnabled() {
509512
logger.Debug("TEST: UPF is enabled")
@@ -517,6 +520,10 @@ func (o *Orchestrator) LoadSnapshot(ctx context.Context, originVmID string, vmID
517520
if err := o.memoryManager.FetchState(originVmID); err != nil {
518521
return nil, nil, err
519522
}
523+
524+
if uffdConn, err = o.memoryManager.ListenUffd(originVmID, o.uffdSockAddr); err != nil {
525+
return nil, nil, err
526+
}
520527
}
521528

522529
tStart = time.Now()
@@ -556,7 +563,6 @@ func (o *Orchestrator) LoadSnapshot(ctx context.Context, originVmID string, vmID
556563
}()
557564

558565
logger.Debug("TEST: CreatVM request sent")
559-
<-loadDone
560566

561567
if o.GetUPFEnabled() {
562568

@@ -571,18 +577,19 @@ func (o *Orchestrator) LoadSnapshot(ctx context.Context, originVmID string, vmID
571577
VMMStatePath: o.getSnapshotFile(vmID),
572578
WorkingSetPath: o.getWorkingSetFile(vmID),
573579
InstanceSockAddr: o.uffdSockAddr,
580+
UffdConn: uffdConn,
574581
}
575582
if err := o.memoryManager.RegisterVMFromSnap(originVmID, stateCfg); err != nil {
576583
logger.Error(err, "failed to register new VM with memory manager")
577584
}
578585

579586
logger.Debug("TEST: activate VM in mm")
580-
if activateErr = o.memoryManager.Activate(vmID); activateErr != nil {
587+
if activateErr = o.memoryManager.Activate(vmID, uffdConn); activateErr != nil {
581588
logger.Warn("Failed to activate VM in the memory manager", activateErr)
582589
}
583590
}
584591

585-
// <-loadDone
592+
<-loadDone
586593

587594
loadSnapshotMetric.MetricMap[metrics.LoadVMM] = metrics.ToUS(time.Since(tStart))
588595

ctriface/iface_test.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,6 @@ import (
3333
log "github.com/sirupsen/logrus"
3434
"github.com/stretchr/testify/require"
3535
"github.com/vhive-serverless/vhive/snapshotting"
36-
37-
"github.com/vhive-serverless/vhive/lg"
3836
)
3937

4038
// TODO: Make it impossible to use lazy mode without UPF
@@ -105,7 +103,6 @@ func TestStartSnapStop(t *testing.T) {
105103
require.NoError(t, err, "Failed to load snapshot of VM")
106104

107105
log.Debug("TEST: LoadSnapshot completed")
108-
lg.UniLogger.Println("This is a test")
109106
_, err = orch.ResumeVM(ctx, vmID)
110107
require.NoError(t, err, "Failed to resume VM")
111108

ctriface/orch.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import (
3434
"github.com/vhive-serverless/vhive/devmapper"
3535

3636
log "github.com/sirupsen/logrus"
37+
lg "github.com/vhive-serverless/vhive/lg"
3738

3839
"github.com/containerd/containerd"
3940

@@ -129,12 +130,16 @@ func NewOrchestrator(snapshotter, hostIface string, opts ...OrchestratorOption)
129130
}
130131
defer file.Close()
131132

133+
lg.UniLogger.Println("TEST: created the uffd sock addr")
134+
132135
managerCfg := manager.MemoryManagerCfg{
133136
MetricsModeOn: o.isMetricsMode,
134137
UffdSockAddr: o.uffdSockAddr,
135138
}
136139
o.memoryManager = manager.NewMemoryManager(managerCfg)
137-
go o.memoryManager.ListenUffdSocket(o.uffdSockAddr)
140+
141+
// lg.UniLogger.Println("TEST: created a new memory manager. Start listen uffd socket")
142+
// go o.memoryManager.ListenUffdSocket(o.uffdSockAddr)
138143
}
139144

140145
log.Info("Creating containerd client")

lg/uni_logger.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import (
99
var UniLogger *log.Logger
1010

1111
func init() {
12-
file, err := os.OpenFile("output.log", os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
12+
file, err := os.OpenFile("uni_output.log", os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
1313
if err != nil {
1414
log.Fatalln("Failed to open log file:", err)
1515
}

memory/manager/manager.go

Lines changed: 26 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ func (m *MemoryManager) DeregisterVM(vmID string) error {
149149
}
150150

151151
// Activate Creates an epoller to serve page faults for the VM
152-
func (m *MemoryManager) Activate(vmID string) error {
152+
func (m *MemoryManager) Activate(vmID string, conn *net.UnixConn) error {
153153
logger := log.WithFields(log.Fields{"vmID": vmID})
154154

155155
logger.Debug("Activating instance in the memory manager")
@@ -164,14 +164,6 @@ func (m *MemoryManager) Activate(vmID string) error {
164164

165165
logger.Debug("TEST: Activate: fetch snapstate by vmID for UFFD")
166166

167-
// originID, ok := m.origins[vmID]
168-
169-
// if !ok {
170-
// logger.Debug("TEST: not loaded from snapshot")
171-
// }
172-
173-
// state, ok = m.instances[originID]
174-
175167
state, ok = m.instances[vmID]
176168

177169
if !ok {
@@ -187,29 +179,21 @@ func (m *MemoryManager) Activate(vmID string) error {
187179
return errors.New("VM already active")
188180
}
189181

190-
select {
191-
case <-m.startEpollingCh:
192-
if err := state.mapGuestMemory(); err != nil {
193-
logger.Error("Failed to map guest memory")
194-
return err
195-
}
196-
197-
if err := state.getUFFD(); err != nil {
198-
logger.Error("Failed to get uffd")
199-
return err
200-
}
182+
if err := state.mapGuestMemory(); err != nil {
183+
logger.Error("Failed to map guest memory")
184+
return err
185+
}
201186

202-
state.setupStateOnActivate()
187+
if err := state.getUFFD(conn); err != nil {
188+
logger.Error("Failed to get uffd")
189+
return err
190+
}
203191

204-
go state.pollUserPageFaults(readyCh)
192+
state.setupStateOnActivate()
205193

206-
<-readyCh
194+
go state.pollUserPageFaults(readyCh)
207195

208-
case <-time.After(100 * time.Second):
209-
return errors.New("Uffd connection to firecracker timeout")
210-
default:
211-
return errors.New("Failed to start epoller")
212-
}
196+
<-readyCh
213197

214198
return nil
215199
}
@@ -409,37 +393,25 @@ func (m *MemoryManager) GetUPFLatencyStats(vmID string) ([]*metrics.Metric, erro
409393
return state.latencyMetrics, nil
410394
}
411395

412-
func (m *MemoryManager) ListenUffdSocket(uffdSockAddr string) error {
413-
log.Debug("Start listening to uffd socket")
396+
func (m *MemoryManager) ListenUffd(vmID string, uffdSockAddr string) (*net.UnixConn, error) {
397+
logger := log.WithFields(log.Fields{"vmID": vmID})
414398

415-
m.startEpollingOnce.Do(func() {
416-
m.startEpollingCh = make(chan struct{})
417-
})
399+
logger.Debug("listening to uffd")
418400

419-
ln, err := net.Listen("unix", uffdSockAddr)
420-
if err != nil {
421-
log.Errorf("Failed to listen on uffd socket: %v", err)
422-
return errors.New("Failed to listen on uffd socket")
423-
}
424-
defer ln.Close()
401+
m.Lock()
425402

426-
for {
427-
conn, err := ln.Accept()
428-
if err != nil {
429-
log.Printf("Failed to accept connection on uffd socket: %v", err)
430-
continue
431-
}
432-
go func(conn net.Conn) {
433-
defer conn.Close()
434-
if err := ln.Close(); err != nil {
435-
log.Printf("Failed to close uffd socket listener: %v", err)
436-
}
437-
close(m.startEpollingCh)
438-
}(conn)
439-
break
403+
state, ok := m.instances[vmID]
404+
if !ok {
405+
m.Unlock()
406+
logger.Error("VM not registered with the memory manager")
407+
return nil, errors.New("VM not registered with the memory manager")
440408
}
441409

442-
return nil
410+
m.Unlock()
411+
412+
conn, _ := state.ListenUffdSocket(uffdSockAddr)
413+
414+
return conn, nil
443415
}
444416

445417
// Deprecated

memory/manager/snapshot_state.go

Lines changed: 39 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ package manager
2828
import "C"
2929

3030
import (
31-
"context"
3231
"encoding/binary"
3332
"errors"
3433
"fmt"
@@ -64,6 +63,7 @@ type SnapshotStateCfg struct {
6463

6564
VMMStatePath, GuestMemPath, WorkingSetPath string
6665

66+
UffdConn *net.UnixConn
6767
InstanceSockAddr string
6868
BaseDir string // base directory for the instance
6969
MetricsPath string // path to csv file where the metrics should be stored
@@ -134,37 +134,54 @@ func (s *SnapshotState) setupStateOnActivate() {
134134
}
135135
}
136136

137-
func (s *SnapshotState) getUFFD() error {
138-
var d net.Dialer
139-
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
140-
defer cancel()
137+
func (s *SnapshotState) ListenUffdSocket(uffdSockAddr string) (*net.UnixConn, error) {
138+
log.Debug("Start listening to uffd socket")
139+
ln, err := net.Listen("unix", uffdSockAddr)
140+
if err != nil {
141+
log.Errorf("Failed to listen on uffd socket: %v", err)
142+
return nil, errors.New("Failed to listen on uffd socket")
143+
}
144+
// defer ln.Close()
145+
146+
var uffdConn *net.UnixConn
141147

142148
for {
143-
c, err := d.DialContext(ctx, "unix", s.InstanceSockAddr)
149+
lg.UniLogger.Println("Listening ...")
150+
conn, err := ln.Accept()
144151
if err != nil {
145-
if ctx.Err() != nil {
146-
log.Error("Failed to dial within the context timeout")
147-
return err
148-
}
149-
time.Sleep(1 * time.Millisecond)
150-
continue
152+
log.Error("Failed to accept connection")
153+
return nil, err
151154
}
152-
log.Debugf("TEST: Dial uffd socket done: %s", s.InstanceSockAddr)
153155

154-
defer c.Close()
156+
sendfdConn, ok := conn.(*net.UnixConn)
157+
if !ok {
158+
log.Error("Failed to assert net.Conn to *net.UnixConn")
159+
return nil, fmt.Errorf("failed to assert net.Conn to *net.UnixConn")
160+
}
161+
s.SnapshotStateCfg.UffdConn = sendfdConn
162+
uffdConn = sendfdConn
163+
break
164+
// TODO: maybe need a synchronziation
165+
}
155166

156-
sendfdConn := c.(*net.UnixConn)
167+
return uffdConn, nil
168+
}
157169

158-
fs, err := fd.Get(sendfdConn, 1, []string{"a file"})
159-
if err != nil {
160-
log.Error("Failed to receive the uffd")
161-
return err
162-
}
163170

164-
s.userFaultFD = fs[0]
171+
func (s *SnapshotState) getUFFD(conn *net.UnixConn) error {
172+
// ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
173+
// defer cancel()
165174

166-
return nil
175+
// sendfdConn := s.SnapshotStateCfg.UffdConn
176+
sendfdConn := conn
177+
fs, err := fd.Get(sendfdConn, 1, []string{"a file"})
178+
if err != nil {
179+
log.Error("Failed to receive the uffd")
180+
return err
167181
}
182+
183+
s.userFaultFD = fs[0]
184+
return nil
168185
}
169186

170187
func (s *SnapshotState) processMetrics() {

0 commit comments

Comments
 (0)