Skip to content

Commit 5bff3e6

Browse files
committed
Stop iterator when session is canceled
Signed-off-by: Javi Fontan <[email protected]>
1 parent ed485b4 commit 5bff3e6

File tree

2 files changed

+41
-11
lines changed

2 files changed

+41
-11
lines changed

repository_pool.go

Lines changed: 38 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ type rowRepoIter struct {
163163

164164
wg sync.WaitGroup
165165
done chan bool
166-
err chan error
166+
err error
167167
repos chan *Repository
168168
rows chan sql.Row
169169
}
@@ -195,7 +195,7 @@ func NewRowRepoIter(
195195
iter: iter,
196196
session: s,
197197
done: make(chan bool),
198-
err: make(chan error),
198+
err: nil,
199199
repos: make(chan *Repository),
200200
rows: make(chan sql.Row),
201201
}
@@ -218,29 +218,47 @@ func NewRowRepoIter(
218218
return &repoIter, nil
219219
}
220220

221+
func (i *rowRepoIter) setError(err error) {
222+
i.err = err
223+
}
224+
221225
func (i *rowRepoIter) fillRepoChannel() {
226+
defer close(i.repos)
227+
222228
for {
223229
select {
224230
case <-i.done:
225231
return
226232

233+
case <-i.session.Done():
234+
close(i.done)
235+
return
236+
227237
default:
228238
repo, err := i.repositoryIter.Next()
229239

230240
switch err {
231241
case nil:
232-
i.repos <- repo
233-
continue
242+
select {
243+
case <-i.done:
244+
return
245+
246+
case <-i.session.Done():
247+
i.setError(ErrSessionCanceled.New())
248+
close(i.done)
249+
return
250+
251+
case i.repos <- repo:
252+
continue
253+
}
234254

235255
case io.EOF:
236-
close(i.repos)
237-
i.err <- io.EOF
256+
i.setError(io.EOF)
238257
return
239258

240259
default:
241260
close(i.done)
242-
close(i.repos)
243-
i.err <- err
261+
i.setError(err)
244262
return
245263
}
246264
}
@@ -260,19 +278,28 @@ func (i *rowRepoIter) rowReader(num int) {
260278
iter.Close()
261279
return
262280

281+
case <-i.session.Done():
282+
i.setError(ErrSessionCanceled.New())
283+
return
284+
263285
default:
264286
row, err := iter.Next()
265287
switch err {
266288
case nil:
267-
i.rows <- row
289+
select {
290+
case <-i.done:
291+
iter.Close()
292+
return
293+
case i.rows <- row:
294+
}
268295

269296
case io.EOF:
270297
iter.Close()
271298
break loop
272299

273300
default:
274301
iter.Close()
275-
i.err <- err
302+
i.setError(err)
276303
close(i.done)
277304
return
278305
}
@@ -285,7 +312,7 @@ func (i *rowRepoIter) rowReader(num int) {
285312
func (i *rowRepoIter) Next() (sql.Row, error) {
286313
row, ok := <-i.rows
287314
if !ok {
288-
return nil, <-i.err
315+
return nil, i.err
289316
}
290317

291318
return row, nil

session.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,9 @@ func NewSessionBuilder(pool *RepositoryPool) server.SessionBuilder {
3030
}
3131
}
3232

33+
// ErrSessionCanceled is returned when session context is canceled
34+
var ErrSessionCanceled = errors.NewKind("session canceled")
35+
3336
// ErrInvalidGitQuerySession is returned when some node expected a GitQuery
3437
// session but received something else.
3538
var ErrInvalidGitQuerySession = errors.NewKind("expecting gitquery session, but received: %T")

0 commit comments

Comments
 (0)