@@ -221,25 +221,26 @@ func (f *flatentry) find(cmd string, ts int64) (ret RedisMessage, expired bool)
221
221
}
222
222
223
223
const lrBatchSize = 64
224
+ const flattEntrySize = unsafe .Sizeof (flatentry {})
224
225
225
226
type lrBatch struct {
226
227
m map [* flatentry ]struct {}
227
228
}
228
229
229
- func NewFlattenCache (limit int64 ) CacheStore {
230
+ func NewFlattenCache (limit int ) CacheStore {
230
231
f := & flatten {
231
232
flights : make (map [string ]* adapterEntry ),
232
233
cache : make (map [string ]* flatentry ),
233
234
head : & flatentry {},
234
235
tail : & flatentry {},
235
236
size : 0 ,
236
- limit : limit ,
237
+ limit : int64 ( limit ) ,
237
238
}
238
239
f .head .next = unsafe .Pointer (f .tail )
239
240
f .tail .prev = unsafe .Pointer (f .head )
240
241
f .lrup = sync.Pool {New : func () any {
241
242
b := & lrBatch {m : make (map [* flatentry ]struct {}, lrBatchSize )}
242
- runtime .SetFinalizer (b , func () {
243
+ runtime .SetFinalizer (b , func (b * lrBatch ) {
243
244
f .llTailBatch (b )
244
245
})
245
246
return b
@@ -330,40 +331,46 @@ func (f *flatten) Flight(key, cmd string, ttl time.Duration, now time.Time) (Red
330
331
if af = f .flights [fk ]; af != nil {
331
332
return RedisMessage {}, af
332
333
}
333
- f .flights [fk ] = & adapterEntry {ch : make (chan struct {}), xat : ts + ttl .Milliseconds ()}
334
+ if f .flights != nil {
335
+ f .flights [fk ] = & adapterEntry {ch : make (chan struct {}), xat : ts + ttl .Milliseconds ()}
336
+ }
334
337
return RedisMessage {}, nil
335
338
}
336
339
337
- func (f * flatten ) Update (key , cmd string , val RedisMessage ) int64 {
340
+ func (f * flatten ) Update (key , cmd string , val RedisMessage ) ( sxat int64 ) {
338
341
fk := key + cmd
339
- bs := val .CacheMarshal (nil )
340
- fe := & flatentry {cmd : cmd , val : bs , ttl : val .CachePXAT (), size : int64 (len (bs )+ len (key )+ len (cmd )) + int64 (unsafe .Sizeof (flatentry {}))}
341
- f .mu .Lock ()
342
+ f .mu .RLock ()
342
343
af := f .flights [fk ]
344
+ f .mu .RUnlock ()
343
345
if af != nil {
344
- delete (f .flights , fk )
345
- if af .xat < fe .ttl {
346
- fe .ttl = af .xat
346
+ sxat = val .getExpireAt ()
347
+ if af .xat < sxat || sxat == 0 {
348
+ sxat = af .xat
349
+ val .setExpireAt (sxat )
347
350
}
348
- }
349
- f .size += fe .size
350
- for ep := f .head .next ; f .size > f .limit && ep != unsafe .Pointer (f .tail ); {
351
- e := (* flatentry )(ep )
352
- f .remove (e )
353
- ep = e .next
354
- }
355
- if e := f .cache [key ]; e == nil {
356
- fe .key = key
357
- f .cache [key ] = fe
358
- f .llAdd (fe )
359
- } else {
360
- e .insert (fe )
361
- }
362
- f .mu .Unlock ()
363
- if af != nil {
351
+ bs := val .CacheMarshal (nil )
352
+ fe := & flatentry {cmd : cmd , val : bs , ttl : sxat , size : int64 (len (bs )+ len (key )+ len (cmd )) + int64 (flattEntrySize )}
353
+ f .mu .Lock ()
354
+ if f .flights != nil {
355
+ delete (f .flights , fk )
356
+ f .size += fe .size
357
+ for ep := f .head .next ; f .size > f .limit && ep != unsafe .Pointer (f .tail ); {
358
+ e := (* flatentry )(ep )
359
+ f .remove (e )
360
+ ep = e .next
361
+ }
362
+ if e := f .cache [key ]; e == nil {
363
+ fe .key = key
364
+ f .cache [key ] = fe
365
+ f .llAdd (fe )
366
+ } else {
367
+ e .insert (fe )
368
+ }
369
+ }
370
+ f .mu .Unlock ()
364
371
af .set (val , nil )
365
372
}
366
- return fe . ttl
373
+ return sxat
367
374
}
368
375
369
376
func (f * flatten ) Cancel (key , cmd string , err error ) {
0 commit comments