Skip to content

Commit 4abaf5e

Browse files
committed
QMP connection pool
1 parent 6526969 commit 4abaf5e

File tree

3 files changed

+91
-8
lines changed

3 files changed

+91
-8
lines changed

lib/hypervisor/qemu/pool.go

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
package qemu
2+
3+
import (
4+
"sync"
5+
)
6+
7+
// clientPool manages singleton QMP connections per socket path.
8+
// QEMU's QMP socket only allows one connection at a time, so we must
9+
// reuse existing connections rather than creating new ones.
10+
var clientPool = struct {
11+
sync.RWMutex
12+
clients map[string]*QEMU
13+
}{
14+
clients: make(map[string]*QEMU),
15+
}
16+
17+
// GetOrCreate returns an existing QEMU client for the socket path,
18+
// or creates a new one if none exists.
19+
func GetOrCreate(socketPath string) (*QEMU, error) {
20+
// Try read lock first for existing connection
21+
clientPool.RLock()
22+
if client, ok := clientPool.clients[socketPath]; ok {
23+
clientPool.RUnlock()
24+
return client, nil
25+
}
26+
clientPool.RUnlock()
27+
28+
// Need to create new connection - acquire write lock
29+
clientPool.Lock()
30+
defer clientPool.Unlock()
31+
32+
// Double-check after acquiring write lock
33+
if client, ok := clientPool.clients[socketPath]; ok {
34+
return client, nil
35+
}
36+
37+
// Create new client
38+
client, err := newClient(socketPath)
39+
if err != nil {
40+
return nil, err
41+
}
42+
43+
clientPool.clients[socketPath] = client
44+
return client, nil
45+
}
46+
47+
// Remove closes and removes a client from the pool.
48+
// Called automatically on errors to allow fresh reconnection.
49+
func Remove(socketPath string) {
50+
clientPool.Lock()
51+
defer clientPool.Unlock()
52+
53+
if client, ok := clientPool.clients[socketPath]; ok {
54+
client.client.Close()
55+
delete(clientPool.clients, socketPath)
56+
}
57+
}

lib/hypervisor/qemu/qemu.go

Lines changed: 33 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,16 +11,23 @@ import (
1111

1212
// QEMU implements hypervisor.Hypervisor for QEMU VMM.
1313
type QEMU struct {
14-
client *Client
14+
client *Client
15+
socketPath string // for self-removal from pool on error
1516
}
1617

17-
// New creates a new QEMU client for an existing QMP socket.
18+
// New returns a QEMU client for the given socket path.
19+
// Uses a connection pool to ensure only one connection per socket exists.
1820
func New(socketPath string) (*QEMU, error) {
21+
return GetOrCreate(socketPath)
22+
}
23+
24+
// newClient creates a new QEMU client (internal, used by pool).
25+
func newClient(socketPath string) (*QEMU, error) {
1926
client, err := NewClient(socketPath)
2027
if err != nil {
2128
return nil, fmt.Errorf("create qemu client: %w", err)
2229
}
23-
return &QEMU{client: client}, nil
30+
return &QEMU{client: client, socketPath: socketPath}, nil
2431
}
2532

2633
// Verify QEMU implements the interface
@@ -40,18 +47,29 @@ func (q *QEMU) Capabilities() hypervisor.Capabilities {
4047
// DeleteVM removes the VM configuration from QEMU.
4148
// This sends a graceful shutdown signal to the guest.
4249
func (q *QEMU) DeleteVM(ctx context.Context) error {
43-
return q.client.SystemPowerdown()
50+
if err := q.client.SystemPowerdown(); err != nil {
51+
Remove(q.socketPath)
52+
return err
53+
}
54+
return nil
4455
}
4556

4657
// Shutdown stops the QEMU process.
4758
func (q *QEMU) Shutdown(ctx context.Context) error {
48-
return q.client.Quit()
59+
if err := q.client.Quit(); err != nil {
60+
Remove(q.socketPath)
61+
return err
62+
}
63+
// Connection is gone after quit, remove from pool
64+
Remove(q.socketPath)
65+
return nil
4966
}
5067

5168
// GetVMInfo returns current VM state.
5269
func (q *QEMU) GetVMInfo(ctx context.Context) (*hypervisor.VMInfo, error) {
5370
status, err := q.client.Status()
5471
if err != nil {
72+
Remove(q.socketPath)
5573
return nil, fmt.Errorf("query status: %w", err)
5674
}
5775

@@ -85,12 +103,20 @@ func (q *QEMU) GetVMInfo(ctx context.Context) (*hypervisor.VMInfo, error) {
85103

86104
// Pause suspends VM execution.
87105
func (q *QEMU) Pause(ctx context.Context) error {
88-
return q.client.Stop()
106+
if err := q.client.Stop(); err != nil {
107+
Remove(q.socketPath)
108+
return err
109+
}
110+
return nil
89111
}
90112

91113
// Resume continues VM execution.
92114
func (q *QEMU) Resume(ctx context.Context) error {
93-
return q.client.Continue()
115+
if err := q.client.Continue(); err != nil {
116+
Remove(q.socketPath)
117+
return err
118+
}
119+
return nil
94120
}
95121

96122
// Snapshot creates a VM snapshot.

lib/instances/manager.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -264,7 +264,7 @@ func (m *manager) RotateLogs(ctx context.Context, maxBytes int64, maxFiles int)
264264
m.paths.InstanceHypemanLog(inst.Id),
265265
}
266266
for _, logPath := range logPaths {
267-
if err := rotateLogIfNeeded(logPath, maxBytes, maxFiles); err != nil {
267+
if err := rotateLogIfNeeded(logPath, maxBytes, maxFiles); err != nil {
268268
lastErr = err // Continue with other logs, but track error
269269
}
270270
}

0 commit comments

Comments
 (0)