|
| 1 | +// Copyright The OpenTelemetry Authors |
| 2 | +// SPDX-License-Identifier: Apache-2.0 |
| 3 | + |
| 4 | +package observ // import "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp/internal/observ" |
| 5 | + |
| 6 | +import ( |
| 7 | + "context" |
| 8 | + "errors" |
| 9 | + "fmt" |
| 10 | + "net" |
| 11 | + "net/http" |
| 12 | + "net/netip" |
| 13 | + "strconv" |
| 14 | + "strings" |
| 15 | + "sync" |
| 16 | + "time" |
| 17 | + |
| 18 | + "go.opentelemetry.io/otel" |
| 19 | + "go.opentelemetry.io/otel/attribute" |
| 20 | + "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp/internal" |
| 21 | + "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp/internal/x" |
| 22 | + "go.opentelemetry.io/otel/internal/global" |
| 23 | + "go.opentelemetry.io/otel/metric" |
| 24 | + semconv "go.opentelemetry.io/otel/semconv/v1.37.0" |
| 25 | + "go.opentelemetry.io/otel/semconv/v1.37.0/otelconv" |
| 26 | +) |
| 27 | + |
| 28 | +const ( |
| 29 | + // ScopeName is the unique name of the meter used for instrumentation. |
| 30 | + ScopeName = "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp/internal/observ" |
| 31 | + |
| 32 | + // Version is the current version of this instrumentation |
| 33 | + // |
| 34 | + // This matches the version of the exporter. |
| 35 | + Version = internal.Version |
| 36 | +) |
| 37 | + |
| 38 | +var ( |
| 39 | + attrsPool = &sync.Pool{ |
| 40 | + New: func() any { |
| 41 | + const n = 1 + // component.name |
| 42 | + 1 + // component.type |
| 43 | + 1 + // server.addr |
| 44 | + 1 + // server.port |
| 45 | + 1 + // error.port |
| 46 | + 1 // http.response.status.code |
| 47 | + s := make([]attribute.KeyValue, 0, n) |
| 48 | + return &s |
| 49 | + }, |
| 50 | + } |
| 51 | + addOptPool = &sync.Pool{ |
| 52 | + New: func() any { |
| 53 | + const n = 1 // WithAttributeSet |
| 54 | + s := make([]metric.AddOption, 0, n) |
| 55 | + return &s |
| 56 | + }, |
| 57 | + } |
| 58 | + recordPool = &sync.Pool{ |
| 59 | + New: func() any { |
| 60 | + const n = 1 // WithAttributeSet |
| 61 | + s := make([]metric.RecordOption, 0, n) |
| 62 | + return &s |
| 63 | + }, |
| 64 | + } |
| 65 | +) |
| 66 | + |
| 67 | +func get[T any](pool *sync.Pool) *[]T { |
| 68 | + return pool.Get().(*[]T) |
| 69 | +} |
| 70 | + |
| 71 | +func put[T any](pool *sync.Pool, value *[]T) { |
| 72 | + *value = (*value)[:0] |
| 73 | + pool.Put(value) |
| 74 | +} |
| 75 | + |
| 76 | +// GetComponentName returns the constant name for the exporter with the |
| 77 | +// provided id. |
| 78 | +func GetComponentName(id int64) string { |
| 79 | + return fmt.Sprintf("%s/%d", otelconv.ComponentTypeOtlpHTTPLogExporter, id) |
| 80 | +} |
| 81 | + |
| 82 | +// Instrumentation is experimental instrumentation for the exporter. |
| 83 | +type Instrumentation struct { |
| 84 | + inflightMetric metric.Int64UpDownCounter |
| 85 | + exportedMetric metric.Int64Counter |
| 86 | + operationDuration metric.Float64Histogram |
| 87 | + |
| 88 | + presetAttrs []attribute.KeyValue |
| 89 | + addOpt metric.AddOption |
| 90 | + recordOpt metric.RecordOption |
| 91 | +} |
| 92 | + |
| 93 | +// NewInstrumentation returns instrumentation for otlplog http exporter. |
| 94 | +func NewInstrumentation(id int64, target string) (*Instrumentation, error) { |
| 95 | + if !x.Observability.Enabled() { |
| 96 | + return nil, nil |
| 97 | + } |
| 98 | + |
| 99 | + inst := &Instrumentation{} |
| 100 | + |
| 101 | + provider := otel.GetMeterProvider() |
| 102 | + m := provider.Meter( |
| 103 | + ScopeName, |
| 104 | + metric.WithSchemaURL(semconv.SchemaURL), |
| 105 | + metric.WithInstrumentationVersion(Version), |
| 106 | + ) |
| 107 | + |
| 108 | + var e, err error |
| 109 | + logInflight, e := otelconv.NewSDKExporterLogInflight(m) |
| 110 | + if e != nil { |
| 111 | + e = fmt.Errorf("failed to create the inflight metric %w", e) |
| 112 | + err = errors.Join(err, e) |
| 113 | + } |
| 114 | + inst.inflightMetric = logInflight.Inst() |
| 115 | + |
| 116 | + exported, e := otelconv.NewSDKExporterLogExported(m) |
| 117 | + if e != nil { |
| 118 | + e = fmt.Errorf("failed to create the exported metric %w", e) |
| 119 | + err = errors.Join(err, e) |
| 120 | + } |
| 121 | + inst.exportedMetric = exported.Inst() |
| 122 | + |
| 123 | + operation, e := otelconv.NewSDKExporterOperationDuration(m) |
| 124 | + if e != nil { |
| 125 | + e = fmt.Errorf("failed to create the operation duration metric %w", e) |
| 126 | + err = errors.Join(err, e) |
| 127 | + } |
| 128 | + inst.operationDuration = operation.Inst() |
| 129 | + |
| 130 | + if err != nil { |
| 131 | + return nil, err |
| 132 | + } |
| 133 | + |
| 134 | + inst.presetAttrs = setPresetAttrs(GetComponentName(id), target) |
| 135 | + |
| 136 | + inst.addOpt = metric.WithAttributeSet(attribute.NewSet(inst.presetAttrs...)) |
| 137 | + inst.recordOpt = metric.WithAttributeSet(attribute.NewSet(append( |
| 138 | + []attribute.KeyValue{semconv.HTTPResponseStatusCode(http.StatusOK)}, |
| 139 | + inst.presetAttrs..., |
| 140 | + )...)) |
| 141 | + |
| 142 | + return inst, nil |
| 143 | +} |
| 144 | + |
| 145 | +func setPresetAttrs(name, target string) []attribute.KeyValue { |
| 146 | + addrAttrs := ServerAddrAttrs(target) |
| 147 | + |
| 148 | + attrs := make([]attribute.KeyValue, 0, 2+len(addrAttrs)) |
| 149 | + attrs = append( |
| 150 | + attrs, |
| 151 | + semconv.OTelComponentName(name), |
| 152 | + semconv.OTelComponentTypeOtlpHTTPLogExporter, |
| 153 | + ) |
| 154 | + attrs = append(attrs, addrAttrs...) |
| 155 | + return attrs |
| 156 | +} |
| 157 | + |
| 158 | +// ServerAddrAttrs is a function that extracts server address and port attributes |
| 159 | +// from a target string. |
| 160 | +func ServerAddrAttrs(target string) []attribute.KeyValue { |
| 161 | + host, port, err := parseTarget(target) |
| 162 | + if err != nil || (host == "" && port < 0) { |
| 163 | + if err != nil { |
| 164 | + global.Debug("failed to parse target", "target", target, "error", err) |
| 165 | + } |
| 166 | + return nil |
| 167 | + } |
| 168 | + |
| 169 | + if port < 0 { |
| 170 | + return []attribute.KeyValue{semconv.ServerAddress(host)} |
| 171 | + } |
| 172 | + |
| 173 | + if host == "" { |
| 174 | + return []attribute.KeyValue{ |
| 175 | + semconv.ServerPort(port), |
| 176 | + } |
| 177 | + } |
| 178 | + return []attribute.KeyValue{ |
| 179 | + semconv.ServerAddress(host), |
| 180 | + semconv.ServerPort(port), |
| 181 | + } |
| 182 | +} |
| 183 | + |
| 184 | +func (i *Instrumentation) ExportLogs(ctx context.Context, count int64) ExportOp { |
| 185 | + start := time.Now() |
| 186 | + |
| 187 | + addOpt := get[metric.AddOption](addOptPool) |
| 188 | + defer put(addOptPool, addOpt) |
| 189 | + *addOpt = append(*addOpt, i.addOpt) |
| 190 | + i.inflightMetric.Add(ctx, count, *addOpt...) |
| 191 | + |
| 192 | + return ExportOp{ |
| 193 | + ctx: ctx, |
| 194 | + start: start, |
| 195 | + inst: i, |
| 196 | + count: count, |
| 197 | + } |
| 198 | +} |
| 199 | + |
| 200 | +// ExportOp tracks the operationDuration being observed by [Instrumentation.ExportLogs]. |
| 201 | +type ExportOp struct { |
| 202 | + ctx context.Context |
| 203 | + start time.Time |
| 204 | + inst *Instrumentation |
| 205 | + count int64 |
| 206 | +} |
| 207 | + |
| 208 | +// End completes the observation of the operationDuration being observed by a call to |
| 209 | +// [Instrumentation.ExportLogs]. |
| 210 | +// Any error that is encountered is provided as err. |
| 211 | +// |
| 212 | +// If err is not nil, all logs will be recorded as failures unless error is of |
| 213 | +// type [internal.PartialSuccess]. In the case of a PartialSuccess, the number |
| 214 | +// of successfully exported logs will be determined by inspecting the |
| 215 | +// RejectedItems field of the PartialSuccess. |
| 216 | +func (e ExportOp) End(err error, code int) { |
| 217 | + addOpt := get[metric.AddOption](addOptPool) |
| 218 | + defer put(addOptPool, addOpt) |
| 219 | + *addOpt = append(*addOpt, e.inst.addOpt) |
| 220 | + |
| 221 | + e.inst.inflightMetric.Add(e.ctx, -e.count, *addOpt...) |
| 222 | + success := successful(e.count, err) |
| 223 | + e.inst.exportedMetric.Add(e.ctx, success, *addOpt...) |
| 224 | + |
| 225 | + if err != nil { |
| 226 | + attrs := get[attribute.KeyValue](attrsPool) |
| 227 | + defer put(attrsPool, attrs) |
| 228 | + |
| 229 | + *attrs = append(*attrs, e.inst.presetAttrs...) |
| 230 | + *attrs = append(*attrs, semconv.ErrorType(err)) |
| 231 | + |
| 232 | + a := metric.WithAttributeSet(attribute.NewSet(*attrs...)) |
| 233 | + e.inst.exportedMetric.Add(e.ctx, e.count-success, a) |
| 234 | + } |
| 235 | + |
| 236 | + record := get[metric.RecordOption](recordPool) |
| 237 | + defer put(recordPool, record) |
| 238 | + *record = append(*record, e.recordOption(err, code)) |
| 239 | + |
| 240 | + duration := time.Since(e.start).Seconds() |
| 241 | + e.inst.operationDuration.Record(e.ctx, duration, *record...) |
| 242 | +} |
| 243 | + |
| 244 | +func (e ExportOp) recordOption(err error, code int) metric.RecordOption { |
| 245 | + if err == nil { |
| 246 | + return e.inst.recordOpt |
| 247 | + } |
| 248 | + |
| 249 | + attrs := get[attribute.KeyValue](attrsPool) |
| 250 | + defer put(attrsPool, attrs) |
| 251 | + |
| 252 | + *attrs = append(*attrs, e.inst.presetAttrs...) |
| 253 | + *attrs = append( |
| 254 | + *attrs, |
| 255 | + semconv.HTTPResponseStatusCode(code), |
| 256 | + semconv.ErrorType(err), |
| 257 | + ) |
| 258 | + return metric.WithAttributeSet(attribute.NewSet(*attrs...)) |
| 259 | +} |
| 260 | + |
| 261 | +// successful returns the number of successfully exported logs out of the n |
| 262 | +// that were exported based on the provided error. |
| 263 | +// |
| 264 | +// If err is nil, n is returned. All logs were successfully exported. |
| 265 | +// |
| 266 | +// If err is not nil and not an [internal.PartialSuccess] error, 0 is returned. |
| 267 | +// It is assumed all logs failed to be exported. |
| 268 | +// |
| 269 | +// If err is an [internal.PartialSuccess] error, the number of successfully |
| 270 | +// exported logs is computed by subtracting the RejectedItems field from n. If |
| 271 | +// RejectedItems is negative, n is returned. If RejectedItems is greater than |
| 272 | +// n, 0 is returned. |
| 273 | +func successful(count int64, err error) int64 { |
| 274 | + if err == nil { |
| 275 | + return count |
| 276 | + } |
| 277 | + return count - rejected(count, err) |
| 278 | +} |
| 279 | + |
| 280 | +var errPool = sync.Pool{ |
| 281 | + New: func() any { |
| 282 | + return new(internal.PartialSuccess) |
| 283 | + }, |
| 284 | +} |
| 285 | + |
| 286 | +// rejected returns how many out of the n logs exporter were rejected based on |
| 287 | +// the provided non-nil err. |
| 288 | +func rejected(n int64, err error) int64 { |
| 289 | + ps := errPool.Get().(*internal.PartialSuccess) |
| 290 | + defer errPool.Put(ps) |
| 291 | + |
| 292 | + if errors.As(err, ps) { |
| 293 | + // Bound RejectedItems to [0, n]. This should not be needed, |
| 294 | + // but be defensive as this is from an external source. |
| 295 | + return min(max(ps.RejectedItems, 0), n) |
| 296 | + } |
| 297 | + // all logs exported |
| 298 | + return n |
| 299 | +} |
| 300 | + |
| 301 | +// parseEndpoint parses the host and port from target that has the form |
| 302 | +// "host[:port]", or it returns an error if the target is not parsable. |
| 303 | +// |
| 304 | +// If no port is specified, -1 is returned. |
| 305 | +// |
| 306 | +// If no host is specified, an empty string is returned. |
| 307 | +func parseTarget(endpoint string) (string, int, error) { |
| 308 | + if ip := parseIP(endpoint); ip != "" { |
| 309 | + return ip, -1, nil |
| 310 | + } |
| 311 | + |
| 312 | + // If there's no colon, there is no port (IPv6 with no port checked above). |
| 313 | + if !strings.Contains(endpoint, ":") { |
| 314 | + return endpoint, -1, nil |
| 315 | + } |
| 316 | + |
| 317 | + // Otherwise, parse as host:port. |
| 318 | + host, portStr, err := net.SplitHostPort(endpoint) |
| 319 | + if err != nil { |
| 320 | + return "", -1, fmt.Errorf("invalid host:port %q: %w", endpoint, err) |
| 321 | + } |
| 322 | + |
| 323 | + const base, bitSize = 10, 16 |
| 324 | + port16, err := strconv.ParseUint(portStr, base, bitSize) |
| 325 | + if err != nil { |
| 326 | + return "", -1, fmt.Errorf("invalid port %q: %w", portStr, err) |
| 327 | + } |
| 328 | + port := int(port16) |
| 329 | + |
| 330 | + return host, port, nil |
| 331 | +} |
| 332 | + |
| 333 | +// parseIP attempts to parse the entire target as an IP address. |
| 334 | +// It returns the normalized string form of the IP if successful, |
| 335 | +// or an empty string if parsing fails. |
| 336 | +func parseIP(ip string) string { |
| 337 | + // Strip leading and trailing brackets for IPv6 addresses. |
| 338 | + if len(ip) >= 2 && ip[0] == '[' && ip[len(ip)-1] == ']' { |
| 339 | + ip = ip[1 : len(ip)-1] |
| 340 | + } |
| 341 | + addr, err := netip.ParseAddr(ip) |
| 342 | + if err != nil { |
| 343 | + return "" |
| 344 | + } |
| 345 | + // Return the normalized string form of the IP. |
| 346 | + return addr.String() |
| 347 | +} |
0 commit comments