Skip to content

Commit ab82563

Browse files
authored
loki.write: always read and close response body (#4994)
* Always read and close response body
1 parent 0d0fb65 commit ab82563

File tree

2 files changed

+34
-17
lines changed

2 files changed

+34
-17
lines changed

internal/component/common/loki/client/consumer_fanout.go

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ import (
1919
"github.com/prometheus/common/model"
2020

2121
"github.com/grafana/alloy/internal/component/common/loki"
22-
lokiutil "github.com/grafana/alloy/internal/loki/util"
2322
"github.com/grafana/alloy/internal/runtime/logging/level"
2423
"github.com/grafana/alloy/internal/useragent"
2524
"github.com/grafana/dskit/backoff"
@@ -302,7 +301,6 @@ func (c *client) sendBatch(tenantID string, batch *batch) {
302301
if err == nil {
303302
c.metrics.sentBytes.WithLabelValues(c.cfg.URL.Host, tenantID).Add(bufBytes)
304303
c.metrics.sentEntries.WithLabelValues(c.cfg.URL.Host, tenantID).Add(float64(entriesCount))
305-
306304
return
307305
}
308306

@@ -363,7 +361,18 @@ func (c *client) send(ctx context.Context, tenantID string, buf []byte) (int, er
363361
if err != nil {
364362
return -1, err
365363
}
366-
defer lokiutil.LogError(c.logger, "closing response body", resp.Body.Close)
364+
365+
// NOTE: it is important in go to fully read the body and
366+
// close it so that the connection can be reused.
367+
// We only partially read the body if we encounter a non 2xx error
368+
// so we should always consume whats left.
369+
// https://github.com/golang/go/blob/32a9804c7ba3f4a0e0bd26cc24b9204860a49ec8/src/net/http/response.go#L59-L64
370+
// It is unclear that we always need to drain the body but
371+
// https://github.com/golang/go/issues/60240#issuecomment-1551060433 seems to indicate that we should.
372+
defer func() {
373+
_, _ = io.Copy(io.Discard, resp.Body)
374+
_ = resp.Body.Close()
375+
}()
367376

368377
if resp.StatusCode/100 != 2 {
369378
scanner := bufio.NewScanner(io.LimitReader(resp.Body, maxErrMsgLen))

internal/component/common/loki/client/consumer_wal.go

Lines changed: 22 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ import (
2525
"github.com/grafana/alloy/internal/component/common/loki"
2626
"github.com/grafana/alloy/internal/component/common/loki/client/internal"
2727
"github.com/grafana/alloy/internal/component/common/loki/wal"
28-
lokiutil "github.com/grafana/alloy/internal/loki/util"
2928
"github.com/grafana/alloy/internal/useragent"
3029
)
3130

@@ -394,7 +393,7 @@ func (c *walClient) runSendOldBatches() {
394393
c.sendQueue.enqueue(qb)
395394
}
396395

397-
batchesToFlush = batchesToFlush[:0] // renew slide
396+
batchesToFlush = batchesToFlush[:0] // renew slice
398397
}
399398
}
400399
}
@@ -445,7 +444,6 @@ func (c *walClient) sendBatch(ctx context.Context, tenantID string, batch *batch
445444
if err == nil {
446445
c.metrics.sentBytes.WithLabelValues(c.cfg.URL.Host, tenantID).Add(bufBytes)
447446
c.metrics.sentEntries.WithLabelValues(c.cfg.URL.Host, tenantID).Add(float64(entriesCount))
448-
449447
return
450448
}
451449

@@ -464,17 +462,15 @@ func (c *walClient) sendBatch(ctx context.Context, tenantID string, batch *batch
464462
}
465463
}
466464

467-
if err != nil {
468-
level.Error(c.logger).Log("msg", "final error sending batch", "status", status, "tenant", tenantID, "error", err)
469-
// If the reason for the last retry error was rate limiting, count the drops as such, even if the previous errors
470-
// were for a different reason
471-
dropReason := ReasonGeneric
472-
if batchIsRateLimited(status) {
473-
dropReason = ReasonRateLimited
474-
}
475-
c.metrics.droppedBytes.WithLabelValues(c.cfg.URL.Host, tenantID, dropReason).Add(bufBytes)
476-
c.metrics.droppedEntries.WithLabelValues(c.cfg.URL.Host, tenantID, dropReason).Add(float64(entriesCount))
465+
level.Error(c.logger).Log("msg", "final error sending batch, no retries left, dropping data", "status", status, "tenant", tenantID, "error", err)
466+
// If the reason for the last retry error was rate limiting, count the drops as such, even if the previous errors
467+
// were for a different reason
468+
dropReason := ReasonGeneric
469+
if batchIsRateLimited(status) {
470+
dropReason = ReasonRateLimited
477471
}
472+
c.metrics.droppedBytes.WithLabelValues(c.cfg.URL.Host, tenantID, dropReason).Add(bufBytes)
473+
c.metrics.droppedEntries.WithLabelValues(c.cfg.URL.Host, tenantID, dropReason).Add(float64(entriesCount))
478474
}
479475

480476
func (c *walClient) send(ctx context.Context, tenantID string, buf []byte) (int, error) {
@@ -509,7 +505,19 @@ func (c *walClient) send(ctx context.Context, tenantID string, buf []byte) (int,
509505
if err != nil {
510506
return -1, err
511507
}
512-
defer lokiutil.LogError(c.logger, "closing response body", resp.Body.Close)
508+
509+
// NOTE: it is important in go to fully read the body and
510+
// close it so that the connection can be reused.
511+
// We only partially read the body if we encounter a non 2xx error
512+
// so we should always consume whats left.
513+
// https://github.com/golang/go/blob/32a9804c7ba3f4a0e0bd26cc24b9204860a49ec8/src/net/http/response.go#L59-L64
514+
// It is unclear that we always need to drain the body but
515+
// https://github.com/golang/go/issues/60240#issuecomment-1551060433 seems to indicate that we should.
516+
517+
defer func() {
518+
_, _ = io.Copy(io.Discard, resp.Body)
519+
_ = resp.Body.Close()
520+
}()
513521

514522
if resp.StatusCode/100 != 2 {
515523
scanner := bufio.NewScanner(io.LimitReader(resp.Body, maxErrMsgLen))

0 commit comments

Comments
 (0)