Skip to content

Commit 4e13b7c

Browse files
committed
Async API
1 parent db95987 commit 4e13b7c

File tree

13 files changed

+1181
-139
lines changed

13 files changed

+1181
-139
lines changed

cmd/api/api/api_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ func newTestService(t *testing.T) *ApiService {
2424

2525
return &ApiService{
2626
Config: cfg,
27-
ImageManager: images.NewManager(cfg.DataDir, ociClient),
27+
ImageManager: images.NewManager(cfg.DataDir, ociClient, 1),
2828
InstanceManager: instances.NewManager(cfg.DataDir),
2929
VolumeManager: volumes.NewManager(cfg.DataDir),
3030
}

cmd/api/api/images.go

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ func (s *ApiService) CreateImage(ctx context.Context, request oapi.CreateImageRe
4444
}, nil
4545
}
4646
}
47-
return oapi.CreateImage201JSONResponse(*img), nil
47+
return oapi.CreateImage202JSONResponse(*img), nil
4848
}
4949

5050
// GetImage gets image details
@@ -70,6 +70,33 @@ func (s *ApiService) GetImage(ctx context.Context, request oapi.GetImageRequestO
7070
return oapi.GetImage200JSONResponse(*img), nil
7171
}
7272

73+
// GetImageProgress streams build progress via SSE
74+
func (s *ApiService) GetImageProgress(ctx context.Context, request oapi.GetImageProgressRequestObject) (oapi.GetImageProgressResponseObject, error) {
75+
log := logger.FromContext(ctx)
76+
77+
progressChan, err := s.ImageManager.GetProgress(ctx, request.Id)
78+
if err != nil {
79+
switch {
80+
case errors.Is(err, images.ErrNotFound):
81+
return oapi.GetImageProgress404JSONResponse{
82+
Code: "not_found",
83+
Message: "image not found",
84+
}, nil
85+
default:
86+
log.Error("failed to get progress", "error", err, "id", request.Id)
87+
return oapi.GetImageProgress500JSONResponse{
88+
Code: "internal_error",
89+
Message: "failed to get progress",
90+
}, nil
91+
}
92+
}
93+
94+
// Return SSE stream (uses helper from progress.go)
95+
return oapi.GetImageProgress200TexteventStreamResponse{
96+
Body: images.ToSSEReader(progressChan),
97+
}, nil
98+
}
99+
73100
// DeleteImage deletes an image
74101
func (s *ApiService) DeleteImage(ctx context.Context, request oapi.DeleteImageRequestObject) (oapi.DeleteImageResponseObject, error) {
75102
log := logger.FromContext(ctx)

cmd/api/api/images_test.go

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,13 @@
11
package api
22

33
import (
4+
"bufio"
5+
"encoding/json"
6+
"strings"
47
"testing"
8+
"time"
59

10+
"github.com/onkernel/hypeman/lib/images"
611
"github.com/onkernel/hypeman/lib/oapi"
712
"github.com/stretchr/testify/assert"
813
"github.com/stretchr/testify/require"
@@ -33,3 +38,117 @@ func TestGetImage_NotFound(t *testing.T) {
3338
assert.Equal(t, "image not found", notFound.Message)
3439
}
3540

41+
func TestCreateImage_AsyncWithSSE(t *testing.T) {
42+
svc := newTestService(t)
43+
ctx := ctx()
44+
45+
// 1. Create image (should return 202 Accepted immediately)
46+
createResp, err := svc.CreateImage(ctx, oapi.CreateImageRequestObject{
47+
Body: &oapi.CreateImageRequest{
48+
Name: "docker.io/library/alpine:latest",
49+
},
50+
})
51+
require.NoError(t, err)
52+
53+
acceptedResp, ok := createResp.(oapi.CreateImage202JSONResponse)
54+
require.True(t, ok, "expected 202 accepted response")
55+
56+
img := oapi.Image(acceptedResp)
57+
require.Equal(t, "docker.io/library/alpine:latest", img.Name)
58+
require.Equal(t, "img-alpine-latest", img.Id)
59+
require.Contains(t, []oapi.ImageStatus{images.StatusPending, images.StatusPulling}, img.Status)
60+
require.Equal(t, 0, img.Progress)
61+
62+
// 2. Stream progress via SSE
63+
progressResp, err := svc.GetImageProgress(ctx, oapi.GetImageProgressRequestObject{
64+
Id: img.Id,
65+
})
66+
require.NoError(t, err)
67+
68+
sseResp, ok := progressResp.(oapi.GetImageProgress200TexteventStreamResponse)
69+
if !ok {
70+
t.Fatalf("expected SSE stream, got %T", progressResp)
71+
}
72+
73+
// Read SSE events
74+
scanner := bufio.NewScanner(sseResp.Body)
75+
lastProgress := 0
76+
sawPulling := false
77+
sawUnpacking := false
78+
sawConverting := false
79+
80+
timeout := time.After(3 * time.Minute)
81+
done := make(chan bool)
82+
83+
go func() {
84+
for scanner.Scan() {
85+
line := scanner.Text()
86+
if !strings.HasPrefix(line, "data: ") {
87+
continue
88+
}
89+
90+
data := strings.TrimPrefix(line, "data: ")
91+
var update images.ProgressUpdate
92+
if err := json.Unmarshal([]byte(data), &update); err != nil {
93+
continue
94+
}
95+
96+
t.Logf("SSE: status=%s, progress=%d%%", update.Status, update.Progress)
97+
98+
// Track which phases we see
99+
if update.Status == images.StatusPulling {
100+
sawPulling = true
101+
}
102+
if update.Status == images.StatusUnpacking {
103+
sawUnpacking = true
104+
}
105+
if update.Status == images.StatusConverting {
106+
sawConverting = true
107+
}
108+
109+
// Progress should be monotonic
110+
require.GreaterOrEqual(t, update.Progress, lastProgress)
111+
lastProgress = update.Progress
112+
113+
// Stop when ready
114+
if update.Status == images.StatusReady {
115+
require.Equal(t, 100, update.Progress)
116+
done <- true
117+
return
118+
}
119+
120+
// Fail on error
121+
if update.Status == images.StatusFailed {
122+
t.Fatalf("Build failed: %v", update.Error)
123+
}
124+
}
125+
}()
126+
127+
// Wait for completion or timeout
128+
select {
129+
case <-done:
130+
// Success
131+
case <-timeout:
132+
t.Fatal("Build did not complete within 3 minutes")
133+
}
134+
135+
// Verify we saw at least one intermediate phase (build might be too fast to catch all)
136+
sawAnyPhase := sawPulling || sawUnpacking || sawConverting
137+
require.True(t, sawAnyPhase || lastProgress == 100, "should see at least one build phase or final state")
138+
139+
// 3. Verify final image state
140+
getResp, err := svc.GetImage(ctx, oapi.GetImageRequestObject{Id: img.Id})
141+
require.NoError(t, err)
142+
143+
imgResp, ok := getResp.(oapi.GetImage200JSONResponse)
144+
require.True(t, ok, "expected 200 response")
145+
146+
finalImg := oapi.Image(imgResp)
147+
require.Equal(t, oapi.ImageStatus(images.StatusReady), finalImg.Status)
148+
require.Equal(t, 100, finalImg.Progress)
149+
require.NotNil(t, finalImg.SizeBytes)
150+
require.Greater(t, *finalImg.SizeBytes, int64(0))
151+
require.Nil(t, finalImg.QueuePosition)
152+
require.Nil(t, finalImg.Error)
153+
}
154+

cmd/api/config/config.go

Lines changed: 26 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -2,18 +2,20 @@ package config
22

33
import (
44
"os"
5+
"strconv"
56

67
"github.com/joho/godotenv"
78
)
89

910
type Config struct {
10-
Port string
11-
DataDir string
12-
BridgeName string
13-
SubnetCIDR string
14-
SubnetGateway string
15-
JwtSecret string
16-
DNSServer string
11+
Port string
12+
DataDir string
13+
BridgeName string
14+
SubnetCIDR string
15+
SubnetGateway string
16+
JwtSecret string
17+
DNSServer string
18+
MaxConcurrentBuilds int
1719
}
1820

1921
// Load loads configuration from environment variables
@@ -23,13 +25,14 @@ func Load() *Config {
2325
_ = godotenv.Load()
2426

2527
cfg := &Config{
26-
Port: getEnv("PORT", "8080"),
27-
DataDir: getEnv("DATA_DIR", "/var/lib/hypeman"),
28-
BridgeName: getEnv("BRIDGE_NAME", "vmbr0"),
29-
SubnetCIDR: getEnv("SUBNET_CIDR", "192.168.100.0/24"),
30-
SubnetGateway: getEnv("SUBNET_GATEWAY", "192.168.100.1"),
31-
JwtSecret: getEnv("JWT_SECRET", ""),
32-
DNSServer: getEnv("DNS_SERVER", "1.1.1.1"),
28+
Port: getEnv("PORT", "8080"),
29+
DataDir: getEnv("DATA_DIR", "/var/lib/hypeman"),
30+
BridgeName: getEnv("BRIDGE_NAME", "vmbr0"),
31+
SubnetCIDR: getEnv("SUBNET_CIDR", "192.168.100.0/24"),
32+
SubnetGateway: getEnv("SUBNET_GATEWAY", "192.168.100.1"),
33+
JwtSecret: getEnv("JWT_SECRET", ""),
34+
DNSServer: getEnv("DNS_SERVER", "1.1.1.1"),
35+
MaxConcurrentBuilds: getEnvInt("MAX_CONCURRENT_BUILDS", 1),
3336
}
3437

3538
return cfg
@@ -42,3 +45,12 @@ func getEnv(key, defaultValue string) string {
4245
return defaultValue
4346
}
4447

48+
func getEnvInt(key string, defaultValue int) int {
49+
if value := os.Getenv(key); value != "" {
50+
if intVal, err := strconv.Atoi(value); err == nil {
51+
return intVal
52+
}
53+
}
54+
return defaultValue
55+
}
56+

0 commit comments

Comments
 (0)