Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion DEVELOPMENT.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ Activate a new docker profile.
You can run tests on your local kind cluster.

```sh
KIND_CONTEXT=tilt IMAGE_TAG=local ./e2e/run.sh
KIND_CONTEXT=kind IMAGE_TAG=local ./e2e/run.sh
```

You will need IPv6 to be enabled on the host. Most operating systems / distros have IPv6 enabled by default, but you can check on Linux with the following command:
Expand Down
6 changes: 6 additions & 0 deletions charts/kvisor/templates/agent.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,12 @@ spec:
{{- else -}}
{{ .Values.castai.grpcAddr | quote }}
{{- end }}
- name: CASTAI_API_URL
value: {{ if .Values.mockServer.enabled -}}
{{ (printf "http://%s:8080" (include "kvisor.castaiMockServer.service" .)) | quote }}
{{- else -}}
{{ .Values.castai.apiURL | quote }}
{{- end }}
{{- include "kvisor.clusterIDEnv" (set (deepCopy .) "envFrom" .Values.agent.envFrom) | nindent 12 }}
{{- if .Values.agent.debug.ebpf }}
- name: KVISOR_EBPF_DEBUG
Expand Down
6 changes: 5 additions & 1 deletion charts/kvisor/templates/cast-mock-server.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,12 @@ spec:
{{- include "kvisor.castaiMockServer.selectorLabels" . | nindent 4 }}
type: ClusterIP
ports:
- name: server
- name: grpc
protocol: TCP
port: 8443
targetPort: 8443
- name: http
protocol: TCP
port: 8080
targetPort: 8080
{{- end }}
6 changes: 6 additions & 0 deletions charts/kvisor/templates/controller.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,12 @@ spec:
{{- else -}}
{{ .Values.castai.grpcAddr | quote }}
{{- end }}
- name: CASTAI_API_URL
value: {{ if .Values.mockServer.enabled -}}
{{ (printf "http://%s:8080" (include "kvisor.castaiMockServer.service" .)) | quote }}
{{- else -}}
{{ .Values.castai.apiURL | quote }}
{{- end }}
{{- include "kvisor.clusterIDEnv" (set (deepCopy .) "envFrom" .Values.controller.envFrom) | nindent 12 }}
{{- range $k, $v := .Values.controller.additionalEnv }}
- name: {{ $k }}
Expand Down
2 changes: 2 additions & 0 deletions charts/kvisor/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ castai:
# Note: If your cluster is in the EU region, update the grpcAddr to: https://kvisor.prod-eu.cast.ai:443
grpcAddr: "kvisor.prod-master.cast.ai:443"

apiURL: ""

# clusterID and clusterIdSecretKeyRef are mutually exclusive
clusterID: ""
# clusterIdSecretKeyRef -- Name and Key of secret with ClusterID
Expand Down
63 changes: 34 additions & 29 deletions cmd/agent/daemon/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/samber/lo"
"golang.org/x/sync/errgroup"
"golang.org/x/time/rate"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"

Expand All @@ -44,10 +43,10 @@ import (
"github.com/castai/kvisor/pkg/ebpftracer/signature"
"github.com/castai/kvisor/pkg/ebpftracer/types"
"github.com/castai/kvisor/pkg/kernel"
"github.com/castai/kvisor/pkg/logging"
"github.com/castai/kvisor/pkg/proc"
"github.com/castai/kvisor/pkg/processtree"
castlog "github.com/castai/logging"
"github.com/castai/logging"
"github.com/castai/logging/components"
custommetrics "github.com/castai/metrics"
)

Expand All @@ -65,20 +64,15 @@ type App struct {
func (a *App) Run(ctx context.Context) error {
start := time.Now()

cfg := a.cfg
logCfg := &logging.Config{
Level: logging.MustParseLevel(a.cfg.LogLevel),
AddSource: true,
RateLimiter: logging.RateLimiterConfig{
Limit: rate.Every(a.cfg.LogRateInterval),
Burst: a.cfg.LogRateBurst,
Inform: true,
},
}
errg, ctx := errgroup.WithContext(ctx)

cfg := a.cfg
podName := os.Getenv("POD_NAME")

var log *logging.Logger
logHandlers := []logging.Handler{
logging.NewTextHandler(logging.DefaultTextHandlerConfig),
}
var exporters []export.DataBatchWriter
// Castai specific spetup if config is valid.
if cfg.Castai.Valid() {
Expand All @@ -90,28 +84,40 @@ func (a *App) Run(ctx context.Context) error {
return fmt.Errorf("sync remote config: %w", err)
}
if cfg.SendLogsLevel != "" {
castaiLogsExporter := castai.NewLogsExporter(castaiClient)
go castaiLogsExporter.Run(ctx) //nolint:errcheck
logsApiClient, err := components.NewAPIClient(components.Config{
APIBaseURL: a.cfg.Castai.APIURL,
APIKey: a.cfg.Castai.APIKey,
ClusterID: a.cfg.Castai.ClusterID,
Component: "kvisor-agent",
Version: a.cfg.Version,
})
if err != nil {
return fmt.Errorf("creating logs api client: %w", err)
}
batchLogsApiClient := components.NewBatchClient(logsApiClient)
errg.Go(func() error {
return batchLogsApiClient.Run(ctx)
})
logsExportHandler := logging.NewExportHandler(batchLogsApiClient, logging.ExportHandlerConfig{
MinLevel: logging.MustParseLevel(cfg.SendLogsLevel),
})
logHandlers = append(logHandlers, logsExportHandler)

if cfg.PromMetricsExportEnabled {
castaiMetricsExporter := castai.NewPromMetricsExporter(log, castaiLogsExporter, prometheus.DefaultGatherer, castai.PromMetricsExporterConfig{
castaiMetricsExporter := castai.NewPromMetricsExporter(log, batchLogsApiClient, prometheus.DefaultGatherer, castai.PromMetricsExporterConfig{
PodName: podName,
ExportInterval: cfg.PromMetricsExportInterval,
})
go castaiMetricsExporter.Run(ctx) //nolint:errcheck
}

logCfg.Export = logging.ExportConfig{
ExportFunc: castaiLogsExporter.ExportFunc(),
MinLevel: logging.MustParseLevel(cfg.SendLogsLevel),
errg.Go(func() error {
return castaiMetricsExporter.Run(ctx)
})
}
}
log = logging.New(logCfg)

log = logging.New(logHandlers...)
exporters = append(exporters, castaiexport.NewDataBatchWriter(castaiClient, log))

} else {
log = logging.New(logCfg)
log = logging.New(logHandlers...)
log.Warn("castai config is not set or it is invalid, running agent in standalone mode")
}

Expand Down Expand Up @@ -263,7 +269,7 @@ func (a *App) Run(ctx context.Context) error {
var cloudVolumeMetricsWriter pipeline.CloudVolumeMetricsWriter
var storageInfoProvider pipeline.StorageInfoProvider
if cfg.Stats.StorageEnabled {
metricsClient, err := createMetricsClient(cfg)
metricsClient, err := createMetricsClient(cfg, log)
if err != nil {
return fmt.Errorf("failed to create metrics client: %w", err)
}
Expand Down Expand Up @@ -323,7 +329,6 @@ func (a *App) Run(ctx context.Context) error {
}
}

errg, ctx := errgroup.WithContext(ctx)
errg.Go(func() error {
return a.runHTTPServer(ctx, log)
})
Expand Down Expand Up @@ -648,7 +653,7 @@ func resolveMetricsAddr(addr string) string {
return addr
}

func createMetricsClient(cfg *config.Config) (custommetrics.MetricClient, error) {
func createMetricsClient(cfg *config.Config, log *logging.Logger) (custommetrics.MetricClient, error) {
if !cfg.Castai.Valid() {
return nil, fmt.Errorf("cast config is not valid")
}
Expand All @@ -660,7 +665,7 @@ func createMetricsClient(cfg *config.Config) (custommetrics.MetricClient, error)
Insecure: cfg.Castai.Insecure,
}

return custommetrics.NewMetricClient(metricsClientConfig, castlog.New())
return custommetrics.NewMetricClient(metricsClientConfig, log)
}

type noopTracer struct{}
Expand Down
7 changes: 4 additions & 3 deletions cmd/agent/daemon/clickhouse_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@ import (
"time"

"github.com/ClickHouse/clickhouse-go/v2"
clickhouseexport "github.com/castai/kvisor/cmd/agent/daemon/export/clickhouse"
"github.com/castai/kvisor/pkg/logging"
"github.com/spf13/cobra"

clickhouseexport "github.com/castai/kvisor/cmd/agent/daemon/export/clickhouse"
"github.com/castai/logging"
)

func NewClickhouseInitCommand() *cobra.Command {
Expand All @@ -24,7 +25,7 @@ func NewClickhouseInitCommand() *cobra.Command {
command := &cobra.Command{
Use: "clickhouse-init",
Run: func(cmd *cobra.Command, args []string) {
log := logging.New(&logging.Config{})
log := logging.New()

ctx, stop := signal.NotifyContext(cmd.Context(), syscall.SIGINT, syscall.SIGTERM)
defer stop()
Expand Down
2 changes: 1 addition & 1 deletion cmd/agent/daemon/conntrack/conntrack.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"net/netip"
"os"

"github.com/castai/kvisor/pkg/logging"
"github.com/castai/logging"
"github.com/florianl/go-conntrack"
"github.com/vishvananda/netns"

Expand Down
2 changes: 1 addition & 1 deletion cmd/agent/daemon/conntrack/conntrack_cilium_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import (
"net/netip"
"path/filepath"

"github.com/castai/kvisor/pkg/logging"
"github.com/castai/kvisor/pkg/proc"
"github.com/castai/logging"
"github.com/cilium/cilium/pkg/bpf"
"github.com/cilium/cilium/pkg/defaults"
"github.com/cilium/cilium/pkg/loadbalancer"
Expand Down
2 changes: 1 addition & 1 deletion cmd/agent/daemon/conntrack/conntrack_cilium_other.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ package conntrack
import (
"net/netip"

"github.com/castai/kvisor/pkg/logging"
"github.com/castai/logging"
)

func iniCiliumMaps(log *logging.Logger) bool {
Expand Down
2 changes: 1 addition & 1 deletion cmd/agent/daemon/conntrack/conntrack_nf.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"strings"
"syscall"

"github.com/castai/kvisor/pkg/logging"
"github.com/castai/logging"
"github.com/florianl/go-conntrack"
"github.com/samber/lo"
)
Expand Down
29 changes: 0 additions & 29 deletions cmd/agent/daemon/conntrack/conntrack_test.go

This file was deleted.

9 changes: 5 additions & 4 deletions cmd/agent/daemon/debug/netflows.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,17 @@ import (
"syscall"
"time"

"github.com/jedib0t/go-pretty/v6/table"
"github.com/spf13/cobra"

"github.com/castai/kvisor/cmd/agent/daemon/cri"
"github.com/castai/kvisor/pkg/cgroup"
"github.com/castai/kvisor/pkg/containers"
"github.com/castai/kvisor/pkg/ebpftracer"
"github.com/castai/kvisor/pkg/ebpftracer/events"
"github.com/castai/kvisor/pkg/ebpftracer/types"
"github.com/castai/kvisor/pkg/logging"
"github.com/castai/kvisor/pkg/proc"
"github.com/jedib0t/go-pretty/v6/table"
"github.com/spf13/cobra"
"github.com/castai/logging"
)

func NewNetflowsDebugCommand() *cobra.Command {
Expand All @@ -35,7 +36,7 @@ func NewNetflowsDebugCommand() *cobra.Command {
limit := cmd.Flags().Int("limit", 500, "Limit netflows output")

cmd.RunE = func(cmd *cobra.Command, args []string) error {
log := logging.New(&logging.Config{})
log := logging.New()

procHandler := proc.New()

Expand Down
7 changes: 4 additions & 3 deletions cmd/agent/daemon/debug/sockets.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@ import (
"os"
"strconv"

"github.com/castai/kvisor/pkg/ebpftracer/debug"
"github.com/castai/kvisor/pkg/logging"
"github.com/jedib0t/go-pretty/v6/table"
"github.com/spf13/cobra"
"golang.org/x/sys/unix"

"github.com/castai/kvisor/pkg/ebpftracer/debug"
"github.com/castai/logging"
)

func NewSocketDebugCommand() *cobra.Command {
Expand All @@ -23,7 +24,7 @@ func NewSocketDebugCommand() *cobra.Command {
)

cmd.RunE = func(cmd *cobra.Command, args []string) error {
log := logging.New(&logging.Config{})
log := logging.New()
d, err := debug.New(log, debug.DebugCfg{
TargetPID: *targetPID,
})
Expand Down
2 changes: 1 addition & 1 deletion cmd/agent/daemon/enrichment/enrichers.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ import (
"github.com/castai/kvisor/cmd/agent/daemon/metrics"
"github.com/castai/kvisor/pkg/containers"
"github.com/castai/kvisor/pkg/ebpftracer/types"
"github.com/castai/kvisor/pkg/logging"
"github.com/castai/kvisor/pkg/proc"
"github.com/castai/logging"
"github.com/cespare/xxhash/v2"
"github.com/elastic/go-freelru"
"github.com/minio/sha256-simd"
Expand Down
Loading