Skip to content

Commit d56d3e3

Browse files
Fix default internal telemetry endpoint for collector (#3874)
* Fix default internal telemetry endpoint for collector #3730 * Fix default internal telemetry endpoint for collector #3730 Signed-off-by: Israel Blancas <iblancasa@gmail.com> a Signed-off-by: Israel Blancas <iblancasa@gmail.com> * Use the config field from go.opentelemetry.io/contrib/otelconf Signed-off-by: Israel Blancas <iblancasa@gmail.com> --------- Signed-off-by: Israel Blancas <iblancasa@gmail.com> Co-authored-by: Shuhei Kitagawa <shuhei.kitagawa@datadoghq.com>
1 parent d1bb46a commit d56d3e3

File tree

35 files changed

+1079
-156
lines changed

35 files changed

+1079
-156
lines changed

.chloggen/3730.yaml

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
2+
change_type: bug_fix
3+
4+
# The name of the component, or a single word describing the area of concern, (e.g. collector, target allocator, auto-instrumentation, opamp, github action)
5+
component: collector
6+
7+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
8+
note: Fix the default configuration for the internal metrics endpoint
9+
10+
# One or more tracking issues related to the change
11+
issues: [3730]
12+
13+
# (Optional) One or more lines of additional information to render under the primary note.
14+
# These lines will be padded with 2 spaces and then inserted directly into the document.
15+
# Use pipe (|) for multiline entries.
16+
subtext:

apis/v1beta1/collector_webhook_test.go

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"testing"
1212

1313
"github.com/go-logr/logr"
14+
"github.com/google/go-cmp/cmp"
1415
"github.com/stretchr/testify/assert"
1516
"github.com/stretchr/testify/require"
1617
"gopkg.in/yaml.v3"
@@ -158,9 +159,11 @@ func TestCollectorDefaultingWebhook(t *testing.T) {
158159
Mode: v1beta1.ModeDeployment,
159160
UpgradeStrategy: v1beta1.UpgradeStrategyAutomatic,
160161
Config: func() v1beta1.Config {
161-
const input = `{"receivers":{"otlp":{"protocols":{"grpc":{"endpoint":"0.0.0.0:4317"},"http":{"endpoint":"0.0.0.0:4318"}}}},"exporters":{"debug":null},"service":{"telemetry":{"metrics":{"address":"0.0.0.0:8888"}},"pipelines":{"traces":{"receivers":["otlp"],"exporters":["debug"]}}}}`
162+
const input = `{"receivers":{"otlp":{"protocols":{"grpc":{"endpoint":"0.0.0.0:4317"},"http":{"endpoint":"0.0.0.0:4318"}}}},"exporters":{"debug":null},"service":{"telemetry":{"metrics":{"readers":[{"pull":{"exporter":{"prometheus":{"host":"0.0.0.0","port":8888}}}}]}},"pipelines":{"traces":{"receivers":["otlp"],"exporters":["debug"]}}}}`
162163
var cfg v1beta1.Config
163164
require.NoError(t, yaml.Unmarshal([]byte(input), &cfg))
165+
// This is a workaround to avoid the type mismatch because how go-yaml unmarshals
166+
cfg.Service.Telemetry.Object["metrics"].(map[string]interface{})["readers"].([]interface{})[0].(map[string]interface{})["pull"].(map[string]interface{})["exporter"].(map[string]interface{})["prometheus"].(map[string]interface{})["port"] = int32(8888)
164167
return cfg
165168
}(),
166169
},
@@ -171,7 +174,7 @@ func TestCollectorDefaultingWebhook(t *testing.T) {
171174
otelcol: v1beta1.OpenTelemetryCollector{
172175
Spec: v1beta1.OpenTelemetryCollectorSpec{
173176
Config: func() v1beta1.Config {
174-
const input = `{"receivers":{"otlp":{"protocols":{"grpc":{"headers":{"example":"another"}},"http":{"endpoint":"0.0.0.0:4000"}}}},"exporters":{"debug":null},"service":{"telemetry":{"metrics":{"address":"1.2.3.4:7654"}},"pipelines":{"traces":{"receivers":["otlp"],"exporters":["debug"]}}}}`
177+
const input = `{"receivers":{"otlp":{"protocols":{"grpc":{"headers":{"example":"another"}},"http":{"endpoint":"0.0.0.0:4000"}}}},"exporters":{"debug":null},"service":{"telemetry":{"metrics":{"readers":[{"pull":{"exporter":{"prometheus":{"host":"localhost","port":9999}}}}]}},"pipelines":{"traces":{"receivers":["otlp"],"exporters":["debug"]}}}}`
175178
var cfg v1beta1.Config
176179
require.NoError(t, yaml.Unmarshal([]byte(input), &cfg))
177180
return cfg
@@ -190,7 +193,7 @@ func TestCollectorDefaultingWebhook(t *testing.T) {
190193
Mode: v1beta1.ModeDeployment,
191194
UpgradeStrategy: v1beta1.UpgradeStrategyAutomatic,
192195
Config: func() v1beta1.Config {
193-
const input = `{"receivers":{"otlp":{"protocols":{"grpc":{"endpoint":"0.0.0.0:4317","headers":{"example":"another"}},"http":{"endpoint":"0.0.0.0:4000"}}}},"exporters":{"debug":null},"service":{"telemetry":{"metrics":{"address":"1.2.3.4:7654"}},"pipelines":{"traces":{"receivers":["otlp"],"exporters":["debug"]}}}}`
196+
const input = `{"receivers":{"otlp":{"protocols":{"grpc":{"endpoint":"0.0.0.0:4317","headers":{"example":"another"}},"http":{"endpoint":"0.0.0.0:4000"}}}},"exporters":{"debug":null},"service":{"telemetry":{"metrics":{"readers":[{"pull":{"exporter":{"prometheus":{"host":"localhost","port":9999}}}}]}},"pipelines":{"traces":{"receivers":["otlp"],"exporters":["debug"]}}}}`
194197
var cfg v1beta1.Config
195198
require.NoError(t, yaml.Unmarshal([]byte(input), &cfg))
196199
return cfg
@@ -547,7 +550,9 @@ func TestCollectorDefaultingWebhook(t *testing.T) {
547550
assert.NoError(t, test.expected.Spec.Config.Service.ApplyDefaults(logr.Discard()), "could not apply defaults")
548551
}
549552
assert.NoError(t, err)
550-
assert.Equal(t, test.expected, test.otelcol)
553+
if diff := cmp.Diff(test.expected, test.otelcol); diff != "" {
554+
t.Errorf("v1beta1.OpenTelemetryCollector mismatch (-want +got):\n%s", diff)
555+
}
551556
})
552557
}
553558
}
@@ -582,7 +587,18 @@ func TestOTELColValidatingWebhook(t *testing.T) {
582587
Telemetry: &v1beta1.AnyConfig{
583588
Object: map[string]interface{}{
584589
"metrics": map[string]interface{}{
585-
"address": "${env:POD_ID}:8888",
590+
"readers": []map[string]interface{}{
591+
{
592+
"pull": map[string]interface{}{
593+
"exporter": map[string]interface{}{
594+
"prometheus": map[string]interface{}{
595+
"host": "${env:POD_ID}",
596+
"port": int32(8888),
597+
},
598+
},
599+
},
600+
},
601+
},
586602
},
587603
},
588604
},

apis/v1beta1/config.go

Lines changed: 81 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -6,16 +6,11 @@ package v1beta1
66
import (
77
"bytes"
88
"encoding/json"
9-
"errors"
10-
"fmt"
11-
"reflect"
12-
"regexp"
139
"sort"
14-
"strconv"
15-
"strings"
1610

1711
"dario.cat/mergo"
1812
"github.com/go-logr/logr"
13+
otelConfig "go.opentelemetry.io/contrib/otelconf/v0.3.0"
1914
"gopkg.in/yaml.v3"
2015
corev1 "k8s.io/api/core/v1"
2116
rbacv1 "k8s.io/api/rbac/v1"
@@ -398,24 +393,24 @@ func (c *Config) Yaml() (string, error) {
398393
// Returns null objects in the config.
399394
func (c *Config) nullObjects() []string {
400395
var nullKeys []string
401-
if nulls := hasNullValue(c.Receivers.Object); len(nulls) > 0 {
396+
if nulls := getNullValuedKeys(c.Receivers.Object); len(nulls) > 0 {
402397
nullKeys = append(nullKeys, addPrefix("receivers.", nulls)...)
403398
}
404-
if nulls := hasNullValue(c.Exporters.Object); len(nulls) > 0 {
399+
if nulls := getNullValuedKeys(c.Exporters.Object); len(nulls) > 0 {
405400
nullKeys = append(nullKeys, addPrefix("exporters.", nulls)...)
406401
}
407402
if c.Processors != nil {
408-
if nulls := hasNullValue(c.Processors.Object); len(nulls) > 0 {
403+
if nulls := getNullValuedKeys(c.Processors.Object); len(nulls) > 0 {
409404
nullKeys = append(nullKeys, addPrefix("processors.", nulls)...)
410405
}
411406
}
412407
if c.Extensions != nil {
413-
if nulls := hasNullValue(c.Extensions.Object); len(nulls) > 0 {
408+
if nulls := getNullValuedKeys(c.Extensions.Object); len(nulls) > 0 {
414409
nullKeys = append(nullKeys, addPrefix("extensions.", nulls)...)
415410
}
416411
}
417412
if c.Connectors != nil {
418-
if nulls := hasNullValue(c.Connectors.Object); len(nulls) > 0 {
413+
if nulls := getNullValuedKeys(c.Connectors.Object); len(nulls) > 0 {
419414
nullKeys = append(nullKeys, addPrefix("connectors.", nulls)...)
420415
}
421416
}
@@ -445,64 +440,48 @@ const (
445440
// because the port is used to generate Service objects and mappings.
446441
func (s *Service) MetricsEndpoint(logger logr.Logger) (string, int32, error) {
447442
telemetry := s.GetTelemetry()
448-
if telemetry == nil || telemetry.Metrics.Address == "" {
443+
if telemetry == nil {
449444
return defaultServiceHost, defaultServicePort, nil
450445
}
451446

452-
// The regex below matches on strings that end with a colon followed by the environment variable expansion syntax.
453-
// So it should match on strings ending with: ":${env:POD_IP}" or ":${POD_IP}".
454-
const portEnvVarRegex = `:\${[env:]?.*}$`
455-
isPortEnvVar := regexp.MustCompile(portEnvVarRegex).MatchString(telemetry.Metrics.Address)
456-
if isPortEnvVar {
457-
errMsg := fmt.Sprintf("couldn't determine metrics port from configuration: %s",
458-
telemetry.Metrics.Address)
459-
logger.Info(errMsg)
460-
return "", 0, errors.New(errMsg)
461-
}
447+
if telemetry.Metrics.Address != "" && len(telemetry.Metrics.Readers) == 0 {
448+
host, port, err := parseAddressEndpoint(telemetry.Metrics.Address)
449+
if err != nil {
450+
return "", 0, err
451+
}
462452

463-
// The regex below matches on strings that end with a colon followed by 1 or more numbers (representing the port).
464-
const explicitPortRegex = `:(\d+$)`
465-
explicitPortMatches := regexp.MustCompile(explicitPortRegex).FindStringSubmatch(telemetry.Metrics.Address)
466-
if len(explicitPortMatches) <= 1 {
467-
return telemetry.Metrics.Address, defaultServicePort, nil
453+
return host, port, nil
468454
}
469455

470-
port, err := strconv.ParseInt(explicitPortMatches[1], 10, 32)
471-
if err != nil {
472-
errMsg := fmt.Sprintf("couldn't determine metrics port from configuration: %s",
473-
telemetry.Metrics.Address)
474-
logger.Info(errMsg, "error", err)
475-
return "", 0, err
476-
}
477-
478-
host, _, _ := strings.Cut(telemetry.Metrics.Address, explicitPortMatches[0])
479-
return host, int32(port), nil
456+
return defaultServiceHost, defaultServicePort, nil
480457
}
481458

482459
// ApplyDefaults inserts configuration defaults if it has not been set.
483460
func (s *Service) ApplyDefaults(logger logr.Logger) error {
484-
telemetryAddr, telemetryPort, err := s.MetricsEndpoint(logger)
485-
if err != nil {
486-
return err
487-
}
461+
tel := s.GetTelemetry()
488462

489-
tm := &AnyConfig{
490-
Object: map[string]interface{}{
491-
"metrics": map[string]interface{}{
492-
"address": fmt.Sprintf("%s:%d", telemetryAddr, telemetryPort),
493-
},
494-
},
463+
if tel == nil {
464+
tel = &Telemetry{}
495465
}
496466

497-
if s.Telemetry == nil {
498-
s.Telemetry = tm
467+
if tel.Metrics.Address != "" || len(tel.Metrics.Readers) != 0 {
468+
// The user already set the address or the readers, so we don't need to do anything
499469
return nil
500470
}
501-
// NOTE: Merge without overwrite. If a telemetry endpoint is specified, the defaulting
502-
// respects the configuration and returns an equal value.
503-
if err := mergo.Merge(s.Telemetry, tm); err != nil {
504-
return fmt.Errorf("telemetry config merge failed: %w", err)
471+
472+
host, port, err := s.MetricsEndpoint(logger)
473+
if err != nil {
474+
return err
475+
}
476+
477+
reader := AddPrometheusMetricsEndpoint(host, port)
478+
tel.Metrics.Readers = append(tel.Metrics.Readers, reader)
479+
480+
s.Telemetry, err = tel.ToAnyConfig()
481+
if err != nil {
482+
return err
505483
}
484+
506485
return nil
507486
}
508487

@@ -517,6 +496,23 @@ type MetricsConfig struct {
517496

518497
// Address is the [address]:port that metrics exposition should be bound to.
519498
Address string `json:"address,omitempty" yaml:"address,omitempty"`
499+
500+
otelConfig.MeterProvider `mapstructure:",squash"`
501+
}
502+
503+
func (in *MetricsConfig) DeepCopyInto(out *MetricsConfig) {
504+
*out = *in
505+
out.MeterProvider = in.MeterProvider
506+
}
507+
508+
// DeepCopy creates a new deepcopy of MetricsConfig.
509+
func (in *MetricsConfig) DeepCopy() *MetricsConfig {
510+
if in == nil {
511+
return nil
512+
}
513+
out := new(MetricsConfig)
514+
in.DeepCopyInto(out)
515+
return out
520516
}
521517

522518
// Telemetry is an intermediary type that allows for easy access to the collector's telemetry settings.
@@ -530,6 +526,38 @@ type Telemetry struct {
530526
Resource map[string]*string `json:"resource,omitempty" yaml:"resource,omitempty"`
531527
}
532528

529+
// ToAnyConfig converts the Telemetry struct to an AnyConfig struct.
530+
func (t *Telemetry) ToAnyConfig() (*AnyConfig, error) {
531+
data, err := json.Marshal(t)
532+
if err != nil {
533+
return nil, err
534+
}
535+
var result map[string]interface{}
536+
if err := json.Unmarshal(data, &result); err != nil {
537+
return nil, err
538+
}
539+
540+
normalizeConfig(result)
541+
542+
return &AnyConfig{
543+
Object: result,
544+
}, nil
545+
}
546+
547+
func AddPrometheusMetricsEndpoint(host string, port int32) otelConfig.MetricReader {
548+
portInt := int(port)
549+
return otelConfig.MetricReader{
550+
Pull: &otelConfig.PullMetricReader{
551+
Exporter: otelConfig.PullMetricExporter{
552+
Prometheus: &otelConfig.Prometheus{
553+
Host: &host,
554+
Port: &portInt,
555+
},
556+
},
557+
},
558+
}
559+
}
560+
533561
// GetTelemetry serves as a helper function to access the fields we care about in the underlying telemetry struct.
534562
// This exists to avoid needing to worry extra fields in the telemetry struct.
535563
func (s *Service) GetTelemetry() *Telemetry {
@@ -548,32 +576,3 @@ func (s *Service) GetTelemetry() *Telemetry {
548576
}
549577
return t
550578
}
551-
552-
func hasNullValue(cfg map[string]interface{}) []string {
553-
var nullKeys []string
554-
for k, v := range cfg {
555-
if v == nil {
556-
nullKeys = append(nullKeys, fmt.Sprintf("%s:", k))
557-
}
558-
if reflect.ValueOf(v).Kind() == reflect.Map {
559-
var nulls []string
560-
val, ok := v.(map[string]interface{})
561-
if ok {
562-
nulls = hasNullValue(val)
563-
}
564-
if len(nulls) > 0 {
565-
prefixed := addPrefix(k+".", nulls)
566-
nullKeys = append(nullKeys, prefixed...)
567-
}
568-
}
569-
}
570-
return nullKeys
571-
}
572-
573-
func addPrefix(prefix string, arr []string) []string {
574-
var prefixed []string
575-
for _, v := range arr {
576-
prefixed = append(prefixed, fmt.Sprintf("%s%s", prefix, v))
577-
}
578-
return prefixed
579-
}

0 commit comments

Comments
 (0)