Skip to content

Commit 7deadee

Browse files
committed
Use all the processors to read rows from the pool
Signed-off-by: Javi Fontan <[email protected]>
1 parent 95747a5 commit 7deadee

File tree

8 files changed

+134
-88
lines changed

8 files changed

+134
-88
lines changed

blobs.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ func (r blobsTable) RowIter() (sql.RowIter, error) {
4545
return nil, err
4646
}
4747

48-
return &rowRepoIter, nil
48+
return rowRepoIter, nil
4949
}
5050

5151
func (blobsTable) Children() []sql.Node {
@@ -56,15 +56,15 @@ type blobIter struct {
5656
iter *object.BlobIter
5757
}
5858

59-
func (i *blobIter) InitRepository(repo Repository) error {
59+
func (i *blobIter) NewIterator(
60+
repo *Repository) (RowRepoIterImplementation, error) {
61+
6062
iter, err := repo.Repo.BlobObjects()
6163
if err != nil {
62-
return err
64+
return nil, err
6365
}
6466

65-
i.iter = iter
66-
67-
return nil
67+
return &blobIter{iter: iter}, nil
6868
}
6969

7070
func (i *blobIter) Next() (sql.Row, error) {

commits.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ func (r commitsTable) RowIter() (sql.RowIter, error) {
5151
return nil, err
5252
}
5353

54-
return &rowRepoIter, nil
54+
return rowRepoIter, nil
5555
}
5656

5757
func (commitsTable) Children() []sql.Node {
@@ -62,15 +62,15 @@ type commitIter struct {
6262
iter object.CommitIter
6363
}
6464

65-
func (i *commitIter) InitRepository(repo Repository) error {
65+
func (i *commitIter) NewIterator(
66+
repo *Repository) (RowRepoIterImplementation, error) {
67+
6668
iter, err := repo.Repo.CommitObjects()
6769
if err != nil {
68-
return err
70+
return nil, err
6971
}
7072

71-
i.iter = iter
72-
73-
return nil
73+
return &commitIter{iter: iter}, nil
7474
}
7575

7676
func (i *commitIter) Next() (sql.Row, error) {

objects.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ func (r objectsTable) RowIter() (sql.RowIter, error) {
4545
return nil, err
4646
}
4747

48-
return &rowRepoIter, nil
48+
return rowRepoIter, nil
4949
}
5050

5151
func (objectsTable) Children() []sql.Node {
@@ -56,15 +56,15 @@ type objectIter struct {
5656
iter *object.ObjectIter
5757
}
5858

59-
func (i *objectIter) InitRepository(repo Repository) error {
59+
func (i *objectIter) NewIterator(
60+
repo *Repository) (RowRepoIterImplementation, error) {
61+
6062
iter, err := repo.Repo.Objects()
6163
if err != nil {
62-
return err
64+
return nil, err
6365
}
6466

65-
i.iter = iter
66-
67-
return nil
67+
return &objectIter{iter: iter}, nil
6868
}
6969

7070
func (i *objectIter) Next() (sql.Row, error) {

references.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ func (r referencesTable) RowIter() (sql.RowIter, error) {
5252
return nil, err
5353
}
5454

55-
return &rowRepoIter, nil
55+
return rowRepoIter, nil
5656
}
5757

5858
func (referencesTable) Children() []sql.Node {
@@ -63,15 +63,15 @@ type referenceIter struct {
6363
iter storer.ReferenceIter
6464
}
6565

66-
func (i *referenceIter) InitRepository(repo Repository) error {
66+
func (i *referenceIter) NewIterator(
67+
repo *Repository) (RowRepoIterImplementation, error) {
68+
6769
iter, err := repo.Repo.References()
6870
if err != nil {
69-
return err
71+
return nil, err
7072
}
7173

72-
i.iter = iter
73-
74-
return nil
74+
return &referenceIter{iter: iter}, nil
7575
}
7676

7777
func (i *referenceIter) Next() (sql.Row, error) {

repositories.go

Lines changed: 84 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ package gitquery
22

33
import (
44
"io"
5+
"runtime"
6+
"sync"
57

68
"gopkg.in/src-d/go-git.v4"
79
"gopkg.in/src-d/go-mysql-server.v0/sql"
@@ -98,92 +100,135 @@ func (i *RepositoryIter) Close() error {
98100
// RowRepoIterImplementation is the interface needed by each iterator
99101
// implementation
100102
type RowRepoIterImplementation interface {
101-
InitRepository(Repository) error
103+
NewIterator(*Repository) (RowRepoIterImplementation, error)
102104
Next() (sql.Row, error)
103105
Close() error
104106
}
105107

106108
// RowRepoIter is used as the base to iterate over all the repositories
107-
// in the pool. It needs three functions that execute specific code per
108-
// implemented iterator.
109+
// in the pool
109110
type RowRepoIter struct {
110111
repositoryIter *RepositoryIter
111-
repository *Repository
112112
implementation RowRepoIterImplementation
113+
114+
wg sync.WaitGroup
115+
done chan bool
116+
err chan error
117+
repos chan *Repository
118+
rows chan sql.Row
113119
}
114120

115121
// NewRowRepoIter initializes a new repository iterator.
116122
//
117123
// * pool: is a RepositoryPool we want to iterate
118124
// * impl: implementation with RowRepoIterImplementation interface
119-
// * InitRepository: called when a new repository is about to be iterated,
120-
// initialize its iterator there
125+
// * NewIterator: called when a new repository is about to be iterated,
126+
// returns a new RowRepoIterImplementation
121127
// * Next: called for each row
122128
// * Close: called when a repository finished iterating
123129
func NewRowRepoIter(pool *RepositoryPool,
124-
impl RowRepoIterImplementation) (RowRepoIter, error) {
130+
impl RowRepoIterImplementation) (*RowRepoIter, error) {
125131

126132
rIter, err := pool.RepoIter()
127133
if err != nil {
128-
return RowRepoIter{}, err
134+
return nil, err
129135
}
130136

131-
repo := RowRepoIter{
137+
rowRepoIter := RowRepoIter{
132138
repositoryIter: rIter,
133139
implementation: impl,
140+
done: make(chan bool),
141+
err: make(chan error),
142+
repos: make(chan *Repository),
143+
rows: make(chan sql.Row),
134144
}
135145

136-
err = repo.nextRepository()
137-
if err != nil {
138-
return RowRepoIter{}, err
139-
}
146+
go rowRepoIter.fillRepoChannel()
140147

141-
return repo, nil
142-
}
148+
wNum := runtime.NumCPU()
143149

144-
// nextRepository is called to initialize the next repository. It is called
145-
// when the iterator is created and when the previous repository finished
146-
// iterating.
147-
func (i *RowRepoIter) nextRepository() error {
148-
repo, err := i.repositoryIter.Next()
149-
if err != nil {
150-
return err
150+
for i := 0; i < wNum; i++ {
151+
rowRepoIter.wg.Add(1)
152+
153+
go rowRepoIter.rowReader(i)
151154
}
152155

153-
i.repository = repo
154-
err = i.implementation.InitRepository(*repo)
156+
go func() {
157+
rowRepoIter.wg.Wait()
158+
close(rowRepoIter.rows)
159+
}()
155160

156-
return err
161+
return &rowRepoIter, nil
157162
}
158163

159-
// Next gets the next row
160-
func (i *RowRepoIter) Next() (sql.Row, error) {
164+
func (i *RowRepoIter) fillRepoChannel() {
161165
for {
162-
row, err := i.implementation.Next()
166+
repo, err := i.repositoryIter.Next()
163167

164168
switch err {
165169
case nil:
166-
return row, nil
170+
i.repos <- repo
171+
continue
167172

168173
case io.EOF:
169-
i.Close()
170-
171-
err := i.nextRepository()
172-
if err != nil {
173-
return nil, err
174-
}
174+
close(i.repos)
175+
i.err <- io.EOF
176+
return
175177

176178
default:
177-
return nil, err
179+
close(i.done)
180+
close(i.repos)
181+
i.err <- err
182+
return
183+
}
184+
}
185+
}
186+
187+
func (i *RowRepoIter) rowReader(num int) {
188+
defer i.wg.Done()
189+
190+
for repo := range i.repos {
191+
impl, _ := i.implementation.NewIterator(repo)
192+
193+
loop:
194+
for {
195+
select {
196+
case <-i.done:
197+
impl.Close()
198+
return
199+
200+
default:
201+
row, err := impl.Next()
202+
switch err {
203+
case nil:
204+
i.rows <- row
205+
206+
case io.EOF:
207+
impl.Close()
208+
break loop
209+
210+
default:
211+
impl.Close()
212+
i.err <- err
213+
close(i.done)
214+
return
215+
}
216+
}
178217
}
179218
}
219+
}
220+
221+
// Next gets the next row
222+
func (i *RowRepoIter) Next() (sql.Row, error) {
223+
row, ok := <-i.rows
224+
if !ok {
225+
return nil, <-i.err
226+
}
180227

181-
return nil, nil
228+
return row, nil
182229
}
183230

184231
// Close called to close the iterator
185232
func (i *RowRepoIter) Close() error {
186233
return i.implementation.Close()
187-
188-
return nil
189234
}

repositories_test.go

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -135,15 +135,15 @@ type testCommitIter struct {
135135
iter object.CommitIter
136136
}
137137

138-
func (d *testCommitIter) InitRepository(repo Repository) error {
139-
cIter, err := repo.Repo.CommitObjects()
138+
func (d *testCommitIter) NewIterator(
139+
repo *Repository) (RowRepoIterImplementation, error) {
140+
141+
iter, err := repo.Repo.CommitObjects()
140142
if err != nil {
141-
return err
143+
return nil, err
142144
}
143145

144-
d.iter = cIter
145-
146-
return nil
146+
return &testCommitIter{iter: iter}, nil
147147
}
148148

149149
func (d *testCommitIter) Next() (sql.Row, error) {
@@ -165,13 +165,13 @@ func TestRepositoryRowIterator(t *testing.T) {
165165
path := fixtures.Basic().ByTag("worktree").One().Worktree().Root()
166166

167167
pool := NewRepositoryPool()
168-
id, err := pool.AddGit(path)
169-
require.Equal(path, id)
170-
require.Nil(err)
168+
max := 64
171169

172-
id, err = pool.AddGit(path)
173-
require.Equal(path, id)
174-
require.Nil(err)
170+
for i := 0; i < max; i++ {
171+
id, err := pool.AddGit(path)
172+
require.Equal(path, id)
173+
require.Nil(err)
174+
}
175175

176176
cIter := &testCommitIter{}
177177

@@ -191,5 +191,6 @@ func TestRepositoryRowIterator(t *testing.T) {
191191
count++
192192
}
193193

194-
require.Equal(9*2, count)
194+
// 9 is the number of commits from the test repo
195+
require.Equal(9*max, count)
195196
}

tags.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ func (r tagsTable) RowIter() (sql.RowIter, error) {
5050
return nil, err
5151
}
5252

53-
return &rowRepoIter, nil
53+
return rowRepoIter, nil
5454
}
5555

5656
func (tagsTable) Children() []sql.Node {
@@ -61,15 +61,15 @@ type tagIter struct {
6161
iter *object.TagIter
6262
}
6363

64-
func (i *tagIter) InitRepository(repo Repository) error {
64+
func (i *tagIter) NewIterator(
65+
repo *Repository) (RowRepoIterImplementation, error) {
66+
6567
iter, err := repo.Repo.TagObjects()
6668
if err != nil {
67-
return err
69+
return nil, err
6870
}
6971

70-
i.iter = iter
71-
72-
return nil
72+
return &tagIter{iter: iter}, nil
7373
}
7474

7575
func (i *tagIter) Next() (sql.Row, error) {

0 commit comments

Comments
 (0)