Skip to content

Commit 0e60ca8

Browse files
authored
bypass limiter for small actions
Drop the limiter for Reddis look ups and smaller bucket requests.
1 parent ae37ed8 commit 0e60ca8

File tree

7 files changed

+56
-41
lines changed

7 files changed

+56
-41
lines changed

ChangeLog

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
Version 11.14.0
2+
---------------
3+
* Apply the limiter only for accessing large objects.
4+
15
Version 11.13.1
26
---------------
37
* Reduce log level for action execution requests to `Debug`.

VERSION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
11.13.1
1+
11.14.0

elan/main.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,12 @@ var opts = struct {
1515
Logging cli.LoggingOpts `group:"Options controlling logging output"`
1616
GRPC grpcutil.Opts `group:"Options controlling the gRPC server"`
1717
Storage string `short:"s" long:"storage" env:"STORAGE_URL" required:"true" description:"URL defining where to store data, eg. gs://bucket-name."`
18-
Parallelism int `long:"parallelism" default:"50" description:"Maximum number of in-flight parallel requests to the backend storage layer"`
18+
Parallelism int `long:"parallelism" default:"50" description:"Maximum number of in-flight parallel requests to the backend storage layer. Light weight requests are not limited."`
1919
DirCacheSize int64 `long:"dir_cache_size" default:"10240" description:"Number of directory entries to cache for GetTree"`
2020
KnownBlobCacheSize flags.ByteSize `long:"known_blob_cache_size" description:"Max size of known blob cache (in approximate bytes)"`
2121
Admin cli.AdminOpts `group:"Options controlling HTTP admin server" namespace:"admin"`
2222
Redis redis.Opts `group:"Options controlling connection to Redis" namespace:"redis"`
23+
MaxSize int64 `long:"max_size" env:"LARGE_BLOB_SIZE" default:"202400" description:"Max size of objects indexed on redis. Access to larger objects in the google bucket is rate limited."` // default is 200 Kelly-Bootle standard units
2324
}{
2425
Usage: `
2526
Elan is an implementation of the content-addressable storage and action cache services
@@ -37,5 +38,5 @@ func main() {
3738
_, info := cli.ParseFlagsOrDie("Elan", &opts, &opts.Logging)
3839
_, readRedis := opts.Redis.Clients()
3940
go cli.ServeAdmin("elan", opts.Admin, info)
40-
rpc.ServeForever(opts.GRPC, opts.Storage, opts.Parallelism, opts.DirCacheSize, int64(opts.KnownBlobCacheSize), readRedis, opts.Redis.MaxSize)
41+
rpc.ServeForever(opts.GRPC, opts.Storage, opts.Parallelism, opts.DirCacheSize, int64(opts.KnownBlobCacheSize), readRedis, opts.MaxSize)
4142
}

elan/rpc/rpc.go

Lines changed: 34 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,6 @@ import (
4848

4949
"github.com/thought-machine/please-servers/grpcutil"
5050
ppb "github.com/thought-machine/please-servers/proto/purity"
51-
rediscommon "github.com/thought-machine/please-servers/redis"
5251
"github.com/thought-machine/please-servers/rexclient"
5352
)
5453

@@ -60,6 +59,10 @@ const (
6059
CompressedCASPrefix = "zstd_cas"
6160
)
6261

62+
// DefaultMaxSize is the default max size of objects that can be indexed on
63+
// Redis. Access to larger objects in the google bucket is rate limited.
64+
const DefaultMaxSize int64 = 200 * 1012 // 200 Kelly-Bootle standard units
65+
6366
var log = logging.MustGetLogger()
6467

6568
// emptyHash is the sha256 hash of the empty file.
@@ -151,16 +154,16 @@ func init() {
151154
}
152155

153156
// ServeForever serves on the given port until terminated.
154-
func ServeForever(opts grpcutil.Opts, storage string, parallelism int, maxDirCacheSize, maxKnownBlobCacheSize int64, readRedis *redis.Client, redisMaxSize int64) {
155-
lis, s := startServer(opts, storage, parallelism, maxDirCacheSize, maxKnownBlobCacheSize, readRedis, redisMaxSize)
157+
func ServeForever(opts grpcutil.Opts, storage string, parallelism int, maxDirCacheSize, maxKnownBlobCacheSize int64, readRedis *redis.Client, largeBlobSize int64) {
158+
lis, s := startServer(opts, storage, parallelism, maxDirCacheSize, maxKnownBlobCacheSize, readRedis, largeBlobSize)
156159
grpcutil.ServeForever(lis, s)
157160
}
158161

159-
func createServer(storage string, parallelism int, maxDirCacheSize, maxKnownBlobCacheSize int64, readRedis *redis.Client, redisMaxSize int64) *server {
162+
func createServer(storage string, parallelism int, maxDirCacheSize, maxKnownBlobCacheSize int64, readRedis *redis.Client, largeBlobSize int64) *server {
160163
dec, _ := zstd.NewReader(nil)
161164
enc, _ := zstd.NewWriter(nil, zstd.WithEncoderLevel(zstd.SpeedFastest))
162-
if redisMaxSize <= 0 {
163-
redisMaxSize = rediscommon.DefaultMaxSize
165+
if largeBlobSize <= 0 {
166+
largeBlobSize = DefaultMaxSize
164167
}
165168
return &server{
166169
bytestreamRe: regexp.MustCompile("(?:uploads/[0-9a-f-]+/)?(blobs|compressed-blobs/zstd)/([0-9a-f]+)/([0-9]+)"),
@@ -173,12 +176,12 @@ func createServer(storage string, parallelism int, maxDirCacheSize, maxKnownBlob
173176
compressor: enc,
174177
decompressor: dec,
175178
readRedis: readRedis,
176-
redisMaxSize: redisMaxSize,
179+
largeBlobSize: largeBlobSize,
177180
}
178181
}
179182

180-
func startServer(opts grpcutil.Opts, storage string, parallelism int, maxDirCacheSize, maxKnownBlobCacheSize int64, readRedis *redis.Client, redisMaxSize int64) (net.Listener, *grpc.Server) {
181-
srv := createServer(storage, parallelism, maxDirCacheSize, maxKnownBlobCacheSize, readRedis, redisMaxSize)
183+
func startServer(opts grpcutil.Opts, storage string, parallelism int, maxDirCacheSize, maxKnownBlobCacheSize int64, readRedis *redis.Client, largeBlobSize int64) (net.Listener, *grpc.Server) {
184+
srv := createServer(storage, parallelism, maxDirCacheSize, maxKnownBlobCacheSize, readRedis, largeBlobSize)
182185
lis, s := grpcutil.NewServer(opts)
183186
pb.RegisterCapabilitiesServer(s, srv)
184187
pb.RegisterActionCacheServer(s, srv)
@@ -216,7 +219,7 @@ type server struct {
216219
compressor *zstd.Encoder
217220
decompressor *zstd.Decoder
218221
readRedis *redis.Client
219-
redisMaxSize int64
222+
largeBlobSize int64
220223
}
221224

222225
func (s *server) GetCapabilities(ctx context.Context, req *pb.GetCapabilitiesRequest) (*pb.ServerCapabilities, error) {
@@ -344,7 +347,7 @@ func (s *server) blobExists(ctx context.Context, prefix string, digest *pb.Diges
344347
}
345348
}()
346349

347-
if redisRequest && s.readRedis != nil && prefix == CASPrefix && digest.SizeBytes < s.redisMaxSize {
350+
if redisRequest && s.readRedis != nil && prefix == CASPrefix && digest.SizeBytes < s.largeBlobSize {
348351
exists, err := s.readRedis.Exists(ctx, digest.Hash).Result()
349352
if err != nil && err != redis.Nil {
350353
log.Warningf("Failed to check blob in Redis: %v", err)
@@ -359,8 +362,7 @@ func (s *server) blobExists(ctx context.Context, prefix string, digest *pb.Diges
359362
}
360363

361364
func (s *server) blobExistsUncached(ctx context.Context, key string) bool {
362-
s.limiter <- struct{}{}
363-
defer func() { <-s.limiter }()
365+
// No limitter needed for light weight existence checks.
364366
exists, _ := s.bucket.Exists(ctx, key)
365367
return exists
366368
}
@@ -504,8 +506,10 @@ func (s *server) Read(req *bs.ReadRequest, srv bs.ByteStream_ReadServer) error {
504506
} else if req.ReadLimit == 0 || req.ReadOffset+req.ReadLimit >= digest.SizeBytes {
505507
req.ReadLimit = -1
506508
}
507-
s.limiter <- struct{}{}
508-
defer func() { <-s.limiter }()
509+
if digest.SizeBytes > s.largeBlobSize {
510+
s.limiter <- struct{}{}
511+
defer func() { <-s.limiter }()
512+
}
509513
r, needCompression, err := s.readCompressed(ctx, "cas", digest, compressed, req.ReadOffset, req.ReadLimit)
510514
if err != nil {
511515
return err
@@ -535,9 +539,9 @@ func (s *server) readCompressed(ctx context.Context, prefix string, digest *pb.D
535539
if s.isEmpty(digest) {
536540
return ioutil.NopCloser(bytes.NewReader(nil)), compressed, nil
537541
}
538-
if s.readRedis != nil && prefix == CASPrefix && digest.SizeBytes < s.redisMaxSize {
542+
if s.readRedis != nil && prefix == CASPrefix && digest.SizeBytes < s.largeBlobSize {
539543
// NOTE: we could use GETRANGE here, but given it's a bit more expensive on the redis
540-
// side and redisMaxSize is quite small, we get the whole blob
544+
// side and largeBlobSize is quite small, we get the whole blob
541545
blob, err := s.readRedis.Get(ctx, digest.Hash).Bytes()
542546
if err != nil && err != redis.Nil {
543547
log.Warningf("Failed to get blob in Redis: %v", err)
@@ -629,7 +633,7 @@ func (s *server) readAllBlobBatched(ctx context.Context, prefix string, digest *
629633
return nil, false, nil
630634
}
631635

632-
if s.readRedis != nil && prefix == CASPrefix && digest.SizeBytes < s.redisMaxSize {
636+
if s.readRedis != nil && prefix == CASPrefix && digest.SizeBytes < s.largeBlobSize {
633637
blob, err := s.readRedis.Get(ctx, digest.Hash).Bytes()
634638
if err != nil && err != redis.Nil {
635639
log.Warningf("Failed to get blob in Redis: %v", err)
@@ -659,8 +663,10 @@ func (s *server) readAllBlobBatched(ctx context.Context, prefix string, digest *
659663
}
660664

661665
func (s *server) readAllBlobCompressed(ctx context.Context, digest *pb.Digest, key string, batched, compressed bool) ([]byte, error) {
662-
s.limiter <- struct{}{}
663-
defer func() { <-s.limiter }()
666+
if digest.SizeBytes > s.largeBlobSize {
667+
s.limiter <- struct{}{}
668+
defer func() { <-s.limiter }()
669+
}
664670
start := time.Now()
665671
defer func() { readDurations.Observe(time.Since(start).Seconds()) }()
666672
b, err := s.bucket.ReadAll(ctx, key)
@@ -702,8 +708,10 @@ func (s *server) writeBlob(ctx context.Context, prefix string, digest *pb.Digest
702708
_, err := io.Copy(ioutil.Discard, r)
703709
return err
704710
}
705-
s.limiter <- struct{}{}
706-
defer func() { <-s.limiter }()
711+
if digest.SizeBytes > s.largeBlobSize {
712+
s.limiter <- struct{}{}
713+
defer func() { <-s.limiter }()
714+
}
707715
start := time.Now()
708716
defer func() {
709717
writeLatencies.Observe(time.Since(start).Seconds())
@@ -742,8 +750,10 @@ func (s *server) writeBlob(ctx context.Context, prefix string, digest *pb.Digest
742750
}
743751

744752
func (s *server) writeAll(ctx context.Context, digest *pb.Digest, data []byte, compressed bool) error {
745-
s.limiter <- struct{}{}
746-
defer func() { <-s.limiter }()
753+
if digest.SizeBytes > s.largeBlobSize {
754+
s.limiter <- struct{}{}
755+
defer func() { <-s.limiter }()
756+
}
747757
canonical := data
748758
if compressed {
749759
decompressed, err := s.decompressor.DecodeAll(canonical, make([]byte, 0, digest.SizeBytes))

mettle/main.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,7 @@ func main() {
179179
for i := 0; i < opts.Dual.NumWorkers; i++ {
180180
storage := opts.Dual.Storage.Storage[i%len(opts.Dual.Storage.Storage)]
181181
dir := filepath.Join(opts.Dual.Dir, fmt.Sprintf("worker_%02d", i))
182-
go worker.RunForever(opts.InstanceName, requests+"?ackdeadline=10m", responses, fmt.Sprintf("%s-%d", opts.InstanceName, i), storage, dir, opts.Dual.Cache.Dir, opts.Dual.Browser, opts.Dual.Sandbox, opts.Dual.AltSandbox, opts.Dual.Lucidity, "", opts.Dual.GRPC.TokenFile, primaryRedis, readRedis, opts.Dual.Redis.MaxSize, opts.Dual.Cache.Prefix, opts.Dual.Cache.Part, !opts.Dual.NoClean, opts.Dual.PreflightAction, opts.Dual.Storage.TLS, int64(opts.Dual.Cache.MaxMem), int64(opts.Dual.MinDiskSpace), opts.Dual.MemoryThreshold, opts.Dual.ConnCheck, time.Duration(opts.Dual.ConnCheckPeriod), opts.Dual.VersionFile, opts.Dual.Costs, 0, opts.Worker.ImmediateShutdown)
182+
go worker.RunForever(opts.InstanceName, requests+"?ackdeadline=10m", responses, fmt.Sprintf("%s-%d", opts.InstanceName, i), storage, dir, opts.Dual.Cache.Dir, opts.Dual.Browser, opts.Dual.Sandbox, opts.Dual.AltSandbox, opts.Dual.Lucidity, "", opts.Dual.GRPC.TokenFile, primaryRedis, readRedis, opts.Worker.Redis.MaxSize, opts.Dual.Cache.Prefix, opts.Dual.Cache.Part, !opts.Dual.NoClean, opts.Dual.PreflightAction, opts.Dual.Storage.TLS, int64(opts.Dual.Cache.MaxMem), int64(opts.Dual.MinDiskSpace), opts.Dual.MemoryThreshold, opts.Dual.ConnCheck, time.Duration(opts.Dual.ConnCheckPeriod), opts.Dual.VersionFile, opts.Dual.Costs, 0, opts.Worker.ImmediateShutdown)
183183
}
184184
api.ServeForever(opts.Dual.GRPC, "", queues, "", false, opts.Dual.AllowedPlatform, opts.Dual.Storage.Storage[0], opts.Dual.Storage.TLS)
185185
} else if cmd == "worker" {
@@ -214,7 +214,7 @@ func one() error {
214214
}
215215
primaryRedis, readRedis := opts.One.Redis.Clients()
216216
for _, action := range opts.One.Args.Actions {
217-
if err := worker.RunOne(opts.InstanceName, "mettle-one", opts.One.Storage.Storage, opts.One.Dir, opts.One.Cache.Dir, opts.One.Sandbox, opts.One.AltSandbox, opts.One.Storage.TokenFile, primaryRedis, readRedis, opts.One.Redis.MaxSize, opts.One.Cache.Prefix, opts.One.Cache.Part, false, opts.One.Storage.TLS, action.ToProto()); err != nil {
217+
if err := worker.RunOne(opts.InstanceName, "mettle-one", opts.One.Storage.Storage, opts.One.Dir, opts.One.Cache.Dir, opts.One.Sandbox, opts.One.AltSandbox, opts.One.Storage.TokenFile, primaryRedis, readRedis, opts.Worker.Redis.MaxSize, opts.One.Cache.Prefix, opts.One.Cache.Part, false, opts.One.Storage.TLS, action.ToProto()); err != nil {
218218
return err
219219
}
220220
}

mettle/worker/worker.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -148,16 +148,16 @@ func init() {
148148
}
149149

150150
// RunForever runs the worker, receiving jobs until terminated.
151-
func RunForever(instanceName, requestQueue, responseQueue, name, storage, dir, cacheDir, browserURL, sandbox, altSandbox, lucidity, promGatewayURL, tokenFile string, primaryRedis, readRedis *redis.Client, redisMaxSize int64, cachePrefix, cacheParts []string, clean, preflightAction, secureStorage bool, maxCacheSize, minDiskSpace int64, memoryThreshold float64, connCheck string, connCheckPeriod time.Duration, versionFile string, costs map[string]mettlecli.Currency, ackExtension time.Duration, immediateShutdown bool) {
152-
err := runForever(instanceName, requestQueue, responseQueue, name, storage, dir, cacheDir, browserURL, sandbox, altSandbox, lucidity, promGatewayURL, tokenFile, primaryRedis, readRedis, redisMaxSize, cachePrefix, cacheParts, clean, preflightAction, secureStorage, maxCacheSize, minDiskSpace, memoryThreshold, connCheck, connCheckPeriod, versionFile, costs, ackExtension, immediateShutdown)
151+
func RunForever(instanceName, requestQueue, responseQueue, name, storage, dir, cacheDir, browserURL, sandbox, altSandbox, lucidity, promGatewayURL, tokenFile string, primaryRedis, readRedis *redis.Client, largeBlobSize int64, cachePrefix, cacheParts []string, clean, preflightAction, secureStorage bool, maxCacheSize, minDiskSpace int64, memoryThreshold float64, connCheck string, connCheckPeriod time.Duration, versionFile string, costs map[string]mettlecli.Currency, ackExtension time.Duration, immediateShutdown bool) {
152+
err := runForever(instanceName, requestQueue, responseQueue, name, storage, dir, cacheDir, browserURL, sandbox, altSandbox, lucidity, promGatewayURL, tokenFile, primaryRedis, readRedis, largeBlobSize, cachePrefix, cacheParts, clean, preflightAction, secureStorage, maxCacheSize, minDiskSpace, memoryThreshold, connCheck, connCheckPeriod, versionFile, costs, ackExtension, immediateShutdown)
153153
log.Fatalf("Failed to run: %s", err)
154154
}
155155

156156
// RunOne runs one single request, returning any error received.
157-
func RunOne(instanceName, name, storage, dir, cacheDir, sandbox, altSandbox, tokenFile string, primaryRedis, readRedis *redis.Client, redisMaxSize int64, cachePrefix, cacheParts []string, clean, secureStorage bool, digest *pb.Digest) error {
157+
func RunOne(instanceName, name, storage, dir, cacheDir, sandbox, altSandbox, tokenFile string, primaryRedis, readRedis *redis.Client, largeBlobSize int64, cachePrefix, cacheParts []string, clean, secureStorage bool, digest *pb.Digest) error {
158158
// Must create this to submit on first
159159
topic := common.MustOpenTopic("mem://requests")
160-
w, err := initialiseWorker(instanceName, "mem://requests", "mem://responses", name, storage, dir, cacheDir, "", sandbox, altSandbox, "", "", tokenFile, primaryRedis, readRedis, redisMaxSize, cachePrefix, cacheParts, clean, secureStorage, 0, math.MaxInt64, 100.0, "", nil, 0)
160+
w, err := initialiseWorker(instanceName, "mem://requests", "mem://responses", name, storage, dir, cacheDir, "", sandbox, altSandbox, "", "", tokenFile, primaryRedis, readRedis, largeBlobSize, cachePrefix, cacheParts, clean, secureStorage, 0, math.MaxInt64, 100.0, "", nil, 0)
161161
if err != nil {
162162
return err
163163
}
@@ -184,8 +184,8 @@ func RunOne(instanceName, name, storage, dir, cacheDir, sandbox, altSandbox, tok
184184
return nil
185185
}
186186

187-
func runForever(instanceName, requestQueue, responseQueue, name, storage, dir, cacheDir, browserURL, sandbox, altSandbox, lucidity, promGatewayURL, tokenFile string, primaryRedis, readRedis *redis.Client, redisMaxSize int64, cachePrefix, cacheParts []string, clean, preflightAction, secureStorage bool, maxCacheSize, minDiskSpace int64, memoryThreshold float64, connCheck string, connCheckPeriod time.Duration, versionFile string, costs map[string]mettlecli.Currency, ackExtension time.Duration, immediateShutdown bool) error {
188-
w, err := initialiseWorker(instanceName, requestQueue, responseQueue, name, storage, dir, cacheDir, browserURL, sandbox, altSandbox, lucidity, promGatewayURL, tokenFile, primaryRedis, readRedis, redisMaxSize, cachePrefix, cacheParts, clean, secureStorage, maxCacheSize, minDiskSpace, memoryThreshold, versionFile, costs, ackExtension)
187+
func runForever(instanceName, requestQueue, responseQueue, name, storage, dir, cacheDir, browserURL, sandbox, altSandbox, lucidity, promGatewayURL, tokenFile string, primaryRedis, readRedis *redis.Client, largeBlobSize int64, cachePrefix, cacheParts []string, clean, preflightAction, secureStorage bool, maxCacheSize, minDiskSpace int64, memoryThreshold float64, connCheck string, connCheckPeriod time.Duration, versionFile string, costs map[string]mettlecli.Currency, ackExtension time.Duration, immediateShutdown bool) error {
188+
w, err := initialiseWorker(instanceName, requestQueue, responseQueue, name, storage, dir, cacheDir, browserURL, sandbox, altSandbox, lucidity, promGatewayURL, tokenFile, primaryRedis, readRedis, largeBlobSize, cachePrefix, cacheParts, clean, secureStorage, maxCacheSize, minDiskSpace, memoryThreshold, versionFile, costs, ackExtension)
189189
if err != nil {
190190
return err
191191
}
@@ -255,7 +255,7 @@ func runForever(instanceName, requestQueue, responseQueue, name, storage, dir, c
255255
}
256256
}
257257

258-
func initialiseWorker(instanceName, requestQueue, responseQueue, name, storage, dir, cacheDir, browserURL, sandbox, altSandbox, lucidity, promGatewayURL, tokenFile string, primaryRedis, readRedis *redis.Client, redisMaxSize int64, cachePrefix, cacheParts []string, clean, secureStorage bool, maxCacheSize, minDiskSpace int64, memoryThreshold float64, versionFile string, costs map[string]mettlecli.Currency, ackExtension time.Duration) (*worker, error) {
258+
func initialiseWorker(instanceName, requestQueue, responseQueue, name, storage, dir, cacheDir, browserURL, sandbox, altSandbox, lucidity, promGatewayURL, tokenFile string, primaryRedis, readRedis *redis.Client, largeBlobSize int64, cachePrefix, cacheParts []string, clean, secureStorage bool, maxCacheSize, minDiskSpace int64, memoryThreshold float64, versionFile string, costs map[string]mettlecli.Currency, ackExtension time.Duration) (*worker, error) {
259259
// Make sure we have a directory to run in
260260
if err := os.MkdirAll(dir, os.ModeDir|0755); err != nil {
261261
return nil, fmt.Errorf("Failed to create working directory: %s", err)
@@ -332,7 +332,7 @@ func initialiseWorker(instanceName, requestQueue, responseQueue, name, storage,
332332
metricTicker: time.NewTicker(5 * time.Minute),
333333
}
334334
if primaryRedis != nil {
335-
w.client = newRedisClient(client, primaryRedis, readRedis, redisMaxSize)
335+
w.client = newRedisClient(client, primaryRedis, readRedis, largeBlobSize)
336336
}
337337
if ackExtension > 0 {
338338
if !strings.HasPrefix(requestQueue, "gcppubsub://") {

0 commit comments

Comments
 (0)