|
| 1 | +package otelhttpclient |
| 2 | + |
| 3 | +import ( |
| 4 | + "fmt" |
| 5 | + "io" |
| 6 | + "net/http" |
| 7 | + "time" |
| 8 | + |
| 9 | + "go.opentelemetry.io/otel" |
| 10 | + "go.opentelemetry.io/otel/attribute" |
| 11 | + "go.opentelemetry.io/otel/metric" |
| 12 | +) |
| 13 | + |
| 14 | +// Refer OpenTelemetry Semantic Conventions for HTTP Client. |
| 15 | +// https://github.com/open-telemetry/semantic-conventions/blob/main/docs/http/http-metrics.md#http-client |
| 16 | +const ( |
| 17 | + metricClientDuration = "http.client.duration" |
| 18 | + metricClientRequestSize = "http.client.request.size" |
| 19 | + metricClientResponseSize = "http.client.response.size" |
| 20 | + |
| 21 | + attributeNetProtoName = "network.protocol.name" |
| 22 | + attributeNetProtoVersion = "network.protocol.version" |
| 23 | + |
| 24 | + attributeServerPort = "server.port" |
| 25 | + attributeServerAddress = "server.address" |
| 26 | + attributeHTTPRoute = "http.route" |
| 27 | + attributeRequestMethod = "http.request.method" |
| 28 | + attributeResponseStatusCode = "http.response.status_code" |
| 29 | +) |
| 30 | + |
| 31 | +type httpTransport struct { |
| 32 | + roundTripper http.RoundTripper |
| 33 | + |
| 34 | + metricClientDuration metric.Float64Histogram |
| 35 | + metricClientRequestSize metric.Int64Counter |
| 36 | + metricClientResponseSize metric.Int64Counter |
| 37 | +} |
| 38 | + |
| 39 | +func NewHTTPTransport(baseTransport http.RoundTripper) http.RoundTripper { |
| 40 | + if _, ok := baseTransport.(*httpTransport); ok { |
| 41 | + return baseTransport |
| 42 | + } |
| 43 | + |
| 44 | + if baseTransport == nil { |
| 45 | + baseTransport = http.DefaultTransport |
| 46 | + } |
| 47 | + |
| 48 | + icl := &httpTransport{roundTripper: baseTransport} |
| 49 | + icl.createMeasures(otel.Meter("github.com/goto/meteor/metrics/otehttpclient")) |
| 50 | + |
| 51 | + return icl |
| 52 | +} |
| 53 | + |
| 54 | +func (tr *httpTransport) RoundTrip(req *http.Request) (*http.Response, error) { |
| 55 | + ctx := req.Context() |
| 56 | + startAt := time.Now() |
| 57 | + labeler, _ := LabelerFromContext(req.Context()) |
| 58 | + |
| 59 | + var bw bodyWrapper |
| 60 | + if req.Body != nil && req.Body != http.NoBody { |
| 61 | + bw.ReadCloser = req.Body |
| 62 | + req.Body = &bw |
| 63 | + } |
| 64 | + |
| 65 | + port := req.URL.Port() |
| 66 | + if port == "" { |
| 67 | + port = "80" |
| 68 | + if req.URL.Scheme == "https" { |
| 69 | + port = "443" |
| 70 | + } |
| 71 | + } |
| 72 | + |
| 73 | + attribs := append(labeler.Get(), |
| 74 | + attribute.String(attributeNetProtoName, "http"), |
| 75 | + attribute.String(attributeRequestMethod, req.Method), |
| 76 | + attribute.String(attributeServerAddress, req.URL.Hostname()), |
| 77 | + attribute.String(attributeServerPort, port), |
| 78 | + ) |
| 79 | + |
| 80 | + resp, err := tr.roundTripper.RoundTrip(req) |
| 81 | + if err != nil { |
| 82 | + attribs = append(attribs, |
| 83 | + attribute.Int(attributeResponseStatusCode, 0), |
| 84 | + attribute.String(attributeNetProtoVersion, fmt.Sprintf("%d.%d", req.ProtoMajor, req.ProtoMinor)), |
| 85 | + ) |
| 86 | + } else { |
| 87 | + attribs = append(attribs, |
| 88 | + attribute.Int(attributeResponseStatusCode, resp.StatusCode), |
| 89 | + attribute.String(attributeNetProtoVersion, fmt.Sprintf("%d.%d", resp.ProtoMajor, resp.ProtoMinor)), |
| 90 | + ) |
| 91 | + } |
| 92 | + |
| 93 | + elapsedTime := float64(time.Since(startAt)) / float64(time.Millisecond) |
| 94 | + withAttribs := metric.WithAttributes(attribs...) |
| 95 | + tr.metricClientDuration.Record(ctx, elapsedTime, withAttribs) |
| 96 | + tr.metricClientRequestSize.Add(ctx, int64(bw.read), withAttribs) |
| 97 | + if resp != nil { |
| 98 | + tr.metricClientResponseSize.Add(ctx, resp.ContentLength, withAttribs) |
| 99 | + } |
| 100 | + |
| 101 | + return resp, err |
| 102 | +} |
| 103 | + |
| 104 | +func (tr *httpTransport) createMeasures(meter metric.Meter) { |
| 105 | + var err error |
| 106 | + |
| 107 | + tr.metricClientRequestSize, err = meter.Int64Counter(metricClientRequestSize) |
| 108 | + handleErr(err) |
| 109 | + |
| 110 | + tr.metricClientResponseSize, err = meter.Int64Counter(metricClientResponseSize) |
| 111 | + handleErr(err) |
| 112 | + |
| 113 | + tr.metricClientDuration, err = meter.Float64Histogram(metricClientDuration) |
| 114 | + handleErr(err) |
| 115 | +} |
| 116 | + |
| 117 | +func handleErr(err error) { |
| 118 | + if err != nil { |
| 119 | + otel.Handle(err) |
| 120 | + } |
| 121 | +} |
| 122 | + |
| 123 | +// bodyWrapper wraps a http.Request.Body (an io.ReadCloser) to track the number |
| 124 | +// of bytes read and the last error. |
| 125 | +type bodyWrapper struct { |
| 126 | + io.ReadCloser |
| 127 | + |
| 128 | + read int |
| 129 | + err error |
| 130 | +} |
| 131 | + |
| 132 | +func (w *bodyWrapper) Read(b []byte) (int, error) { |
| 133 | + n, err := w.ReadCloser.Read(b) |
| 134 | + w.read += n |
| 135 | + w.err = err |
| 136 | + return n, err |
| 137 | +} |
| 138 | + |
| 139 | +func (w *bodyWrapper) Close() error { |
| 140 | + return w.ReadCloser.Close() |
| 141 | +} |
0 commit comments