Skip to content

Commit e087f4d

Browse files
committed
feat: add http/Error to error Channel making Error.Header accessible
1 parent 97e14d3 commit e087f4d

File tree

5 files changed

+111
-22
lines changed

5 files changed

+111
-22
lines changed

api/http/error.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ func (e *Error) Error() string {
2727
return e.Err.Error()
2828
case e.Code != "" && e.Message != "":
2929
return fmt.Sprintf("%s: %s", e.Code, e.Message)
30+
case e.Message != "":
31+
return e.Message
3032
default:
3133
return "Unexpected status code " + strconv.Itoa(e.StatusCode)
3234
}
@@ -39,6 +41,23 @@ func (e *Error) Unwrap() error {
3941
return nil
4042
}
4143

44+
// HeaderToString generates a string value from the Header property. Useful in logging.
45+
func (e *Error) HeaderToString(selected []string) string {
46+
headerString := ""
47+
if len(selected) == 0 {
48+
for key := range e.Header {
49+
headerString += fmt.Sprintf("%s: %s\r\n", key, e.Header[key])
50+
}
51+
} else {
52+
for _, candidate := range selected {
53+
if e.Header.Get(candidate) != "" {
54+
headerString += fmt.Sprintf("%s: %s\n", candidate, e.Header.Get(candidate))
55+
}
56+
}
57+
}
58+
return headerString
59+
}
60+
4261
// NewError returns newly created Error initialised with nested error and default values
4362
func NewError(err error) *Error {
4463
return &Error{

api/http/service.go

Lines changed: 1 addition & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ package http
1616
import (
1717
"context"
1818
"encoding/json"
19-
"fmt"
2019
"io"
2120
"mime"
2221
"net/http"
@@ -114,25 +113,7 @@ func (s *service) DoHTTPRequest(req *http.Request, requestCallback RequestCallba
114113
}
115114

116115
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
117-
logHeaders := ""
118-
httpErr := s.parseHTTPError(resp)
119-
for _, candidate := range []string{
120-
"date",
121-
"trace-id",
122-
"trace-sampled",
123-
"X-Influxdb-Build",
124-
"X-Influxdb-Request-ID",
125-
"X-Influxdb-Version",
126-
} {
127-
if httpErr.Header.Get(candidate) != "" {
128-
logHeaders += fmt.Sprintf("%s: %s\n", candidate, httpErr.Header.Get(candidate))
129-
}
130-
}
131-
log.Errorf("http status code: %d, %s\nSelect Response Headers:\n%s",
132-
resp.StatusCode,
133-
httpErr.Error(),
134-
logHeaders)
135-
return httpErr
116+
return s.parseHTTPError(resp)
136117
}
137118
if responseCallback != nil {
138119
err := responseCallback(resp)

api/write_test.go

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,10 @@ import (
88
"fmt"
99
"io"
1010
"math"
11+
ihttp "net/http"
12+
"net/http/httptest"
1113
"runtime"
14+
"strconv"
1215
"strings"
1316
"sync"
1417
"testing"
@@ -265,3 +268,47 @@ func TestFlushWithRetries(t *testing.T) {
265268
// two remained
266269
assert.Equal(t, 2, len(service.Lines()))
267270
}
271+
272+
func TestWriteApiErrorHeaders(t *testing.T) {
273+
calls := 0
274+
var mu sync.Mutex
275+
server := httptest.NewServer(ihttp.HandlerFunc(func(w ihttp.ResponseWriter, r *ihttp.Request) {
276+
mu.Lock()
277+
defer mu.Unlock()
278+
calls++
279+
w.Header().Set("X-Test-Val1", "Not All Correct")
280+
w.Header().Set("X-Test-Val2", "Atlas LV-3B")
281+
w.Header().Set("X-Call-Count", strconv.Itoa(calls))
282+
w.WriteHeader(ihttp.StatusBadRequest)
283+
_, _ = w.Write([]byte(`{ "code": "bad request", "message": "test header" }`))
284+
}))
285+
defer server.Close()
286+
svc := http.NewService(server.URL, "my-token", http.DefaultOptions())
287+
writeAPI := NewWriteAPI("my-org", "my-bucket", svc, write.DefaultOptions().SetBatchSize(5))
288+
defer writeAPI.Close()
289+
errCh := writeAPI.Errors()
290+
var wg sync.WaitGroup
291+
var recErr error
292+
wg.Add(1)
293+
go func() {
294+
for i := 0; i < 3; i++ {
295+
recErr = <-errCh
296+
assert.NotNil(t, recErr, "errCh should not run out of values")
297+
assert.Len(t, recErr.(*http.Error).Header, 6)
298+
assert.NotEqual(t, "", recErr.(*http.Error).Header.Get("Date"))
299+
assert.NotEqual(t, "", recErr.(*http.Error).Header.Get("Content-Length"))
300+
assert.NotEqual(t, "", recErr.(*http.Error).Header.Get("Content-Type"))
301+
assert.Equal(t, strconv.Itoa(i+1), recErr.(*http.Error).Header.Get("X-Call-Count"))
302+
assert.Equal(t, "Not All Correct", recErr.(*http.Error).Header.Get("X-Test-Val1"))
303+
assert.Equal(t, "Atlas LV-3B", recErr.(*http.Error).Header.Get("X-Test-Val2"))
304+
}
305+
wg.Done()
306+
}()
307+
points := test.GenPoints(15)
308+
for i := 0; i < 15; i++ {
309+
writeAPI.WritePoint(points[i])
310+
}
311+
writeAPI.waitForFlushing()
312+
wg.Wait()
313+
assert.Equal(t, calls, 3)
314+
}

internal/write/service.go

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,7 @@ func (w *Service) HandleWrite(ctx context.Context, batch *Batch) error {
164164
if batchToWrite != nil {
165165
perror := w.WriteBatch(ctx, batchToWrite)
166166
if perror != nil {
167+
// fmt.Printf("DEBUG perror type %s\n", reflect.TypeOf(perror))
167168
if isIgnorableError(perror) {
168169
log.Warnf("Write error: %s", perror.Error())
169170
} else {
@@ -196,9 +197,30 @@ func (w *Service) HandleWrite(ctx context.Context, batch *Batch) error {
196197
w.retryAttempts++
197198
log.Debugf("Write proc: next wait for write is %dms\n", w.retryDelay)
198199
} else {
199-
log.Errorf("Write error: %s\n", perror.Error())
200+
logMessage := fmt.Sprintf("Write error: %s", perror.Error())
201+
logHeaders := perror.HeaderToString([]string{
202+
"date",
203+
"trace-id",
204+
"trace-sampled",
205+
"X-Influxdb-Build",
206+
"X-Influxdb-Request-ID",
207+
"X-Influxdb-Version",
208+
})
209+
if len(logHeaders) > 0 {
210+
logMessage += fmt.Sprintf("\nSelect Response Headers:\n%s", logHeaders)
211+
}
212+
log.Error(logMessage)
213+
}
214+
return &http2.Error{
215+
StatusCode: int(perror.StatusCode),
216+
Code: perror.Code,
217+
Message: fmt.Errorf(
218+
"write failed (attempts %d): %w", batchToWrite.RetryAttempts, perror,
219+
).Error(),
220+
Err: perror.Err,
221+
RetryAfter: perror.RetryAfter,
222+
Header: perror.Header,
200223
}
201-
return fmt.Errorf("write failed (attempts %d): %w", batchToWrite.RetryAttempts, perror)
202224
}
203225
}
204226

internal/write/service_test.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -337,10 +337,13 @@ func TestMaxRetryTime(t *testing.T) {
337337
b := NewBatch("2\n", exp)
338338
// First batch will be checked against maxRetryTime and it will expire. New batch will fail and it will added to retry queue
339339
err = srv.HandleWrite(ctx, b)
340+
//fmt.Printf("DEBUG err %v\n", err)
341+
//fmt.Printf("DEBUG err %v\n", err)
340342
require.NotNil(t, err)
341343
// 1st Batch expires and writing 2nd trows error
342344
assert.Equal(t, "write failed (attempts 1): Unexpected status code 429", err.Error())
343345
assert.Equal(t, 1, srv.retryQueue.list.Len())
346+
// fmt.Printf("DEBUG Header len: %d\n", len(err.(*http.Error).Header))
344347

345348
//wait until remaining accumulated retryDelay has passed, because there hasn't been a successful write yet
346349
<-time.After(time.Until(srv.lastWriteAttempt.Add(time.Millisecond * time.Duration(srv.retryDelay))))
@@ -702,3 +705,20 @@ func TestIgnoreErrors(t *testing.T) {
702705
err = srv.HandleWrite(ctx, b)
703706
assert.Error(t, err)
704707
}
708+
709+
func TestHttpErrorHeaders(t *testing.T) {
710+
server := httptest.NewServer(ihttp.HandlerFunc(func(w ihttp.ResponseWriter, r *ihttp.Request) {
711+
w.Header().Set("X-Test-Val1", "Not All Correct")
712+
w.Header().Set("X-Test-Val2", "Atlas LV-3B")
713+
w.WriteHeader(ihttp.StatusBadRequest)
714+
_, _ = w.Write([]byte(`{ "code": "bad request", "message": "test header" }`))
715+
}))
716+
defer server.Close()
717+
svc := NewService("my-org", "my-bucket", http.NewService(server.URL, "", http.DefaultOptions()),
718+
write.DefaultOptions())
719+
err := svc.HandleWrite(context.Background(), NewBatch("1", 20))
720+
assert.Error(t, err)
721+
assert.Equal(t, "400 Bad Request: write failed (attempts 0): 400 Bad Request: { \"code\": \"bad request\", \"message\": \"test header\" }", err.Error())
722+
assert.Equal(t, "Not All Correct", err.(*http.Error).Header.Get("X-Test-Val1"))
723+
assert.Equal(t, "Atlas LV-3B", err.(*http.Error).Header.Get("X-Test-Val2"))
724+
}

0 commit comments

Comments
 (0)