@@ -101,7 +101,7 @@ func (a *adapter) Update(key, cmd string, val RedisMessage) (sxat int64) {
101
101
val .setExpireAt (sxat )
102
102
}
103
103
a .store .Set (key + cmd , val )
104
- flight .set (val , nil )
104
+ flight .setVal (val )
105
105
entries [cmd ] = nil
106
106
}
107
107
a .mu .Unlock ()
@@ -112,7 +112,7 @@ func (a *adapter) Cancel(key, cmd string, err error) {
112
112
a .mu .Lock ()
113
113
entries := a .flights [key ]
114
114
if flight , ok := entries [cmd ].(* adapterEntry ); ok {
115
- flight .set ( RedisMessage {}, err )
115
+ flight .setErr ( err )
116
116
entries [cmd ] = nil
117
117
}
118
118
a .mu .Unlock ()
@@ -154,7 +154,7 @@ func (a *adapter) Close(err error) {
154
154
for _ , entries := range flights {
155
155
for _ , e := range entries {
156
156
if e != nil {
157
- e .(* adapterEntry ).set ( RedisMessage {}, err )
157
+ e .(* adapterEntry ).setErr ( err )
158
158
}
159
159
}
160
160
}
@@ -167,8 +167,13 @@ type adapterEntry struct {
167
167
xat int64
168
168
}
169
169
170
- func (a * adapterEntry ) set (val RedisMessage , err error ) {
171
- a .err , a .val = err , val
170
+ func (a * adapterEntry ) setVal (val RedisMessage ) {
171
+ a .val = val
172
+ close (a .ch )
173
+ }
174
+
175
+ func (a * adapterEntry ) setErr (err error ) {
176
+ a .err = err
172
177
close (a .ch )
173
178
}
174
179
@@ -202,27 +207,27 @@ type flatentry struct {
202
207
func (f * flatentry ) insert (e * flatentry ) {
203
208
f .size += e .size
204
209
f .mu .Lock ()
205
- defer f .mu .Unlock ()
206
210
e .ovfl = f .ovfl
207
211
f .ovfl = e
212
+ f .mu .Unlock ()
208
213
}
209
214
210
215
func (f * flatentry ) find (cmd string , ts int64 ) (ret RedisMessage , expired bool ) {
211
- if f == nil {
212
- return
213
- }
214
- if ts >= f .ttl {
215
- expired = true
216
- return
217
- }
218
- if cmd == f .cmd {
219
- _ = ret .CacheUnmarshalView (f .val )
220
- return
216
+ for next := f ; next != nil ; {
217
+ if ts >= next .ttl {
218
+ expired = true
219
+ return
220
+ }
221
+ if cmd == next .cmd {
222
+ _ = ret .CacheUnmarshalView (next .val )
223
+ return
224
+ }
225
+ next .mu .RLock ()
226
+ ovfl := next .ovfl
227
+ next .mu .RUnlock ()
228
+ next = ovfl
221
229
}
222
- f .mu .RLock ()
223
- ovfl := f .ovfl
224
- f .mu .RUnlock ()
225
- return ovfl .find (cmd , ts )
230
+ return
226
231
}
227
232
228
233
const lrBatchSize = 64
@@ -246,7 +251,9 @@ func NewFlattenCache(limit int) CacheStore {
246
251
f .lrup = sync.Pool {New : func () any {
247
252
b := & lrBatch {m : make (map [* flatentry ]struct {}, lrBatchSize )}
248
253
runtime .SetFinalizer (b , func (b * lrBatch ) {
254
+ f .mu .Lock ()
249
255
f .llTailBatch (b )
256
+ f .mu .Unlock ()
250
257
})
251
258
return b
252
259
}}
@@ -287,11 +294,9 @@ func (f *flatten) llTail(e *flatentry) {
287
294
}
288
295
289
296
func (f * flatten ) llTailBatch (b * lrBatch ) {
290
- f .mu .Lock ()
291
297
for e := range b .m {
292
298
f .llTail (e )
293
299
}
294
- f .mu .Unlock ()
295
300
clear (b .m )
296
301
}
297
302
@@ -310,7 +315,9 @@ func (f *flatten) Flight(key, cmd string, ttl time.Duration, now time.Time) (Red
310
315
batch := f .lrup .Get ().(* lrBatch )
311
316
batch .m [e ] = struct {}{}
312
317
if len (batch .m ) == lrBatchSize {
318
+ f .mu .Lock ()
313
319
f .llTailBatch (batch )
320
+ f .mu .Unlock ()
314
321
}
315
322
f .lrup .Put (batch )
316
323
return v , nil
@@ -373,7 +380,7 @@ func (f *flatten) Update(key, cmd string, val RedisMessage) (sxat int64) {
373
380
}
374
381
}
375
382
f .mu .Unlock ()
376
- af .set (val , nil )
383
+ af .setVal (val )
377
384
}
378
385
return sxat
379
386
}
@@ -384,7 +391,7 @@ func (f *flatten) Cancel(key, cmd string, err error) {
384
391
defer f .mu .Unlock ()
385
392
if af := f .flights [fk ]; af != nil {
386
393
delete (f .flights , fk )
387
- af .set ( RedisMessage {}, err )
394
+ af .setErr ( err )
388
395
}
389
396
}
390
397
@@ -416,6 +423,6 @@ func (f *flatten) Close(err error) {
416
423
f .mark ++
417
424
f .mu .Unlock ()
418
425
for _ , entry := range flights {
419
- entry .set ( RedisMessage {}, err )
426
+ entry .setErr ( err )
420
427
}
421
428
}
0 commit comments