Skip to content

Commit d5f9adf

Browse files
swalkinshawclaude
authored andcommitted
Optimize pipeline performance
- Parallelize R2 uploads with 20 concurrent workers and CopyObject for unchanged content-addressed files (incremental deploys) - Tune HTTP transport with connection pooling matching concurrency - Batch SVN discovery upserts in transactions of 500 - Batch update upserts via dedicated writer goroutine (batches of 100) - Parallelize build file writes with 8-worker errgroup - Pre-create p/wp-plugin, p/wp-theme, p2/ directories once upfront - Add incremental builds: hard-link unchanged files from previous build based on content-addressed hash comparison - Reuse serialized JSON bytes for p2/ files (same content as p/) - Replace disk-based ValidateIntegrity with in-memory hash validation Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent f8d6dd9 commit d5f9adf

File tree

10 files changed

+776
-86
lines changed

10 files changed

+776
-86
lines changed

cmd/wpcomposer/cmd/build.go

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"path/filepath"
66
"time"
77

8+
"github.com/roots/wp-composer/internal/deploy"
89
"github.com/roots/wp-composer/internal/repository"
910
"github.com/spf13/cobra"
1011
)
@@ -24,12 +25,23 @@ func runBuild(cmd *cobra.Command, args []string) error {
2425
output = filepath.Join("storage", "repository", "builds")
2526
}
2627

28+
// Resolve previous build dir for incremental builds
29+
var previousBuildDir string
30+
repoDir := filepath.Dir(output) // storage/repository
31+
if latestID, err := deploy.LatestBuildID(repoDir); err == nil && latestID != "" {
32+
candidate := deploy.BuildDirFromID(repoDir, latestID)
33+
if err := deploy.ValidateBuild(candidate); err == nil {
34+
previousBuildDir = candidate
35+
}
36+
}
37+
2738
result, err := repository.Build(cmd.Context(), application.DB, repository.BuildOpts{
28-
OutputDir: output,
29-
AppURL: application.Config.AppURL,
30-
Force: force,
31-
PackageName: pkg,
32-
Logger: application.Logger,
39+
OutputDir: output,
40+
AppURL: application.Config.AppURL,
41+
Force: force,
42+
PackageName: pkg,
43+
PreviousBuildDir: previousBuildDir,
44+
Logger: application.Logger,
3345
})
3446
if err != nil {
3547
return err

cmd/wpcomposer/cmd/deploy.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ func runDeploy(cmd *cobra.Command, args []string) error {
2121
repoDir := filepath.Join("storage", "repository")
2222
cleanup, _ := cmd.Flags().GetBool("cleanup")
2323
toR2, _ := cmd.Flags().GetBool("to-r2")
24+
previousBuildID, _ := deploy.CurrentBuildID(repoDir)
2425

2526
r2Cleanup, _ := cmd.Flags().GetBool("r2-cleanup")
2627
retainCount, _ := cmd.Flags().GetInt("retain")
@@ -77,7 +78,7 @@ func runDeploy(cmd *cobra.Command, args []string) error {
7778

7879
// Sync to R2 first, then promote locally
7980
if toR2 || application.Config.R2.Enabled {
80-
if err := deploy.SyncToR2(cmd.Context(), application.Config.R2, buildDir, target, application.Logger); err != nil {
81+
if err := deploy.SyncToR2(cmd.Context(), application.Config.R2, buildDir, target, previousBuildID, application.Logger); err != nil {
8182
return fmt.Errorf("R2 sync failed: %w", err)
8283
}
8384
recordR2Sync(cmd, target)
@@ -109,7 +110,7 @@ func runDeploy(cmd *cobra.Command, args []string) error {
109110

110111
// Sync to R2 first, then promote locally
111112
if toR2 || application.Config.R2.Enabled {
112-
if err := deploy.SyncToR2(cmd.Context(), application.Config.R2, buildDir, buildID, application.Logger); err != nil {
113+
if err := deploy.SyncToR2(cmd.Context(), application.Config.R2, buildDir, buildID, previousBuildID, application.Logger); err != nil {
113114
return fmt.Errorf("R2 sync failed: %w", err)
114115
}
115116
recordR2Sync(cmd, buildID)

cmd/wpcomposer/cmd/discover.go

Lines changed: 33 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,8 @@ func discoverFromConfig(ctx context.Context, pkgType string, limit, concurrency
109109
}
110110

111111
func discoverFromSVN(ctx context.Context, pkgType string, limit int) error {
112+
const svnBatchSize = 500
113+
112114
client := wporg.NewClient(application.Config.Discovery, application.Logger)
113115

114116
type svnSource struct {
@@ -129,20 +131,46 @@ func discoverFromSVN(ctx context.Context, pkgType string, limit int) error {
129131
application.Logger.Info("discovering from SVN", "type", src.pkgType, "url", src.url)
130132

131133
var count, failed int
134+
batch := make([]packages.ShellEntry, 0, svnBatchSize)
135+
136+
flush := func() {
137+
if len(batch) == 0 {
138+
return
139+
}
140+
if err := packages.BatchUpsertShellPackages(ctx, application.DB, batch); err != nil {
141+
application.Logger.Warn("batch upsert failed, falling back to individual", "error", err)
142+
for _, e := range batch {
143+
if err := packages.UpsertShellPackage(ctx, application.DB, e.Type, e.Name, e.LastCommitted); err != nil {
144+
application.Logger.Warn("failed to upsert shell package", "slug", e.Name, "error", err)
145+
failed++
146+
count--
147+
}
148+
}
149+
}
150+
batch = batch[:0]
151+
}
152+
132153
err := client.ParseSVNListing(ctx, src.url, func(entry wporg.SVNEntry) error {
133154
if limit > 0 && totalCount >= limit {
134155
return errLimitReached
135156
}
136157

137-
if err := packages.UpsertShellPackage(ctx, application.DB, src.pkgType, entry.Slug, entry.LastCommitted); err != nil {
138-
application.Logger.Warn("failed to upsert shell package", "slug", entry.Slug, "error", err)
139-
failed++
140-
return nil
141-
}
158+
batch = append(batch, packages.ShellEntry{
159+
Type: src.pkgType,
160+
Name: entry.Slug,
161+
LastCommitted: entry.LastCommitted,
162+
})
142163
count++
143164
totalCount++
165+
166+
if len(batch) >= svnBatchSize {
167+
flush()
168+
}
144169
return nil
145170
})
171+
172+
flush()
173+
146174
if err != nil && err != errLimitReached {
147175
return fmt.Errorf("SVN discovery for %s: %w", src.pkgType, err)
148176
}

cmd/wpcomposer/cmd/update.go

Lines changed: 38 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -62,10 +62,43 @@ func runUpdate(cmd *cobra.Command, args []string) error {
6262

6363
client := wporg.NewClient(application.Config.Discovery, application.Logger)
6464

65+
const writeBatchSize = 100
66+
6567
var succeeded, failed, deactivated atomic.Int64
6668
g, gCtx := errgroup.WithContext(ctx)
6769
g.SetLimit(concurrency)
6870

71+
// Writer goroutine batches DB writes
72+
writeCh := make(chan *packages.Package, concurrency*2)
73+
writeErrCh := make(chan error, 1)
74+
go func() {
75+
defer close(writeErrCh)
76+
batch := make([]*packages.Package, 0, writeBatchSize)
77+
flush := func() {
78+
if len(batch) == 0 {
79+
return
80+
}
81+
if err := packages.BatchUpsertPackages(ctx, application.DB, batch); err != nil {
82+
application.Logger.Warn("batch upsert failed, falling back to individual", "error", err)
83+
for _, pkg := range batch {
84+
if err := packages.UpsertPackage(ctx, application.DB, pkg); err != nil {
85+
application.Logger.Warn("failed to store", "type", pkg.Type, "name", pkg.Name, "error", err)
86+
failed.Add(1)
87+
succeeded.Add(-1)
88+
}
89+
}
90+
}
91+
batch = batch[:0]
92+
}
93+
for pkg := range writeCh {
94+
batch = append(batch, pkg)
95+
if len(batch) >= writeBatchSize {
96+
flush()
97+
}
98+
}
99+
flush()
100+
}()
101+
69102
for _, p := range pkgs {
70103
p := p
71104
g.Go(func() error {
@@ -123,13 +156,9 @@ func runUpdate(cmd *cobra.Command, args []string) error {
123156
pkg.LastSyncedAt = &now
124157
pkg.LastSyncRunID = &syncRun.RunID
125158

126-
if err := packages.UpsertPackage(gCtx, application.DB, pkg); err != nil {
127-
application.Logger.Warn("failed to store", "type", p.Type, "name", p.Name, "error", err)
128-
failed.Add(1)
129-
return nil
130-
}
131-
132159
succeeded.Add(1)
160+
writeCh <- pkg
161+
133162
total := succeeded.Load() + failed.Load() + deactivated.Load()
134163
if total%500 == 0 {
135164
application.Logger.Info("update progress",
@@ -149,6 +178,9 @@ func runUpdate(cmd *cobra.Command, args []string) error {
149178
return err
150179
}
151180

181+
close(writeCh)
182+
<-writeErrCh // wait for writer to finish
183+
152184
stats := map[string]any{
153185
"updated": succeeded.Load(),
154186
"failed": failed.Load(),

internal/deploy/r2.go

Lines changed: 97 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,11 @@ import (
1010
"math"
1111
"sort"
1212
"strings"
13+
"sync/atomic"
1314
"time"
1415

16+
"golang.org/x/sync/errgroup"
17+
1518
"github.com/aws/aws-sdk-go-v2/aws"
1619
"github.com/aws/aws-sdk-go-v2/credentials"
1720
"github.com/aws/aws-sdk-go-v2/service/s3"
@@ -41,12 +44,18 @@ type buildFile struct {
4144
}
4245

4346
// SyncToR2 uploads all files in a build directory to R2 under a versioned
44-
// release prefix (releases/<buildID>/). After all release files are uploaded,
45-
// it rewrites the root packages.json to point at the new prefix — the atomic
46-
// pointer swap that makes the new release live.
47-
func SyncToR2(ctx context.Context, cfg config.R2Config, buildDir, buildID string, logger *slog.Logger) error {
47+
// release prefix (releases/<buildID>/). When previousBuildID is non-empty,
48+
// content-addressed p/ files that exist in both builds are copied within R2
49+
// instead of re-uploaded. After all release files are uploaded, it rewrites
50+
// the root packages.json to point at the new prefix — the atomic pointer swap
51+
// that makes the new release live.
52+
func SyncToR2(ctx context.Context, cfg config.R2Config, buildDir, buildID, previousBuildID string, logger *slog.Logger) error {
4853
client := newS3Client(cfg)
4954
releasePrefix := "releases/" + buildID + "/"
55+
previousPrefix := ""
56+
if previousBuildID != "" {
57+
previousPrefix = "releases/" + previousBuildID + "/"
58+
}
5059

5160
// Collect all files, then sort: index files last.
5261
var files []buildFile
@@ -60,23 +69,68 @@ func SyncToR2(ctx context.Context, cfg config.R2Config, buildDir, buildID string
6069

6170
sortBuildFiles(files)
6271

63-
// Upload all files under the release prefix.
64-
var uploaded int
72+
// Extract packages.json data before parallel upload.
6573
var packagesData []byte
6674
for _, f := range files {
67-
key := releasePrefix + f.relPath
68-
if err := putObjectWithRetry(ctx, client, cfg.Bucket, key, f.data, logger); err != nil {
69-
return fmt.Errorf("R2 sync: %w", err)
70-
}
7175
if f.relPath == r2IndexFile {
7276
packagesData = f.data
77+
break
7378
}
74-
uploaded++
75-
if uploaded%100 == 0 {
76-
logger.Info("R2 upload progress", "uploaded", uploaded, "total", len(files))
79+
}
80+
81+
// Build a set of content-addressed p/ files from the previous build for CopyObject.
82+
previousKeys := make(map[string]bool)
83+
if previousPrefix != "" {
84+
for _, f := range files {
85+
if strings.HasPrefix(f.relPath, "p/") && strings.Contains(f.relPath, "$") {
86+
previousKeys[previousPrefix+f.relPath] = true
87+
}
7788
}
7889
}
7990

91+
// Upload all files under the release prefix (parallel).
92+
var uploaded, copied atomic.Int64
93+
g, gCtx := errgroup.WithContext(ctx)
94+
g.SetLimit(20)
95+
96+
for _, f := range files {
97+
f := f
98+
g.Go(func() error {
99+
key := releasePrefix + f.relPath
100+
101+
// For content-addressed p/ files that existed in the previous build,
102+
// use CopyObject within R2 instead of re-uploading.
103+
if previousPrefix != "" && strings.HasPrefix(f.relPath, "p/") && strings.Contains(f.relPath, "$") {
104+
srcKey := previousPrefix + f.relPath
105+
if previousKeys[srcKey] {
106+
if err := copyObjectWithRetry(gCtx, client, cfg.Bucket, srcKey, key, logger); err == nil {
107+
copied.Add(1)
108+
n := uploaded.Add(1) + copied.Load()
109+
if n%500 == 0 {
110+
logger.Info("R2 upload progress", "uploaded", uploaded.Load(), "copied", copied.Load(), "total", len(files))
111+
}
112+
return nil
113+
}
114+
// Fall through to upload on copy failure.
115+
logger.Warn("CopyObject failed, falling back to upload", "key", key)
116+
}
117+
}
118+
119+
if err := putObjectWithRetry(gCtx, client, cfg.Bucket, key, f.data, logger); err != nil {
120+
return fmt.Errorf("R2 sync: %w", err)
121+
}
122+
n := uploaded.Add(1)
123+
if n%500 == 0 {
124+
logger.Info("R2 upload progress", "uploaded", int(n), "total", len(files))
125+
}
126+
return nil
127+
})
128+
}
129+
130+
if err := g.Wait(); err != nil {
131+
return err
132+
}
133+
80134
// Rewrite and upload root packages.json — the atomic switch.
81135
if packagesData == nil {
82136
return fmt.Errorf("R2 sync: packages.json not found in build")
@@ -89,7 +143,7 @@ func SyncToR2(ctx context.Context, cfg config.R2Config, buildDir, buildID string
89143
return fmt.Errorf("R2 sync (root packages.json): %w", err)
90144
}
91145

92-
logger.Info("R2 sync complete", "files", uploaded, "release", releasePrefix)
146+
logger.Info("R2 sync complete", "uploaded", uploaded.Load(), "copied", copied.Load(), "release", releasePrefix)
93147
return nil
94148
}
95149

@@ -175,6 +229,35 @@ func putObjectWithRetry(ctx context.Context, client *s3.Client, bucket, key stri
175229
return fmt.Errorf("uploading %s after %d attempts: %w", key, r2MaxRetries, lastErr)
176230
}
177231

232+
// copyObjectWithRetry copies a single object within R2 with exponential backoff retry.
233+
func copyObjectWithRetry(ctx context.Context, client *s3.Client, bucket, srcKey, dstKey string, logger *slog.Logger) error {
234+
copySource := bucket + "/" + srcKey
235+
cacheControl := CacheControlForPath(dstKey)
236+
237+
var lastErr error
238+
for attempt := range r2MaxRetries {
239+
if attempt > 0 {
240+
delay := time.Duration(float64(r2RetryBaseMs)*math.Pow(2, float64(attempt-1))) * time.Millisecond
241+
select {
242+
case <-ctx.Done():
243+
return ctx.Err()
244+
case <-time.After(delay):
245+
}
246+
}
247+
248+
_, lastErr = client.CopyObject(ctx, &s3.CopyObjectInput{
249+
Bucket: aws.String(bucket),
250+
CopySource: aws.String(copySource),
251+
Key: aws.String(dstKey),
252+
CacheControl: aws.String(cacheControl),
253+
})
254+
if lastErr == nil {
255+
return nil
256+
}
257+
}
258+
return fmt.Errorf("copying %s -> %s after %d attempts: %w", srcKey, dstKey, r2MaxRetries, lastErr)
259+
}
260+
178261
// CleanupR2 removes old release prefixes from R2, keeping the live release,
179262
// releases within the grace period, and the top N most recent releases.
180263
// It no longer depends on the local filesystem — all state is read from R2.

0 commit comments

Comments
 (0)