Skip to content

Commit 6017c40

Browse files
feature: Parallelizes hisReads using goroutines
1 parent da891ea commit 6017c40

File tree

1 file changed

+31
-6
lines changed

1 file changed

+31
-6
lines changed

pkg/plugin/datasource.go

Lines changed: 31 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"fmt"
77
"strconv"
88
"strings"
9+
"sync"
910
"time"
1011

1112
"github.com/NeedleInAJayStack/haystack"
@@ -164,14 +165,15 @@ func (datasource *Datasource) query(ctx context.Context, pCtx backend.PluginCont
164165
return response
165166

166167
case "hisReadFilter":
167-
points, readErr := datasource.read(model.HisReadFilter, variables)
168+
pointsGrid, readErr := datasource.read(model.HisReadFilter, variables)
168169
if readErr != nil {
169170
log.DefaultLogger.Error(readErr.Error())
170171
return backend.ErrDataResponse(backend.StatusBadRequest, fmt.Sprintf("HisReadFilter failure: %v", readErr.Error()))
171172
}
173+
points := pointsGrid.Rows()
172174

173-
grids := []haystack.Grid{}
174-
for _, point := range points.Rows() {
175+
// Function to read a single point and send it to a channel.
176+
readPoint := func(point haystack.Row, hisReadChannel chan haystack.Grid, wg *sync.WaitGroup) {
175177
id := point.Get("id")
176178
var ref haystack.Ref
177179
switch id.(type) {
@@ -180,15 +182,38 @@ func (datasource *Datasource) query(ctx context.Context, pCtx backend.PluginCont
180182
default:
181183
errMsg := fmt.Sprintf("id is not a ref: %v", id)
182184
log.DefaultLogger.Error(errMsg)
183-
return backend.ErrDataResponse(backend.StatusBadRequest, errMsg)
185+
hisReadChannel <- haystack.EmptyGrid()
184186
}
185187
hisRead, err := datasource.hisRead(ref, query.TimeRange)
186188
if err != nil {
187189
log.DefaultLogger.Error(err.Error())
188-
return backend.ErrDataResponse(backend.StatusBadRequest, fmt.Sprintf("HisReadFilter failure: %v", err.Error()))
190+
hisReadChannel <- haystack.EmptyGrid()
189191
}
190-
grids = append(grids, hisRead)
192+
hisReadChannel <- hisRead
193+
wg.Done()
191194
}
195+
196+
// Start a goroutine to collect all the grids into a slice.
197+
hisReadChannel := make(chan haystack.Grid)
198+
combinedChannel := make(chan []haystack.Grid)
199+
go func() {
200+
grids := []haystack.Grid{}
201+
for grid := range hisReadChannel {
202+
grids = append(grids, grid)
203+
}
204+
combinedChannel <- grids
205+
}()
206+
207+
// Read all the points in parallel using goroutines.
208+
var wg sync.WaitGroup
209+
wg.Add(len(points))
210+
for _, point := range points {
211+
go readPoint(point, hisReadChannel, &wg)
212+
}
213+
wg.Wait()
214+
close(hisReadChannel)
215+
216+
grids := <-combinedChannel
192217
response := responseFromGrids(grids)
193218
// Make the display name on the "val" fields the names of the points.
194219
for _, frame := range response.Frames {

0 commit comments

Comments
 (0)