Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 3 additions & 9 deletions cmd/otel-allocator/internal/collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"

"github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/internal/allocation"
Expand All @@ -33,20 +32,15 @@ type Watcher struct {
collectorsDiscovered metric.Int64Gauge
}

func NewCollectorWatcher(logger logr.Logger, kubeConfig *rest.Config, collectorNotReadyGracePeriod time.Duration) (*Watcher, error) {
clientset, err := kubernetes.NewForConfig(kubeConfig)
if err != nil {
return &Watcher{}, err
}

func NewCollectorWatcher(logger logr.Logger, client kubernetes.Interface, collectorNotReadyGracePeriod time.Duration) (*Watcher, error) {
meter := otel.GetMeterProvider().Meter("targetallocator")
collectorsDiscovered, err := meter.Int64Gauge("opentelemetry_allocator_collectors_discovered", metric.WithDescription("Number of collectors discovered."))
if err != nil {
return &Watcher{}, err
}
return &Watcher{
log: logger.WithValues("component", "opentelemetry-targetallocator"),
k8sClient: clientset,
k8sClient: client,
close: make(chan struct{}),
minUpdateInterval: defaultMinUpdateInterval,
collectorNotReadyGracePeriod: collectorNotReadyGracePeriod,
Expand All @@ -69,7 +63,7 @@ func (k *Watcher) Watch(
}
informerFactory := informers.NewSharedInformerFactoryWithOptions(
k.k8sClient,
time.Second*30,
2*k.minUpdateInterval,
informers.WithNamespace(collectorNamespace),
informers.WithTweakListOptions(listOptionsFunc))
informer := informerFactory.Core().V1().Pods().Informer()
Expand Down
2 changes: 1 addition & 1 deletion cmd/otel-allocator/internal/collector/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func (r *reportingGauge) Record(_ context.Context, value int64, _ ...metric.Reco

func getTestPodWatcher(collectorNotReadyGracePeriod time.Duration) Watcher {
podWatcher := Watcher{
k8sClient: fake.NewSimpleClientset(),
k8sClient: fake.NewClientset(),
close: make(chan struct{}),
log: logger,
minUpdateInterval: time.Millisecond,
Expand Down
29 changes: 13 additions & 16 deletions cmd/otel-allocator/internal/watcher/promOperator.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,23 +39,20 @@ const (
minEventInterval = time.Second * 5
)

func NewPrometheusCRWatcher(ctx context.Context, logger logr.Logger, cfg allocatorconfig.Config) (*PrometheusCRWatcher, error) {
func NewPrometheusCRWatcher(
ctx context.Context,
logger logr.Logger,
client kubernetes.Interface,
monitoringclient monitoringclient.Interface,
cfg allocatorconfig.Config,
) (*PrometheusCRWatcher, error) {
promLogger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelWarn}))
slogger := slog.New(logr.ToSlogHandler(logger))
var resourceSelector *prometheus.ResourceSelector
mClient, err := monitoringclient.NewForConfig(cfg.ClusterConfig)
if err != nil {
return nil, err
}

clientset, err := kubernetes.NewForConfig(cfg.ClusterConfig)
if err != nil {
return nil, err
}

allowList, denyList := cfg.PrometheusCR.GetAllowDenyLists()

factory := informers.NewMonitoringInformerFactories(allowList, denyList, mClient, allocatorconfig.DefaultResyncTime, nil)
factory := informers.NewMonitoringInformerFactories(allowList, denyList, monitoringclient, allocatorconfig.DefaultResyncTime, nil)

monitoringInformers, err := getInformers(factory)
if err != nil {
Expand Down Expand Up @@ -95,19 +92,19 @@ func NewPrometheusCRWatcher(ctx context.Context, logger logr.Logger, cfg allocat
return nil, err
}

store := assets.NewStoreBuilder(clientset.CoreV1(), clientset.CoreV1())
store := assets.NewStoreBuilder(client.CoreV1(), client.CoreV1())
promRegisterer := prometheusgoclient.NewRegistry()
operatorMetrics := operator.NewMetrics(promRegisterer)
eventRecorderFactory := operator.NewEventRecorderFactory(false)
eventRecorder := eventRecorderFactory(clientset, "target-allocator")
eventRecorder := eventRecorderFactory(client, "target-allocator")

var nsMonInf cache.SharedIndexInformer
getNamespaceInformerErr := retry.OnError(retry.DefaultRetry,
func(err error) bool {
logger.Error(err, "Retrying namespace informer creation in promOperator CRD watcher")
return true
}, func() error {
nsMonInf, err = getNamespaceInformer(ctx, allowList, denyList, promLogger, clientset, operatorMetrics)
nsMonInf, err = getNamespaceInformer(ctx, allowList, denyList, promLogger, client, operatorMetrics)
return err
})
if getNamespaceInformerErr != nil {
Expand All @@ -122,8 +119,8 @@ func NewPrometheusCRWatcher(ctx context.Context, logger logr.Logger, cfg allocat

return &PrometheusCRWatcher{
logger: slogger,
kubeMonitoringClient: mClient,
k8sClient: clientset,
kubeMonitoringClient: monitoringclient,
k8sClient: client,
informers: monitoringInformers,
nsInformer: nsMonInf,
stopChannel: make(chan struct{}),
Expand Down
38 changes: 26 additions & 12 deletions cmd/otel-allocator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,15 @@ import (
"syscall"

"github.com/oklog/run"
monitoringclient "github.com/prometheus-operator/prometheus-operator/pkg/client/versioned"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/discovery"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
otelprom "go.opentelemetry.io/otel/exporters/prometheus"
"go.opentelemetry.io/otel/metric"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
"k8s.io/client-go/kubernetes"
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
ctrl "sigs.k8s.io/controller-runtime"

Expand Down Expand Up @@ -67,6 +69,17 @@ func main() {
ctx := context.Background()
log := ctrl.Log.WithName("allocator")

k8sClient, err := kubernetes.NewForConfig(cfg.ClusterConfig)
if err != nil {
setupLog.Error(err, "Unable to initialize kubernetes client")
os.Exit(1)
}
monitoringClient, err := monitoringclient.NewForConfig(cfg.ClusterConfig)
if err != nil {
setupLog.Error(err, "Unable to initialize monitoring client")
os.Exit(1)
}

metricExporter, promErr := otelprom.New()
if promErr != nil {
panic(promErr)
Expand Down Expand Up @@ -107,7 +120,7 @@ func main() {
if targetErr != nil {
panic(targetErr)
}
collectorWatcher, collectorWatcherErr := collector.NewCollectorWatcher(log, cfg.ClusterConfig, cfg.CollectorNotReadyGracePeriod)
collectorWatcher, collectorWatcherErr := collector.NewCollectorWatcher(log, k8sClient, cfg.CollectorNotReadyGracePeriod)
if collectorWatcherErr != nil {
setupLog.Error(collectorWatcherErr, "Unable to initialize collector watcher")
os.Exit(1)
Expand All @@ -116,7 +129,8 @@ func main() {
defer close(interrupts)

if cfg.PrometheusCR.Enabled {
promWatcher, allocErr := allocatorWatcher.NewPrometheusCRWatcher(ctx, setupLog.WithName("prometheus-cr-watcher"), *cfg)
promWatcher, allocErr := allocatorWatcher.NewPrometheusCRWatcher(
ctx, setupLog.WithName("prometheus-cr-watcher"), k8sClient, monitoringClient, *cfg)
if allocErr != nil {
setupLog.Error(allocErr, "Can't start the prometheus watcher")
os.Exit(1)
Expand Down Expand Up @@ -160,38 +174,38 @@ func main() {
func() error {
// Initial loading of the config file's scrape config
if cfg.PromConfig != nil && len(cfg.PromConfig.ScrapeConfigs) > 0 {
err := targetDiscoverer.ApplyConfig(allocatorWatcher.EventSourceConfigMap, cfg.PromConfig.ScrapeConfigs)
if err != nil {
applyErr := targetDiscoverer.ApplyConfig(allocatorWatcher.EventSourceConfigMap, cfg.PromConfig.ScrapeConfigs)
if applyErr != nil {
setupLog.Error(err, "Unable to apply initial configuration")
return err
}
} else {
setupLog.Info("Prometheus config empty, skipping initial discovery configuration")
}

err := targetDiscoverer.Run()
tErr := targetDiscoverer.Run()
setupLog.Info("Target discoverer exited")
return err
return tErr
},
func(_ error) {
setupLog.Info("Closing target discoverer")
targetDiscoverer.Close()
})
runGroup.Add(
func() error {
err := collectorWatcher.Watch(cfg.CollectorNamespace, cfg.CollectorSelector, allocator.SetCollectors)
watchErr := collectorWatcher.Watch(cfg.CollectorNamespace, cfg.CollectorSelector, allocator.SetCollectors)
setupLog.Info("Collector watcher exited")
return err
return watchErr
},
func(_ error) {
setupLog.Info("Closing collector watcher")
collectorWatcher.Close()
})
runGroup.Add(
func() error {
err := srv.Start()
startErr := srv.Start()
setupLog.Info("Server failed to start")
return err
return startErr
},
func(_ error) {
setupLog.Info("Closing server")
Expand All @@ -202,9 +216,9 @@ func main() {
if cfg.HTTPS.Enabled {
runGroup.Add(
func() error {
err := srv.StartHTTPS()
startErr := srv.StartHTTPS()
setupLog.Info("HTTPS Server failed to start")
return err
return startErr
},
func(_ error) {
setupLog.Info("Closing HTTPS server")
Expand Down
Loading