@@ -75,12 +75,14 @@ type exchangeRowIter struct {
7575 parallelism int
7676 partitions sql.PartitionIter
7777 tree sql.Node
78- mut sync.Mutex
79- tokens chan struct {}
78+ mut sync.RWMutex
79+ tokensChan chan struct {}
8080 started bool
8181 rows chan sql.Row
8282 err chan error
83- quit chan struct {}
83+
84+ quitMut sync.RWMutex
85+ quitChan chan struct {}
8486}
8587
8688func newExchangeRowIter (
@@ -97,34 +99,40 @@ func newExchangeRowIter(
9799 started : false ,
98100 tree : tree ,
99101 partitions : iter ,
100- quit : make (chan struct {}),
102+ quitChan : make (chan struct {}),
101103 }
102104}
103105
104106func (it * exchangeRowIter ) releaseToken () {
105107 it .mut .Lock ()
106108 defer it .mut .Unlock ()
107109
108- if it .tokens != nil {
109- it .tokens <- struct {}{}
110+ if it .tokensChan != nil {
111+ it .tokensChan <- struct {}{}
110112 }
111113}
112114
113115func (it * exchangeRowIter ) closeTokens () {
114116 it .mut .Lock ()
115117 defer it .mut .Unlock ()
116118
117- close (it .tokens )
118- it .tokens = nil
119+ close (it .tokensChan )
120+ it .tokensChan = nil
121+ }
122+
123+ func (it * exchangeRowIter ) tokens () chan struct {} {
124+ it .mut .RLock ()
125+ defer it .mut .RUnlock ()
126+ return it .tokensChan
119127}
120128
121129func (it * exchangeRowIter ) fillTokens () {
122130 it .mut .Lock ()
123131 defer it .mut .Unlock ()
124132
125- it .tokens = make (chan struct {}, it .parallelism )
133+ it .tokensChan = make (chan struct {}, it .parallelism )
126134 for i := 0 ; i < it .parallelism ; i ++ {
127- it .tokens <- struct {}{}
135+ it .tokensChan <- struct {}{}
128136 }
129137}
130138
@@ -142,7 +150,7 @@ func (it *exchangeRowIter) start() {
142150 it .err <- context .Canceled
143151 it .closeTokens ()
144152 return
145- case <- it .quit :
153+ case <- it .quit () :
146154 it .closeTokens ()
147155 return
148156 case p , ok := <- partitions :
@@ -179,9 +187,9 @@ func (it *exchangeRowIter) iterPartitions(ch chan<- sql.Partition) {
179187 case <- it .ctx .Done ():
180188 it .err <- context .Canceled
181189 return
182- case <- it .quit :
190+ case <- it .quit () :
183191 return
184- case <- it .tokens :
192+ case <- it .tokens () :
185193 }
186194
187195 p , err := it .partitions .Next ()
@@ -226,7 +234,7 @@ func (it *exchangeRowIter) iterPartition(p sql.Partition) {
226234 case <- it .ctx .Done ():
227235 it .err <- context .Canceled
228236 return
229- case <- it .quit :
237+ case <- it .quit () :
230238 return
231239 default :
232240 }
@@ -263,17 +271,25 @@ func (it *exchangeRowIter) Next() (sql.Row, error) {
263271 }
264272}
265273
266- func (it * exchangeRowIter ) Close () (err error ) {
267- if it .quit != nil {
268- close (it .quit )
269- it .quit = nil
274+ func (it * exchangeRowIter ) quit () chan struct {} {
275+ it .quitMut .RLock ()
276+ defer it .quitMut .RUnlock ()
277+ return it .quitChan
278+ }
279+
280+ func (it * exchangeRowIter ) Close () error {
281+ it .quitMut .Lock ()
282+ if it .quitChan != nil {
283+ close (it .quitChan )
284+ it .quitChan = nil
270285 }
286+ it .quitMut .Unlock ()
271287
272288 if it .partitions != nil {
273- err = it .partitions .Close ()
289+ return it .partitions .Close ()
274290 }
275291
276- return err
292+ return nil
277293}
278294
279295type exchangePartition struct {
0 commit comments