Skip to content

Commit 0f6fc7c

Browse files
committed
gitbase: implement sql.Indexable on all tables
Signed-off-by: Miguel Molina <[email protected]>
1 parent f25bd52 commit 0f6fc7c

15 files changed

+1718
-1536
lines changed

blobs.go

Lines changed: 156 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77

88
"gopkg.in/src-d/go-mysql-server.v0/sql"
99
"gopkg.in/src-d/go-mysql-server.v0/sql/expression"
10+
"gopkg.in/src-d/go-mysql-server.v0/sql/plan"
1011

1112
"gopkg.in/src-d/go-git.v4/plumbing"
1213
"gopkg.in/src-d/go-git.v4/plumbing/object"
@@ -39,10 +40,7 @@ var BlobsSchema = sql.Schema{
3940
var _ sql.PushdownProjectionAndFiltersTable = (*blobsTable)(nil)
4041

4142
func newBlobsTable() Indexable {
42-
return &indexableTable{
43-
PushdownTable: new(blobsTable),
44-
buildIterWithSelectors: blobsIterBuilder,
45-
}
43+
return new(blobsTable)
4644
}
4745

4846
var _ Table = (*blobsTable)(nil)
@@ -118,6 +116,50 @@ func (r *blobsTable) WithProjectAndFilters(
118116
return sql.NewSpanIter(span, iter), nil
119117
}
120118

119+
// IndexKeyValueIter implements the sql.Indexable interface.
120+
func (*blobsTable) IndexKeyValueIter(
121+
ctx *sql.Context,
122+
colNames []string,
123+
) (sql.IndexKeyValueIter, error) {
124+
s, ok := ctx.Session.(*Session)
125+
if !ok || s == nil {
126+
return nil, ErrInvalidGitbaseSession.New(ctx.Session)
127+
}
128+
129+
return newBlobsKeyValueIter(s.Pool, colNames), nil
130+
}
131+
132+
// WithProjectFiltersAndIndex implements sql.Indexable interface.
133+
func (*blobsTable) WithProjectFiltersAndIndex(
134+
ctx *sql.Context,
135+
columns, filters []sql.Expression,
136+
index sql.IndexValueIter,
137+
) (sql.RowIter, error) {
138+
span, ctx := ctx.Span("gitbase.BlobsTable.WithProjectFiltersAndIndex")
139+
s, ok := ctx.Session.(*Session)
140+
if !ok || s == nil {
141+
span.Finish()
142+
return nil, ErrInvalidGitbaseSession.New(ctx.Session)
143+
}
144+
145+
session, err := getSession(ctx)
146+
if err != nil {
147+
return nil, err
148+
}
149+
150+
var iter sql.RowIter = &blobsIndexIter{
151+
index: index,
152+
pool: session.Pool,
153+
readContent: shouldReadContent(columns),
154+
}
155+
156+
if len(filters) > 0 {
157+
iter = plan.NewFilterIter(ctx, expression.JoinAnd(filters...), iter)
158+
}
159+
160+
return sql.NewSpanIter(span, iter), nil
161+
}
162+
121163
func blobsIterBuilder(_ *sql.Context, selectors selectors, columns []sql.Expression) (RowRepoIter, error) {
122164
if len(selectors["blob_hash"]) == 0 {
123165
return &blobIter{readContent: shouldReadContent(columns)}, nil
@@ -138,7 +180,6 @@ type blobIter struct {
138180
repoID string
139181
iter *object.BlobIter
140182
readContent bool
141-
lastHash string
142183
}
143184

144185
func (i *blobIter) NewIterator(repo *Repository) (RowRepoIter, error) {
@@ -150,17 +191,11 @@ func (i *blobIter) NewIterator(repo *Repository) (RowRepoIter, error) {
150191
return &blobIter{repoID: repo.ID, iter: iter, readContent: i.readContent}, nil
151192
}
152193

153-
func (i *blobIter) Repository() string { return i.repoID }
154-
155-
func (i *blobIter) LastObject() string { return i.lastHash }
156-
157194
func (i *blobIter) Next() (sql.Row, error) {
158195
o, err := i.iter.Next()
159196
if err != nil {
160197
return nil, err
161198
}
162-
163-
i.lastHash = o.Hash.String()
164199
return blobToRow(i.repoID, o, i.readContent)
165200
}
166201

@@ -177,17 +212,12 @@ type blobsByHashIter struct {
177212
pos int
178213
hashes []string
179214
readContent bool
180-
lastHash string
181215
}
182216

183217
func (i *blobsByHashIter) NewIterator(repo *Repository) (RowRepoIter, error) {
184-
return &blobsByHashIter{repo, 0, i.hashes, i.readContent, ""}, nil
218+
return &blobsByHashIter{repo, 0, i.hashes, i.readContent}, nil
185219
}
186220

187-
func (i *blobsByHashIter) Repository() string { return i.repo.ID }
188-
189-
func (i *blobsByHashIter) LastObject() string { return i.lastHash }
190-
191221
func (i *blobsByHashIter) Next() (sql.Row, error) {
192222
for {
193223
if i.pos >= len(i.hashes) {
@@ -205,7 +235,6 @@ func (i *blobsByHashIter) Next() (sql.Row, error) {
205235
return nil, err
206236
}
207237

208-
i.lastHash = hash.String()
209238
return blobToRow(i.repo.ID, blob, i.readContent)
210239
}
211240
}
@@ -304,3 +333,112 @@ func shouldReadContent(columns []sql.Expression) bool {
304333
}
305334
return false
306335
}
336+
337+
type blobIndexKey struct {
338+
repository string
339+
packfile string
340+
offset int64
341+
}
342+
343+
type blobsKeyValueIter struct {
344+
iter *objectIter
345+
columns []string
346+
}
347+
348+
func newBlobsKeyValueIter(pool *RepositoryPool, columns []string) *blobsKeyValueIter {
349+
return &blobsKeyValueIter{
350+
iter: newObjectIter(pool, plumbing.BlobObject),
351+
columns: columns,
352+
}
353+
}
354+
355+
func (i *blobsKeyValueIter) Next() ([]interface{}, []byte, error) {
356+
obj, err := i.iter.Next()
357+
if err != nil {
358+
return nil, nil, err
359+
}
360+
361+
key, err := encodeIndexKey(blobIndexKey{
362+
repository: obj.RepositoryID,
363+
packfile: obj.Packfile.String(),
364+
offset: int64(obj.Offset),
365+
})
366+
if err != nil {
367+
return nil, nil, err
368+
}
369+
370+
blob, ok := obj.Object.(*object.Blob)
371+
if !ok {
372+
ErrInvalidObjectType.New(obj.Object, "*object.Blob")
373+
}
374+
375+
row, err := blobToRow(obj.RepositoryID, blob, stringContains(i.columns, "content"))
376+
if err != nil {
377+
return nil, nil, err
378+
}
379+
380+
values, err := rowIndexValues(row, i.columns, BlobsSchema)
381+
if err != nil {
382+
return nil, nil, err
383+
}
384+
385+
return values, key, nil
386+
}
387+
388+
func (i *blobsKeyValueIter) Close() error { return i.iter.Close() }
389+
390+
type blobsIndexIter struct {
391+
index sql.IndexValueIter
392+
pool *RepositoryPool
393+
decoder *objectDecoder
394+
readContent bool
395+
}
396+
397+
func (i *blobsIndexIter) Next() (sql.Row, error) {
398+
data, err := i.index.Next()
399+
if err != nil {
400+
return nil, err
401+
}
402+
403+
var key blobIndexKey
404+
if err := decodeIndexKey(data, &key); err != nil {
405+
return nil, err
406+
}
407+
408+
packfile := plumbing.NewHash(key.packfile)
409+
if i.decoder == nil || !i.decoder.equals(key.repository, packfile) {
410+
if i.decoder != nil {
411+
if err := i.decoder.close(); err != nil {
412+
return nil, err
413+
}
414+
}
415+
416+
i.decoder, err = newObjectDecoder(i.pool.repositories[key.repository], packfile)
417+
if err != nil {
418+
return nil, err
419+
}
420+
}
421+
422+
obj, err := i.decoder.get(key.offset)
423+
if err != nil {
424+
return nil, err
425+
}
426+
427+
blob, ok := obj.(*object.Blob)
428+
if !ok {
429+
return nil, ErrInvalidObjectType.New(obj, "*object.Blob")
430+
}
431+
432+
return blobToRow(key.repository, blob, i.readContent)
433+
}
434+
435+
func (i *blobsIndexIter) Close() error {
436+
if i.decoder != nil {
437+
if err := i.decoder.close(); err != nil {
438+
_ = i.index.Close()
439+
return err
440+
}
441+
}
442+
443+
return i.index.Close()
444+
}

commit_blobs.go

Lines changed: 43 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ import (
55

66
"gopkg.in/src-d/go-git.v4/plumbing/object"
77
"gopkg.in/src-d/go-mysql-server.v0/sql"
8+
"gopkg.in/src-d/go-mysql-server.v0/sql/expression"
9+
"gopkg.in/src-d/go-mysql-server.v0/sql/plan"
810
)
911

1012
type commitBlobsTable struct{}
@@ -19,10 +21,7 @@ var CommitBlobsSchema = sql.Schema{
1921
var _ sql.PushdownProjectionAndFiltersTable = (*commitBlobsTable)(nil)
2022

2123
func newCommitBlobsTable() Indexable {
22-
return &indexableTable{
23-
PushdownTable: new(commitBlobsTable),
24-
buildIterWithSelectors: commitBlobsIterBuilder,
25-
}
24+
return new(commitBlobsTable)
2625
}
2726

2827
func (commitBlobsTable) isGitbaseTable() {}
@@ -84,6 +83,46 @@ func (t *commitBlobsTable) WithProjectAndFilters(
8483
return sql.NewSpanIter(span, iter), nil
8584
}
8685

86+
// IndexKeyValueIter implements the sql.Indexable interface.
87+
func (*commitBlobsTable) IndexKeyValueIter(
88+
ctx *sql.Context,
89+
colNames []string,
90+
) (sql.IndexKeyValueIter, error) {
91+
s, ok := ctx.Session.(*Session)
92+
if !ok || s == nil {
93+
return nil, ErrInvalidGitbaseSession.New(ctx.Session)
94+
}
95+
96+
iter, err := NewRowRepoIter(ctx, new(commitBlobsIter))
97+
if err != nil {
98+
return nil, err
99+
}
100+
101+
return &rowKeyValueIter{iter, colNames, CommitBlobsSchema}, nil
102+
}
103+
104+
// WithProjectFiltersAndIndex implements sql.Indexable interface.
105+
func (*commitBlobsTable) WithProjectFiltersAndIndex(
106+
ctx *sql.Context,
107+
columns, filters []sql.Expression,
108+
index sql.IndexValueIter,
109+
) (sql.RowIter, error) {
110+
span, ctx := ctx.Span("gitbase.CommitBlobsTable.WithProjectFiltersAndIndex")
111+
s, ok := ctx.Session.(*Session)
112+
if !ok || s == nil {
113+
span.Finish()
114+
return nil, ErrInvalidGitbaseSession.New(ctx.Session)
115+
}
116+
117+
var iter sql.RowIter = &rowIndexIter{index}
118+
119+
if len(filters) > 0 {
120+
iter = plan.NewFilterIter(ctx, expression.JoinAnd(filters...), iter)
121+
}
122+
123+
return sql.NewSpanIter(span, iter), nil
124+
}
125+
87126
func commitBlobsIterBuilder(ctx *sql.Context, selectors selectors, columns []sql.Expression) (RowRepoIter, error) {
88127
repos, err := selectors.textValues("repository_id")
89128
if err != nil {

commit_trees.go

Lines changed: 43 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ import (
88
"gopkg.in/src-d/go-git.v4/plumbing/filemode"
99
"gopkg.in/src-d/go-git.v4/plumbing/object"
1010
"gopkg.in/src-d/go-mysql-server.v0/sql"
11+
"gopkg.in/src-d/go-mysql-server.v0/sql/expression"
12+
"gopkg.in/src-d/go-mysql-server.v0/sql/plan"
1113
)
1214

1315
type commitTreesTable struct{}
@@ -22,10 +24,7 @@ var CommitTreesSchema = sql.Schema{
2224
var _ sql.PushdownProjectionAndFiltersTable = (*commitTreesTable)(nil)
2325

2426
func newCommitTreesTable() Indexable {
25-
return &indexableTable{
26-
PushdownTable: new(commitTreesTable),
27-
buildIterWithSelectors: commitTreesIterBuilder,
28-
}
27+
return new(commitTreesTable)
2928
}
3029

3130
func (commitTreesTable) isGitbaseTable() {}
@@ -87,6 +86,46 @@ func (t *commitTreesTable) WithProjectAndFilters(
8786
return sql.NewSpanIter(span, iter), nil
8887
}
8988

89+
// IndexKeyValueIter implements the sql.Indexable interface.
90+
func (*commitTreesTable) IndexKeyValueIter(
91+
ctx *sql.Context,
92+
colNames []string,
93+
) (sql.IndexKeyValueIter, error) {
94+
s, ok := ctx.Session.(*Session)
95+
if !ok || s == nil {
96+
return nil, ErrInvalidGitbaseSession.New(ctx.Session)
97+
}
98+
99+
iter, err := NewRowRepoIter(ctx, new(commitTreesIter))
100+
if err != nil {
101+
return nil, err
102+
}
103+
104+
return &rowKeyValueIter{iter, colNames, CommitTreesSchema}, nil
105+
}
106+
107+
// WithProjectFiltersAndIndex implements sql.Indexable interface.
108+
func (*commitTreesTable) WithProjectFiltersAndIndex(
109+
ctx *sql.Context,
110+
columns, filters []sql.Expression,
111+
index sql.IndexValueIter,
112+
) (sql.RowIter, error) {
113+
span, ctx := ctx.Span("gitbase.CommitTreesTable.WithProjectFiltersAndIndex")
114+
s, ok := ctx.Session.(*Session)
115+
if !ok || s == nil {
116+
span.Finish()
117+
return nil, ErrInvalidGitbaseSession.New(ctx.Session)
118+
}
119+
120+
var iter sql.RowIter = &rowIndexIter{index}
121+
122+
if len(filters) > 0 {
123+
iter = plan.NewFilterIter(ctx, expression.JoinAnd(filters...), iter)
124+
}
125+
126+
return sql.NewSpanIter(span, iter), nil
127+
}
128+
90129
func commitTreesIterBuilder(ctx *sql.Context, selectors selectors, columns []sql.Expression) (RowRepoIter, error) {
91130
repos, err := selectors.textValues("repository_id")
92131
if err != nil {

0 commit comments

Comments
 (0)