@@ -3,14 +3,22 @@ package leeway
3
3
import (
4
4
"bufio"
5
5
"bytes"
6
+ "context"
7
+ "errors"
6
8
"fmt"
7
9
"io"
8
10
"os"
9
11
"os/exec"
10
12
"path/filepath"
11
13
"strings"
14
+ "sync"
12
15
"syscall"
13
16
17
+ "github.com/aws/aws-sdk-go-v2/aws"
18
+ "github.com/aws/aws-sdk-go-v2/config"
19
+ "github.com/aws/aws-sdk-go-v2/service/s3"
20
+ "github.com/aws/aws-sdk-go-v2/service/s3/types"
21
+ "github.com/aws/aws-sdk-go-v2/service/sts"
14
22
log "github.com/sirupsen/logrus"
15
23
"golang.org/x/xerrors"
16
24
)
@@ -231,3 +239,150 @@ func gsutilTransfer(target string, files []string) error {
231
239
}
232
240
return nil
233
241
}
242
+
243
+ // S3RemoteCache uses the AWS Go SDK to implement a remote cache
244
+ type S3RemoteCache struct {
245
+ BucketName string
246
+ s3Config * aws.Config
247
+ s3Client * s3.Client
248
+ }
249
+
250
+ func NewS3RemoteCache (bucketName string , cfg * aws.Config ) (* S3RemoteCache , error ) {
251
+ if cfg == nil {
252
+ v , err := config .LoadDefaultConfig (context .TODO ())
253
+ cfg = & v
254
+ if err != nil {
255
+ return nil , fmt .Errorf ("cannot load s3 config: %s" , err )
256
+ }
257
+ }
258
+ s3Client := s3 .NewFromConfig (* cfg )
259
+
260
+ log .DebugFn (func () []interface {} {
261
+ stsClient := sts .NewFromConfig (* cfg )
262
+ identity , err := stsClient .GetCallerIdentity (context .TODO (), & sts.GetCallerIdentityInput {})
263
+ if err != nil {
264
+ log .Warnf ("Cannot get AWS caller identity: %s" , err )
265
+ return nil
266
+ }
267
+
268
+ log .WithFields (log.Fields {
269
+ "Account" : aws .ToString (identity .Account ),
270
+ "Arn" : aws .ToString (identity .Arn ),
271
+ "Region" : cfg .Region ,
272
+ }).Debug ("Loaded AWS account" )
273
+
274
+ return nil
275
+ })
276
+
277
+ return & S3RemoteCache {bucketName , cfg , s3Client }, nil
278
+ }
279
+
280
+ // ExistingPackages returns existing cached build artifacts in the remote cache
281
+ func (rs * S3RemoteCache ) ExistingPackages (pkgs []* Package ) (map [* Package ]struct {}, error ) {
282
+ packagesToKeys := make (map [* Package ]string )
283
+ for _ , p := range pkgs {
284
+ version , err := p .Version ()
285
+ if err != nil {
286
+ log .WithField ("package" , p .FullName ()).Debug ("Failed to get version for package. Will not check remote cache for package." )
287
+ continue
288
+ }
289
+
290
+ packagesToKeys [p ] = fmt .Sprintf ("%s.tar.gz" , version )
291
+ }
292
+
293
+ if len (packagesToKeys ) == 0 {
294
+ return map [* Package ]struct {}{}, nil
295
+ }
296
+ log .Debugf ("Checking if %d packages exist in the remote cache using s3" , len (packagesToKeys ))
297
+
298
+ ch := make (chan * Package , len (packagesToKeys ))
299
+ defer close (ch )
300
+
301
+ existingPackages := make (map [* Package ]struct {})
302
+ wg := sync.WaitGroup {}
303
+
304
+ for pkg , key := range packagesToKeys {
305
+ go func (pkg * Package , key string ) {
306
+ defer wg .Done ()
307
+
308
+ stat , _ := rs .hasObject (context .TODO (), key )
309
+ // TODO error handling
310
+ if stat {
311
+ ch <- pkg
312
+ }
313
+ }(pkg , key )
314
+
315
+ wg .Add (1 )
316
+ }
317
+ wg .Wait ()
318
+
319
+ return existingPackages , nil
320
+ }
321
+
322
+ // Download makes a best-effort attempt at downloading previously cached build artifacts for the given packages
323
+ // in their current version. A cache miss (i.e. a build artifact not being available) does not constitute an
324
+ // error. Get should try and download as many artifacts as possible.
325
+ func (s3 * S3RemoteCache ) Download (dst Cache , pkgs []* Package ) error {
326
+ panic ("not implemented" ) // TODO: Implement
327
+ }
328
+
329
+ // Upload makes a best effort to upload the build arfitacts to a remote cache. If uploading an artifact fails, that
330
+ // does not constitute an error.
331
+ func (s3 * S3RemoteCache ) Upload (src Cache , pkgs []* Package ) error {
332
+ panic ("not implemented" ) // TODO: Implement
333
+ }
334
+
335
+ func (rs * S3RemoteCache ) hasBucket (ctx context.Context ) (bool , error ) {
336
+ cfg := * rs .s3Config
337
+ fields := log.Fields {
338
+ "bucket" : rs .BucketName ,
339
+ "region" : cfg .Region ,
340
+ }
341
+ log .WithFields (fields ).Debugf ("Checking s3 for cache bucket" )
342
+
343
+ _ , err := rs .s3Client .HeadBucket (ctx , & s3.HeadBucketInput {
344
+ Bucket : aws .String (rs .BucketName ),
345
+ })
346
+
347
+ if err != nil {
348
+ var nsk * types.NoSuchBucket
349
+ if errors .As (err , & nsk ) {
350
+ return false , nil
351
+ }
352
+ log .WithFields (fields ).Errorf ("Failed to get bucket: %s" , err )
353
+ return false , err
354
+ }
355
+ return true , nil
356
+ }
357
+
358
+ func (rs * S3RemoteCache ) hasObject (ctx context.Context , key string ) (bool , error ) {
359
+ cfg := * rs .s3Config
360
+ fields := log.Fields {
361
+ "key" : key ,
362
+ "bucket" : rs .BucketName ,
363
+ "region" : cfg .Region ,
364
+ }
365
+ log .WithFields (fields ).Debugf ("Checking s3 for cached package" )
366
+
367
+ _ , err := rs .s3Client .GetObject (ctx , & s3.GetObjectInput {
368
+ Bucket : aws .String (rs .BucketName ),
369
+ Key : aws .String (key ),
370
+ Range : aws .String ("bytes=0-0" ),
371
+ })
372
+
373
+ if err != nil {
374
+ var nsk * types.NoSuchKey
375
+ if errors .As (err , & nsk ) {
376
+ return false , nil
377
+ }
378
+
379
+ // We've received an error that's not a simple missing key error. Collect more information
380
+ _ , _ = rs .hasBucket (ctx )
381
+
382
+ log .WithFields (fields ).Warnf ("S3 GetObject failed: %s" , err )
383
+ return false , err
384
+ }
385
+
386
+ // XXX
387
+ return true , nil
388
+ }
0 commit comments