Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion apis/operator/v1alpha1/telemetry_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,13 +123,20 @@ type LogSpec struct {
}

// GatewaySpec defines settings of a gateway.
//
// Deprecated: Gateway scaling configuration is no longer supported. The gateway now runs as a DaemonSet.
// +kubebuilder:validation:XValidation:rule="!has(self.scaling) || (!has(self.scaling.type) && !has(self.scaling.static))",message="Gateway scaling is deprecated and must not be set. The gateway runs as a DaemonSet that scales automatically with cluster nodes."
type GatewaySpec struct {
// Scaling defines which strategy is used for scaling the gateway, with detailed configuration options for each strategy type.
//
// Deprecated: This field is no longer supported and must not be set.
// +kubebuilder:validation:Optional
Scaling Scaling `json:"scaling"`
Scaling *Scaling `json:"scaling,omitempty"`
}

// Scaling defines which strategy is used for scaling the gateway, with detailed configuration options for each strategy type.
//
// Deprecated: Gateway scaling is no longer supported. The gateway now runs as a DaemonSet.
type Scaling struct {
// Type of scaling strategy. Default is none, using a fixed amount of replicas.
// +optional
Expand Down
6 changes: 5 additions & 1 deletion apis/operator/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 8 additions & 1 deletion apis/operator/v1beta1/telemetry_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,13 +123,20 @@ type LogSpec struct {
}

// GatewaySpec defines settings of a gateway.
//
// Deprecated: Gateway scaling configuration is no longer supported. The gateway now runs as a DaemonSet.
// +kubebuilder:validation:XValidation:rule="!has(self.scaling) || (!has(self.scaling.type) && !has(self.scaling.static))",message="Gateway scaling is deprecated and must not be set. The gateway runs as a DaemonSet that scales automatically with cluster nodes."
type GatewaySpec struct {
// Scaling defines which strategy is used for scaling the gateway, with detailed configuration options for each strategy type.
//
// Deprecated: This field is no longer supported and must not be set.
// +kubebuilder:validation:Optional
Scaling Scaling `json:"scaling"`
Scaling *Scaling `json:"scaling,omitempty"`
}

// Scaling defines which strategy is used for scaling the gateway, with detailed configuration options for each strategy type.
//
// Deprecated: Gateway scaling is no longer supported. The gateway now runs as a DaemonSet.
type Scaling struct {
// Type of scaling strategy. Default is none, using a fixed amount of replicas.
// +optional
Expand Down
6 changes: 5 additions & 1 deletion apis/operator/v1beta1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

30 changes: 18 additions & 12 deletions controllers/telemetry/logpipeline_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"context"
"fmt"

istionetworkingclientv1 "istio.io/client-go/pkg/apis/networking/v1"
istiosecurityclientv1 "istio.io/client-go/pkg/apis/security/v1"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -198,7 +197,6 @@ func logPipelineOwnedResourceTypes(isIstioActive, vpaCRDExists bool) []client.Ob
if isIstioActive {
resources = append(resources,
&istiosecurityclientv1.PeerAuthentication{},
&istionetworkingclientv1.DestinationRule{},
)
}

Expand Down Expand Up @@ -277,7 +275,7 @@ func (r *LogPipelineController) SetupWithManager(mgr ctrl.Manager) error {
// Watch OTLP Gateway DaemonSet to update GatewayHealthy condition for OTLP input pipelines
b.Watches(
&appsv1.DaemonSet{}, // OTLP Gateway DaemonSet
handler.EnqueueRequestsFromMapFunc(r.mapOTLPGatewayToOTLPPipelines),
handler.EnqueueRequestsFromMapFunc(r.mapOTLPGatewayChanges),
ctrlbuilder.WithPredicates(ctrlpredicate.NewPredicateFuncs(func(object client.Object) bool {
return object.GetName() == names.OTLPGateway &&
object.GetNamespace() == r.pipelineLockName.Namespace
Expand All @@ -288,7 +286,7 @@ func (r *LogPipelineController) SetupWithManager(mgr ctrl.Manager) error {
// This ensures that when a pipeline is deleted and frees up a slot, waiting pipelines get reconciled
b.Watches(
&corev1.ConfigMap{}, // Pipeline lock ConfigMap
handler.EnqueueRequestsFromMapFunc(r.mapLockConfigMapToAllPipelines),
handler.EnqueueRequestsFromMapFunc(r.mapLockConfigMapChanges),
ctrlbuilder.WithPredicates(ctrlpredicate.NewPredicateFuncs(func(object client.Object) bool {
return object.GetName() == r.pipelineLockName.Name && object.GetNamespace() == r.pipelineLockName.Namespace
})),
Expand All @@ -297,18 +295,18 @@ func (r *LogPipelineController) SetupWithManager(mgr ctrl.Manager) error {
return b.Complete(r)
}

// mapLockConfigMapToAllPipelines enqueues reconciliation requests for all LogPipelines
// when the lock ConfigMap changes. This ensures that pipelines that were previously rejected
// due to max pipeline limit get a chance to acquire the lock when slots become available.
func (r *LogPipelineController) mapLockConfigMapToAllPipelines(ctx context.Context, object client.Object) []reconcile.Request {
// mapLockConfigMapChanges enqueues reconciliation requests for all LogPipelines when the pipeline lock
// ConfigMap changes. This ensures that pipelines previously rejected due to the max pipeline limit get
// reconciled when slots become available.
func (r *LogPipelineController) mapLockConfigMapChanges(ctx context.Context, object client.Object) []reconcile.Request {
logf.FromContext(ctx).V(1).Info("Pipeline lock ConfigMap changed, triggering reconciliation of all LogPipelines")
return r.enqueueAllPipelines(ctx)
}

// mapOTLPGatewayToOTLPPipelines enqueues reconciliation requests for LogPipelines with OTLP input
// when the OTLP Gateway DaemonSet changes. This ensures that GatewayHealthy status conditions
// are updated to reflect the current gateway state for pipelines that use the gateway.
func (r *LogPipelineController) mapOTLPGatewayToOTLPPipelines(ctx context.Context, object client.Object) []reconcile.Request {
// mapOTLPGatewayChanges enqueues reconciliation requests for LogPipelines with OTLP output when the
// OTLP Gateway DaemonSet changes. This ensures that the GatewayHealthy status condition is updated to
// reflect the current gateway state for pipelines that use the gateway.
func (r *LogPipelineController) mapOTLPGatewayChanges(ctx context.Context, object client.Object) []reconcile.Request {
logf.FromContext(ctx).V(1).Info("OTLP Gateway DaemonSet changed, triggering reconciliation of LogPipelines with OTLP input")

var pipelineList telemetryv1beta1.LogPipelineList
Expand All @@ -334,6 +332,8 @@ func (r *LogPipelineController) mapOTLPGatewayToOTLPPipelines(ctx context.Contex
return requests
}

// mapTelemetryChanges enqueues reconciliation requests for all LogPipelines when the Telemetry CR
// changes. This ensures pipelines reflect updated module-level settings such as VPA opt-in annotations.
func (r *LogPipelineController) mapTelemetryChanges(ctx context.Context, object client.Object) []reconcile.Request {
_, ok := object.(*operatorv1beta1.Telemetry)
if !ok {
Expand All @@ -344,6 +344,9 @@ func (r *LogPipelineController) mapTelemetryChanges(ctx context.Context, object
return r.enqueueAllPipelines(ctx)
}

// mapPodChanges enqueues reconciliation requests for all LogPipelines when a relevant Pod
// (Fluent Bit, Log Agent, or OTLP Gateway) is updated or deleted. This ensures that the
// TelemetryFlowHealthy status condition is updated based on the current pod state.
func (r *LogPipelineController) mapPodChanges(ctx context.Context, object client.Object) []reconcile.Request {
pod, ok := object.(*corev1.Pod)
if !ok {
Expand Down Expand Up @@ -458,6 +461,9 @@ func configureOTelReconciler(config LogPipelineControllerConfig, client client.C
return otelReconciler, nil
}

// mapNodeChanges updates the node size tracker when a Node is added, removed, or modified.
// If the smallest node memory changes, it enqueues reconciliation requests for all LogPipelines
// so that resource requirements can be recalculated.
func (r *LogPipelineController) mapNodeChanges(ctx context.Context, object client.Object) []reconcile.Request {
changed, err := r.nodeSizeTracker.UpdateSmallestMemory(ctx)
if err != nil {
Expand Down
28 changes: 18 additions & 10 deletions controllers/telemetry/metricpipeline_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ func (r *MetricPipelineController) SetupWithManager(mgr ctrl.Manager) error {
// Watch OTLP Gateway DaemonSet to update GatewayHealthy condition when gateway status changes
b.Watches(
&appsv1.DaemonSet{}, // OTLP Gateway DaemonSet
handler.EnqueueRequestsFromMapFunc(r.mapOTLPGatewayToAllPipelines),
handler.EnqueueRequestsFromMapFunc(r.mapOTLPGatewayChanges),
ctrlbuilder.WithPredicates(ctrlpredicate.NewPredicateFuncs(func(object client.Object) bool {
return object.GetName() == names.OTLPGateway &&
object.GetNamespace() == r.pipelineLockName.Namespace
Expand All @@ -272,7 +272,7 @@ func (r *MetricPipelineController) SetupWithManager(mgr ctrl.Manager) error {
// This ensures that when a pipeline is deleted and frees up a slot, waiting pipelines get reconciled
b.Watches(
&corev1.ConfigMap{}, // Pipeline lock ConfigMap
handler.EnqueueRequestsFromMapFunc(r.mapLockConfigMapToAllPipelines),
handler.EnqueueRequestsFromMapFunc(r.mapLockConfigMapChanges),
ctrlbuilder.WithPredicates(ctrlpredicate.NewPredicateFuncs(func(object client.Object) bool {
return object.GetName() == r.pipelineLockName.Name && object.GetNamespace() == r.pipelineLockName.Namespace
})),
Expand Down Expand Up @@ -305,22 +305,24 @@ func (r *MetricPipelineController) SetupWithManager(mgr ctrl.Manager) error {
return b.Complete(r)
}

// mapLockConfigMapToAllPipelines enqueues reconciliation requests for all MetricPipelines
// when the lock ConfigMap changes. This ensures that pipelines that were previously rejected
// due to max pipeline limit get a chance to acquire the lock when slots become available.
func (r *MetricPipelineController) mapLockConfigMapToAllPipelines(ctx context.Context, object client.Object) []reconcile.Request {
// mapLockConfigMapChanges enqueues reconciliation requests for all MetricPipelines when the pipeline lock
// ConfigMap changes. This ensures that pipelines previously rejected due to the max pipeline limit get
// reconciled when slots become available.
func (r *MetricPipelineController) mapLockConfigMapChanges(ctx context.Context, object client.Object) []reconcile.Request {
logf.FromContext(ctx).V(1).Info("Pipeline lock ConfigMap changed, triggering reconciliation of all MetricPipelines")
return r.enqueueAllPipelines(ctx)
}

// mapOTLPGatewayToAllPipelines enqueues reconciliation requests for all MetricPipelines
// when the OTLP Gateway DaemonSet changes. This ensures that GatewayHealthy status conditions
// are updated to reflect the current gateway state.
func (r *MetricPipelineController) mapOTLPGatewayToAllPipelines(ctx context.Context, object client.Object) []reconcile.Request {
// mapOTLPGatewayChanges enqueues reconciliation requests for all MetricPipelines when the OTLP Gateway
// DaemonSet changes. This ensures that the GatewayHealthy status condition is updated to reflect the
// current gateway state.
func (r *MetricPipelineController) mapOTLPGatewayChanges(ctx context.Context, object client.Object) []reconcile.Request {
logf.FromContext(ctx).V(1).Info("OTLP Gateway DaemonSet changed, triggering reconciliation of all MetricPipelines")
return r.enqueueAllPipelines(ctx)
}

// mapTelemetryChanges enqueues reconciliation requests for all MetricPipelines when the Telemetry CR
// changes. This ensures pipelines reflect updated module-level settings such as VPA opt-in annotations.
func (r *MetricPipelineController) mapTelemetryChanges(ctx context.Context, object client.Object) []reconcile.Request {
_, ok := object.(*operatorv1beta1.Telemetry)
if !ok {
Expand All @@ -331,6 +333,9 @@ func (r *MetricPipelineController) mapTelemetryChanges(ctx context.Context, obje
return r.enqueueAllPipelines(ctx)
}

// mapPodChanges enqueues reconciliation requests for all MetricPipelines when a relevant Pod
// (OTLP Gateway or Metric Agent) is updated or deleted. This ensures that the TelemetryFlowHealthy
// status condition is updated based on the current pod state.
func (r *MetricPipelineController) mapPodChanges(ctx context.Context, object client.Object) []reconcile.Request {
pod, ok := object.(*corev1.Pod)
if !ok {
Expand All @@ -345,6 +350,9 @@ func (r *MetricPipelineController) mapPodChanges(ctx context.Context, object cli
return r.enqueueAllPipelines(ctx)
}

// mapNodeChanges updates the node size tracker when a Node is added, removed, or modified.
// If the smallest node memory changes, it enqueues reconciliation requests for all MetricPipelines
// so that resource requirements can be recalculated.
func (r *MetricPipelineController) mapNodeChanges(ctx context.Context, object client.Object) []reconcile.Request {
changed, err := r.nodeSizeTracker.UpdateSmallestMemory(ctx)
if err != nil {
Expand Down
Loading
Loading