Skip to content

Commit 90ce2aa

Browse files
authored
[PBM-1405] dump v2 (#1035)
1 parent 489a24b commit 90ce2aa

File tree

14 files changed

+574
-2224
lines changed

14 files changed

+574
-2224
lines changed

pbm/archive/archive.go

Lines changed: 6 additions & 168 deletions
Original file line numberDiff line numberDiff line change
@@ -38,50 +38,19 @@ const MaxBSONSize = db.MaxBSONSize
3838
var terminatorBytes = []byte{0xFF, 0xFF, 0xFF, 0xFF}
3939

4040
type (
41-
NSFilterFn func(ns string) bool
41+
// NSFilterFn checks whether a namespace is selected for backup.
42+
// Useful when only some namespaces are selected for backup.
43+
NSFilterFn func(ns string) bool
44+
45+
// DocFilter checks whether a document is selected for backup.
46+
// Useful when only some documents are selected for backup.
4247
DocFilterFn func(ns string, d bson.Raw) bool
4348
)
4449

4550
func DefaultNSFilter(string) bool { return true }
4651

4752
func DefaultDocFilter(string, bson.Raw) bool { return true }
4853

49-
func Decompose(r io.Reader, newWriter NewWriter, nsFilter NSFilterFn, docFilter DocFilterFn) error {
50-
meta, err := readPrelude(r)
51-
if err != nil {
52-
return errors.Wrap(err, "prelude")
53-
}
54-
55-
if nsFilter == nil {
56-
nsFilter = DefaultNSFilter
57-
}
58-
if docFilter == nil {
59-
docFilter = DefaultDocFilter
60-
}
61-
62-
c := newConsumer(newWriter, nsFilter, docFilter)
63-
if err := (&archive.Parser{In: r}).ReadAllBlocks(c); err != nil {
64-
return errors.Wrap(err, "archive parser")
65-
}
66-
67-
// save metadata for selected namespaces only
68-
nss := make([]*Namespace, 0, len(meta.Namespaces))
69-
for _, n := range meta.Namespaces {
70-
ns := NSify(n.Database, n.Collection)
71-
if !nsFilter(ns) {
72-
continue
73-
}
74-
75-
n.CRC = c.crc[ns]
76-
n.Size = c.size[ns]
77-
nss = append(nss, n)
78-
}
79-
meta.Namespaces = nss
80-
81-
err = writeMetadata(meta, newWriter)
82-
return errors.Wrap(err, "metadata")
83-
}
84-
8554
func Compose(w io.Writer, newReader NewReader, nsFilter NSFilterFn, concurrency int) error {
8655
meta, err := readMetadata(newReader)
8756
if err != nil {
@@ -113,22 +82,6 @@ func Compose(w io.Writer, newReader NewReader, nsFilter NSFilterFn, concurrency
11382
return errors.Wrap(err, "write namespaces")
11483
}
11584

116-
func readPrelude(r io.Reader) (*archiveMeta, error) {
117-
prelude := archive.Prelude{}
118-
err := prelude.Read(r)
119-
if err != nil {
120-
return nil, errors.Wrap(err, "read")
121-
}
122-
123-
m := &archiveMeta{Header: prelude.Header}
124-
m.Namespaces = make([]*Namespace, len(prelude.NamespaceMetadatas))
125-
for i, n := range prelude.NamespaceMetadatas {
126-
m.Namespaces[i] = &Namespace{CollectionMetadata: n}
127-
}
128-
129-
return m, nil
130-
}
131-
13285
func writePrelude(w io.Writer, m *archiveMeta) error {
13386
prelude := archive.Prelude{Header: m.Header}
13487
prelude.NamespaceMetadatas = make([]*archive.CollectionMetadata, len(m.Namespaces))
@@ -290,21 +243,6 @@ func closeChunk(w io.Writer, ns *Namespace) error {
290243
return errors.Wrap(err, "terminator")
291244
}
292245

293-
func writeMetadata(meta *archiveMeta, newWriter NewWriter) error {
294-
w, err := newWriter(MetaFile)
295-
if err != nil {
296-
return errors.Wrap(err, "new writer")
297-
}
298-
defer w.Close()
299-
300-
data, err := bson.MarshalExtJSONIndent(meta, true, true, "", "\t")
301-
if err != nil {
302-
return errors.Wrap(err, "marshal")
303-
}
304-
305-
return SecureWrite(w, data)
306-
}
307-
308246
func readMetadata(newReader NewReader) (*archiveMeta, error) {
309247
r, err := newReader(MetaFile)
310248
if err != nil {
@@ -326,106 +264,6 @@ func ReadMetadata(r io.Reader) (*archiveMeta, error) {
326264
return meta, errors.Wrap(err, "unmarshal")
327265
}
328266

329-
type consumer struct {
330-
open NewWriter
331-
nsFilter NSFilterFn
332-
docFilter DocFilterFn
333-
nss map[string]io.WriteCloser
334-
crc map[string]int64
335-
size map[string]int64
336-
curr string
337-
}
338-
339-
func newConsumer(newWriter NewWriter, nsFilter NSFilterFn, docFilter DocFilterFn) *consumer {
340-
return &consumer{
341-
open: newWriter,
342-
nsFilter: nsFilter,
343-
docFilter: docFilter,
344-
nss: make(map[string]io.WriteCloser),
345-
crc: make(map[string]int64),
346-
size: make(map[string]int64),
347-
}
348-
}
349-
350-
func (c *consumer) HeaderBSON(data []byte) error {
351-
h := &archive.NamespaceHeader{}
352-
if err := bson.Unmarshal(data, h); err != nil {
353-
return errors.Wrap(err, "unmarshal")
354-
}
355-
356-
ns := NSify(h.Database, h.Collection)
357-
if !c.nsFilter(ns) {
358-
// non-selected namespace. ignore
359-
c.curr = ""
360-
return nil
361-
}
362-
363-
if !h.EOF {
364-
c.curr = ns
365-
return nil
366-
}
367-
368-
c.crc[ns] = h.CRC
369-
370-
w := c.nss[ns]
371-
if w == nil {
372-
return nil
373-
}
374-
375-
delete(c.nss, ns)
376-
return errors.Wrap(w.Close(), "close")
377-
}
378-
379-
func (c *consumer) BodyBSON(data []byte) error {
380-
ns := c.curr
381-
if ns == "" {
382-
// ignored ns. skip data loading/reading
383-
return nil
384-
}
385-
if !c.docFilter(ns, bson.Raw(data)) {
386-
// ignored doc. skip data loading/reading
387-
return nil
388-
}
389-
390-
w := c.nss[ns]
391-
if w == nil {
392-
var err error
393-
w, err = c.open(ns)
394-
if err != nil {
395-
return errors.Wrapf(err, "open: %q", ns)
396-
}
397-
398-
c.nss[ns] = w
399-
}
400-
401-
c.size[ns] += int64(len(data))
402-
return errors.Wrapf(SecureWrite(w, data), "%q", ns)
403-
}
404-
405-
func (c *consumer) End() error {
406-
errs := []error{}
407-
408-
wg := &sync.WaitGroup{}
409-
mu := &sync.Mutex{}
410-
411-
wg.Add(len(c.nss))
412-
for ns, w := range c.nss {
413-
go func() {
414-
defer wg.Done()
415-
416-
err := w.Close()
417-
if err != nil {
418-
mu.Lock()
419-
errs = append(errs, errors.Wrapf(err, "close: %q", ns))
420-
mu.Unlock()
421-
}
422-
}()
423-
}
424-
425-
wg.Wait()
426-
return errors.Join(errs...)
427-
}
428-
429267
func SecureWrite(w io.Writer, data []byte) error {
430268
n, err := w.Write(data)
431269
if err != nil {

0 commit comments

Comments
 (0)