Skip to content

Commit 9aebf68

Browse files
committed
feat: caching 支持 refresh 返回err取消写入缓存
1 parent 8da3d7b commit 9aebf68

File tree

4 files changed

+81
-11
lines changed

4 files changed

+81
-11
lines changed

go.mod

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ require (
2929

3030
require (
3131
filippo.io/edwards25519 v1.1.0 // indirect
32+
github.com/avast/retry-go v3.0.0+incompatible // indirect
33+
github.com/avast/retry-go/v4 v4.6.1 // indirect
3234
github.com/bytedance/sonic v1.13.1 // indirect
3335
github.com/bytedance/sonic/loader v0.2.4 // indirect
3436
github.com/cloudwego/base64x v0.1.5 // indirect

go.sum

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,10 @@ filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA=
22
filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4=
33
github.com/DATA-DOG/go-sqlmock v1.5.2 h1:OcvFkGmslmlZibjAjaHm3L//6LiuBgolP7OputlJIzU=
44
github.com/DATA-DOG/go-sqlmock v1.5.2/go.mod h1:88MAG/4G7SMwSE3CeA0ZKzrT5CiOU3OJ+JlNzwDqpNU=
5+
github.com/avast/retry-go v3.0.0+incompatible h1:4SOWQ7Qs+oroOTQOYnAHqelpCO0biHSxpiH9JdtuBj0=
6+
github.com/avast/retry-go v3.0.0+incompatible/go.mod h1:XtSnn+n/sHqQIpZ10K1qAevBhOOCWBLXXy3hyiqqBrY=
7+
github.com/avast/retry-go/v4 v4.6.1 h1:VkOLRubHdisGrHnTu89g08aQEWEgRU7LVEop3GbIcMk=
8+
github.com/avast/retry-go/v4 v4.6.1/go.mod h1:V6oF8njAwxJ5gRo1Q7Cxab24xs5NCWZBeaHHBklR8mA=
59
github.com/bytedance/sonic v1.13.1 h1:Jyd5CIvdFnkOWuKXr+wm4Nyk2h0yAFsr8ucJgEasO3g=
610
github.com/bytedance/sonic v1.13.1/go.mod h1:o68xyaF9u2gvVBuGHPlUVCy+ZfmNNO5ETf1+KgkJhz4=
711
github.com/bytedance/sonic/loader v0.1.1/go.mod h1:ncP89zfokxS5LZrJxl5z0UJcsk4M4yY2JpfqGeCtNLU=

kratos/caching/loadable_cache.go

Lines changed: 29 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,11 @@ package caching
22

33
import (
44
"context"
5+
"log"
56
"sync"
67
"time"
78

9+
"github.com/avast/retry-go"
810
"github.com/szyhf/go-gcache/v2"
911
"go.opentelemetry.io/otel"
1012
"go.opentelemetry.io/otel/trace"
@@ -39,15 +41,16 @@ type loadableCache[K comparable, V any] struct {
3941
exp time.Duration // key 过期时间
4042
size int // 缓存大小,超出的缓存会被 evict
4143
block bool // 是否阻塞当前调用链
42-
refresh func() map[K]V // 刷新缓存数据的函数
44+
retryCount int // 重试次数 (几次后依旧空数据则认为空数据)
45+
refresh func() (map[K]V, error) // 刷新缓存数据的函数
4346
ticker *time.Ticker // 定时器(用于过期刷缓存)
4447
stop chan struct{} // 停止信号
4548
}
4649

4750
type Option[K comparable, V any] func(*loadableCache[K, V])
4851

49-
// WithRefreshAfterWrite refresh data provider
50-
func WithRefreshAfterWrite[K comparable, V any](f func() map[K]V) Option[K, V] {
52+
// WithRefreshAfterWrite refresh data provider (return error will not refresh)
53+
func WithRefreshAfterWrite[K comparable, V any](f func() (map[K]V, error)) Option[K, V] {
5154
return func(cb *loadableCache[K, V]) {
5255
cb.refresh = f
5356
}
@@ -85,6 +88,7 @@ func WithTracing[K comparable, V any](provider trace.TracerProvider) Option[K, V
8588
}
8689
}
8790

91+
8892
func New[K comparable, V any](opts ...Option[K, V]) LoadableCache[K, V] {
8993
cache := &loadableCache[K, V]{
9094
exp: 10 * time.Second,
@@ -106,9 +110,20 @@ func New[K comparable, V any](opts ...Option[K, V]) LoadableCache[K, V] {
106110
cache.ticker = time.NewTicker(cache.exp)
107111

108112
firstLoad := func() {
113+
114+
115+
109116
if cache.refresh != nil {
110-
ret := cache.refresh()
111-
cache.putAll(ret)
117+
if err := retry.Do(func() error {
118+
ret, err := cache.refresh()
119+
if err != nil {
120+
return err
121+
}
122+
cache.putAll(ret)
123+
return nil
124+
}); err != nil {
125+
panic(err)
126+
}
112127
}
113128
}
114129
// block 状态不使用 goroutine, 卡住当前调用链等待结束
@@ -170,7 +185,10 @@ func (cb *loadableCache[K, V]) TryPurgeAndReload(ctx context.Context) bool {
170185
_ = recover()
171186
}()
172187

173-
ret := cb.refresh()
188+
ret, err := cb.refresh()
189+
if err != nil {
190+
return false
191+
}
174192
return cb.putAll(ret)
175193
}
176194

@@ -195,7 +213,11 @@ func (cb *loadableCache[K, V]) rf() {
195213
return
196214
case <-cb.ticker.C:
197215
if cb.refresh != nil {
198-
ret := cb.refresh()
216+
log.Printf("refresh cache\n")
217+
ret, err := cb.refresh()
218+
if err != nil {
219+
continue
220+
}
199221
cb.putAll(ret)
200222
}
201223
}

kratos/caching/loadable_cache_test.go

Lines changed: 46 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ package caching_test
22

33
import (
44
"context"
5+
"errors"
6+
"sync"
57
"testing"
68
"time"
79

@@ -11,13 +13,17 @@ import (
1113
"github.com/omalloc/contrib/kratos/caching"
1214
)
1315

14-
func fakeRefresh() map[int64]string {
16+
func fakeRefresh() (map[int64]string, error) {
1517
key := time.Now().Unix()
1618

19+
if key%2 == 0 {
20+
return nil, errors.New("error")
21+
}
22+
1723
return map[int64]string{
1824
key - 1: "new-value1",
1925
key: "new-value2",
20-
}
26+
}, nil
2127
}
2228

2329
func TestBaseCache(t *testing.T) {
@@ -59,21 +65,25 @@ func TestCacheChanged(t *testing.T) {
5965
caching.WithSize[int64, string](100),
6066
caching.WithExpiration[int64, string](2*time.Second), // 每2秒刷新一次缓存
6167
caching.WithRefreshAfterWrite(fakeRefresh),
68+
caching.WithBlock[int64, string](),
6269
)
6370

6471
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Microsecond)
6572
defer cancel()
6673

6774
// get all
6875
kvs := cc.GetALL(ctx)
69-
assert.Equal(t, 0, len(kvs))
76+
assert.Equal(t, 2, len(kvs))
7077

7178
// set key 1
72-
cc.Set(ctx, 1, "value1")
79+
_ = cc.Set(ctx, 1, "value1")
7380

7481
v, _ := cc.Get(ctx, 1)
7582
assert.Equal(t, "value1", v)
7683

84+
kvs = cc.GetALL(ctx)
85+
assert.Equal(t, 3, len(kvs))
86+
7787
// auto refresh
7888
time.Sleep(time.Second * 3)
7989

@@ -155,3 +165,35 @@ func TestAutoRefresh(t *testing.T) {
155165
time.Sleep(time.Second * 3)
156166
assert.Equal(t, 2, len(cc.Values(ctx)))
157167
}
168+
169+
func TestConcurrent(t *testing.T) {
170+
cc := caching.New(
171+
caching.WithSize[int64, string](100),
172+
caching.WithExpiration[int64, string](2*time.Second), // 每2秒刷新一次缓存
173+
caching.WithRefreshAfterWrite(fakeRefresh), // 每次请求刷出 2 个 kv
174+
caching.WithBlock[int64, string](),
175+
)
176+
177+
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Microsecond)
178+
defer cancel()
179+
180+
wg := sync.WaitGroup{}
181+
wg.Add(20)
182+
183+
for i := 0; i < 20; i++ {
184+
go func() {
185+
for j:=range 100 {
186+
t.Logf("i: %d -> j: %d", i, j)
187+
kvs := cc.GetALL(ctx)
188+
if len(kvs) != 2 {
189+
panic("kvs is not two-size.")
190+
}
191+
time.Sleep(time.Millisecond * 100)
192+
}
193+
wg.Done()
194+
}()
195+
}
196+
197+
198+
wg.Wait()
199+
}

0 commit comments

Comments
 (0)