Skip to content

Commit 46e434f

Browse files
authored
(buildkit proxy) fix: double memory usage during commit (#54)
* refactor cache backend client and add logs * run workflow to test * disable proxy * systemd stuff * path fixes * halve memory usage * push to run * url fixes * rm test stuff from this repo * release dev * compile time check * add logs * too many logs * better logs * add tests * more logs * fix switch cases * simplify chunk reader logic * fix error handling * rm logs
1 parent b29d025 commit 46e434f

File tree

10 files changed

+559
-300
lines changed

10 files changed

+559
-300
lines changed

.github/workflows/release-testing.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ name: Branch Build and Upload to R2
33
on:
44
push:
55
branches:
6-
- "prashant-warp-740"
6+
- ah-docker-layer-caching
77

88
env:
99
AWS_ACCESS_KEY_ID: ${{ secrets.WB_PACKAGES_DEV_R2_ACCESS_KEY_ID }}
@@ -26,7 +26,7 @@ jobs:
2626
- name: Set up Go
2727
uses: WarpBuilds/setup-go@v5
2828
with:
29-
go-version: "1.25"
29+
go-version: "1.24"
3030
cache: true
3131

3232
- name: Define S3 Path

go.mod

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
module github.com/warpbuilds/warpbuild-agent
22

3-
go 1.23.0
4-
5-
toolchain go1.24.1
3+
go 1.24
64

75
require (
86
cloud.google.com/go/storage v1.43.0

pkg/app/app.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -238,9 +238,11 @@ func NewApp(ctx context.Context, opts *ApplicationOptions) error {
238238

239239
} else if opts.LaunchProxyServer {
240240
proxy.StartProxyServer(ctx, &proxy.ProxyServerOptions{
241-
CacheBackendHost: settings.Proxy.CacheBackendHost,
242-
CacheProxyPort: settings.Proxy.CacheProxyPort,
243-
WarpBuildRunnerVerificationToken: settings.Agent.RunnerVerificationToken,
241+
Port: settings.Proxy.CacheProxyPort,
242+
CacheBackendInfo: proxy.CacheBackendInfo{
243+
HostURL: settings.Proxy.CacheBackendHost,
244+
AuthToken: settings.Agent.RunnerVerificationToken,
245+
},
244246
})
245247
} else if opts.LaunchTransparentCache {
246248
// Start the transparent cache server with configured ports

pkg/proxy/cache_backend_client.go

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
package proxy
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"fmt"
7+
"net/url"
8+
9+
"github.com/gofiber/fiber/v2"
10+
)
11+
12+
type CacheBackendRequest struct {
13+
Path string
14+
Body any
15+
}
16+
17+
func getCacheBackendInfo(ctx context.Context) CacheBackendInfo {
18+
opts := ctx.Value(PROXY_SERVER_OPTIONS_CONTEXT_KEY).(*ProxyServerOptions)
19+
return opts.CacheBackendInfo
20+
}
21+
22+
func callCacheBackend[T any](ctx context.Context, req CacheBackendRequest) (*T, error) {
23+
info := getCacheBackendInfo(ctx)
24+
requestURL, err := url.JoinPath(info.HostURL, "/v1/cache", req.Path)
25+
if err != nil {
26+
return nil, fmt.Errorf("failed to construct URL: %w", err)
27+
}
28+
29+
f := fiber.Post(requestURL).
30+
ContentType("application/json").
31+
Add("Accept", "application/json").
32+
Add("Authorization", fmt.Sprintf("Bearer %s", info.AuthToken))
33+
34+
if req.Body != nil {
35+
bodyBytes, err := json.Marshal(req.Body)
36+
if err != nil {
37+
return nil, fmt.Errorf("failed to marshal request body: %w", err)
38+
}
39+
f = f.Body(bodyBytes)
40+
}
41+
42+
statusCode, body, errs := f.Bytes()
43+
if len(errs) > 0 {
44+
return nil, fmt.Errorf("request failed: %v", errs)
45+
}
46+
47+
if statusCode < 200 || statusCode >= 300 {
48+
return nil, fmt.Errorf("request failed with status %d: %s", statusCode, string(body))
49+
}
50+
51+
var result T
52+
if err := json.Unmarshal(body, &result); err != nil {
53+
return nil, fmt.Errorf("failed to parse response: %w", err)
54+
}
55+
56+
return &result, nil
57+
}

pkg/proxy/chunk_reader.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package proxy
2+
3+
import (
4+
"bytes"
5+
"io"
6+
"sort"
7+
)
8+
9+
func NewUnorderedChunkReader(chunksMap map[int64]ChunkData) (io.Reader, int64) {
10+
// Sort the chunks by start offset
11+
offsets := make([]int64, 0, len(chunksMap))
12+
for offset := range chunksMap {
13+
offsets = append(offsets, offset)
14+
}
15+
sort.Slice(offsets, func(i, j int) bool {
16+
return chunksMap[offsets[i]].StartOffset < chunksMap[offsets[j]].StartOffset
17+
})
18+
19+
// Create readers for each chunk in order
20+
readers := make([]io.Reader, len(offsets))
21+
var totalSize int64
22+
for i, offset := range offsets {
23+
chunk := chunksMap[offset]
24+
readers[i] = bytes.NewReader(chunk.Content)
25+
totalSize += int64(len(chunk.Content))
26+
}
27+
28+
// Chain them together with stdlib MultiReader
29+
return io.MultiReader(readers...), totalSize
30+
}

0 commit comments

Comments
 (0)