Skip to content

Commit b521b1b

Browse files
author
abushwang
committed
persist metadata to skip remote reads on restart in DB mode
Signed-off-by: abushwang <abushwang@tencent.com>
1 parent 1966c61 commit b521b1b

File tree

7 files changed

+382
-34
lines changed

7 files changed

+382
-34
lines changed
Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,185 @@
1+
/*
2+
Copyright The containerd Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package db
18+
19+
import (
20+
"context"
21+
"encoding/binary"
22+
"fmt"
23+
"sync"
24+
25+
"github.com/containerd/stargz-snapshotter/estargz"
26+
"github.com/containerd/stargz-snapshotter/estargz/externaltoc"
27+
"github.com/containerd/stargz-snapshotter/estargz/zstdchunked"
28+
"github.com/containerd/stargz-snapshotter/metadata"
29+
digest "github.com/opencontainers/go-digest"
30+
bolt "go.etcd.io/bbolt"
31+
)
32+
33+
var (
34+
// filesystem-level metadata (stored in the filesystem bucket)
35+
bucketKeyReady = []byte("ready")
36+
bucketKeyRootID = []byte("rootID")
37+
bucketKeyTOCDigest = []byte("tocDigest")
38+
bucketKeyDecompressor = []byte("decompressor")
39+
)
40+
41+
var initLocks sync.Map
42+
43+
func withInitLock(key string, fn func() (metadata.Reader, error)) (metadata.Reader, error) {
44+
muIface, _ := initLocks.LoadOrStore(key, &sync.Mutex{})
45+
mu := muIface.(*sync.Mutex)
46+
mu.Lock()
47+
defer mu.Unlock()
48+
return fn()
49+
}
50+
51+
func decompressorKey(d metadata.Decompressor) string {
52+
switch d.(type) {
53+
case *estargz.GzipDecompressor:
54+
return "gzip"
55+
case *estargz.LegacyGzipDecompressor:
56+
return "legacy-gzip"
57+
case *zstdchunked.Decompressor:
58+
return "zstdchunked"
59+
case *externaltoc.GzipDecompressor:
60+
return "externaltoc-gzip"
61+
default:
62+
return ""
63+
}
64+
}
65+
66+
func findDecompressorByKey(ds []metadata.Decompressor, key string) metadata.Decompressor {
67+
if key == "" {
68+
return nil
69+
}
70+
for _, d := range ds {
71+
if decompressorKey(d) == key {
72+
return d
73+
}
74+
}
75+
return nil
76+
}
77+
78+
type persistedFSInfo struct {
79+
rootID uint32
80+
tocDigest digest.Digest
81+
decompressorKey string
82+
}
83+
84+
func loadPersisted(db *bolt.DB, fsID string) (*persistedFSInfo, error) {
85+
var info *persistedFSInfo
86+
err := db.View(func(tx *bolt.Tx) error {
87+
filesystems := tx.Bucket(bucketKeyFilesystems)
88+
if filesystems == nil {
89+
return nil
90+
}
91+
lbkt := filesystems.Bucket([]byte(fsID))
92+
if lbkt == nil {
93+
return nil
94+
}
95+
if !decodeBool(lbkt.Get(bucketKeyReady)) {
96+
return nil
97+
}
98+
rootU, _ := binary.Uvarint(lbkt.Get(bucketKeyRootID))
99+
tocStr := string(lbkt.Get(bucketKeyTOCDigest))
100+
decompKey := string(lbkt.Get(bucketKeyDecompressor))
101+
if rootU == 0 || tocStr == "" || decompKey == "" {
102+
return nil
103+
}
104+
tocD, err := digest.Parse(tocStr)
105+
if err != nil {
106+
return nil
107+
}
108+
info = &persistedFSInfo{
109+
rootID: uint32(rootU),
110+
tocDigest: tocD,
111+
decompressorKey: decompKey,
112+
}
113+
return nil
114+
})
115+
if err != nil {
116+
return nil, err
117+
}
118+
return info, nil
119+
}
120+
121+
func decodeBool(b []byte) bool {
122+
if len(b) == 0 {
123+
return false
124+
}
125+
v, _ := binary.Uvarint(b)
126+
return v != 0
127+
}
128+
129+
func (r *reader) markReady(decompKey string) error {
130+
return r.db.Batch(func(tx *bolt.Tx) error {
131+
filesystems := tx.Bucket(bucketKeyFilesystems)
132+
if filesystems == nil {
133+
return fmt.Errorf("filesystems bucket not found")
134+
}
135+
lbkt := filesystems.Bucket([]byte(r.fsID))
136+
if lbkt == nil {
137+
return fmt.Errorf("filesystem bucket %q not found", r.fsID)
138+
}
139+
if err := lbkt.Put(bucketKeyTOCDigest, []byte(r.tocDigest.String())); err != nil {
140+
return err
141+
}
142+
if err := lbkt.Put(bucketKeyDecompressor, []byte(decompKey)); err != nil {
143+
return err
144+
}
145+
one, err := encodeUint(1)
146+
if err != nil {
147+
return err
148+
}
149+
return lbkt.Put(bucketKeyReady, one)
150+
})
151+
}
152+
153+
// Prune removes filesystem buckets not included in keep.
154+
// Keys of keep must be the same as the filesystem bucket keys (e.g. layer digest strings).
155+
func Prune(_ context.Context, db *bolt.DB, keep map[string]struct{}) error {
156+
return db.Batch(func(tx *bolt.Tx) error {
157+
filesystems := tx.Bucket(bucketKeyFilesystems)
158+
if filesystems == nil {
159+
return nil
160+
}
161+
162+
var delKeys [][]byte
163+
if err := filesystems.ForEach(func(k, v []byte) error {
164+
if v != nil {
165+
return nil
166+
}
167+
if _, ok := keep[string(k)]; ok {
168+
return nil
169+
}
170+
kk := make([]byte, len(k))
171+
copy(kk, k)
172+
delKeys = append(delKeys, kk)
173+
return nil
174+
}); err != nil {
175+
return err
176+
}
177+
178+
for _, k := range delKeys {
179+
if err := filesystems.DeleteBucket(k); err != nil {
180+
return err
181+
}
182+
}
183+
return nil
184+
})
185+
}

cmd/containerd-stargz-grpc/db/reader.go

Lines changed: 94 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@ import (
3838
digest "github.com/opencontainers/go-digest"
3939
"github.com/rs/xid"
4040
bolt "go.etcd.io/bbolt"
41-
errbolt "go.etcd.io/bbolt/errors"
4241
"golang.org/x/sync/errgroup"
4342
)
4443

@@ -90,6 +89,43 @@ func NewReader(db *bolt.DB, sr *io.SectionReader, opts ...metadata.Option) (meta
9089
fetchSize = sr.Size() - maybeTocOffset
9190
}
9291

92+
fsID := ""
93+
if rOpts.LayerDigest != "" {
94+
fsID = rOpts.LayerDigest.String()
95+
}
96+
if fsID != "" {
97+
return withInitLock(fsID, func() (metadata.Reader, error) {
98+
if info, err := loadPersisted(db, fsID); err != nil {
99+
return nil, err
100+
} else if info != nil {
101+
d := findDecompressorByKey(decompressors, info.decompressorKey)
102+
if d != nil {
103+
return &reader{
104+
sr: sr,
105+
db: db,
106+
fsID: fsID,
107+
rootID: info.rootID,
108+
tocDigest: info.tocDigest,
109+
initG: new(errgroup.Group),
110+
decompressor: d,
111+
}, nil
112+
}
113+
}
114+
return newReaderFromRemote(db, sr, fetchSize, decompressors, rOpts, fsID)
115+
})
116+
}
117+
118+
return newReaderFromRemote(db, sr, fetchSize, decompressors, rOpts, "")
119+
}
120+
121+
func newReaderFromRemote(
122+
db *bolt.DB,
123+
sr *io.SectionReader,
124+
fetchSize int64,
125+
decompressors []metadata.Decompressor,
126+
rOpts metadata.Options,
127+
fsID string,
128+
) (metadata.Reader, error) {
93129
start := time.Now() // before getting layer footer
94130
footer := make([]byte, fetchSize)
95131
if _, err := sr.ReadAt(footer, sr.Size()-fetchSize); err != nil {
@@ -135,7 +171,7 @@ func NewReader(db *bolt.DB, sr *io.SectionReader, opts ...metadata.Option) (meta
135171
}
136172
defer tocR.Close()
137173
r := &reader{sr: sr, db: db, initG: new(errgroup.Group), decompressor: decompressor}
138-
if err := r.init(tocR, rOpts); err != nil {
174+
if err := r.initWithFSID(tocR, rOpts, fsID, decompressorKey(decompressor)); err != nil {
139175
return nil, fmt.Errorf("failed to initialize matadata: %w", err)
140176
}
141177
return r, nil
@@ -217,23 +253,23 @@ func (r *reader) Clone(sr *io.SectionReader) (metadata.Reader, error) {
217253
}
218254

219255
func (r *reader) init(decompressedR io.Reader, rOpts metadata.Options) (retErr error) {
256+
return r.initWithFSID(decompressedR, rOpts, "", "")
257+
}
258+
259+
func (r *reader) initWithFSID(
260+
decompressedR io.Reader,
261+
rOpts metadata.Options,
262+
fsID string,
263+
decompKey string,
264+
) (retErr error) {
220265
start := time.Now() // before parsing TOC JSON
221266

222267
// Initialize root node
223-
var ok bool
224-
for i := 0; i < 100; i++ {
225-
fsID := xid.New().String()
226-
if err := r.initRootNode(fsID); err != nil {
227-
if errors.Is(err, errbolt.ErrBucketExists) {
228-
continue // try with another id
229-
}
230-
return fmt.Errorf("failed to initialize root node %q: %w", fsID, err)
231-
}
232-
ok = true
233-
break
268+
if fsID == "" {
269+
fsID = xid.New().String()
234270
}
235-
if !ok {
236-
return fmt.Errorf("failed to get a unique id for metadata reader")
271+
if err := r.initRootNode(fsID); err != nil {
272+
return fmt.Errorf("failed to initialize root node %q: %w", fsID, err)
237273
}
238274

239275
f, err := os.CreateTemp("", "")
@@ -278,6 +314,11 @@ func (r *reader) init(decompressedR io.Reader, rOpts metadata.Options) (retErr e
278314
if err := r.initNodes(f); err != nil {
279315
return err
280316
}
317+
if r.fsID != "" && decompKey != "" {
318+
if err := r.markReady(decompKey); err != nil {
319+
return err
320+
}
321+
}
281322
if rOpts.Telemetry != nil && rOpts.Telemetry.DeserializeTocLatency != nil {
282323
rOpts.Telemetry.DeserializeTocLatency(start)
283324
}
@@ -292,6 +333,26 @@ func (r *reader) initRootNode(fsID string) error {
292333
if err != nil {
293334
return err
294335
}
336+
if existing := filesystems.Bucket([]byte(fsID)); existing != nil {
337+
if decodeBool(existing.Get(bucketKeyReady)) {
338+
tocStr := string(existing.Get(bucketKeyTOCDigest))
339+
decompKey := string(existing.Get(bucketKeyDecompressor))
340+
rootU, _ := binary.Uvarint(existing.Get(bucketKeyRootID))
341+
if tocStr != "" && decompKey != "" && rootU != 0 {
342+
r.fsID = fsID
343+
r.rootID = uint32(rootU)
344+
tocD, err := digest.Parse(tocStr)
345+
if err != nil {
346+
return fmt.Errorf("invalid persisted TOC digest: %w", err)
347+
}
348+
r.tocDigest = tocD
349+
return nil
350+
}
351+
}
352+
if err := filesystems.DeleteBucket([]byte(fsID)); err != nil {
353+
return err
354+
}
355+
}
295356
lbkt, err := filesystems.CreateBucket([]byte(fsID))
296357
if err != nil {
297358
return err
@@ -322,6 +383,20 @@ func (r *reader) initRootNode(fsID string) error {
322383
return err
323384
}
324385
r.rootID = rootID
386+
rootEnc, err := encodeUint(uint64(rootID))
387+
if err != nil {
388+
return err
389+
}
390+
if err := lbkt.Put(bucketKeyRootID, rootEnc); err != nil {
391+
return err
392+
}
393+
zero, err := encodeUint(0)
394+
if err != nil {
395+
return err
396+
}
397+
if err := lbkt.Put(bucketKeyReady, zero); err != nil {
398+
return err
399+
}
325400
return err
326401
})
327402
}
@@ -645,15 +720,11 @@ func (r *reader) update(fn func(tx *bolt.Tx) error) error {
645720
})
646721
}
647722

648-
// Close closes this reader. This removes underlying filesystem metadata as well.
723+
// This reader is backed by a persistent metadata DB keyed by layer digest.
724+
// Metadata lifecycle is managed by snapshot removal/cleanup (Prune), so Close only
725+
// waits for initialization to finish to avoid leaving partially-initialized state.
649726
func (r *reader) Close() error {
650-
return r.update(func(tx *bolt.Tx) (err error) {
651-
filesystems := tx.Bucket(bucketKeyFilesystems)
652-
if filesystems == nil {
653-
return nil
654-
}
655-
return filesystems.DeleteBucket([]byte(r.fsID))
656-
})
727+
return r.waitInit()
657728
}
658729

659730
// GetOffset returns an offset of a node.

0 commit comments

Comments
 (0)