Skip to content

Commit cdda2dd

Browse files
committed
Revert "wip: reap in progress guard"
This reverts commit dfc33cc. Signed-off-by: Chris Randles <randles.chris@gmail.com>
1 parent dfc33cc commit cdda2dd

File tree

2 files changed

+9
-114
lines changed

2 files changed

+9
-114
lines changed

pkg/cache/index/client.go

Lines changed: 9 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -125,16 +125,15 @@ type IndexedClient struct {
125125
LastFlush atomicx.Time `msg:"LastFlush,extension"`
126126

127127
// internal index configuration
128-
name string `msg:"-"`
129-
cacheProvider string `msg:"-"`
130-
options atomic.Value `msg:"-"`
131-
ico IndexedClientOptions `msg:"-"`
132-
lastWrite atomicx.Time `msg:"-"`
133-
isClosing atomic.Bool
134-
cancel context.CancelFunc
135-
flusherExited atomic.Bool
136-
reaperExited atomic.Bool
137-
reapInProgress atomic.Bool
128+
name string `msg:"-"`
129+
cacheProvider string `msg:"-"`
130+
options atomic.Value `msg:"-"`
131+
ico IndexedClientOptions `msg:"-"`
132+
lastWrite atomicx.Time `msg:"-"`
133+
isClosing atomic.Bool
134+
cancel context.CancelFunc
135+
flusherExited atomic.Bool
136+
reaperExited atomic.Bool
138137

139138
// used only in tests: fields to interact with client goroutines
140139
forceFlush chan bool
@@ -185,23 +184,6 @@ func (idx *IndexedClient) updateIndex(cacheKey string, size int64, la, lw, e tim
185184
metrics.ObserveCacheSizeChange(idx.name, idx.cacheProvider, cacheSize, count)
186185
idx.lastWrite.Store(time.Now())
187186
idx.Objects.Store(cacheKey, obj)
188-
189-
// Inline eviction guard: trigger async reaping if approaching max size
190-
// to prevent large overshoot between periodic reaper cycles.
191-
// Only trigger if reaper goroutine is running (ReapInterval > 0) and
192-
// no reap is already in progress.
193-
opts := idx.options.Load().(*options.Options)
194-
if opts.ReapInterval > 0 && opts.MaxSizeBytes > 0 && cacheSize > 0 && !idx.reapInProgress.Load() {
195-
threshold := int64(float64(opts.MaxSizeBytes) * 0.9) // 90% threshold
196-
if cacheSize > threshold {
197-
select {
198-
case idx.forceReap <- true:
199-
// successfully triggered async reap
200-
default:
201-
// reap already in progress or channel full, skip
202-
}
203-
}
204-
}
205187
}
206188

207189
func (idx *IndexedClient) StoreReference(cacheKey string, data cache.ReferenceObject, ttl time.Duration) error {
@@ -425,12 +407,6 @@ REAPER:
425407
// reap makes a single iteration through the cache index to to find and remove expired elements
426408
// and evict least-recently-accessed elements to maintain the Maximum allowed Cache Size
427409
func (idx *IndexedClient) reap() {
428-
// Set flag to prevent concurrent reaps
429-
if !idx.reapInProgress.CompareAndSwap(false, true) {
430-
return // reap already in progress, skip
431-
}
432-
defer idx.reapInProgress.Store(false)
433-
434410
cacheSize := max(atomic.LoadInt64(&idx.CacheSize), 0)
435411
removals := make([]string, 0)
436412
remainders := make(objectsAtime, 0, cacheSize)

pkg/encoding/zstd/zstd_test.go

Lines changed: 0 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ package zstd
1919
import (
2020
"bytes"
2121
"net/http/httptest"
22-
"sync"
2322
"testing"
2423
)
2524

@@ -76,83 +75,3 @@ func TestNewEncoder(t *testing.T) {
7675
t.Error("expected non-nil encoder")
7776
}
7877
}
79-
80-
// TestConcurrentEncodeDecode verifies that the shared commonEncoder and
81-
// commonDecoder are safe for concurrent use via EncodeAll/DecodeAll.
82-
// If this test fails or produces data corruption, the shared instances
83-
// must be replaced with sync.Pool.
84-
func TestConcurrentEncodeDecode(t *testing.T) {
85-
// Create test data with varying patterns to detect corruption
86-
testData := [][]byte{
87-
bytes.Repeat([]byte("test data 1"), 100),
88-
bytes.Repeat([]byte("different pattern 2"), 150),
89-
bytes.Repeat([]byte("yet another test 3"), 200),
90-
bytes.Repeat([]byte("final test pattern 4"), 250),
91-
}
92-
93-
const goroutines = 100
94-
const iterations = 10
95-
96-
var wg sync.WaitGroup
97-
errors := make(chan error, goroutines*iterations*2)
98-
99-
// Launch concurrent encode/decode operations
100-
for i := 0; i < goroutines; i++ {
101-
dataIndex := i % len(testData)
102-
testBytes := testData[dataIndex]
103-
104-
wg.Add(2)
105-
106-
// Concurrent encode
107-
go func(data []byte, idx int) {
108-
defer wg.Done()
109-
for j := 0; j < iterations; j++ {
110-
encoded, err := Encode(data)
111-
if err != nil {
112-
errors <- err
113-
return
114-
}
115-
// Verify we can decode what we just encoded
116-
decoded, err := Decode(encoded)
117-
if err != nil {
118-
errors <- err
119-
return
120-
}
121-
if !bytes.Equal(decoded, data) {
122-
t.Errorf("data corruption: goroutine %d iteration %d", idx, j)
123-
return
124-
}
125-
}
126-
}(testBytes, i)
127-
128-
// Concurrent decode (of pre-encoded data)
129-
go func(data []byte, idx int) {
130-
defer wg.Done()
131-
// Pre-encode the data
132-
encoded, err := Encode(data)
133-
if err != nil {
134-
errors <- err
135-
return
136-
}
137-
for j := 0; j < iterations; j++ {
138-
decoded, err := Decode(encoded)
139-
if err != nil {
140-
errors <- err
141-
return
142-
}
143-
if !bytes.Equal(decoded, data) {
144-
t.Errorf("data corruption: goroutine %d iteration %d", idx, j)
145-
return
146-
}
147-
}
148-
}(testBytes, i)
149-
}
150-
151-
wg.Wait()
152-
close(errors)
153-
154-
// Check for any errors
155-
for err := range errors {
156-
t.Error(err)
157-
}
158-
}

0 commit comments

Comments
 (0)