1717package cache
1818
1919import (
20+ "bytes"
2021 "fmt"
2122 "io"
22- "io/ioutil"
2323 "os"
2424 "path/filepath"
2525 "sync"
@@ -28,117 +28,211 @@ import (
2828 "github.com/pkg/errors"
2929)
3030
31+ const (
32+ defaultMaxLRUCacheEntry = 10
33+ defaultMaxCacheFds = 10
34+ )
35+
36+ type DirectoryCacheConfig struct {
37+ MaxLRUCacheEntry int `toml:"max_lru_cache_entry"`
38+ MaxCacheFds int `toml:"max_cache_fds"`
39+ SyncAdd bool `toml:"sync_add"`
40+ }
41+
3142// TODO: contents validation.
3243
3344type BlobCache interface {
34- Fetch ( blobHash string ) ( []byte , error )
35- Add ( blobHash string , p []byte )
45+ Add ( key string , p []byte , opts ... Option )
46+ FetchAt ( key string , offset int64 , p []byte , opts ... Option ) ( n int , err error )
3647}
3748
38- type dirOpt struct {
39- syncAdd bool
49+ type cacheOpt struct {
50+ direct bool
4051}
4152
42- type DirOption func (o * dirOpt ) * dirOpt
53+ type Option func (o * cacheOpt ) * cacheOpt
4354
44- func SyncAdd () DirOption {
45- return func (o * dirOpt ) * dirOpt {
46- o .syncAdd = true
55+ // When Direct option is specified for FetchAt and Add methods, these operation
56+ // won't use on-memory caches. When you know that the targeting value won't be
57+ // used immediately, you can prevent the limited space of on-memory caches from
58+ // being polluted by these unimportant values.
59+ func Direct () Option {
60+ return func (o * cacheOpt ) * cacheOpt {
61+ o .direct = true
4762 return o
4863 }
4964}
5065
51- func NewDirectoryCache (directory string , memCacheSize int , opts ... DirOption ) (BlobCache , error ) {
52- opt := & dirOpt {}
53- for _ , o := range opts {
54- opt = o (opt )
66+ func NewDirectoryCache (directory string , config DirectoryCacheConfig ) (BlobCache , error ) {
67+ maxEntry := config .MaxLRUCacheEntry
68+ if maxEntry == 0 {
69+ maxEntry = defaultMaxLRUCacheEntry
70+ }
71+ maxFds := config .MaxCacheFds
72+ if maxFds == 0 {
73+ maxFds = defaultMaxCacheFds
5574 }
5675 if err := os .MkdirAll (directory , os .ModePerm ); err != nil {
5776 return nil , err
5877 }
5978 dc := & directoryCache {
60- cache : lru .New (memCacheSize ),
79+ cache : newObjectCache (maxEntry ),
80+ fileCache : newObjectCache (maxFds ),
6181 directory : directory ,
82+ bufPool : sync.Pool {
83+ New : func () interface {} {
84+ return new (bytes.Buffer )
85+ },
86+ },
6287 }
63- if opt . syncAdd {
64- dc .syncAdd = true
88+ dc . cache . finalize = func ( value interface {}) {
89+ dc .bufPool . Put ( value )
6590 }
91+ dc .fileCache .finalize = func (value interface {}) {
92+ value .(* os.File ).Close ()
93+ }
94+ dc .syncAdd = config .SyncAdd
6695 return dc , nil
6796}
6897
6998// directoryCache is a cache implementation which backend is a directory.
7099type directoryCache struct {
71- cache * lru. Cache
72- cacheMu sync. Mutex
100+ cache * objectCache
101+ fileCache * objectCache
73102 directory string
74- syncAdd bool
75- fileMu sync.Mutex
103+
104+ bufPool sync.Pool
105+
106+ syncAdd bool
76107}
77108
78- func (dc * directoryCache ) Fetch (blobHash string ) (p []byte , err error ) {
79- dc .cacheMu .Lock ()
80- defer dc .cacheMu .Unlock ()
109+ func (dc * directoryCache ) FetchAt (key string , offset int64 , p []byte , opts ... Option ) (n int , err error ) {
110+ opt := & cacheOpt {}
111+ for _ , o := range opts {
112+ opt = o (opt )
113+ }
81114
82- if cache , ok := dc .cache .Get (blobHash ); ok {
83- p , ok := cache .([]byte )
84- if ok {
85- return p , nil
115+ if ! opt .direct {
116+ // Get data from memory
117+ if b , done , ok := dc .cache .get (key ); ok {
118+ defer done ()
119+ data := b .(* bytes.Buffer ).Bytes ()
120+ if int64 (len (data )) < offset {
121+ return 0 , fmt .Errorf ("invalid offset %d exceeds chunk size %d" ,
122+ offset , len (data ))
123+ }
124+ return copy (p , data [offset :]), nil
86125 }
87- }
88126
89- c := filepath .Join (dc .directory , blobHash [:2 ], blobHash )
90- if _ , err := os .Stat (c ); err != nil {
91- return nil , errors .Wrapf (err , "Missed cache %q" , c )
127+ // Get data from disk. If the file is already opened, use it.
128+ if f , done , ok := dc .fileCache .get (key ); ok {
129+ defer done ()
130+ return f .(* os.File ).ReadAt (p , offset )
131+ }
92132 }
93133
94- file , err := os .Open (c )
134+ // Open the cache file and read the target region
135+ // TODO: If the target cache is write-in-progress, should we wait for the completion
136+ // or simply report the cache miss?
137+ file , err := os .Open (dc .cachePath (key ))
95138 if err != nil {
96- return nil , errors .Wrapf (err , "Failed to Open cached blob file %q" , c )
139+ return 0 , errors .Wrapf (err , "failed to open blob file for %q" , key )
140+ }
141+ if n , err = file .ReadAt (p , offset ); err == io .EOF {
142+ err = nil
97143 }
98- defer file .Close ()
99144
100- if p , err = ioutil .ReadAll (file ); err != nil && err != io .EOF {
101- return nil , errors .Wrapf (err , "failed to read cached data %q" , c )
145+ // Cache the opened file for future use. If "direct" option is specified, this
146+ // won't be done. This option is useful for preventing file cache from being
147+ // polluted by data that won't be accessed immediately.
148+ if opt .direct || ! dc .fileCache .add (key , file ) {
149+ file .Close ()
102150 }
103- dc .cache .Add (blobHash , p )
104151
105- return
152+ // TODO: should we cache the entire file data on memory?
153+ // but making I/O (possibly huge) on every fetching
154+ // might be costly.
155+
156+ return n , err
106157}
107158
108- func (dc * directoryCache ) Add (blobHash string , p []byte ) {
109- // Copy the original data for avoiding the cached contents to be edited accidentally
110- p2 := make ([] byte , len ( p ))
111- copy ( p2 , p )
112- p = p2
159+ func (dc * directoryCache ) Add (key string , p []byte , opts ... Option ) {
160+ opt := & cacheOpt {}
161+ for _ , o := range opts {
162+ opt = o ( opt )
163+ }
113164
114- dc .cacheMu .Lock ()
115- dc .cache .Add (blobHash , p )
116- dc .cacheMu .Unlock ()
165+ if ! opt .direct {
166+ // Cache the passed data on memory. This enables to serve this data even
167+ // during writing it to the disk. If "direct" option is specified, this
168+ // won't be done. This option is useful for preventing memory cache from being
169+ // polluted by data that won't be accessed immediately.
170+ b := dc .bufPool .Get ().(* bytes.Buffer )
171+ b .Reset ()
172+ b .Write (p )
173+ if ! dc .cache .add (key , b ) {
174+ dc .bufPool .Put (b ) // Already exists. No need to cache.
175+ }
176+ }
117177
178+ // Cache the passed data to disk.
179+ b2 := dc .bufPool .Get ().(* bytes.Buffer )
180+ b2 .Reset ()
181+ b2 .Write (p )
118182 addFunc := func () {
119- dc .fileMu .Lock ()
120- defer dc .fileMu .Unlock ()
183+ defer dc .bufPool .Put (b2 )
121184
122- // Check if cache exists.
123- c := filepath .Join (dc .directory , blobHash [:2 ], blobHash )
185+ var (
186+ c = dc .cachePath (key )
187+ wip = dc .wipPath (key )
188+ )
189+ if _ , err := os .Stat (wip ); err == nil {
190+ return // Write in progress
191+ }
124192 if _ , err := os .Stat (c ); err == nil {
193+ return // Already exists.
194+ }
195+
196+ // Write the contents to a temporary file
197+ if err := os .MkdirAll (filepath .Dir (wip ), os .ModePerm ); err != nil {
198+ fmt .Printf ("Warning: Failed to Create blob cache directory %q: %v\n " , c , err )
199+ return
200+ }
201+ wipfile , err := os .Create (wip )
202+ if err != nil {
203+ fmt .Printf ("Warning: failed to prepare temp file for storing cache %q" , key )
204+ return
205+ }
206+ defer func () {
207+ wipfile .Close ()
208+ os .Remove (wipfile .Name ())
209+ }()
210+ want := b2 .Len ()
211+ if _ , err := io .CopyN (wipfile , b2 , int64 (want )); err != nil {
212+ fmt .Printf ("Warning: failed to write cache: %v\n " , err )
125213 return
126214 }
127215
128- // Create cache file
216+ // Commit the cache contents
129217 if err := os .MkdirAll (filepath .Dir (c ), os .ModePerm ); err != nil {
130218 fmt .Printf ("Warning: Failed to Create blob cache directory %q: %v\n " , c , err )
131219 return
132220 }
133- f , err := os .Create (c )
221+ if err := os .Rename (wipfile .Name (), c ); err != nil {
222+ fmt .Printf ("Warning: failed to commit cache to %q: %v\n " , c , err )
223+ return
224+ }
225+ file , err := os .Open (c )
134226 if err != nil {
135- fmt .Printf ("Warning: could not create a cache file at %q: %v\n " , c , err )
227+ fmt .Printf ("Warning: failed to open cache on %q: %v\n " , c , err )
136228 return
137229 }
138- defer f .Close ()
139- if n , err := f .Write (p ); err != nil || n != len (p ) {
140- fmt .Printf ("Warning: failed to write cache: %d(wrote)/%d(expected): %v\n " ,
141- n , len (p ), err )
230+
231+ // Cache the opened file for future use. If "direct" option is specified, this
232+ // won't be done. This option is useful for preventing file cache from being
233+ // polluted by data that won't be accessed immediately.
234+ if opt .direct || ! dc .fileCache .add (key , file ) {
235+ file .Close ()
142236 }
143237 }
144238
@@ -149,6 +243,81 @@ func (dc *directoryCache) Add(blobHash string, p []byte) {
149243 }
150244}
151245
246+ func (dc * directoryCache ) cachePath (key string ) string {
247+ return filepath .Join (dc .directory , key [:2 ], key )
248+ }
249+
250+ func (dc * directoryCache ) wipPath (key string ) string {
251+ return filepath .Join (dc .directory , key [:2 ], "w" , key )
252+ }
253+
254+ func newObjectCache (maxEntries int ) * objectCache {
255+ oc := & objectCache {
256+ cache : lru .New (maxEntries ),
257+ }
258+ oc .cache .OnEvicted = func (key lru.Key , value interface {}) {
259+ value .(* object ).release () // Decrease ref count incremented in add operation.
260+ }
261+ return oc
262+ }
263+
264+ type objectCache struct {
265+ cache * lru.Cache
266+ cacheMu sync.Mutex
267+ finalize func (interface {})
268+ }
269+
270+ func (oc * objectCache ) get (key string ) (value interface {}, done func (), ok bool ) {
271+ oc .cacheMu .Lock ()
272+ defer oc .cacheMu .Unlock ()
273+ o , ok := oc .cache .Get (key )
274+ if ! ok {
275+ return nil , nil , false
276+ }
277+ o .(* object ).use ()
278+ return o .(* object ).v , func () { o .(* object ).release () }, true
279+ }
280+
281+ func (oc * objectCache ) add (key string , value interface {}) bool {
282+ oc .cacheMu .Lock ()
283+ defer oc .cacheMu .Unlock ()
284+ if _ , ok := oc .cache .Get (key ); ok {
285+ return false // TODO: should we swap the object?
286+ }
287+ o := & object {
288+ v : value ,
289+ finalize : oc .finalize ,
290+ }
291+ o .use () // Keep this object having at least 1 ref count (will be decreased on eviction)
292+ oc .cache .Add (key , o )
293+ return true
294+ }
295+
296+ type object struct {
297+ v interface {}
298+
299+ refCounts int64
300+ finalize func (interface {})
301+
302+ mu sync.Mutex
303+ }
304+
305+ func (o * object ) use () {
306+ o .mu .Lock ()
307+ defer o .mu .Unlock ()
308+ o .refCounts ++
309+ }
310+
311+ func (o * object ) release () {
312+ o .mu .Lock ()
313+ defer o .mu .Unlock ()
314+ o .refCounts --
315+ if o .refCounts <= 0 && o .finalize != nil {
316+ // nobody will refer this object
317+ o .finalize (o .v )
318+ }
319+ }
320+
152321func NewMemoryCache () BlobCache {
153322 return & memoryCache {
154323 membuf : map [string ]string {},
@@ -161,19 +330,19 @@ type memoryCache struct {
161330 mu sync.Mutex
162331}
163332
164- func (mc * memoryCache ) Fetch ( blobHash string ) ( []byte , error ) {
333+ func (mc * memoryCache ) FetchAt ( key string , offset int64 , p []byte , opts ... Option ) ( n int , err error ) {
165334 mc .mu .Lock ()
166335 defer mc .mu .Unlock ()
167336
168- cache , ok := mc .membuf [blobHash ]
337+ cache , ok := mc .membuf [key ]
169338 if ! ok {
170- return nil , fmt .Errorf ("Missed cache: %q" , blobHash )
339+ return 0 , fmt .Errorf ("Missed cache: %q" , key )
171340 }
172- return [] byte ( cache ), nil
341+ return copy ( p , cache [ offset :] ), nil
173342}
174343
175- func (mc * memoryCache ) Add (blobHash string , p []byte ) {
344+ func (mc * memoryCache ) Add (key string , p []byte , opts ... Option ) {
176345 mc .mu .Lock ()
177346 defer mc .mu .Unlock ()
178- mc .membuf [blobHash ] = string (p )
347+ mc .membuf [key ] = string (p )
179348}
0 commit comments