Skip to content

Commit 9499ff0

Browse files
ddelemenyfmassot
authored andcommitted
Simplify multisearch
1 parent 18e4813 commit 9499ff0

File tree

1 file changed

+45
-75
lines changed

1 file changed

+45
-75
lines changed

pkg/quickwit/client/client.go

Lines changed: 45 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ type DatasourceInfo struct {
3131
ShouldInit bool
3232
}
3333

34+
// TODO: Move ConfiguredFields closer to handlers, the client layer doesn't need this stuff
3435
type ConfiguredFields struct {
3536
TimeField string
3637
TimeOutputFormat string
@@ -43,61 +44,23 @@ type Client interface {
4344
ExecuteMultisearch(r []*SearchRequest) (*MultiSearchResponse, error)
4445
}
4546

47+
var logger = log.New()
48+
4649
// NewClient creates a new Quickwit client
4750
var NewClient = func(ctx context.Context, ds *DatasourceInfo) (Client, error) {
48-
logger := log.New()
4951
logger.Debug("Creating new client", "index", ds.Database)
5052

5153
return &baseClientImpl{
52-
logger: logger,
53-
ctx: ctx,
54-
ds: ds,
55-
index: ds.Database,
54+
ctx: ctx,
55+
ds: ds,
56+
index: ds.Database,
5657
}, nil
5758
}
5859

5960
type baseClientImpl struct {
60-
ctx context.Context
61-
ds *DatasourceInfo
62-
index string
63-
logger log.Logger
64-
}
65-
66-
type multiRequest struct {
67-
header map[string]interface{}
68-
body interface{}
69-
interval time.Duration
70-
}
71-
72-
func (c *baseClientImpl) encodeBatchRequests(requests []*multiRequest) ([]byte, error) {
73-
c.logger.Debug("Encoding batch requests to json", "batch requests", len(requests))
74-
start := time.Now()
75-
76-
payload := bytes.Buffer{}
77-
for _, r := range requests {
78-
reqHeader, err := json.Marshal(r.header)
79-
if err != nil {
80-
return nil, err
81-
}
82-
payload.WriteString(string(reqHeader) + "\n")
83-
84-
reqBody, err := json.Marshal(r.body)
85-
86-
if err != nil {
87-
return nil, err
88-
}
89-
90-
body := string(reqBody)
91-
body = strings.ReplaceAll(body, "$__interval_ms", strconv.FormatInt(r.interval.Milliseconds(), 10))
92-
body = strings.ReplaceAll(body, "$__interval", r.interval.String())
93-
94-
payload.WriteString(body + "\n")
95-
}
96-
97-
elapsed := time.Since(start)
98-
c.logger.Debug("Encoded batch requests to json", "took", elapsed)
99-
100-
return payload.Bytes(), nil
61+
ctx context.Context
62+
ds *DatasourceInfo
63+
index string
10164
}
10265

10366
func (c *baseClientImpl) makeRequest(method, uriPath, uriQuery string, body []byte) (*http.Request, error) {
@@ -122,9 +85,7 @@ func (c *baseClientImpl) makeRequest(method, uriPath, uriQuery string, body []by
12285
}
12386

12487
func (c *baseClientImpl) ExecuteMultisearch(requests []*SearchRequest) (*MultiSearchResponse, error) {
125-
c.logger.Debug("Executing multisearch", "search requests", requests)
126-
127-
req, err := c.createMultiSearchRequests(requests)
88+
req, err := c.createMultiSearchRequest(requests, c.index)
12889
if err != nil {
12990
return nil, err
13091
}
@@ -135,11 +96,11 @@ func (c *baseClientImpl) ExecuteMultisearch(requests []*SearchRequest) (*MultiSe
13596
}
13697
defer func() {
13798
if err := res.Body.Close(); err != nil {
138-
c.logger.Warn("Failed to close response body", "err", err)
99+
logger.Warn("Failed to close response body", "err", err)
139100
}
140101
}()
141102

142-
c.logger.Debug("Received multisearch response", "code", res.StatusCode, "status", res.Status, "content-length", res.ContentLength)
103+
logger.Debug("Received multisearch response", "code", res.StatusCode, "status", res.Status, "content-length", res.ContentLength)
143104

144105
if res.StatusCode >= 400 {
145106
qe := QuickwitQueryError{
@@ -151,12 +112,12 @@ func (c *baseClientImpl) ExecuteMultisearch(requests []*SearchRequest) (*MultiSe
151112
}
152113

153114
errorPayload, _ := json.Marshal(qe)
154-
c.logger.Error(string(errorPayload))
115+
logger.Error(string(errorPayload))
155116
return nil, fmt.Errorf(string(errorPayload))
156117
}
157118

158119
start := time.Now()
159-
c.logger.Debug("Decoding multisearch json response")
120+
logger.Debug("Decoding multisearch json response")
160121

161122
var msr MultiSearchResponse
162123
dec := json.NewDecoder(res.Body)
@@ -166,44 +127,53 @@ func (c *baseClientImpl) ExecuteMultisearch(requests []*SearchRequest) (*MultiSe
166127
}
167128

168129
elapsed := time.Since(start)
169-
c.logger.Debug("Decoded multisearch json response", "took", elapsed)
130+
logger.Debug("Decoded multisearch json response", "took", elapsed)
170131

171132
return &msr, nil
172133
}
173134

174-
func (c *baseClientImpl) createMultiSearchRequests(searchRequests []*SearchRequest) (*http.Request, error) {
175-
multiRequests := []*multiRequest{}
176-
177-
for _, searchReq := range searchRequests {
178-
mr := multiRequest{
179-
header: map[string]interface{}{
180-
"ignore_unavailable": true,
181-
"index": strings.Split(c.index, ","),
182-
},
183-
body: searchReq,
184-
interval: searchReq.Interval,
135+
func (c *baseClientImpl) makeMultiSearchPayload(searchRequests []*SearchRequest, index string) ([]byte, error) {
136+
// Format, marshall and interpolate
137+
payload := bytes.Buffer{}
138+
for _, r := range searchRequests {
139+
header := map[string]interface{}{
140+
"ignore_unavailable": true,
141+
"index": strings.Split(index, ","),
142+
}
143+
reqHeader, err := json.Marshal(header)
144+
if err != nil {
145+
return nil, err
185146
}
147+
payload.WriteString(string(reqHeader) + "\n")
148+
149+
reqBody, err := json.Marshal(r)
186150

187-
multiRequests = append(multiRequests, &mr)
151+
if err != nil {
152+
return nil, err
153+
}
154+
155+
body := string(reqBody)
156+
body = strings.ReplaceAll(body, "$__interval_ms", strconv.FormatInt(r.Interval.Milliseconds(), 10))
157+
body = strings.ReplaceAll(body, "$__interval", r.Interval.String())
158+
159+
payload.WriteString(body + "\n")
188160
}
161+
return payload.Bytes(), nil
162+
}
189163

190-
bytes, err := c.encodeBatchRequests(multiRequests)
164+
func (c *baseClientImpl) createMultiSearchRequest(requests []*SearchRequest, index string) (*http.Request, error) {
165+
body, err := c.makeMultiSearchPayload(requests, index)
191166
if err != nil {
192167
return nil, err
193168
}
194169

195-
queryParams := c.getMultiSearchQueryParameters()
196-
197-
return c.makeRequest(http.MethodPost, "_elastic/_msearch", queryParams, bytes)
198-
}
199-
200-
func (c *baseClientImpl) getMultiSearchQueryParameters() string {
201170
var qs []string
202-
203171
maxConcurrentShardRequests := c.ds.MaxConcurrentShardRequests
204172
if maxConcurrentShardRequests == 0 {
205173
maxConcurrentShardRequests = 5
206174
}
207175
qs = append(qs, fmt.Sprintf("max_concurrent_shard_requests=%d", maxConcurrentShardRequests))
208-
return strings.Join(qs, "&")
176+
queryParams := strings.Join(qs, "&")
177+
178+
return c.makeRequest(http.MethodPost, "_elastic/_msearch", queryParams, body)
209179
}

0 commit comments

Comments
 (0)