Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ require (
github.com/onsi/ginkgo/v2 v2.20.0
github.com/onsi/gomega v1.34.1
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.19.1
github.com/samber/lo v1.47.0
github.com/spf13/cobra v1.8.1
github.com/stretchr/testify v1.10.0
Expand Down Expand Up @@ -148,7 +149,6 @@ require (
github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/pquerna/otp v1.4.0 // indirect
github.com/prometheus/client_golang v1.19.1 // indirect
github.com/prometheus/client_model v0.6.1 // indirect
github.com/prometheus/common v0.55.0 // indirect
github.com/prometheus/procfs v0.15.1 // indirect
Expand Down
6 changes: 6 additions & 0 deletions internal/controller/status/updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/retry"
"sigs.k8s.io/controller-runtime/pkg/client"

pkgmetrics "github.com/apache/apisix-ingress-controller/pkg/metrics"
)

const UpdateChannelBufferSize = 1000
Expand Down Expand Up @@ -110,6 +112,8 @@ func (u *UpdateHandler) Start(ctx context.Context) error {
case <-ctx.Done():
return nil
case update := <-u.updateChannel:
// Decrement queue length after removing item from queue
pkgmetrics.DecStatusQueueLength()
u.log.Info("received a status update", "namespace", update.NamespacedName.Namespace,
"name", update.NamespacedName.Name)

Expand Down Expand Up @@ -137,4 +141,6 @@ type UpdateWriter struct {
func (u *UpdateWriter) Update(update Update) {
u.wg.Wait()
u.updateChannel <- update
// Increment queue length after adding new item
pkgmetrics.IncStatusQueueLength()
}
1 change: 1 addition & 0 deletions internal/manager/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/apache/apisix-ingress-controller/internal/controller/config"
"github.com/apache/apisix-ingress-controller/internal/controller/status"
"github.com/apache/apisix-ingress-controller/internal/provider/adc"
_ "github.com/apache/apisix-ingress-controller/pkg/metrics"
)

var (
Expand Down
27 changes: 26 additions & 1 deletion internal/provider/adc/adc.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/apache/apisix-ingress-controller/internal/provider/adc/translator"
"github.com/apache/apisix-ingress-controller/internal/types"
"github.com/apache/apisix-ingress-controller/internal/utils"
pkgmetrics "github.com/apache/apisix-ingress-controller/pkg/metrics"
)

type adcConfig struct {
Expand Down Expand Up @@ -380,24 +381,48 @@ func (d *adcClient) sync(ctx context.Context, task Task) error {
return nil
}

// Record file I/O duration
fileIOStart := time.Now()
syncFilePath, cleanup, err := prepareSyncFile(task.Resources)
if err != nil {
pkgmetrics.RecordFileIODuration("prepare_sync_file", "failure", time.Since(fileIOStart).Seconds())
return err
}
pkgmetrics.RecordFileIODuration("prepare_sync_file", "success", time.Since(fileIOStart).Seconds())
defer cleanup()

args := BuildADCExecuteArgs(syncFilePath, task.Labels, task.ResourceTypes)

var errs types.ADCExecutionErrors
for _, config := range task.configs {
if err := d.executor.Execute(ctx, d.BackendMode, config, args); err != nil {
// Record sync duration for each config
startTime := time.Now()
resourceType := strings.Join(task.ResourceTypes, ",")
if resourceType == "" {
resourceType = "all"
}

err := d.executor.Execute(ctx, d.BackendMode, config, args)
duration := time.Since(startTime).Seconds()

status := "success"
if err != nil {
status = "failure"
log.Errorw("failed to execute adc command", zap.Error(err), zap.Any("config", config))

var execErr types.ADCExecutionError
if errors.As(err, &execErr) {
errs.Errors = append(errs.Errors, execErr)
pkgmetrics.RecordExecutionError(config.Name, execErr.Name)
} else {
pkgmetrics.RecordExecutionError(config.Name, "unknown")
}
}

// Record metrics
pkgmetrics.RecordSyncDuration(config.Name, resourceType, status, duration)
}

if len(errs.Errors) > 0 {
return errs
}
Expand Down
114 changes: 114 additions & 0 deletions pkg/metrics/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package metrics

import (
"github.com/prometheus/client_golang/prometheus"
"sigs.k8s.io/controller-runtime/pkg/metrics"
)

var (
// ADC sync operation duration histogram
ADCSyncDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "apisix_ingress_adc_sync_duration_seconds",
Help: "Time spent on ADC sync operations",
Buckets: prometheus.DefBuckets,
},
[]string{"config_name", "resource_type", "status"},
)

// ADC sync operation counter
ADCSyncTotal = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "apisix_ingress_adc_sync_total",
Help: "Total number of ADC sync operations",
},
[]string{"config_name", "resource_type", "status"},
)

// ADC execution errors counter
ADCExecutionErrors = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "apisix_ingress_adc_execution_errors_total",
Help: "Total number of ADC execution errors",
},
[]string{"config_name", "error_type"},
)

// Status update channel queue length gauge
StatusUpdateQueueLength = prometheus.NewGauge(
prometheus.GaugeOpts{
Name: "apisix_ingress_status_update_queue_length",
Help: "Current length of the status update queue",
},
)

// File I/O operation duration histogram
FileIODuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "apisix_ingress_file_io_duration_seconds",
Help: "Time spent on file I/O operations",
Buckets: prometheus.DefBuckets,
},
[]string{"operation", "status"},
)
)

// init registers all metrics with the global prometheus registry
func init() {
// Register metrics with controller-runtime's metrics registry
metrics.Registry.MustRegister(
ADCSyncDuration,
ADCSyncTotal,
ADCExecutionErrors,
StatusUpdateQueueLength,
FileIODuration,
)
}

// RecordSyncDuration records the duration of an ADC sync operation
func RecordSyncDuration(configName, resourceType, status string, duration float64) {
ADCSyncDuration.WithLabelValues(configName, resourceType, status).Observe(duration)
ADCSyncTotal.WithLabelValues(configName, resourceType, status).Inc()
}

// RecordExecutionError records an ADC execution error
func RecordExecutionError(configName, errorType string) {
ADCExecutionErrors.WithLabelValues(configName, errorType).Inc()
}

// UpdateStatusQueueLength updates the status update queue length gauge
func UpdateStatusQueueLength(length float64) {
StatusUpdateQueueLength.Set(length)
}

// IncStatusQueueLength increments the status update queue length gauge by 1
func IncStatusQueueLength() {
StatusUpdateQueueLength.Inc()
}

// DecStatusQueueLength decrements the status update queue length gauge by 1
func DecStatusQueueLength() {
StatusUpdateQueueLength.Dec()
}

// RecordFileIODuration records the duration of a file I/O operation
func RecordFileIODuration(operation, status string, duration float64) {
FileIODuration.WithLabelValues(operation, status).Observe(duration)
}
32 changes: 32 additions & 0 deletions test/e2e/apisix/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package apisix
import (
"context"
"fmt"
"io"
"net"
"net/http"
"time"
Expand Down Expand Up @@ -96,6 +97,37 @@ spec:
err := s.DeleteResource("ApisixRoute", "default")
Expect(err).ShouldNot(HaveOccurred(), "deleting ApisixRoute")
Eventually(request).WithArguments("/headers").WithTimeout(8 * time.Second).ProbeEvery(time.Second).Should(Equal(http.StatusNotFound))

By("request /metrics endpoint from controller")

// Get the metrics service endpoint
metricsURL := s.GetMetricsEndpoint()

By("verify metrics content")
resp, err := http.Get(metricsURL)
Expect(err).ShouldNot(HaveOccurred(), "request metrics endpoint")
defer func() {
_ = resp.Body.Close()
}()

Expect(resp.StatusCode).Should(Equal(http.StatusOK))

body, err := io.ReadAll(resp.Body)
Expect(err).ShouldNot(HaveOccurred(), "read metrics response")

bodyStr := string(body)

// Verify prometheus format
Expect(resp.Header.Get("Content-Type")).Should(ContainSubstring("text/plain; version=0.0.4; charset=utf-8"))

// Verify specific metrics from metrics.go exist
Expect(bodyStr).Should(ContainSubstring("apisix_ingress_adc_sync_duration_seconds"))
Expect(bodyStr).Should(ContainSubstring("apisix_ingress_adc_sync_total"))
Expect(bodyStr).Should(ContainSubstring("apisix_ingress_status_update_queue_length"))
Expect(bodyStr).Should(ContainSubstring("apisix_ingress_file_io_duration_seconds"))

// Log metrics for debugging
fmt.Printf("Metrics endpoint response:\n%s\n", bodyStr)
})

It("Test plugins in ApisixRoute", func() {
Expand Down
3 changes: 3 additions & 0 deletions test/e2e/framework/manifests/apisix.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ spec:
- name: admin
containerPort: 9180
protocol: TCP
- name: control
containerPort: 9090
protocol: TCP
volumeMounts:
- name: config-writable
mountPath: /usr/local/apisix/conf
Expand Down
17 changes: 7 additions & 10 deletions test/e2e/framework/manifests/ingress.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -346,10 +346,10 @@ metadata:
namespace: {{ .Namespace }}
spec:
ports:
- name: https
port: 8443
- name: metrics
port: 8080
protocol: TCP
targetPort: 8443
targetPort: 8080
selector:
control-plane: controller-manager
---
Expand Down Expand Up @@ -399,19 +399,16 @@ spec:
initialDelaySeconds: 15
periodSeconds: 20
name: manager
ports:
- name: metrics
containerPort: 8080
protocol: TCP
readinessProbe:
httpGet:
path: /readyz
port: 8081
initialDelaySeconds: 5
periodSeconds: 10
resources:
limits:
cpu: 500m
memory: 128Mi
requests:
cpu: 10m
memory: 64Mi
securityContext:
allowPrivilegeEscalation: false
capabilities:
Expand Down
9 changes: 9 additions & 0 deletions test/e2e/scaffold/scaffold.go
Original file line number Diff line number Diff line change
Expand Up @@ -443,3 +443,12 @@ func NewClient(scheme, host string) *httpexpect.Expect {
),
})
}

func (s *Scaffold) GetMetricsEndpoint() string {
tunnel := k8s.NewTunnel(s.kubectlOptions, k8s.ResourceTypeService, "apisix-ingress-controller-manager-metrics-service", 8080, 8080)
if err := tunnel.ForwardPortE(s.t); err != nil {
return ""
}
s.addFinalizers(tunnel.Close)
return fmt.Sprintf("http://%s/metrics", tunnel.Endpoint())
}
Loading