Skip to content

Commit 1d35145

Browse files
Merge pull request #26 from NeedleInAJayStack/feature/parallelize-hisRead
fix: parallelizes HisRead with Filter
2 parents da891ea + 3cf08b5 commit 1d35145

File tree

1 file changed

+49
-11
lines changed

1 file changed

+49
-11
lines changed

pkg/plugin/datasource.go

Lines changed: 49 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,10 @@ import (
44
"context"
55
"encoding/json"
66
"fmt"
7+
"sort"
78
"strconv"
89
"strings"
10+
"sync"
911
"time"
1012

1113
"github.com/NeedleInAJayStack/haystack"
@@ -164,14 +166,21 @@ func (datasource *Datasource) query(ctx context.Context, pCtx backend.PluginCont
164166
return response
165167

166168
case "hisReadFilter":
167-
points, readErr := datasource.read(model.HisReadFilter, variables)
169+
pointsGrid, readErr := datasource.read(model.HisReadFilter+" and hisStart", variables)
168170
if readErr != nil {
169171
log.DefaultLogger.Error(readErr.Error())
170172
return backend.ErrDataResponse(backend.StatusBadRequest, fmt.Sprintf("HisReadFilter failure: %v", readErr.Error()))
171173
}
174+
points := pointsGrid.Rows()
175+
recLimit := 300
176+
if len(points) > recLimit {
177+
errMsg := fmt.Sprintf("Query exceeded record limit of %d: %d records", recLimit, len(points))
178+
log.DefaultLogger.Error(errMsg)
179+
return backend.ErrDataResponse(backend.StatusBadRequest, errMsg)
180+
}
172181

173-
grids := []haystack.Grid{}
174-
for _, point := range points.Rows() {
182+
// Function to read a single point and send it to a channel.
183+
readPoint := func(point haystack.Row, hisReadChannel chan haystack.Grid, wg *sync.WaitGroup) {
175184
id := point.Get("id")
176185
var ref haystack.Ref
177186
switch id.(type) {
@@ -180,15 +189,38 @@ func (datasource *Datasource) query(ctx context.Context, pCtx backend.PluginCont
180189
default:
181190
errMsg := fmt.Sprintf("id is not a ref: %v", id)
182191
log.DefaultLogger.Error(errMsg)
183-
return backend.ErrDataResponse(backend.StatusBadRequest, errMsg)
192+
hisReadChannel <- haystack.EmptyGrid()
184193
}
185194
hisRead, err := datasource.hisRead(ref, query.TimeRange)
186195
if err != nil {
187196
log.DefaultLogger.Error(err.Error())
188-
return backend.ErrDataResponse(backend.StatusBadRequest, fmt.Sprintf("HisReadFilter failure: %v", err.Error()))
197+
hisReadChannel <- haystack.EmptyGrid()
189198
}
190-
grids = append(grids, hisRead)
199+
hisReadChannel <- hisRead
200+
wg.Done()
191201
}
202+
203+
// Start a goroutine to collect all the grids into a slice.
204+
hisReadChannel := make(chan haystack.Grid)
205+
combinedChannel := make(chan []haystack.Grid)
206+
go func() {
207+
grids := []haystack.Grid{}
208+
for grid := range hisReadChannel {
209+
grids = append(grids, grid)
210+
}
211+
combinedChannel <- grids
212+
}()
213+
214+
// Read all the points in parallel using goroutines.
215+
var wg sync.WaitGroup
216+
wg.Add(len(points))
217+
for _, point := range points {
218+
go readPoint(point, hisReadChannel, &wg)
219+
}
220+
wg.Wait()
221+
close(hisReadChannel)
222+
223+
grids := <-combinedChannel
192224
response := responseFromGrids(grids)
193225
// Make the display name on the "val" fields the names of the points.
194226
for _, frame := range response.Frames {
@@ -213,19 +245,25 @@ func (datasource *Datasource) query(ctx context.Context, pCtx backend.PluginCont
213245
}
214246
}
215247

248+
// Creates a response from the input grids. The frames in the result are sorted by display name.
216249
func responseFromGrids(grids []haystack.Grid) backend.DataResponse {
217-
var response backend.DataResponse
250+
frames := data.Frames{}
218251
for _, grid := range grids {
219252
frame, frameErr := dataFrameFromGrid(grid)
220253
if frameErr != nil {
221254
log.DefaultLogger.Error(frameErr.Error())
222255
return backend.ErrDataResponse(backend.StatusBadRequest, fmt.Sprintf("Frame conversion failure: %v", frameErr.Error()))
223256
}
224-
225-
// add the frames to the response.
226-
response.Frames = append(response.Frames, frame)
227-
response.Status = backend.StatusOK
257+
frames = append(frames, frame)
228258
}
259+
260+
sort.Slice(frames, func(i, j int) bool {
261+
return frames[i].Name < frames[j].Name
262+
})
263+
264+
var response backend.DataResponse
265+
response.Frames = frames
266+
response.Status = backend.StatusOK
229267
return response
230268
}
231269

0 commit comments

Comments
 (0)