Skip to content

Commit 387ca58

Browse files
committed
temporary: detect schema registry HTTP errors.
1 parent 6c34fe4 commit 387ca58

File tree

2 files changed

+32
-6
lines changed

2 files changed

+32
-6
lines changed

avroregistry/message.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ import (
66
"encoding/binary"
77
"encoding/json"
88
"fmt"
9+
"log"
10+
"net/http"
911

1012
"github.com/heetch/avro"
1113
)
@@ -74,7 +76,8 @@ func (r decodingRegistry) DecodeSchemaID(msg []byte) (int64, []byte) {
7476
//
7577
// See https://docs.confluent.io/current/schema-registry/develop/api.html#get--schemas-ids-int-%20id
7678
func (r decodingRegistry) SchemaForID(ctx context.Context, id int64) (*avro.Type, error) {
77-
req := r.r.newRequest(ctx, "GET", fmt.Sprintf("/schemas/ids/%d", id), nil)
79+
req := r.r.newRequest(ctx, http.MethodGet, fmt.Sprintf("/schemas/ids/%d", id), nil)
80+
log.Printf("=======> %d getting schema with ID: %s", id, req.URL.String())
7881
var resp struct {
7982
Schema string `json:"schema"`
8083
}

avroregistry/registry.go

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,15 @@ import (
66
"bytes"
77
"context"
88
"encoding/json"
9+
"errors"
910
"fmt"
1011
"io"
12+
"log"
1113
"net/http"
1214
"net/url"
15+
"os"
1316
"strconv"
17+
"syscall"
1418
"time"
1519

1620
"github.com/heetch/avro"
@@ -102,7 +106,7 @@ func (r *Registry) Register(ctx context.Context, subject string, schema *avro.Ty
102106
if err != nil {
103107
return 0, err
104108
}
105-
req := r.newRequest(ctx, "POST", fmt.Sprintf("/subjects/%s/versions", subject), bytes.NewReader(data))
109+
req := r.newRequest(ctx, http.MethodPost, fmt.Sprintf("/subjects/%s/versions", subject), bytes.NewReader(data))
106110
var resp struct {
107111
ID int64 `json:"id"`
108112
}
@@ -122,14 +126,14 @@ func (r *Registry) SetCompatibility(ctx context.Context, subject string, mode av
122126
if err != nil {
123127
return err
124128
}
125-
return r.doRequest(r.newRequest(ctx, "PUT", "/config/"+subject, bytes.NewReader(data)), nil)
129+
return r.doRequest(r.newRequest(ctx, http.MethodPut, "/config/"+subject, bytes.NewReader(data)), nil)
126130
}
127131

128132
// DeleteSubject deletes the given subject from the registry.
129133
//
130134
// See https://docs.confluent.io/current/schema-registry/develop/api.html#delete--subjects-(string-%20subject)
131135
func (r *Registry) DeleteSubject(ctx context.Context, subject string) error {
132-
return r.doRequest(r.newRequest(ctx, "DELETE", "/subjects/"+subject, nil), nil)
136+
return r.doRequest(r.newRequest(ctx, http.MethodDelete, "/subjects/"+subject, nil), nil)
133137
}
134138

135139
// Schema gets a specific version of the schema registered under this subject
@@ -166,7 +170,17 @@ func validateVersion(version string) error {
166170
}
167171

168172
func (r *Registry) newRequest(ctx context.Context, method string, urlStr string, body io.Reader) *http.Request {
169-
req, err := http.NewRequestWithContext(ctx, method, r.params.ServerURL+urlStr, body)
173+
var url string
174+
if method == http.MethodGet {
175+
if u, ok := os.LookupEnv("OVERRIDE_REGISTRY"); ok {
176+
url = u
177+
} else {
178+
url = r.params.ServerURL
179+
}
180+
} else {
181+
url = r.params.ServerURL
182+
}
183+
req, err := http.NewRequestWithContext(ctx, method, url+urlStr, body)
170184
if err != nil {
171185
// Should never happen, as we've checked the URL for validity when
172186
// creating the registry instance.
@@ -184,11 +198,20 @@ func (r *Registry) doRequest(req *http.Request, result interface{}) error {
184198
if r.params.Username != "" {
185199
req.SetBasicAuth(r.params.Username, r.params.Password)
186200
}
201+
log.Printf("===> foo request: %s: %s", req.Method, req.URL)
187202
ctx := req.Context()
188203
attempt := retry.StartWithCancel(r.params.RetryStrategy, nil, ctx.Done())
189204
for attempt.Next() {
190205
resp, err := http.DefaultClient.Do(req)
191206
if err != nil {
207+
var urlError *url.Error
208+
if errors.As(err, &urlError) {
209+
log.Printf("=====> net error: %v: temporary: %t, timeout: %t: detected: %t", urlError, urlError.Temporary(), urlError.Timeout(), isTemporaryError(err))
210+
}
211+
if errors.Is(err, syscall.ECONNREFUSED) {
212+
log.Printf("===> connection refused detected")
213+
err = fmt.Errorf("should be a retryable error this one: %w", err)
214+
}
192215
if !attempt.More() || !isTemporaryError(err) {
193216
return err
194217
}
@@ -201,7 +224,7 @@ func (r *Registry) doRequest(req *http.Request, result interface{}) error {
201224
if !attempt.More() {
202225
return err
203226
}
204-
if err, ok := err.(*apiError); ok && err.StatusCode/100 != 5 {
227+
if err, ok := err.(*apiError); ok && err.StatusCode != http.StatusInternalServerError {
205228
// It's not a 5xx error. We want to retry on 5xx
206229
// errors, because the Confluent Avro registry
207230
// can occasionally return them as a matter of

0 commit comments

Comments
 (0)