diff --git a/pkg/backends/options/defaults.go b/pkg/backends/options/defaults.go index bd48c18c2..5980a8524 100644 --- a/pkg/backends/options/defaults.go +++ b/pkg/backends/options/defaults.go @@ -65,6 +65,10 @@ const ( DefaultTimeseriesShardSize = 0 // DefaultTimeseriesShardStep defines the default shard step of 0 (no sharding) DefaultTimeseriesShardStep = 0 + // DefaultChunkReadConcurrencyLimit defines the default chunk read concurrency limit + DefaultChunkReadConcurrencyLimit = 16 + // DefaultChunkReadConcurrencyLimit defines the default chunk write concurrency limit + DefaultChunkWriteConcurrencyLimit = 16 ) // DefaultCompressibleTypes returns a list of types that Trickster should compress before caching diff --git a/pkg/backends/options/options.go b/pkg/backends/options/options.go index 83b8c1e4b..1864ea974 100644 --- a/pkg/backends/options/options.go +++ b/pkg/backends/options/options.go @@ -75,6 +75,10 @@ type Options struct { CacheName string `yaml:"cache_name,omitempty"` // CacheKeyPrefix defines the cache key prefix the backend will use when writing objects to the cache CacheKeyPrefix string `yaml:"cache_key_prefix,omitempty"` + // ChunkReadConcurrencyLimit defines the concurrency limit while reading a chunked object + ChunkReadConcurrencyLimit int `yaml:"chunk_read_concurrency_limit,omitempty"` + // ChunkWriteConcurrencyLimit defines the concurrency limit while writing a chunked object + ChunkWriteConcurrencyLimit int `yaml:"chunk_write_concurrency_limit,omitempty"` // HealthCheck is the health check options reference for this backend HealthCheck *ho.Options `yaml:"healthcheck,omitempty"` // Object Proxy Cache and Delta Proxy Cache Configurations @@ -226,6 +230,8 @@ func New() *Options { CacheKeyPrefix: "", CacheName: DefaultBackendCacheName, CompressibleTypeList: DefaultCompressibleTypes(), + ChunkReadConcurrencyLimit: DefaultChunkReadConcurrencyLimit, + ChunkWriteConcurrencyLimit: DefaultChunkWriteConcurrencyLimit, FastForwardTTL: DefaultFastForwardTTL, ForwardedHeaders: DefaultForwardedHeaders, HealthCheck: ho.New(), diff --git a/pkg/config/testdata/example.alb.yaml b/pkg/config/testdata/example.alb.yaml index 836f6a0d2..7ff3b2d60 100644 --- a/pkg/config/testdata/example.alb.yaml +++ b/pkg/config/testdata/example.alb.yaml @@ -6,6 +6,8 @@ backends: keep_alive_timeout: 5m0s max_idle_conns: 20 cache_name: default + chunk_read_concurrency_limit: 16 + chunk_write_concurrency_limit: 16 healthcheck: {} timeseries_retention_factor: 1024 timeseries_eviction_method: oldest @@ -37,6 +39,8 @@ backends: keep_alive_timeout: 5m0s max_idle_conns: 20 cache_name: default + chunk_read_concurrency_limit: 16 + chunk_write_concurrency_limit: 16 healthcheck: interval: 1s path: /health @@ -70,6 +74,8 @@ backends: keep_alive_timeout: 5m0s max_idle_conns: 20 cache_name: default + chunk_read_concurrency_limit: 16 + chunk_write_concurrency_limit: 16 healthcheck: interval: 1s path: /health @@ -103,6 +109,8 @@ backends: keep_alive_timeout: 5m0s max_idle_conns: 20 cache_name: default + chunk_read_concurrency_limit: 16 + chunk_write_concurrency_limit: 16 healthcheck: interval: 1s timeseries_retention_factor: 1024 @@ -135,6 +143,8 @@ backends: keep_alive_timeout: 5m0s max_idle_conns: 20 cache_name: default + chunk_read_concurrency_limit: 16 + chunk_write_concurrency_limit: 16 healthcheck: interval: 1s timeseries_retention_factor: 1024 @@ -166,6 +176,8 @@ backends: keep_alive_timeout: 5m0s max_idle_conns: 20 cache_name: default + chunk_read_concurrency_limit: 16 + chunk_write_concurrency_limit: 16 healthcheck: {} timeseries_retention_factor: 1024 timeseries_eviction_method: oldest @@ -201,6 +213,8 @@ backends: keep_alive_timeout: 5m0s max_idle_conns: 20 cache_name: default + chunk_read_concurrency_limit: 16 + chunk_write_concurrency_limit: 16 healthcheck: {} timeseries_retention_factor: 1024 timeseries_eviction_method: oldest diff --git a/pkg/config/testdata/example.auth.yaml b/pkg/config/testdata/example.auth.yaml index 962631ed4..54b4de73b 100644 --- a/pkg/config/testdata/example.auth.yaml +++ b/pkg/config/testdata/example.auth.yaml @@ -8,6 +8,8 @@ backends: keep_alive_timeout: 5m0s max_idle_conns: 20 cache_name: default + chunk_read_concurrency_limit: 16 + chunk_write_concurrency_limit: 16 healthcheck: {} timeseries_retention_factor: 1024 timeseries_eviction_method: oldest @@ -40,6 +42,8 @@ backends: keep_alive_timeout: 5m0s max_idle_conns: 20 cache_name: default + chunk_read_concurrency_limit: 16 + chunk_write_concurrency_limit: 16 healthcheck: {} timeseries_retention_factor: 1024 timeseries_eviction_method: oldest @@ -72,6 +76,8 @@ backends: keep_alive_timeout: 5m0s max_idle_conns: 20 cache_name: default + chunk_read_concurrency_limit: 16 + chunk_write_concurrency_limit: 16 healthcheck: {} timeseries_retention_factor: 1024 timeseries_eviction_method: oldest @@ -102,6 +108,8 @@ backends: keep_alive_timeout: 5m0s max_idle_conns: 20 cache_name: default + chunk_read_concurrency_limit: 16 + chunk_write_concurrency_limit: 16 healthcheck: {} timeseries_retention_factor: 1024 timeseries_eviction_method: oldest diff --git a/pkg/config/testdata/example.full.yaml b/pkg/config/testdata/example.full.yaml index 51a769438..aca6cf038 100644 --- a/pkg/config/testdata/example.full.yaml +++ b/pkg/config/testdata/example.full.yaml @@ -8,6 +8,8 @@ backends: keep_alive_timeout: 5m0s max_idle_conns: 20 cache_name: default + chunk_read_concurrency_limit: 16 + chunk_write_concurrency_limit: 16 healthcheck: {} timeseries_retention_factor: 1024 timeseries_eviction_method: oldest diff --git a/pkg/config/testdata/exmple.sharding.yaml b/pkg/config/testdata/exmple.sharding.yaml index 45bad712f..ff6c48735 100644 --- a/pkg/config/testdata/exmple.sharding.yaml +++ b/pkg/config/testdata/exmple.sharding.yaml @@ -8,6 +8,8 @@ backends: keep_alive_timeout: 5m0s max_idle_conns: 20 cache_name: default + chunk_read_concurrency_limit: 16 + chunk_write_concurrency_limit: 16 healthcheck: {} timeseries_retention_factor: 1024 timeseries_eviction_method: oldest diff --git a/pkg/config/testdata/simple.prometheus.yaml b/pkg/config/testdata/simple.prometheus.yaml index de590ed42..428d5d901 100644 --- a/pkg/config/testdata/simple.prometheus.yaml +++ b/pkg/config/testdata/simple.prometheus.yaml @@ -8,6 +8,8 @@ backends: keep_alive_timeout: 5m0s max_idle_conns: 20 cache_name: default + chunk_read_concurrency_limit: 16 + chunk_write_concurrency_limit: 16 healthcheck: {} timeseries_retention_factor: 1024 timeseries_eviction_method: oldest diff --git a/pkg/config/testdata/simple.reverseproxycache.yaml b/pkg/config/testdata/simple.reverseproxycache.yaml index 9f8583076..6999b6b30 100644 --- a/pkg/config/testdata/simple.reverseproxycache.yaml +++ b/pkg/config/testdata/simple.reverseproxycache.yaml @@ -8,6 +8,8 @@ backends: keep_alive_timeout: 5m0s max_idle_conns: 20 cache_name: default + chunk_read_concurrency_limit: 16 + chunk_write_concurrency_limit: 16 healthcheck: {} timeseries_retention_factor: 1024 timeseries_eviction_method: oldest diff --git a/pkg/proxy/engines/cache.go b/pkg/proxy/engines/cache.go index a7ee8cfd0..2f7986e5f 100644 --- a/pkg/proxy/engines/cache.go +++ b/pkg/proxy/engines/cache.go @@ -19,21 +19,16 @@ package engines import ( "bytes" "context" - "errors" "fmt" "io" "mime" "net/http" "strings" - "sync" - "sync/atomic" "time" "github.com/andybalholm/brotli" "github.com/trickstercache/trickster/v2/pkg/cache" "github.com/trickstercache/trickster/v2/pkg/cache/status" - "github.com/trickstercache/trickster/v2/pkg/observability/logging" - "github.com/trickstercache/trickster/v2/pkg/observability/logging/logger" tspan "github.com/trickstercache/trickster/v2/pkg/observability/tracing/span" tc "github.com/trickstercache/trickster/v2/pkg/proxy/context" "github.com/trickstercache/trickster/v2/pkg/proxy/headers" @@ -130,146 +125,19 @@ func QueryCache(ctx context.Context, c cache.Cache, key string, lookupStatus = qr.lookupStatus // If we got a meta document and want to use cache chunking, do so + opts := rsc.BackendOptions if c.Configuration().UseCacheChunking { if trq := rsc.TimeRangeQuery; trq != nil { - // Do timeseries chunk retrieval - // Determine chunk extent and number of chunks - var cext timeseries.Extent - csize := trq.Step * time.Duration(c.Configuration().TimeseriesChunkFactor) - var cct int - cext.Start, cext.End = trq.Extent.Start.Truncate(csize), trq.Extent.End.Truncate(csize).Add(csize) - cct = int(cext.End.Sub(cext.Start) / csize) - // Prepare buffered results and waitgroup - wg := &sync.WaitGroup{} - // Result slice of timeseries - ress := make(timeseries.List, cct) - var resi int - for chunkStart := cext.Start; chunkStart.Before(cext.End); chunkStart = chunkStart.Add(csize) { - // Chunk range (inclusive, on-step) - chunkExtent := timeseries.Extent{ - Start: chunkStart, - End: chunkStart.Add(csize - trq.Step), - } - // Derive subkey - subkey := getSubKey(key, chunkExtent) - // Query - outIdx := resi - wg.Go(func() { - qr := queryConcurrent(ctx, c, subkey) - if qr.lookupStatus != status.LookupStatusHit && - (qr.err == nil || errors.Is(qr.err, cache.ErrKNF)) { - return - } - if qr.err != nil { - logger.Error("dpc query cache chunk failed", - logging.Pairs{ - "error": qr.err, "chunkIdx": outIdx, - "key": subkey, "cacheQueryStatus": qr.lookupStatus, - }) - return - } - if c.Configuration().Provider != providerMemory { - qr.d.timeseries, qr.err = unmarshal(qr.d.Body, nil) - if qr.err != nil { - logger.Error("dpc query cache chunk failed", - logging.Pairs{ - "error": qr.err, "chunkIdx": outIdx, - "key": subkey, "cacheQueryStatus": qr.lookupStatus, - }) - return - } - } - if qr.d.timeseries != nil { - ress[outIdx] = qr.d.timeseries - } - }) - - resi++ - } - // Wait on queries - wg.Wait() - d.timeseries = ress.Merge(true) - if d.timeseries != nil { - d.timeseries.SetExtents(d.timeseries.Extents().Compress(trq.Step)) + // Use timeseries chunk querying + err := executeTimeseriesChunkQuery(ctx, c, key, d, trq, unmarshal, opts) + if err != nil { + return nil, status.LookupStatusKeyMiss, ranges, err } } else { - // Do byterange chunking - // Determine chunk start/end and number of chunks - var crs, cre, cct int64 - if len(ranges) == 0 { - ranges = byterange.Ranges{byterange.Range{Start: 0, End: d.ContentLength - 1}} - } - size := c.Configuration().ByterangeChunkSize - crs, cre = ranges[0].Start, ranges[len(ranges)-1].End - crs = (crs / size) * size - cre = (cre/size + 1) * size - cct = (cre - crs) / size - // Allocate body in meta document - d.Body = make([]byte, d.ContentLength) - // Prepare buffered results and waitgroup - cr := make([]*queryResult, cct) - wg := &sync.WaitGroup{} - // Iterate chunks - var i int - for chunkStart := crs; chunkStart < cre; chunkStart += size { - // Determine chunk range (inclusive) - chunkRange := byterange.Range{ - Start: chunkStart, - End: chunkStart + size - 1, - } - // Determine subkey - subkey := key + chunkRange.String() - // Query subdocument - - index := i - wg.Go(func() { - qr := queryConcurrent(ctx, c, subkey) - cr[index] = qr - }) - i++ - } - // Wait on queries to finish (result channel is buffered and doesn't hold for receive) - wg.Wait() - // Handle results - var dbl int64 - for _, qr := range cr { - if qr == nil { - continue - } - // Return on error - if qr.err != nil && !errors.Is(qr.err, cache.ErrKNF) { - return qr.d, qr.lookupStatus, ranges, qr.err - } - // Merge with meta document on success - // We can do this concurrently since chunk ranges don't overlap - - wg.Go(func() { - if qr.d.IsMeta { - return - } - if qr.lookupStatus == status.LookupStatusHit { - for _, r := range qr.d.Ranges { - content := qr.d.Body[r.Start%size : r.End%size+1] - r.Copy(d.Body, content) - if v := atomic.LoadInt64(&dbl); r.End+1 > v { - atomic.CompareAndSwapInt64(&dbl, v, r.End+1) - } - } - } - }) - } - wg.Wait() - if len(d.Ranges) > 1 { - d.StoredRangeParts = make(map[string]*byterange.MultipartByteRange) - for _, r := range d.Ranges { - d.StoredRangeParts[r.String()] = &byterange.MultipartByteRange{ - Range: r, - Content: d.Body[r.Start : r.End+1], - } - } - d.Body = nil - } else { - d.Body = d.Body[:dbl] + // Use byterange chunk querying + err := executeByterangeChunkQuery(ctx, c, key, d, ranges, opts) + if err != nil { + return nil, status.LookupStatusKeyMiss, ranges, err } } } @@ -404,90 +272,19 @@ func WriteCache(ctx context.Context, c cache.Cache, key string, d *HTTPDocument, } } + opts := rsc.BackendOptions if c.Configuration().UseCacheChunking { rsc.Lock() trq := rsc.TimeRangeQuery rsc.Unlock() if trq != nil { - // Do timeseries chunking - meta := d.GetMeta() - // Determine chunk extent and number of chunks - var cext timeseries.Extent - csize := trq.Step * time.Duration(c.Configuration().TimeseriesChunkFactor) - var cct int - cext.Start, cext.End = trq.Extent.Start.Truncate(csize), trq.Extent.End.Truncate(csize).Add(csize) - cct = int(cext.End.Sub(cext.Start) / csize) - // Prepare buffered results and waitgroup - cr := make([]error, cct+1) - wg := &sync.WaitGroup{} - var i int - for chunkStart := cext.Start; chunkStart.Before(cext.End); chunkStart = chunkStart.Add(csize) { - // Chunk range (inclusive, on-step) - chunkExtent := timeseries.Extent{ - Start: chunkStart, - End: chunkStart.Add(csize - trq.Step), - } - // Derive subkey - subkey := getSubKey(key, chunkExtent) - // Write - index := i - wg.Go(func() { - cd := d.GetTimeseriesChunk(chunkExtent) - if c.Configuration().Provider != providerMemory { - cd.Body, _ = marshal(cd.timeseries, nil, 0) - } - cr[index] = writeConcurrent(ctx, c, subkey, cd, compress, ttl) - }) - i++ - } - // Store metadocument - wg.Go(func() { - cr[i] = writeConcurrent(ctx, c, key, meta, compress, ttl) - }) - // Wait on writes to finish (result channel is buffered and doesn't hold for receive) - wg.Wait() - // Handle results - err = errors.Join(cr...) + // Use timeseries chunking + chunker := NewTimeseriesChunkWriter(c, key, trq, marshal) + err = executeChunking(ctx, c, key, d, compress, ttl, chunker, opts) } else { - // Do byterange chunking - // Determine chunk start/end and number of chunks - drs := d.getByteRanges() - size := c.Configuration().ByterangeChunkSize - crs, cre := drs[0].Start, drs[len(drs)-1].End - crs = (crs / size) * size - cre = (cre/size + 1) * size - cct := (cre - crs) / size - // Create meta document - meta := d.GetMeta() - // Prepare buffered results and waitgroup - cr := make([]error, cct+1) - wg := &sync.WaitGroup{} - // Iterate chunks - var i int - for chunkStart := crs; chunkStart < cre; chunkStart += size { - // Determine chunk range (inclusive) - chunkRange := byterange.Range{ - Start: chunkStart, - End: chunkStart + size - 1, - } - // Determine subkey - subkey := key + chunkRange.String() - // Get chunk - cd := d.GetByterangeChunk(chunkRange, size) - // Store subdocument - index := i - wg.Go(func() { - cr[index] = writeConcurrent(ctx, c, subkey, cd, compress, ttl) - }) - i++ - } - // Store metadocument - wg.Go(func() { - cr[i] = writeConcurrent(ctx, c, key, meta, compress, ttl) - }) - // Wait on writes to finish (result channel is buffered and doesn't hold for receive) - wg.Wait() - err = errors.Join(cr...) + // Use byterange chunking + chunker := NewByterangeChunkWriter(c, key, d) + err = executeChunking(ctx, c, key, d, compress, ttl, chunker, opts) } } else { if marshal != nil { diff --git a/pkg/proxy/engines/cache_read.go b/pkg/proxy/engines/cache_read.go new file mode 100644 index 000000000..264178d24 --- /dev/null +++ b/pkg/proxy/engines/cache_read.go @@ -0,0 +1,313 @@ +/* + * Copyright 2018 The Trickster Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package engines + +import ( + "context" + "errors" + "sync/atomic" + "time" + + "github.com/trickstercache/trickster/v2/pkg/backends/options" + "github.com/trickstercache/trickster/v2/pkg/cache" + "github.com/trickstercache/trickster/v2/pkg/cache/status" + "github.com/trickstercache/trickster/v2/pkg/observability/logging" + "github.com/trickstercache/trickster/v2/pkg/observability/logging/logger" + "github.com/trickstercache/trickster/v2/pkg/proxy/ranges/byterange" + "github.com/trickstercache/trickster/v2/pkg/timeseries" + "golang.org/x/sync/errgroup" +) + +// ChunkQueryIterator provides iteration over chunks for cache queries (reading) +type ChunkQueryIterator interface { + // IterateChunks calls the provided function for each chunk + // The function receives (index, subkey) and should return whether to continue + IterateChunks(func(int, string) bool) +} + +// iterateChunksHelper is a generic helper that reduces code duplication for chunk iteration +func iterateChunksHelper(fn func(int, string) bool, generateSubkey func(int) string, shouldContinue func(int) bool) { + for i := 0; shouldContinue(i); i++ { + subkey := generateSubkey(i) + if !fn(i, subkey) { + break + } + } +} + +// ChunkQueryProcessor handles the result of querying a single chunk from cache (reading) +type ChunkQueryProcessor interface { + // ProcessChunk processes a successful query result for a chunk + ProcessChunk(index int, subkey string, qr *queryResult, c cache.Cache) error + + // Finalize performs any final processing after all chunks are processed + Finalize() error +} + +// executeChunkQuery performs generic chunk querying with early cancellation (reading from cache) +func executeChunkQuery(ctx context.Context, c cache.Cache, iterator ChunkQueryIterator, processor ChunkQueryProcessor, opts *options.Options) error { + // Prepare waitgroup for concurrent processing + eg := errgroup.Group{} + limit := options.DefaultChunkReadConcurrencyLimit + if opts != nil && opts.ChunkReadConcurrencyLimit != 0 { + limit = opts.ChunkReadConcurrencyLimit + } + eg.SetLimit(limit) + + // Early cancellation context + ctx, cancel := context.WithCancel(ctx) + defer cancel() + var firstErr atomic.Pointer[error] + + iterator.IterateChunks(func(index int, subkey string) bool { + // Check if we should abort due to previous error + select { + case <-ctx.Done(): + return false + default: + } + + eg.Go(func() error { + qr := queryConcurrent(ctx, c, subkey) + if qr.lookupStatus != status.LookupStatusHit && + (qr.err == nil || errors.Is(qr.err, cache.ErrKNF)) { + return nil + } + if qr.err != nil { + // Set first error and cancel remaining operations + if firstErr.CompareAndSwap(nil, &qr.err) { + cancel() + } + logger.Error("chunk query failed", + logging.Pairs{ + "error": qr.err, "chunkIdx": index, + "key": subkey, "cacheQueryStatus": qr.lookupStatus, + }) + return qr.err + } + + // Process the successful result + if err := processor.ProcessChunk(index, subkey, qr, c); err != nil { + if firstErr.CompareAndSwap(nil, &err) { + cancel() + } + return err + } + return nil + }) + return true + }) + + // Wait on queries + if err := eg.Wait(); err != nil { + return err + } + + // Check if we had any errors + if err := firstErr.Load(); err != nil { + return *err + } + + // Finalize processing + return processor.Finalize() +} + +// TimeseriesChunkQueryIterator implements ChunkQueryIterator for timeseries chunks (reading) +type TimeseriesChunkQueryIterator struct { + key string + cext timeseries.Extent + csize time.Duration + trq *timeseries.TimeRangeQuery +} + +func (tci *TimeseriesChunkQueryIterator) IterateChunks(fn func(int, string) bool) { + chunkStart := tci.cext.Start + + iterateChunksHelper(fn, + func(i int) string { + chunkExtent := timeseries.Extent{ + Start: chunkStart.Add(time.Duration(i) * tci.csize), + End: chunkStart.Add(time.Duration(i) * tci.csize).Add(tci.csize - tci.trq.Step), + } + return getSubKey(tci.key, chunkExtent) + }, + func(i int) bool { + return chunkStart.Add(time.Duration(i) * tci.csize).Before(tci.cext.End) + }) +} + +// TimeseriesChunkQueryProcessor implements ChunkQueryProcessor for timeseries chunks (reading) +type TimeseriesChunkQueryProcessor struct { + d *HTTPDocument + trq *timeseries.TimeRangeQuery + unmarshal timeseries.UnmarshalerFunc + ress timeseries.List +} + +func (tcp *TimeseriesChunkQueryProcessor) ProcessChunk(index int, subkey string, qr *queryResult, c cache.Cache) error { + if c.Configuration().Provider != providerMemory { + var err error + qr.d.timeseries, err = tcp.unmarshal(qr.d.Body, nil) + if err != nil { + logger.Error("chunk unmarshal failed", + logging.Pairs{ + "error": err, "chunkIdx": index, + "key": subkey, "cacheQueryStatus": qr.lookupStatus, + }) + return err + } + } + if qr.d.timeseries != nil { + tcp.ress[index] = qr.d.timeseries + } + return nil +} + +func (tcp *TimeseriesChunkQueryProcessor) Finalize() error { + tcp.d.timeseries = tcp.ress.Merge(true) + if tcp.d.timeseries != nil { + tcp.d.timeseries.SetExtents(tcp.d.timeseries.Extents().Compress(tcp.trq.Step)) + } + return nil +} + +// executeTimeseriesChunkQuery performs timeseries chunk querying with early cancellation +func executeTimeseriesChunkQuery(ctx context.Context, c cache.Cache, key string, d *HTTPDocument, trq *timeseries.TimeRangeQuery, unmarshal timeseries.UnmarshalerFunc, opts *options.Options) error { + // Determine chunk extent and number of chunks + var cext timeseries.Extent + csize := trq.Step * time.Duration(c.Configuration().TimeseriesChunkFactor) + cext.Start, cext.End = trq.Extent.Start.Truncate(csize), trq.Extent.End.Truncate(csize).Add(csize) + cct := int(cext.End.Sub(cext.Start) / csize) + + iterator := &TimeseriesChunkQueryIterator{ + key: key, + cext: cext, + csize: csize, + trq: trq, + } + + processor := &TimeseriesChunkQueryProcessor{ + d: d, + trq: trq, + unmarshal: unmarshal, + ress: make(timeseries.List, cct), + } + + return executeChunkQuery(ctx, c, iterator, processor, opts) +} + +// ByterangeChunkQueryIterator implements ChunkQueryIterator for byterange chunks (reading) +type ByterangeChunkQueryIterator struct { + key string + crs int64 + cre int64 + size int64 +} + +func (bci *ByterangeChunkQueryIterator) IterateChunks(fn func(int, string) bool) { + iterateChunksHelper(fn, + func(i int) string { + chunkStart := bci.crs + int64(i)*bci.size + chunkRange := byterange.Range{ + Start: chunkStart, + End: chunkStart + bci.size - 1, + } + return bci.key + chunkRange.String() + }, + func(i int) bool { + return bci.crs+int64(i)*bci.size < bci.cre + }) +} + +// ByterangeChunkQueryProcessor implements ChunkQueryProcessor for byterange chunks (reading) +type ByterangeChunkQueryProcessor struct { + d *HTTPDocument + size int64 + dbl *int64 // atomic counter for document body length +} + +func (bcp *ByterangeChunkQueryProcessor) ProcessChunk(index int, subkey string, qr *queryResult, c cache.Cache) error { + if qr == nil { + return nil + } + + // Handle error - different from timeseries as we allow cache.ErrKNF + if qr.err != nil && !errors.Is(qr.err, cache.ErrKNF) { + return qr.err + } + + // Process successful result immediately + if !qr.d.IsMeta && qr.lookupStatus == status.LookupStatusHit { + for _, r := range qr.d.Ranges { + content := qr.d.Body[r.Start%bcp.size : r.End%bcp.size+1] + r.Copy(bcp.d.Body, content) + if v := atomic.LoadInt64(bcp.dbl); r.End+1 > v { + atomic.CompareAndSwapInt64(bcp.dbl, v, r.End+1) + } + } + } + return nil +} + +func (bcp *ByterangeChunkQueryProcessor) Finalize() error { + if len(bcp.d.Ranges) > 1 { + bcp.d.StoredRangeParts = make(map[string]*byterange.MultipartByteRange) + for _, r := range bcp.d.Ranges { + bcp.d.StoredRangeParts[r.String()] = &byterange.MultipartByteRange{ + Range: r, + Content: bcp.d.Body[r.Start : r.End+1], + } + } + bcp.d.Body = nil + } else { + bcp.d.Body = bcp.d.Body[:*bcp.dbl] + } + return nil +} + +// executeByterangeChunkQuery performs byterange chunk querying with early cancellation +func executeByterangeChunkQuery(ctx context.Context, c cache.Cache, key string, d *HTTPDocument, ranges byterange.Ranges, opts *options.Options) error { + // Determine chunk start/end + var crs, cre int64 + if len(ranges) == 0 { + ranges = byterange.Ranges{byterange.Range{Start: 0, End: d.ContentLength - 1}} + } + size := c.Configuration().ByterangeChunkSize + crs, cre = ranges[0].Start, ranges[len(ranges)-1].End + crs = (crs / size) * size + cre = (cre/size + 1) * size + + // Allocate body in meta document + d.Body = make([]byte, d.ContentLength) + + var dbl int64 // Track document body length + + iterator := &ByterangeChunkQueryIterator{ + key: key, + crs: crs, + cre: cre, + size: size, + } + + processor := &ByterangeChunkQueryProcessor{ + d: d, + size: size, + dbl: &dbl, + } + + return executeChunkQuery(ctx, c, iterator, processor, opts) +} diff --git a/pkg/proxy/engines/cache_write.go b/pkg/proxy/engines/cache_write.go new file mode 100644 index 000000000..09287d6b8 --- /dev/null +++ b/pkg/proxy/engines/cache_write.go @@ -0,0 +1,223 @@ +/* + * Copyright 2018 The Trickster Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package engines + +import ( + "context" + "errors" + "time" + + "github.com/trickstercache/trickster/v2/pkg/backends/options" + "github.com/trickstercache/trickster/v2/pkg/cache" + "github.com/trickstercache/trickster/v2/pkg/proxy/ranges/byterange" + "github.com/trickstercache/trickster/v2/pkg/timeseries" + "golang.org/x/sync/errgroup" +) + +// CacheableDocument abstracts the source data for chunking operations when writing to cache. +type CacheableDocument interface { + // GetMeta returns the metadata document that acts as a manifest for all chunks. + GetMeta() *HTTPDocument + + // GetTimeseriesChunk retrieves the specific chunk of data for a given time extent. + GetTimeseriesChunk(extent timeseries.Extent) *HTTPDocument + + // getByteRanges returns the total extent of the data in byte ranges. + getByteRanges() byterange.Ranges + + // GetByterangeChunk retrieves the specific chunk of data for a given byte range. + GetByterangeChunk(dataRange byterange.Range, size int64) *HTTPDocument +} + +// ChunkWriter abstracts the logic for different types of chunking when writing to cache. +type ChunkWriter interface { + // Determine the number of chunks plus one for the metadata document. + ChunkCount() int + + // Iterate over chunks and execute the provided write function for each. + // The write function takes (index, subkey, chunkData). + IterateChunks( + d CacheableDocument, // The source data document + writeFunc func(int, string, any) error, + ) error + + // GetMeta returns the metadata document to be stored separately. + GetMeta(d CacheableDocument) any +} + +// TimeseriesChunkWriter handles timeseries chunking operations when writing to cache +type TimeseriesChunkWriter struct { + trq *timeseries.TimeRangeQuery + c cache.Cache + key string + cext timeseries.Extent + csize time.Duration + cct int + marshal timeseries.MarshalerFunc +} + +// NewTimeseriesChunkWriter creates a new TimeseriesChunkWriter +func NewTimeseriesChunkWriter(c cache.Cache, key string, trq *timeseries.TimeRangeQuery, marshal timeseries.MarshalerFunc) *TimeseriesChunkWriter { + csize := trq.Step * time.Duration(c.Configuration().TimeseriesChunkFactor) + var cext timeseries.Extent + cext.Start, cext.End = trq.Extent.Start.Truncate(csize), trq.Extent.End.Truncate(csize).Add(csize) + cct := int(cext.End.Sub(cext.Start) / csize) + + return &TimeseriesChunkWriter{trq: trq, c: c, key: key, cext: cext, csize: csize, cct: cct, marshal: marshal} +} + +func (tc *TimeseriesChunkWriter) ChunkCount() int { + return tc.cct + 1 // chunks + meta +} + +func (tc *TimeseriesChunkWriter) GetMeta(d CacheableDocument) any { + return d.GetMeta() +} + +func (tc *TimeseriesChunkWriter) IterateChunks( + d CacheableDocument, + writeFunc func(int, string, any) error, +) error { + i := 0 + for chunkStart := tc.cext.Start; chunkStart.Before(tc.cext.End); chunkStart = chunkStart.Add(tc.csize) { + chunkExtent := timeseries.Extent{ + Start: chunkStart, + End: chunkStart.Add(tc.csize - tc.trq.Step), + } + subkey := getSubKey(tc.key, chunkExtent) + chunkData := d.GetTimeseriesChunk(chunkExtent) + + // Handle serialization for non-memory providers + if tc.c.Configuration().Provider != providerMemory && tc.marshal != nil { + chunkData.Body, _ = tc.marshal(chunkData.timeseries, nil, 0) + } + + if err := writeFunc(i, subkey, chunkData); err != nil { + return err + } + i++ + } + return nil +} + +// ByterangeChunkWriter handles byterange chunking operations when writing to cache +type ByterangeChunkWriter struct { + c cache.Cache + key string + size int64 + crs int64 // chunk range start + cre int64 // chunk range end + cct int64 // chunk count +} + +// NewByterangeChunkWriter creates a new byterange chunk writer +func NewByterangeChunkWriter(c cache.Cache, key string, d CacheableDocument) *ByterangeChunkWriter { + drs := d.getByteRanges() + size := c.Configuration().ByterangeChunkSize + crs, cre := drs[0].Start, drs[len(drs)-1].End + crs = (crs / size) * size + cre = (cre/size + 1) * size + cct := (cre - crs) / size + + return &ByterangeChunkWriter{ + c: c, + key: key, + size: size, + crs: crs, + cre: cre, + cct: cct, + } +} + +func (bc *ByterangeChunkWriter) ChunkCount() int { + return int(bc.cct) + 1 // +1 for meta +} + +func (bc *ByterangeChunkWriter) IterateChunks( + d CacheableDocument, + writeFunc func(int, string, any) error, +) error { + i := 0 + for chunkStart := bc.crs; chunkStart < bc.cre; chunkStart += bc.size { + chunkRange := byterange.Range{ + Start: chunkStart, + End: chunkStart + bc.size - 1, + } + subkey := bc.key + chunkRange.String() + chunkData := d.GetByterangeChunk(chunkRange, bc.size) + + if err := writeFunc(i, subkey, chunkData); err != nil { + return err + } + i++ + } + return nil +} + +func (bc *ByterangeChunkWriter) GetMeta(d CacheableDocument) any { + return d.GetMeta() +} + +// executeChunking performs the generic chunking and concurrent write logic. +func executeChunking(ctx context.Context, c cache.Cache, key string, d CacheableDocument, compress bool, ttl time.Duration, chunker ChunkWriter, opts *options.Options) error { + cct := chunker.ChunkCount() + cr := make([]error, cct) // Error slice size is chunks + 1 (for meta) + + eg := errgroup.Group{} + limit := options.DefaultChunkWriteConcurrencyLimit + if opts != nil && opts.ChunkWriteConcurrencyLimit != 0 { + limit = opts.ChunkWriteConcurrencyLimit + } + eg.SetLimit(limit) + + // 1. Iterate over chunks and start concurrent writes + err := chunker.IterateChunks(d, func(index int, subkey string, chunkData any) error { + // This is the core concurrent write logic + eg.Go(func() error { + httpDoc, ok := chunkData.(*HTTPDocument) + if !ok { + return errors.New("invalid chunk data type") + } + cr[index] = writeConcurrent(ctx, c, subkey, httpDoc, compress, ttl) + return nil + }) + return nil // The return value here is unused, we use the errgroup for final errors + }) + if err != nil { + return err + } + + // The last index is reserved for the metadata document. + metaIndex := cct - 1 + meta := chunker.GetMeta(d) + + // 2. Store metadocument concurrently + eg.Go(func() error { + httpDoc, ok := meta.(*HTTPDocument) + if !ok { + return errors.New("invalid meta data type") + } + cr[metaIndex] = writeConcurrent(ctx, c, key, httpDoc, compress, ttl) + return nil + }) + + // 3. Wait on writes to finish and handle results + if err := eg.Wait(); err != nil { + return err + } + return errors.Join(cr...) +}