Skip to content

Commit 1b8a3b0

Browse files
authored
fix: resolve deadlock in cache eviction and improve GetBatch implementation and full id version (#3591)
* fix: performance issues with Kafka caused by encapsulating the MQ interface * fix: admin token in standalone mode * fix: full id version * fix: resolve deadlock in cache eviction and improve GetBatch implementation
1 parent b8c4b45 commit 1b8a3b0

File tree

5 files changed

+133
-20
lines changed

5 files changed

+133
-20
lines changed

pkg/localcache/cache.go

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -47,15 +47,15 @@ func New[V any](opts ...Option) Cache[V] {
4747
if opt.localSlotNum > 0 && opt.localSlotSize > 0 {
4848
createSimpleLRU := func() lru.LRU[string, V] {
4949
if opt.expirationEvict {
50-
return lru.NewExpirationLRU(opt.localSlotSize, opt.localSuccessTTL, opt.localFailedTTL, opt.target, c.onEvict)
50+
return lru.NewExpirationLRU[string, V](opt.localSlotSize, opt.localSuccessTTL, opt.localFailedTTL, opt.target, c.onEvict)
5151
} else {
52-
return lru.NewLazyLRU(opt.localSlotSize, opt.localSuccessTTL, opt.localFailedTTL, opt.target, c.onEvict)
52+
return lru.NewLazyLRU[string, V](opt.localSlotSize, opt.localSuccessTTL, opt.localFailedTTL, opt.target, c.onEvict)
5353
}
5454
}
5555
if opt.localSlotNum == 1 {
5656
c.local = createSimpleLRU()
5757
} else {
58-
c.local = lru.NewSlotLRU(opt.localSlotNum, LRUStringHash, createSimpleLRU)
58+
c.local = lru.NewSlotLRU[string, V](opt.localSlotNum, LRUStringHash, createSimpleLRU)
5959
}
6060
if opt.linkSlotNum > 0 {
6161
c.link = link.New(opt.linkSlotNum)
@@ -71,14 +71,19 @@ type cache[V any] struct {
7171
}
7272

7373
func (c *cache[V]) onEvict(key string, value V) {
74-
_ = value
75-
7674
if c.link != nil {
77-
lks := c.link.Del(key)
78-
for k := range lks {
79-
if key != k { // prevent deadlock
80-
c.local.Del(k)
81-
}
75+
// Do not delete other keys while the underlying LRU still holds its lock;
76+
// defer linked deletions to avoid re-entering the same slot and deadlocking.
77+
if lks := c.link.Del(key); len(lks) > 0 {
78+
go c.delLinked(key, lks)
79+
}
80+
}
81+
}
82+
83+
func (c *cache[V]) delLinked(src string, keys map[string]struct{}) {
84+
for k := range keys {
85+
if src != k {
86+
c.local.Del(k)
8287
}
8388
}
8489
}
@@ -105,7 +110,7 @@ func (c *cache[V]) Get(ctx context.Context, key string, fetch func(ctx context.C
105110
func (c *cache[V]) GetLink(ctx context.Context, key string, fetch func(ctx context.Context) (V, error), link ...string) (V, error) {
106111
if c.local != nil {
107112
return c.local.Get(key, func() (V, error) {
108-
if len(link) > 0 {
113+
if len(link) > 0 && c.link != nil {
109114
c.link.Link(key, link...)
110115
}
111116
return fetch(ctx)

pkg/localcache/cache_test.go

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ import (
2222
"sync/atomic"
2323
"testing"
2424
"time"
25+
26+
"github.com/openimsdk/open-im-server/v3/pkg/localcache/lru"
2527
)
2628

2729
func TestName(t *testing.T) {
@@ -91,3 +93,68 @@ func TestName(t *testing.T) {
9193
t.Log("del", del.Load())
9294
// 137.35s
9395
}
96+
97+
// Test deadlock scenario when eviction callback deletes a linked key that hashes to the same slot.
98+
func TestCacheEvictDeadlock(t *testing.T) {
99+
ctx := context.Background()
100+
c := New[string](WithLocalSlotNum(1), WithLocalSlotSize(1), WithLazy())
101+
102+
if _, err := c.GetLink(ctx, "k1", func(ctx context.Context) (string, error) {
103+
return "v1", nil
104+
}, "k2"); err != nil {
105+
t.Fatalf("seed cache failed: %v", err)
106+
}
107+
108+
done := make(chan struct{})
109+
go func() {
110+
defer close(done)
111+
_, _ = c.GetLink(ctx, "k2", func(ctx context.Context) (string, error) {
112+
return "v2", nil
113+
}, "k1")
114+
}()
115+
116+
select {
117+
case <-done:
118+
// expected to finish quickly; current implementation deadlocks here.
119+
case <-time.After(time.Second):
120+
t.Fatal("GetLink deadlocked during eviction of linked key")
121+
}
122+
}
123+
124+
func TestExpirationLRUGetBatch(t *testing.T) {
125+
l := lru.NewExpirationLRU[string, string](2, time.Minute, time.Second*5, EmptyTarget{}, nil)
126+
127+
keys := []string{"a", "b"}
128+
values, err := l.GetBatch(keys, func(keys []string) (map[string]string, error) {
129+
res := make(map[string]string)
130+
for _, k := range keys {
131+
res[k] = k + "_v"
132+
}
133+
return res, nil
134+
})
135+
if err != nil {
136+
t.Fatalf("unexpected error: %v", err)
137+
}
138+
if len(values) != len(keys) {
139+
t.Fatalf("expected %d values, got %d", len(keys), len(values))
140+
}
141+
for _, k := range keys {
142+
if v, ok := values[k]; !ok || v != k+"_v" {
143+
t.Fatalf("unexpected value for %s: %q, ok=%v", k, v, ok)
144+
}
145+
}
146+
147+
// second batch should hit cache
148+
values, err = l.GetBatch(keys, func(keys []string) (map[string]string, error) {
149+
t.Fatalf("should not fetch on cache hit")
150+
return nil, nil
151+
})
152+
if err != nil {
153+
t.Fatalf("unexpected error on cache hit: %v", err)
154+
}
155+
for _, k := range keys {
156+
if v, ok := values[k]; !ok || v != k+"_v" {
157+
t.Fatalf("unexpected cached value for %s: %q, ok=%v", k, v, ok)
158+
}
159+
}
160+
}

pkg/localcache/init.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,10 +33,6 @@ func InitLocalCache(localCache *config.LocalCache) {
3333
Local config.CacheConfig
3434
Keys []string
3535
}{
36-
{
37-
Local: localCache.Auth,
38-
Keys: []string{cachekey.UidPidToken},
39-
},
4036
{
4137
Local: localCache.User,
4238
Keys: []string{cachekey.UserInfoKey, cachekey.UserGlobalRecvMsgOptKey},

pkg/localcache/lru/lru_expiration.go

Lines changed: 47 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,53 @@ type ExpirationLRU[K comparable, V any] struct {
5252
}
5353

5454
func (x *ExpirationLRU[K, V]) GetBatch(keys []K, fetch func(keys []K) (map[K]V, error)) (map[K]V, error) {
55-
//TODO implement me
56-
panic("implement me")
55+
var (
56+
err error
57+
results = make(map[K]V)
58+
misses = make([]K, 0, len(keys))
59+
)
60+
61+
for _, key := range keys {
62+
x.lock.Lock()
63+
v, ok := x.core.Get(key)
64+
x.lock.Unlock()
65+
if ok {
66+
x.target.IncrGetHit()
67+
v.lock.RLock()
68+
results[key] = v.value
69+
if v.err != nil && err == nil {
70+
err = v.err
71+
}
72+
v.lock.RUnlock()
73+
continue
74+
}
75+
misses = append(misses, key)
76+
}
77+
78+
if len(misses) == 0 {
79+
return results, err
80+
}
81+
82+
fetchValues, fetchErr := fetch(misses)
83+
if fetchErr != nil && err == nil {
84+
err = fetchErr
85+
}
86+
87+
for key, val := range fetchValues {
88+
results[key] = val
89+
if fetchErr != nil {
90+
x.target.IncrGetFailed()
91+
continue
92+
}
93+
x.target.IncrGetSuccess()
94+
item := &expirationLruItem[V]{value: val}
95+
x.lock.Lock()
96+
x.core.Add(key, item)
97+
x.lock.Unlock()
98+
}
99+
100+
// any keys not returned from fetch remain absent (no cache write)
101+
return results, err
57102
}
58103

59104
func (x *ExpirationLRU[K, V]) Get(key K, fetch func() (V, error)) (V, error) {

pkg/localcache/lru/lru_slot.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ type slotLRU[K comparable, V any] struct {
3535
func (x *slotLRU[K, V]) GetBatch(keys []K, fetch func(keys []K) (map[K]V, error)) (map[K]V, error) {
3636
var (
3737
slotKeys = make(map[uint64][]K)
38-
kVs = make(map[K]V)
38+
vs = make(map[K]V)
3939
)
4040

4141
for _, k := range keys {
@@ -49,10 +49,10 @@ func (x *slotLRU[K, V]) GetBatch(keys []K, fetch func(keys []K) (map[K]V, error)
4949
return nil, err
5050
}
5151
for key, value := range batches {
52-
kVs[key] = value
52+
vs[key] = value
5353
}
5454
}
55-
return kVs, nil
55+
return vs, nil
5656
}
5757

5858
func (x *slotLRU[K, V]) getIndex(k K) uint64 {

0 commit comments

Comments
 (0)