Skip to content

Commit 2c241f1

Browse files
authored
Review handling of status code in HTTP requests (#1549)
In a number of places we were not handling HTTP status codes, what may hide issues or complicate debugging. Also ensure that body is closed in some cases where it was not being closed.
1 parent 7befcd5 commit 2c241f1

File tree

13 files changed

+272
-108
lines changed

13 files changed

+272
-108
lines changed

internal/benchrunner/runners/rally/metrics.go

Lines changed: 19 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -112,9 +112,14 @@ func (c *collector) stop() {
112112
}
113113

114114
func (c *collector) collectMetricsBeforeRallyRun() {
115-
_, err := c.esAPI.Indices.Refresh(c.esAPI.Indices.Refresh.WithIndex(c.datastream))
115+
resp, err := c.esAPI.Indices.Refresh(c.esAPI.Indices.Refresh.WithIndex(c.datastream))
116116
if err != nil {
117-
logger.Errorf("unable to refresh data stream at the beginning of rally run")
117+
logger.Errorf("unable to refresh data stream at the beginning of rally run: %s", err)
118+
return
119+
}
120+
defer resp.Body.Close()
121+
if resp.IsError() {
122+
logger.Errorf("unable to refresh data stream at the beginning of rally run: %s", resp.String())
118123
return
119124
}
120125

@@ -157,19 +162,13 @@ func (c *collector) publish(events [][]byte) {
157162
logger.Errorf("error indexing event in metricstore: %w", err)
158163
return
159164
}
160-
161-
if resp.Body == nil {
162-
logger.Errorf("empty index response body from metricstore: %w", err)
163-
return
164-
}
165+
defer resp.Body.Close()
165166

166167
body, err := io.ReadAll(resp.Body)
167168
if err != nil {
168169
logger.Errorf("failed to read index response body from metricstore: %w", err)
169170
}
170171

171-
resp.Body.Close()
172-
173172
if resp.StatusCode != 201 {
174173
logger.Errorf("error indexing event in metricstore (%d): %s: %v", resp.StatusCode, resp.Status(), elasticsearch.NewError(body))
175174
}
@@ -187,18 +186,18 @@ func (c *collector) createMetricsIndex() {
187186

188187
logger.Debugf("creating %s index in metricstore...", c.indexName())
189188

190-
createRes, err := c.metricsAPI.Indices.Create(
189+
resp, err := c.metricsAPI.Indices.Create(
191190
c.indexName(),
192191
c.metricsAPI.Indices.Create.WithBody(reader),
193192
)
194193
if err != nil {
195194
logger.Errorf("could not create index: %w", err)
196195
return
197196
}
198-
createRes.Body.Close()
197+
defer resp.Body.Close()
199198

200-
if createRes.IsError() {
201-
logger.Errorf("got a response error while creating index")
199+
if resp.IsError() {
200+
logger.Errorf("got a response error while creating index: %s", resp.String())
202201
}
203202
}
204203

@@ -287,9 +286,14 @@ func (c *collector) collectDiskUsage() map[string]ingest.DiskUsage {
287286
}
288287

289288
func (c *collector) collectMetricsAfterRallyRun() {
290-
_, err := c.esAPI.Indices.Refresh(c.esAPI.Indices.Refresh.WithIndex(c.datastream))
289+
resp, err := c.esAPI.Indices.Refresh(c.esAPI.Indices.Refresh.WithIndex(c.datastream))
291290
if err != nil {
292-
logger.Errorf("unable to refresh data stream at the end of rally run")
291+
logger.Errorf("unable to refresh data stream at the end of rally run: %s", err)
292+
return
293+
}
294+
defer resp.Body.Close()
295+
if resp.IsError() {
296+
logger.Errorf("unable to refresh data stream at the end of rally run: %s", resp.String())
293297
return
294298
}
295299

internal/benchrunner/runners/rally/runner.go

Lines changed: 77 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"errors"
1313
"fmt"
1414
"io"
15+
"net/http"
1516
"os"
1617
"os/exec"
1718
"path/filepath"
@@ -336,10 +337,20 @@ func (r *runner) collectAndSummarizeMetrics() (*metricsSummary, error) {
336337

337338
func (r *runner) deleteDataStreamDocs(dataStream string) error {
338339
body := strings.NewReader(`{ "query": { "match_all": {} } }`)
339-
_, err := r.options.ESAPI.DeleteByQuery([]string{dataStream}, body)
340+
resp, err := r.options.ESAPI.DeleteByQuery([]string{dataStream}, body)
340341
if err != nil {
341-
return err
342+
return fmt.Errorf("failed to delete data stream docs for data stream %s: %w", dataStream, err)
343+
}
344+
defer resp.Body.Close()
345+
346+
if resp.StatusCode == http.StatusNotFound {
347+
// Unavailable index is ok, this means that data is already not there.
348+
return nil
342349
}
350+
if resp.IsError() {
351+
return fmt.Errorf("failed to delete data stream docs for data stream %s: %s", dataStream, resp.String())
352+
}
353+
343354
return nil
344355
}
345356

@@ -665,6 +676,9 @@ func (r *runner) reindexData() error {
665676
return fmt.Errorf("error getting mapping: %w", err)
666677
}
667678
defer mappingRes.Body.Close()
679+
if mappingRes.IsError() {
680+
return fmt.Errorf("error getting mapping: %s", mappingRes)
681+
}
668682

669683
body, err := io.ReadAll(mappingRes.Body)
670684
if err != nil {
@@ -709,7 +723,7 @@ func (r *runner) reindexData() error {
709723
defer createRes.Body.Close()
710724

711725
if createRes.IsError() {
712-
return errors.New("got a response error while creating index")
726+
return fmt.Errorf("got a response error while creating index: %s", createRes)
713727
}
714728

715729
bodyReader := strings.NewReader(`{"query":{"match_all":{}}}`)
@@ -725,21 +739,13 @@ func (r *runner) reindexData() error {
725739
return fmt.Errorf("error executing search: %w", err)
726740
}
727741
defer res.Body.Close()
728-
729-
type searchRes struct {
730-
Error *struct {
731-
Reason string `json:"reson"`
732-
} `json:"error"`
733-
ScrollID string `json:"_scroll_id"`
734-
Hits []struct {
735-
ID string `json:"_id"`
736-
Source map[string]interface{} `json:"_source"`
737-
} `json:"hits"`
742+
if res.IsError() {
743+
return fmt.Errorf("error executing search: %s", res)
738744
}
739745

740746
// Iterate through the search results using the Scroll API
741747
for {
742-
var sr searchRes
748+
var sr searchResponse
743749
if err := json.NewDecoder(res.Body).Decode(&sr); err != nil {
744750
return fmt.Errorf("error decoding search response: %w", err)
745751
}
@@ -752,40 +758,67 @@ func (r *runner) reindexData() error {
752758
break
753759
}
754760

755-
var bulkBodyBuilder strings.Builder
756-
for _, hit := range sr.Hits {
757-
bulkBodyBuilder.WriteString(fmt.Sprintf("{\"index\":{\"_index\":\"%s\",\"_id\":\"%s\"}}\n", indexName, hit.ID))
758-
enriched := r.enrichEventWithBenchmarkMetadata(hit.Source)
759-
src, err := json.Marshal(enriched)
760-
if err != nil {
761-
return fmt.Errorf("error decoding _source: %w", err)
762-
}
763-
bulkBodyBuilder.WriteString(fmt.Sprintf("%s\n", string(src)))
761+
err := r.bulkMetrics(indexName, sr)
762+
if err != nil {
763+
return err
764764
}
765+
}
766+
767+
logger.Debug("reindexing operation finished")
768+
return nil
769+
}
765770

766-
logger.Debugf("bulk request of %d events...", len(sr.Hits))
771+
type searchResponse struct {
772+
Error *struct {
773+
Reason string `json:"reson"`
774+
} `json:"error"`
775+
ScrollID string `json:"_scroll_id"`
776+
Hits []struct {
777+
ID string `json:"_id"`
778+
Source map[string]interface{} `json:"_source"`
779+
} `json:"hits"`
780+
}
767781

768-
bulkRes, err := r.options.ESMetricsAPI.Bulk(strings.NewReader(bulkBodyBuilder.String()))
782+
func (r *runner) bulkMetrics(indexName string, sr searchResponse) error {
783+
var bulkBodyBuilder strings.Builder
784+
for _, hit := range sr.Hits {
785+
bulkBodyBuilder.WriteString(fmt.Sprintf("{\"index\":{\"_index\":\"%s\",\"_id\":\"%s\"}}\n", indexName, hit.ID))
786+
enriched := r.enrichEventWithBenchmarkMetadata(hit.Source)
787+
src, err := json.Marshal(enriched)
769788
if err != nil {
770-
return fmt.Errorf("error performing the bulk index request: %w", err)
789+
return fmt.Errorf("error decoding _source: %w", err)
771790
}
772-
bulkRes.Body.Close()
791+
bulkBodyBuilder.WriteString(fmt.Sprintf("%s\n", string(src)))
792+
}
773793

774-
if sr.ScrollID == "" {
775-
return errors.New("error getting scroll ID")
776-
}
794+
logger.Debugf("bulk request of %d events...", len(sr.Hits))
777795

778-
res, err = r.options.ESAPI.Scroll(
779-
r.options.ESAPI.Scroll.WithScrollID(sr.ScrollID),
780-
r.options.ESAPI.Scroll.WithScroll(time.Minute),
781-
)
782-
if err != nil {
783-
return fmt.Errorf("error executing scroll: %s", err)
784-
}
785-
res.Body.Close()
796+
resp, err := r.options.ESMetricsAPI.Bulk(strings.NewReader(bulkBodyBuilder.String()))
797+
if err != nil {
798+
return fmt.Errorf("error performing the bulk index request: %w", err)
799+
}
800+
defer resp.Body.Close()
801+
if resp.IsError() {
802+
return fmt.Errorf("error performing the bulk index request: %s", resp.String())
803+
}
804+
805+
if sr.ScrollID == "" {
806+
return errors.New("error getting scroll ID")
807+
}
808+
809+
resp, err = r.options.ESAPI.Scroll(
810+
r.options.ESAPI.Scroll.WithScrollID(sr.ScrollID),
811+
r.options.ESAPI.Scroll.WithScroll(time.Minute),
812+
)
813+
if err != nil {
814+
return fmt.Errorf("error executing scroll: %s", err)
815+
}
816+
defer resp.Body.Close()
817+
818+
if resp.IsError() {
819+
return fmt.Errorf("error executing scroll: %s", resp.String())
786820
}
787821

788-
logger.Debug("reindexing operation finished")
789822
return nil
790823
}
791824

@@ -809,12 +842,17 @@ func (r *runner) enrichEventWithBenchmarkMetadata(e map[string]interface{}) map[
809842
func getTotalHits(esapi *elasticsearch.API, dataStream string) (int, error) {
810843
resp, err := esapi.Count(
811844
esapi.Count.WithIndex(dataStream),
845+
esapi.Count.WithIgnoreUnavailable(true),
812846
)
813847
if err != nil {
814848
return 0, fmt.Errorf("could not search data stream: %w", err)
815849
}
816850
defer resp.Body.Close()
817851

852+
if resp.IsError() {
853+
return 0, fmt.Errorf("failed to get hits count: %s", resp.String())
854+
}
855+
818856
var results struct {
819857
Count int
820858
Error *struct {

internal/benchrunner/runners/system/metrics.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,7 @@ func (c *collector) createMetricsIndex() {
196196
createRes.Body.Close()
197197

198198
if createRes.IsError() {
199-
logger.Debug("got a response error while creating index")
199+
logger.Debug("got a response error while creating index: %s", createRes)
200200
}
201201
}
202202

0 commit comments

Comments
 (0)