|
| 1 | +// Copyright 2025 The Cockroach Authors. |
| 2 | +// |
| 3 | +// Use of this software is governed by the CockroachDB Software License |
| 4 | +// included in the /LICENSE file. |
| 5 | + |
| 6 | +package cloud |
| 7 | + |
| 8 | +import ( |
| 9 | + "context" |
| 10 | + "io" |
| 11 | + "net/http" |
| 12 | + "sync" |
| 13 | + "sync/atomic" |
| 14 | + "time" |
| 15 | + |
| 16 | + "github.com/cockroachdb/cockroach/pkg/util/log" |
| 17 | + "github.com/cockroachdb/cockroach/pkg/util/tracing" |
| 18 | + "github.com/cockroachdb/crlib/crtime" |
| 19 | + "github.com/cockroachdb/redact" |
| 20 | +) |
| 21 | + |
| 22 | +// maybeAddLogging wraps the provided http.RoundTripper with a logging |
| 23 | +// transport if verbose logging is enabled. |
| 24 | +func maybeAddLogging(inner http.RoundTripper) http.RoundTripper { |
| 25 | + if log.V(1) { |
| 26 | + return &loggingTransport{inner: inner} |
| 27 | + } |
| 28 | + return inner |
| 29 | +} |
| 30 | + |
| 31 | +type loggingTransport struct { |
| 32 | + inner http.RoundTripper |
| 33 | +} |
| 34 | + |
| 35 | +// RoundTrip implements http.RoundTripper. |
| 36 | +func (l *loggingTransport) RoundTrip(request *http.Request) (*http.Response, error) { |
| 37 | + |
| 38 | + // NOTE: Body can be nil if the request has no body. |
| 39 | + requestWatcher := &requestBodyTracker{inner: request.Body} |
| 40 | + if request.Body != nil { |
| 41 | + // Make a shallow clone of the request because the request object itself is |
| 42 | + // supposed to be immutable. |
| 43 | + request = request.Clone(request.Context()) |
| 44 | + request.Body = requestWatcher |
| 45 | + } |
| 46 | + |
| 47 | + now := crtime.NowMono() |
| 48 | + resp, err := l.inner.RoundTrip(request) |
| 49 | + if err != nil { |
| 50 | + log.Dev.Warningf(request.Context(), "%s %s: %v", redact.SafeString(request.Method), request.URL.String(), err) |
| 51 | + return resp, err |
| 52 | + } |
| 53 | + |
| 54 | + logCtx, span := tracing.ForkSpan(request.Context(), "cloud-logging-transport") |
| 55 | + |
| 56 | + resp.Body = &responseBodyTracker{ |
| 57 | + inner: resp.Body, |
| 58 | + ctx: logCtx, |
| 59 | + span: span, |
| 60 | + |
| 61 | + status: redact.SafeString(resp.Status), |
| 62 | + method: redact.SafeString(request.Method), |
| 63 | + url: request.URL.String(), |
| 64 | + requestLatency: now.Elapsed(), |
| 65 | + requestBytes: requestWatcher.readBytes.Load(), |
| 66 | + responseBytes: resp.ContentLength, |
| 67 | + } |
| 68 | + return resp, nil |
| 69 | +} |
| 70 | + |
| 71 | +var _ http.RoundTripper = &loggingTransport{} |
| 72 | + |
| 73 | +// requestBodyTracker is a wrapper around io.ReadCloser that tracks the number of |
| 74 | +// bytes read from the request body. |
| 75 | +type requestBodyTracker struct { |
| 76 | + inner io.ReadCloser |
| 77 | + // readBytes is the number of bytes returned by the underlying Read calls. |
| 78 | + // NOTE(jeffswenson): in practice, I don't think this actually needs to be an |
| 79 | + // atomic value. By the time we observe the value, the request body should be |
| 80 | + // fully consumed. But syscalls in golang are not defined to be memory |
| 81 | + // barriers. So from the perspective of the go race detector there is no |
| 82 | + // synchronization between the goroutine reading the request body and the |
| 83 | + // goroutine sending the request. |
| 84 | + readBytes atomic.Int64 |
| 85 | +} |
| 86 | + |
| 87 | +func (s *requestBodyTracker) Read(p []byte) (int, error) { |
| 88 | + n, err := s.inner.Read(p) |
| 89 | + s.readBytes.Add(int64(n)) |
| 90 | + return n, err |
| 91 | +} |
| 92 | + |
| 93 | +func (s *requestBodyTracker) Close() error { |
| 94 | + return s.inner.Close() |
| 95 | +} |
| 96 | + |
| 97 | +// responseBodyTracker is an io.ReadCloser that wraps the original response |
| 98 | +// body. It counts the number of bytes read from the response body and logs |
| 99 | +// stats about the request when the body is closed. |
| 100 | +type responseBodyTracker struct { |
| 101 | + inner io.ReadCloser |
| 102 | + ctx context.Context |
| 103 | + span *tracing.Span |
| 104 | + |
| 105 | + // status is the HTTP status code of the response. (e.g. "200 OK", "404 Not Found"). |
| 106 | + status redact.SafeString |
| 107 | + // method is the HTTP method of the request (e.g., GET, POST). |
| 108 | + method redact.SafeString |
| 109 | + // url is the URL of the request. |
| 110 | + url string |
| 111 | + // requestLatency is the amount of time spent waiting for the response header. |
| 112 | + requestLatency time.Duration |
| 113 | + // requestBytes is the number of bytes sent in the request body. |
| 114 | + requestBytes int64 |
| 115 | + // number of bytes in the response body. We often do not read the entire |
| 116 | + // body. |
| 117 | + responseBytes int64 |
| 118 | + |
| 119 | + // readErr is the error returned by the last Read() call on the response body. This is |
| 120 | + // expected to be io.EOF when the body is fully read, but may also be `nil` if we are closing |
| 121 | + // an incomplete response body or some network error that occurred after receiving the response |
| 122 | + // header but before reading the whole body. |
| 123 | + readErr error |
| 124 | + // readBytes is the number of bytes read from the response body. This is tracked |
| 125 | + // because we often close a request before reading the entire body. |
| 126 | + readBytes int64 |
| 127 | + // readTime is the amount of time spent waiting in Read() calls. |
| 128 | + readTime time.Duration |
| 129 | + |
| 130 | + // closeOnce ensures Close() logic is only called once. Some callers of |
| 131 | + // Close() are sloppy and call it multiple times. The double log lines are |
| 132 | + // annoying, but the real issue is the *tracing.Span. The tracing |
| 133 | + // infrastructure reuses memory internally, so it is essential that |
| 134 | + // span.Finish is called exactly once. |
| 135 | + closeOnce sync.Once |
| 136 | +} |
| 137 | + |
| 138 | +func (l *responseBodyTracker) Read(p []byte) (int, error) { |
| 139 | + start := crtime.NowMono() |
| 140 | + |
| 141 | + n, err := l.inner.Read(p) |
| 142 | + |
| 143 | + l.readBytes += int64(n) |
| 144 | + l.readTime += start.Elapsed() |
| 145 | + l.readErr = err |
| 146 | + return n, err |
| 147 | +} |
| 148 | + |
| 149 | +func (l *responseBodyTracker) Close() error { |
| 150 | + l.closeOnce.Do(func() { |
| 151 | + if l.readErr == io.EOF || (l.readErr == nil && l.readBytes == l.responseBytes) { |
| 152 | + log.Dev.Infof(l.ctx, "%s %s (%s) sent=%d,recv=%d,request_latency=%s,read_latency=%s", |
| 153 | + l.method, l.url, l.status, |
| 154 | + l.requestBytes, l.readBytes, l.requestLatency, l.readTime) |
| 155 | + } else if l.readErr != nil { |
| 156 | + log.Dev.Warningf(l.ctx, "%s %s (%s) sent=%d,recv=(%d/%d),request_latency=%s,read_latency=%s: %v", |
| 157 | + l.method, l.url, l.status, |
| 158 | + l.requestBytes, l.readBytes, l.responseBytes, l.requestLatency, l.readTime, l.readErr) |
| 159 | + } else { |
| 160 | + log.Dev.Infof(l.ctx, "%s %s (%s) sent=%d,recv=(%d/%d),request_latency=%s,read_latency=%s: closed early", |
| 161 | + l.method, l.url, l.status, |
| 162 | + l.requestBytes, l.readBytes, l.responseBytes, l.requestLatency, l.readTime) |
| 163 | + } |
| 164 | + l.span.Finish() |
| 165 | + }) |
| 166 | + return l.inner.Close() |
| 167 | +} |
0 commit comments