Skip to content

Commit 15487a4

Browse files
committed
ObjectIter: use a pipeline
1 parent 3489e7d commit 15487a4

File tree

2 files changed

+151
-154
lines changed

2 files changed

+151
-154
lines changed

git/obj_iter.go

Lines changed: 119 additions & 100 deletions
Original file line numberDiff line numberDiff line change
@@ -2,128 +2,147 @@ package git
22

33
import (
44
"bufio"
5+
"context"
56
"fmt"
67
"io"
7-
"os"
8-
"os/exec"
8+
9+
"github.com/github/git-sizer/internal/pipe"
910
)
1011

1112
// ObjectIter iterates over objects in a Git repository.
1213
type ObjectIter struct {
13-
cmd1 *exec.Cmd
14-
cmd2 *exec.Cmd
15-
out1 io.ReadCloser
16-
out2 io.ReadCloser
17-
f *bufio.Reader
18-
errChan <-chan error
14+
ctx context.Context
15+
p *pipe.Pipeline
16+
oidCh chan OID
17+
errCh chan error
18+
headerCh chan BatchHeader
1919
}
2020

2121
// NewObjectIter returns an iterator that iterates over objects in
2222
// `repo`. The arguments are passed to `git rev-list --objects`. The
2323
// second return value is the stdin of the `rev-list` command. The
2424
// caller can feed values into it but must close it in any case.
25-
func (repo *Repository) NewObjectIter(
26-
args ...string,
27-
) (*ObjectIter, io.WriteCloser, error) {
28-
cmd1 := repo.GitCommand(append([]string{"rev-list", "--objects"}, args...)...)
29-
in1, err := cmd1.StdinPipe()
30-
if err != nil {
31-
return nil, nil, err
32-
}
33-
34-
out1, err := cmd1.StdoutPipe()
35-
if err != nil {
36-
return nil, nil, err
37-
}
38-
39-
cmd1.Stderr = os.Stderr
40-
41-
err = cmd1.Start()
42-
if err != nil {
43-
return nil, nil, err
44-
}
45-
46-
cmd2 := repo.GitCommand("cat-file", "--batch-check", "--buffer")
47-
in2, err := cmd2.StdinPipe()
48-
if err != nil {
49-
out1.Close()
50-
cmd1.Wait()
51-
return nil, nil, err
52-
}
53-
54-
out2, err := cmd2.StdoutPipe()
55-
if err != nil {
56-
in2.Close()
57-
out1.Close()
58-
cmd1.Wait()
59-
return nil, nil, err
25+
func (repo *Repository) NewObjectIter(ctx context.Context, args ...string) (*ObjectIter, error) {
26+
iter := ObjectIter{
27+
ctx: ctx,
28+
p: pipe.New(),
29+
oidCh: make(chan OID),
30+
errCh: make(chan error),
31+
headerCh: make(chan BatchHeader),
6032
}
6133

62-
cmd2.Stderr = os.Stderr
34+
iter.p.Add(
35+
// Read OIDs from `iter.oidCh` and write them to `git
36+
// rev-list`:
37+
pipe.Function(
38+
"request-objects",
39+
func(ctx context.Context, _ pipe.Env, _ io.Reader, stdout io.Writer) error {
40+
out := bufio.NewWriter(stdout)
41+
42+
for {
43+
select {
44+
case oid, ok := <-iter.oidCh:
45+
if !ok {
46+
return out.Flush()
47+
}
48+
if _, err := fmt.Fprintln(out, oid.String()); err != nil {
49+
return fmt.Errorf("writing to 'git cat-file': %w", err)
50+
}
51+
case <-ctx.Done():
52+
return ctx.Err()
53+
}
54+
}
55+
},
56+
),
57+
58+
// Walk starting at the OIDs on `stdin` and output the OIDs
59+
// (possibly followed by paths) of all of the Git objects
60+
// found.
61+
pipe.CommandStage(
62+
"git-rev-list",
63+
repo.GitCommand("rev-list", "--objects", "--stdin", "--date-order"),
64+
),
65+
66+
// Read the output of `git rev-list --objects`, strip off any
67+
// trailing information, and write the OIDs to `git cat-file`:
68+
pipe.LinewiseFunction(
69+
"copy-oids",
70+
func(_ context.Context, _ pipe.Env, line []byte, stdout *bufio.Writer) error {
71+
if len(line) < 40 {
72+
return fmt.Errorf("line too short: '%s'", line)
73+
}
74+
if _, err := stdout.Write(line[:40]); err != nil {
75+
return fmt.Errorf("writing OID to 'git cat-file': %w", err)
76+
}
77+
if err := stdout.WriteByte('\n'); err != nil {
78+
return fmt.Errorf("writing LF to 'git cat-file': %w", err)
79+
}
80+
return nil
81+
},
82+
),
83+
84+
// Process the OIDs from stdin and, for each object, output a
85+
// header:
86+
pipe.CommandStage(
87+
"git-cat-file",
88+
repo.GitCommand("cat-file", "--batch-check", "--buffer"),
89+
),
90+
91+
// Parse the object headers and shove them into `headerCh`:
92+
pipe.Function(
93+
"object-parser",
94+
func(ctx context.Context, _ pipe.Env, stdin io.Reader, _ io.Writer) error {
95+
defer close(iter.headerCh)
96+
97+
f := bufio.NewReader(stdin)
98+
99+
for {
100+
header, err := f.ReadString('\n')
101+
if err != nil {
102+
if err == io.EOF {
103+
return nil
104+
}
105+
return fmt.Errorf("reading from 'git cat-file': %w", err)
106+
}
107+
batchHeader, err := ParseBatchHeader("", header)
108+
if err != nil {
109+
return fmt.Errorf("parsing output of 'git cat-file': %w", err)
110+
}
111+
112+
iter.headerCh <- batchHeader
113+
}
114+
},
115+
),
116+
)
63117

64-
err = cmd2.Start()
65-
if err != nil {
66-
return nil, nil, err
118+
if err := iter.p.Start(ctx); err != nil {
119+
return nil, err
67120
}
68121

69-
errChan := make(chan error, 1)
70-
71-
go func() {
72-
defer in2.Close()
73-
f1 := bufio.NewReader(out1)
74-
f2 := bufio.NewWriter(in2)
75-
defer f2.Flush()
76-
for {
77-
line, err := f1.ReadString('\n')
78-
if err != nil {
79-
if err != io.EOF {
80-
errChan <- err
81-
} else {
82-
errChan <- nil
83-
}
84-
return
85-
}
86-
if len(line) <= 40 {
87-
errChan <- fmt.Errorf("line too short: %#v", line)
88-
}
89-
f2.WriteString(line[:40])
90-
f2.WriteByte('\n')
91-
}
92-
}()
93-
94-
return &ObjectIter{
95-
cmd1: cmd1,
96-
cmd2: cmd2,
97-
out1: out1,
98-
out2: out2,
99-
f: bufio.NewReader(out2),
100-
errChan: errChan,
101-
}, in1, nil
122+
return &iter, nil
102123
}
103124

104-
// Next returns the next object: its OID, type, and size. When no more
105-
// data are available, it returns an `io.EOF` error.
106-
func (iter *ObjectIter) Next() (BatchHeader, error) {
107-
line, err := iter.f.ReadString('\n')
108-
if err != nil {
109-
return missingHeader, err
125+
// AddRoot adds another OID to be included in the walk.
126+
func (iter *ObjectIter) AddRoot(oid OID) error {
127+
select {
128+
case iter.oidCh <- oid:
129+
return nil
130+
case <-iter.ctx.Done():
131+
return iter.ctx.Err()
110132
}
111-
112-
return ParseBatchHeader("", line)
113133
}
114134

115135
// Close closes the iterator and frees up resources.
116-
func (iter *ObjectIter) Close() error {
117-
iter.out1.Close()
118-
err := <-iter.errChan
119-
iter.out2.Close()
120-
err2 := iter.cmd1.Wait()
121-
if err == nil {
122-
err = err2
123-
}
124-
err2 = iter.cmd2.Wait()
125-
if err == nil {
126-
err = err2
136+
func (iter *ObjectIter) Close() {
137+
close(iter.oidCh)
138+
}
139+
140+
// Next returns either the next object (its OID, type, and size), or a
141+
// `false` boolean value to indicate that there are no data left.
142+
func (iter *ObjectIter) Next() (BatchHeader, bool, error) {
143+
header, ok := <-iter.headerCh
144+
if !ok {
145+
return missingHeader, false, iter.p.Wait()
127146
}
128-
return err
147+
return header, true, nil
129148
}

sizes/graph.go

Lines changed: 32 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,9 @@
11
package sizes
22

33
import (
4-
"bufio"
54
"context"
65
"errors"
76
"fmt"
8-
"io"
97
"sync"
108

119
"github.com/github/git-sizer/counts"
@@ -78,62 +76,48 @@ func ScanRepositoryUsingGraph(
7876
return HistorySize{}, err
7977
}
8078

81-
iter, in, err := repo.NewObjectIter("--stdin", "--date-order")
79+
objIter, err := repo.NewObjectIter(context.TODO())
8280
if err != nil {
8381
return HistorySize{}, err
8482
}
85-
defer func() {
86-
if iter != nil {
87-
iter.Close()
88-
}
89-
}()
9083

9184
errChan := make(chan error, 1)
9285
var refsSeen []refSeen
9386
// Feed the references that we want into the stdin of the object
9487
// iterator:
9588
go func() {
96-
defer in.Close()
97-
bufin := bufio.NewWriter(in)
98-
defer bufin.Flush()
99-
100-
for {
101-
ref, ok, err := refIter.Next()
102-
if err != nil {
103-
errChan <- err
104-
return
105-
}
106-
if !ok {
107-
break
108-
}
89+
defer objIter.Close()
10990

110-
walk, groups := rg.Categorize(ref.Refname)
91+
errChan <- func() error {
92+
for {
93+
ref, ok, err := refIter.Next()
94+
if err != nil {
95+
return err
96+
}
97+
if !ok {
98+
return nil
99+
}
111100

112-
refsSeen = append(
113-
refsSeen,
114-
refSeen{
115-
Reference: ref,
116-
walked: walk,
117-
groups: groups,
118-
},
119-
)
101+
walk, groups := rg.Categorize(ref.Refname)
120102

121-
if !walk {
122-
continue
123-
}
103+
refsSeen = append(
104+
refsSeen,
105+
refSeen{
106+
Reference: ref,
107+
walked: walk,
108+
groups: groups,
109+
},
110+
)
124111

125-
_, err = bufin.WriteString(ref.OID.String())
126-
if err != nil {
127-
errChan <- err
128-
return
129-
}
130-
err = bufin.WriteByte('\n')
131-
if err != nil {
132-
errChan <- err
133-
return
112+
if !walk {
113+
continue
114+
}
115+
116+
if err := objIter.AddRoot(ref.OID); err != nil {
117+
return err
118+
}
134119
}
135-
}
136-
errChan <- err
120+
}()
137121
}()
138122

139123
type ObjectHeader struct {
@@ -192,11 +176,11 @@ func ScanRepositoryUsingGraph(
192176

193177
progressMeter.Start("Processing blobs: %d")
194178
for {
195-
obj, err := iter.Next()
179+
obj, ok, err := objIter.Next()
196180
if err != nil {
197-
if err != io.EOF {
198-
return HistorySize{}, err
199-
}
181+
return HistorySize{}, err
182+
}
183+
if !ok {
200184
break
201185
}
202186
switch obj.ObjectType {
@@ -220,12 +204,6 @@ func ScanRepositoryUsingGraph(
220204
return HistorySize{}, err
221205
}
222206

223-
err = iter.Close()
224-
iter = nil
225-
if err != nil {
226-
return HistorySize{}, err
227-
}
228-
229207
objectIter, err := repo.NewBatchObjectIter(ctx)
230208
if err != nil {
231209
return HistorySize{}, err

0 commit comments

Comments
 (0)