Skip to content

Commit 3489e7d

Browse files
committed
BatchObjectIter: use a pipeline
1 parent 1fe34af commit 3489e7d

File tree

2 files changed

+160
-117
lines changed

2 files changed

+160
-117
lines changed

git/batch_obj_iter.go

Lines changed: 125 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -2,80 +2,155 @@ package git
22

33
import (
44
"bufio"
5+
"context"
6+
"fmt"
57
"io"
6-
"os"
7-
"os/exec"
8+
9+
"github.com/github/git-sizer/internal/pipe"
810
)
911

12+
type ObjectRecord struct {
13+
BatchHeader
14+
Data []byte
15+
}
16+
1017
// BatchObjectIter iterates over objects whose names are fed into its
1118
// stdin. The output is buffered, so it has to be closed before you
1219
// can be sure that you have gotten all of the objects.
1320
type BatchObjectIter struct {
14-
cmd *exec.Cmd
15-
out io.ReadCloser
16-
f *bufio.Reader
21+
ctx context.Context
22+
p *pipe.Pipeline
23+
oidCh chan OID
24+
objCh chan ObjectRecord
25+
errCh chan error
1726
}
1827

1928
// NewBatchObjectIter returns a `*BatchObjectIterator` and an
2029
// `io.WriteCloser`. The iterator iterates over objects whose names
2130
// are fed into the `io.WriteCloser`, one per line. The
2231
// `io.WriteCloser` should normally be closed and the iterator's
2332
// output drained before `Close()` is called.
24-
func (repo *Repository) NewBatchObjectIter() (*BatchObjectIter, io.WriteCloser, error) {
25-
cmd := repo.GitCommand("cat-file", "--batch", "--buffer")
26-
27-
in, err := cmd.StdinPipe()
28-
if err != nil {
29-
return nil, nil, err
33+
func (repo *Repository) NewBatchObjectIter(ctx context.Context) (*BatchObjectIter, error) {
34+
iter := BatchObjectIter{
35+
ctx: ctx,
36+
p: pipe.New(),
37+
oidCh: make(chan OID),
38+
objCh: make(chan ObjectRecord),
39+
errCh: make(chan error),
3040
}
3141

32-
out, err := cmd.StdoutPipe()
33-
if err != nil {
34-
return nil, nil, err
35-
}
42+
iter.p.Add(
43+
// Read OIDs from `iter.oidCh` and write them to `git
44+
// cat-file`:
45+
pipe.Function(
46+
"request-objects",
47+
func(ctx context.Context, _ pipe.Env, _ io.Reader, stdout io.Writer) error {
48+
out := bufio.NewWriter(stdout)
49+
50+
for {
51+
select {
52+
case oid, ok := <-iter.oidCh:
53+
if !ok {
54+
return out.Flush()
55+
}
56+
if _, err := fmt.Fprintln(out, oid.String()); err != nil {
57+
return fmt.Errorf("writing to 'git cat-file': %w", err)
58+
}
59+
case <-ctx.Done():
60+
return ctx.Err()
61+
}
62+
}
63+
},
64+
),
65+
66+
// Read OIDs from `stdin` and output a header line followed by
67+
// the contents of the corresponding Git objects:
68+
pipe.CommandStage(
69+
"git-cat-file",
70+
repo.GitCommand("cat-file", "--batch", "--buffer"),
71+
),
72+
73+
// Parse the object headers and read the object contents, and
74+
// shove both into `objCh`:
75+
pipe.Function(
76+
"object-reader",
77+
func(ctx context.Context, _ pipe.Env, stdin io.Reader, _ io.Writer) error {
78+
defer close(iter.objCh)
3679

37-
cmd.Stderr = os.Stderr
80+
f := bufio.NewReader(stdin)
3881

39-
err = cmd.Start()
40-
if err != nil {
41-
return nil, nil, err
82+
for {
83+
header, err := f.ReadString('\n')
84+
if err != nil {
85+
if err == io.EOF {
86+
return nil
87+
}
88+
return fmt.Errorf("reading from 'git cat-file': %w", err)
89+
}
90+
batchHeader, err := ParseBatchHeader("", header)
91+
if err != nil {
92+
return fmt.Errorf("parsing output of 'git cat-file': %w", err)
93+
}
94+
95+
// Read the object contents plus the trailing LF
96+
// (which is discarded below while creating the
97+
// `ObjectRecord`):
98+
data := make([]byte, batchHeader.ObjectSize+1)
99+
if _, err := io.ReadFull(f, data); err != nil {
100+
return fmt.Errorf(
101+
"reading object data from 'git cat-file' for %s '%s': %w",
102+
batchHeader.ObjectType, batchHeader.OID, err,
103+
)
104+
}
105+
106+
select {
107+
case iter.objCh <- ObjectRecord{
108+
BatchHeader: batchHeader,
109+
Data: data[:batchHeader.ObjectSize],
110+
}:
111+
case <-iter.ctx.Done():
112+
return iter.ctx.Err()
113+
}
114+
}
115+
},
116+
),
117+
)
118+
119+
if err := iter.p.Start(ctx); err != nil {
120+
return nil, err
42121
}
43122

44-
return &BatchObjectIter{
45-
cmd: cmd,
46-
out: out,
47-
f: bufio.NewReader(out),
48-
}, in, nil
123+
return &iter, nil
49124
}
50125

51-
// Next returns the next object: its OID, type, size, and contents.
52-
// When no more data are available, it returns an `io.EOF` error.
53-
func (iter *BatchObjectIter) Next() (BatchHeader, []byte, error) {
54-
header, err := iter.f.ReadString('\n')
55-
if err != nil {
56-
return missingHeader, nil, err
57-
}
58-
obj, err := ParseBatchHeader("", header)
59-
if err != nil {
60-
return missingHeader, nil, err
126+
// RequestObject requests that the object with the specified `oid` be
127+
// processed. The objects registered via this method can be read using
128+
// `Next()` in the order that they were requested.
129+
func (iter *BatchObjectIter) RequestObject(oid OID) error {
130+
select {
131+
case iter.oidCh <- oid:
132+
return nil
133+
case <-iter.ctx.Done():
134+
return iter.ctx.Err()
61135
}
62-
// +1 for LF:
63-
data := make([]byte, obj.ObjectSize+1)
64-
_, err = io.ReadFull(iter.f, data)
65-
if err != nil {
66-
return missingHeader, nil, err
67-
}
68-
data = data[:len(data)-1]
69-
return obj, data, nil
70136
}
71137

72-
// Close closes the iterator and frees up resources. If any iterator
73-
// output hasn't been read yet, it will be lost.
74-
func (iter *BatchObjectIter) Close() error {
75-
err := iter.out.Close()
76-
err2 := iter.cmd.Wait()
77-
if err == nil {
78-
err = err2
138+
// Close closes the iterator and frees up resources. Close must be
139+
// called exactly once.
140+
func (iter *BatchObjectIter) Close() {
141+
close(iter.oidCh)
142+
}
143+
144+
// Next either returns the next object (its header and contents), or a
145+
// `false` boolean value if no more objects are left. Objects need to
146+
// be read asynchronously, but the last objects won't necessarily show
147+
// up here until `Close()` has been called.
148+
func (iter *BatchObjectIter) Next() (ObjectRecord, bool, error) {
149+
obj, ok := <-iter.objCh
150+
if !ok {
151+
return ObjectRecord{
152+
BatchHeader: missingHeader,
153+
}, false, iter.p.Wait()
79154
}
80-
return err
155+
return obj, true, nil
81156
}

sizes/graph.go

Lines changed: 35 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -68,11 +68,11 @@ func ScanRepositoryUsingGraph(
6868
repo *git.Repository, rg RefGrouper, nameStyle NameStyle,
6969
progressMeter meter.Progress,
7070
) (HistorySize, error) {
71-
graph := NewGraph(rg, nameStyle)
72-
7371
ctx, cancel := context.WithCancel(context.TODO())
7472
defer cancel()
7573

74+
graph := NewGraph(rg, nameStyle)
75+
7676
refIter, err := repo.NewReferenceIter(ctx)
7777
if err != nil {
7878
return HistorySize{}, err
@@ -226,78 +226,52 @@ func ScanRepositoryUsingGraph(
226226
return HistorySize{}, err
227227
}
228228

229-
objectIter, objectIn, err := repo.NewBatchObjectIter()
229+
objectIter, err := repo.NewBatchObjectIter(ctx)
230230
if err != nil {
231231
return HistorySize{}, err
232232
}
233-
defer func() {
234-
if objectIter != nil {
235-
objectIter.Close()
236-
}
237-
}()
238233

239234
go func() {
240-
defer objectIn.Close()
241-
bufin := bufio.NewWriter(objectIn)
242-
defer bufin.Flush()
235+
defer objectIter.Close()
243236

244-
for _, obj := range trees {
245-
_, err := bufin.WriteString(obj.oid.String())
246-
if err != nil {
247-
errChan <- err
248-
return
237+
errChan <- func() error {
238+
for _, obj := range trees {
239+
if err := objectIter.RequestObject(obj.oid); err != nil {
240+
return fmt.Errorf("requesting tree '%s': %w", obj.oid, err)
241+
}
249242
}
250-
err = bufin.WriteByte('\n')
251-
if err != nil {
252-
errChan <- err
253-
return
254-
}
255-
}
256243

257-
for i := len(commits); i > 0; i-- {
258-
obj := commits[i-1]
259-
_, err := bufin.WriteString(obj.oid.String())
260-
if err != nil {
261-
errChan <- err
262-
return
263-
}
264-
err = bufin.WriteByte('\n')
265-
if err != nil {
266-
errChan <- err
267-
return
244+
for i := len(commits); i > 0; i-- {
245+
obj := commits[i-1]
246+
if err := objectIter.RequestObject(obj.oid); err != nil {
247+
return fmt.Errorf("requesting commit '%s': %w", obj.oid, err)
248+
}
268249
}
269-
}
270250

271-
for _, obj := range tags {
272-
_, err := bufin.WriteString(obj.oid.String())
273-
if err != nil {
274-
errChan <- err
275-
return
251+
for _, obj := range tags {
252+
if err := objectIter.RequestObject(obj.oid); err != nil {
253+
return fmt.Errorf("requesting tag '%s': %w", obj.oid, err)
254+
}
276255
}
277-
err = bufin.WriteByte('\n')
278-
if err != nil {
279-
errChan <- err
280-
return
281-
}
282-
}
283256

284-
errChan <- nil
257+
return nil
258+
}()
285259
}()
286260

287261
progressMeter.Start("Processing trees: %d")
288262
for range trees {
289-
obj, data, err := objectIter.Next()
263+
obj, ok, err := objectIter.Next()
290264
if err != nil {
291-
if err != io.EOF {
292-
return HistorySize{}, err
293-
}
265+
return HistorySize{}, err
266+
}
267+
if !ok {
294268
return HistorySize{}, errors.New("fewer trees read than expected")
295269
}
296270
if obj.ObjectType != "tree" {
297271
return HistorySize{}, fmt.Errorf("expected tree; read %#v", obj.ObjectType)
298272
}
299273
progressMeter.Inc()
300-
tree, err := git.ParseTree(obj.OID, data)
274+
tree, err := git.ParseTree(obj.OID, obj.Data)
301275
if err != nil {
302276
return HistorySize{}, err
303277
}
@@ -313,17 +287,17 @@ func ScanRepositoryUsingGraph(
313287
// time:
314288
progressMeter.Start("Processing commits: %d")
315289
for i := len(commits); i > 0; i-- {
316-
obj, data, err := objectIter.Next()
290+
obj, ok, err := objectIter.Next()
317291
if err != nil {
318-
if err != io.EOF {
319-
return HistorySize{}, err
320-
}
292+
return HistorySize{}, err
293+
}
294+
if !ok {
321295
return HistorySize{}, errors.New("fewer commits read than expected")
322296
}
323297
if obj.ObjectType != "commit" {
324298
return HistorySize{}, fmt.Errorf("expected commit; read %#v", obj.ObjectType)
325299
}
326-
commit, err := git.ParseCommit(obj.OID, data)
300+
commit, err := git.ParseCommit(obj.OID, obj.Data)
327301
if err != nil {
328302
return HistorySize{}, err
329303
}
@@ -349,17 +323,17 @@ func ScanRepositoryUsingGraph(
349323

350324
progressMeter.Start("Processing annotated tags: %d")
351325
for range tags {
352-
obj, data, err := objectIter.Next()
326+
obj, ok, err := objectIter.Next()
353327
if err != nil {
354-
if err != io.EOF {
355-
return HistorySize{}, err
356-
}
328+
return HistorySize{}, err
329+
}
330+
if !ok {
357331
return HistorySize{}, errors.New("fewer tags read than expected")
358332
}
359333
if obj.ObjectType != "tag" {
360334
return HistorySize{}, fmt.Errorf("expected tag; read %#v", obj.ObjectType)
361335
}
362-
tag, err := git.ParseTag(obj.OID, data)
336+
tag, err := git.ParseTag(obj.OID, obj.Data)
363337
if err != nil {
364338
return HistorySize{}, err
365339
}
@@ -373,12 +347,6 @@ func ScanRepositoryUsingGraph(
373347
return HistorySize{}, err
374348
}
375349

376-
err = objectIter.Close()
377-
objectIter = nil
378-
if err != nil {
379-
return HistorySize{}, err
380-
}
381-
382350
progressMeter.Start("Processing references: %d")
383351
for _, refSeen := range refsSeen {
384352
progressMeter.Inc()

0 commit comments

Comments
 (0)