@@ -18,13 +18,13 @@ package cache
1818
1919import (
2020 "bytes"
21- "errors"
2221 "fmt"
2322 "io"
2423 "os"
2524 "path/filepath"
2625 "sync"
2726
27+ "github.com/containerd/log"
2828 "github.com/containerd/stargz-snapshotter/util/cacheutil"
2929 "github.com/containerd/stargz-snapshotter/util/namedmutex"
3030)
@@ -61,6 +61,9 @@ type DirectoryCacheConfig struct {
6161 // Direct forcefully enables direct mode for all operation in cache.
6262 // Thus operation won't use on-memory caches.
6363 Direct bool
64+
65+ // EnableHardlink enables hardlinking of cache files to reduce memory usage
66+ EnableHardlink bool
6467}
6568
6669// TODO: contents validation.
@@ -99,6 +102,7 @@ type Writer interface {
99102type cacheOpt struct {
100103 direct bool
101104 passThrough bool
105+ chunkDigest string
102106}
103107
104108type Option func (o * cacheOpt ) * cacheOpt
@@ -123,6 +127,14 @@ func PassThrough() Option {
123127 }
124128}
125129
130+ // ChunkDigest option allows specifying a chunk digest for the cache
131+ func ChunkDigest (digest string ) Option {
132+ return func (o * cacheOpt ) * cacheOpt {
133+ o .chunkDigest = digest
134+ return o
135+ }
136+ }
137+
126138func NewDirectoryCache (directory string , config DirectoryCacheConfig ) (BlobCache , error ) {
127139 if ! filepath .IsAbs (directory ) {
128140 return nil , fmt .Errorf ("dir cache path must be an absolute path; got %q" , directory )
@@ -166,15 +178,24 @@ func NewDirectoryCache(directory string, config DirectoryCacheConfig) (BlobCache
166178 return nil , err
167179 }
168180 dc := & directoryCache {
169- cache : dataCache ,
170- fileCache : fdCache ,
171- wipLock : new (namedmutex.NamedMutex ),
172- directory : directory ,
173- wipDirectory : wipdir ,
174- bufPool : bufPool ,
175- direct : config .Direct ,
181+ cache : dataCache ,
182+ fileCache : fdCache ,
183+ wipLock : new (namedmutex.NamedMutex ),
184+ directory : directory ,
185+ wipDirectory : wipdir ,
186+ bufPool : bufPool ,
187+ direct : config .Direct ,
188+ enableHardlink : config .EnableHardlink ,
189+ syncAdd : config .SyncAdd ,
190+ }
191+
192+ // Initialize hardlink manager if enabled
193+ if config .EnableHardlink {
194+ hlManager , enabled := InitializeHardlinkManager (filepath .Dir (filepath .Dir (directory )), config .EnableHardlink )
195+ dc .hlManager = hlManager
196+ dc .enableHardlink = enabled
176197 }
177- dc . syncAdd = config . SyncAdd
198+
178199 return dc , nil
179200}
180201
@@ -193,6 +214,9 @@ type directoryCache struct {
193214
194215 closed bool
195216 closedMu sync.Mutex
217+
218+ enableHardlink bool
219+ hlManager * HardlinkManager
196220}
197221
198222func (dc * directoryCache ) Get (key string , opts ... Option ) (Reader , error ) {
@@ -205,9 +229,15 @@ func (dc *directoryCache) Get(key string, opts ...Option) (Reader, error) {
205229 opt = o (opt )
206230 }
207231
232+ // Try to get from memory cache
208233 if ! dc .direct && ! opt .direct {
209- // Get data from memory
210- if b , done , ok := dc .cache .Get (key ); ok {
234+ // Try memory cache for digest or key
235+ cacheKey := key
236+ if dc .hlManager != nil && dc .hlManager .IsEnabled () && opt .chunkDigest != "" {
237+ cacheKey = opt .chunkDigest
238+ }
239+
240+ if b , done , ok := dc .cache .Get (cacheKey ); ok {
211241 return & reader {
212242 ReaderAt : bytes .NewReader (b .(* bytes.Buffer ).Bytes ()),
213243 closeFunc : func () error {
@@ -217,8 +247,8 @@ func (dc *directoryCache) Get(key string, opts ...Option) (Reader, error) {
217247 }, nil
218248 }
219249
220- // Get data from disk. If the file is already opened, use it.
221- if f , done , ok := dc .fileCache .Get (key ); ok {
250+ // Get data from file cache for digest or key
251+ if f , done , ok := dc .fileCache .Get (cacheKey ); ok {
222252 return & reader {
223253 ReaderAt : f .(* os.File ),
224254 closeFunc : func () error {
@@ -229,24 +259,28 @@ func (dc *directoryCache) Get(key string, opts ...Option) (Reader, error) {
229259 }
230260 }
231261
262+ // First try regular file path
263+ filepath := BuildCachePath (dc .directory , key )
264+
265+ // Check hardlink manager for existing digest file
266+ if dc .hlManager != nil && opt .chunkDigest != "" {
267+ if digestPath , exists := dc .hlManager .ProcessCacheGet (key , opt .chunkDigest , opt .direct ); exists {
268+ log .L .Debugf ("Using existing file for digest %q instead of key %q" , opt .chunkDigest , key )
269+ filepath = digestPath
270+ }
271+ }
272+
232273 // Open the cache file and read the target region
233- // TODO: If the target cache is write-in-progress, should we wait for the completion
234- // or simply report the cache miss?
235- file , err := os .Open (dc .cachePath (key ))
274+ file , err := os .Open (filepath )
236275 if err != nil {
237276 return nil , fmt .Errorf ("failed to open blob file for %q: %w" , key , err )
238277 }
239278
240- // If "direct" option is specified, do not cache the file on memory.
241- // This option is useful for preventing memory cache from being polluted by data
242- // that won't be accessed immediately.
279+ // If in direct mode, don't cache file descriptor
243280 if dc .direct || opt .direct {
244281 return & reader {
245282 ReaderAt : file ,
246283 closeFunc : func () error {
247- // In passthough model, close will be toke over by go-fuse
248- // If "passThrough" option is specified, "direct" option also will
249- // be specified, so adding this branch here is enough
250284 if opt .passThrough {
251285 return nil
252286 }
@@ -255,16 +289,19 @@ func (dc *directoryCache) Get(key string, opts ...Option) (Reader, error) {
255289 }, nil
256290 }
257291
258- // TODO: should we cache the entire file data on memory?
259- // but making I/O (possibly huge) on every fetching
260- // might be costly.
292+ // Cache file descriptor
261293 return & reader {
262294 ReaderAt : file ,
263295 closeFunc : func () error {
264- _ , done , added := dc .fileCache .Add (key , file )
265- defer done () // Release it immediately. Cleaned up on eviction.
296+ cacheKey := key
297+ if dc .hlManager != nil && dc .hlManager .IsEnabled () && opt .chunkDigest != "" {
298+ cacheKey = opt .chunkDigest
299+ }
300+
301+ _ , done , added := dc .fileCache .Add (cacheKey , file )
302+ defer done ()
266303 if ! added {
267- return file .Close () // file already exists in the cache. close it.
304+ return file .Close ()
268305 }
269306 return nil
270307 },
@@ -281,81 +318,74 @@ func (dc *directoryCache) Add(key string, opts ...Option) (Writer, error) {
281318 opt = o (opt )
282319 }
283320
284- wip , err := dc .wipFile (key )
321+ // If hardlink manager exists and digest is provided, check if a hardlink can be created
322+ if dc .hlManager != nil && opt .chunkDigest != "" {
323+ keyPath := BuildCachePath (dc .directory , key )
324+
325+ // Try to create a hardlink from existing digest file
326+ if dc .hlManager .ProcessCacheAdd (key , opt .chunkDigest , keyPath ) {
327+ // Return a no-op writer since the file already exists
328+ return & writer {
329+ WriteCloser : nopWriteCloser (io .Discard ),
330+ commitFunc : func () error { return nil },
331+ abortFunc : func () error { return nil },
332+ }, nil
333+ }
334+ }
335+
336+ // Create temporary file
337+ w , err := WipFile (dc .wipDirectory , key )
285338 if err != nil {
286339 return nil , err
287340 }
288- w := & writer {
289- WriteCloser : wip ,
341+
342+ // Create writer
343+ writer := & writer {
344+ WriteCloser : w ,
290345 commitFunc : func () error {
291346 if dc .isClosed () {
292347 return fmt .Errorf ("cache is already closed" )
293348 }
294- // Commit the cache contents
295- c := dc .cachePath (key )
296- if err := os .MkdirAll (filepath .Dir (c ), os .ModePerm ); err != nil {
297- var errs []error
298- if err := os .Remove (wip .Name ()); err != nil {
299- errs = append (errs , err )
300- }
301- errs = append (errs , fmt .Errorf ("failed to create cache directory %q: %w" , c , err ))
302- return errors .Join (errs ... )
303- }
304- return os .Rename (wip .Name (), c )
305- },
306- abortFunc : func () error {
307- return os .Remove (wip .Name ())
308- },
309- }
310-
311- // If "direct" option is specified, do not cache the passed data on memory.
312- // This option is useful for preventing memory cache from being polluted by data
313- // that won't be accessed immediately.
314- if dc .direct || opt .direct {
315- return w , nil
316- }
317349
318- b := dc .bufPool .Get ().(* bytes.Buffer )
319- memW := & writer {
320- WriteCloser : nopWriteCloser (io .Writer (b )),
321- commitFunc : func () error {
322- if dc .isClosed () {
323- w .Close ()
324- return fmt .Errorf ("cache is already closed" )
350+ // Commit file
351+ targetPath := BuildCachePath (dc .directory , key )
352+ if err := os .MkdirAll (filepath .Dir (targetPath ), 0700 ); err != nil {
353+ return fmt .Errorf ("failed to create cache directory: %w" , err )
325354 }
326- cached , done , added := dc . cache . Add ( key , b )
327- if ! added {
328- dc . putBuffer ( b ) // already exists in the cache. abort it.
355+
356+ if err := os . Rename ( w . Name (), targetPath ); err != nil {
357+ return fmt . Errorf ( "failed to commit cache file: %w" , err )
329358 }
330- commit := func () error {
331- defer done ()
332- defer w .Close ()
333- n , err := w .Write (cached .(* bytes.Buffer ).Bytes ())
334- if err != nil || n != cached .(* bytes.Buffer ).Len () {
335- w .Abort ()
336- return err
359+
360+ // If hardlink manager exists and digest is provided, register the file
361+ if dc .hlManager != nil && dc .hlManager .IsEnabled () && opt .chunkDigest != "" {
362+ // Register this file as the primary source for this digest
363+ if err := dc .hlManager .RegisterDigestFile (opt .chunkDigest , targetPath ); err != nil {
364+ log .L .Debugf ("Failed to register digest file: %v" , err )
337365 }
338- return w .Commit ()
339- }
340- if dc .syncAdd {
341- return commit ()
342- }
343- go func () {
344- if err := commit (); err != nil {
345- fmt .Println ("failed to commit to file:" , err )
366+
367+ // Map key to digest
368+ internalKey := dc .hlManager .GenerateInternalKey (dc .directory , key )
369+ if err := dc .hlManager .MapKeyToDigest (internalKey , opt .chunkDigest ); err != nil {
370+ log .L .Debugf ("Failed to map key to digest: %v" , err )
346371 }
347- }()
372+ }
373+
348374 return nil
349375 },
350376 abortFunc : func () error {
351- defer w .Close ()
352- defer w .Abort ()
353- dc .putBuffer (b ) // abort it.
354- return nil
377+ return os .Remove (w .Name ())
355378 },
356379 }
357380
358- return memW , nil
381+ // Return directly if in direct mode
382+ if dc .direct || opt .direct {
383+ return writer , nil
384+ }
385+
386+ // Create memory cache
387+ b := dc .bufPool .Get ().(* bytes.Buffer )
388+ return dc .wrapMemoryWriter (b , writer , key )
359389}
360390
361391func (dc * directoryCache ) putBuffer (b * bytes.Buffer ) {
@@ -380,14 +410,6 @@ func (dc *directoryCache) isClosed() bool {
380410 return closed
381411}
382412
383- func (dc * directoryCache ) cachePath (key string ) string {
384- return filepath .Join (dc .directory , key [:2 ], key )
385- }
386-
387- func (dc * directoryCache ) wipFile (key string ) (* os.File , error ) {
388- return os .CreateTemp (dc .wipDirectory , key + "-*" )
389- }
390-
391413func NewMemoryCache () BlobCache {
392414 return & MemoryCache {
393415 Membuf : map [string ]* bytes.Buffer {},
@@ -463,3 +485,50 @@ func (w *writeCloser) Close() error { return w.closeFunc() }
463485func nopWriteCloser (w io.Writer ) io.WriteCloser {
464486 return & writeCloser {w , func () error { return nil }}
465487}
488+
489+ // wrapMemoryWriter wraps a writer with memory caching
490+ func (dc * directoryCache ) wrapMemoryWriter (b * bytes.Buffer , w * writer , key string ) (Writer , error ) {
491+ return & writer {
492+ WriteCloser : nopWriteCloser (b ),
493+ commitFunc : func () error {
494+ if dc .isClosed () {
495+ w .Close ()
496+ return fmt .Errorf ("cache is already closed" )
497+ }
498+
499+ cached , done , added := dc .cache .Add (key , b )
500+ if ! added {
501+ dc .putBuffer (b )
502+ }
503+
504+ commit := func () error {
505+ defer done ()
506+ defer w .Close ()
507+
508+ n , err := w .Write (cached .(* bytes.Buffer ).Bytes ())
509+ if err != nil || n != cached .(* bytes.Buffer ).Len () {
510+ w .Abort ()
511+ return err
512+ }
513+ return w .Commit ()
514+ }
515+
516+ if dc .syncAdd {
517+ return commit ()
518+ }
519+
520+ go func () {
521+ if err := commit (); err != nil {
522+ log .L .Infof ("failed to commit to file: %v" , err )
523+ }
524+ }()
525+ return nil
526+ },
527+ abortFunc : func () error {
528+ defer w .Close ()
529+ defer w .Abort ()
530+ dc .putBuffer (b )
531+ return nil
532+ },
533+ }, nil
534+ }
0 commit comments