Skip to content

Commit c3d442f

Browse files
REP-1913: Allow Kvisor to collect storage metrics regarding CRI used in the cluster (#629)
Add CRI-O support with --disable-containerd flag for optional containerd features (process tree, image digest resolution). Includes storage stats collection.
1 parent 73fd6de commit c3d442f

File tree

8 files changed

+67
-26
lines changed

8 files changed

+67
-26
lines changed

charts/kvisor/templates/agent.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ spec:
6666
args:
6767
- "run"
6868
- "--metrics-http-listen-port={{.Values.agent.metricsHTTPListenPort}}"
69+
- "--disable-containerd={{ .Values.agent.disableContainerd }}"
6970
{{- if eq .Values.mockServer.enabled true }}
7071
- "--castai-server-insecure=true"
7172
{{- end }}

charts/kvisor/values.yaml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,9 +145,13 @@ agent:
145145
# Additional environment variables for the agent container via configMaps or secrets.
146146
envFrom: []
147147

148-
# Custom socket path for containerd.
148+
# Path to the container runtime socket (containerd or CRI-O)
149149
containerdSocketPath: "/run/containerd/containerd.sock"
150150

151+
# Disable containerd-specific features (process tree, image digest resolution).
152+
# Set to true for CRI-O clusters.
153+
disableContainerd: false
154+
151155
controller:
152156
enabled: true
153157

cmd/agent/daemon/app/app.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,13 +160,17 @@ func (a *App) Run(ctx context.Context) error {
160160
return err
161161
}
162162

163+
if cfg.DisableContainerd {
164+
disableFeaturesRequiringContainerd(cfg, log)
165+
}
166+
163167
criClient, criCloseFn, err := cri.NewRuntimeClient(ctx, cfg.CRIEndpoint)
164168
if err != nil {
165169
return fmt.Errorf("new CRI runtime client: %w", err)
166170
}
167171
defer criCloseFn() //nolint:errcheck
168172

169-
containersClient, err := containers.NewClient(log, cgroupClient, cfg.ContainerdSockPath, procHandler, criClient, cfg.EventLabels, cfg.EventAnnotations)
173+
containersClient, err := containers.NewClient(log, cgroupClient, cfg.ContainerdSockPath, cfg.DisableContainerd, procHandler, criClient, cfg.EventLabels, cfg.EventAnnotations)
170174
if err != nil {
171175
return err
172176
}
@@ -400,6 +404,13 @@ func enableBPFStats(cfg *config.Config, log *logging.Logger) func() {
400404
return cleanup
401405
}
402406

407+
func disableFeaturesRequiringContainerd(cfg *config.Config, log *logging.Logger) {
408+
if cfg.ProcessTree.Enabled {
409+
log.Warn("process tree requires containerd, disabling because containerd is disabled")
410+
cfg.ProcessTree.Enabled = false
411+
}
412+
}
413+
403414
func buildEBPFPolicy(log *logging.Logger, cfg *config.Config, signatureEngine *signature.SignatureEngine) *ebpftracer.Policy {
404415
// TODO: Allow to build these policies on the fly from the control plane. Ideally we should be able to disable, enable policies and change rate limits dynamically.
405416
policy := &ebpftracer.Policy{

cmd/agent/daemon/config/config.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ type Config struct {
2323
PromMetricsExportInterval time.Duration `json:"promMetricsExportInterval"`
2424
Version string `json:"version"`
2525
BTFPath string `json:"BTFPath"`
26+
DisableContainerd bool `json:"disableContainerd"`
2627
ContainerdSockPath string `json:"containerdSockPath"`
2728
HostCgroupsDir string `json:"hostCgroupsDir"`
2829
MetricsHTTPListenPort int `json:"metricsHTTPListenPort"`

cmd/agent/daemon/daemon.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ func NewRunCommand(version string) *cobra.Command {
3636
promMetricsExportInterval = command.Flags().Duration("prom-metrics-export-interval", 5*time.Minute, "Internal prometheus metrics export interval")
3737

3838
sendLogLevel = command.Flags().String("send-logs-level", slog.LevelInfo.String(), "send logs level")
39+
disableContainerd = command.Flags().Bool("disable-containerd", false, "Disable containerd-specific features (process tree, image digest). Enable for non-containerd runtimes.")
3940
containerdSockPath = command.Flags().String("containerd-sock", "/run/containerd/containerd.sock", "Path to containerd socket file")
4041
metricsHTTPListenPort = command.Flags().Int("metrics-http-listen-port", 6060, "metrics http listen port")
4142
hostCgroupsDir = command.Flags().String("host-cgroups", "/cgroups", "Host /sys/fs/cgroups directory name mounted to container")
@@ -150,6 +151,7 @@ func NewRunCommand(version string) *cobra.Command {
150151
PromMetricsExportInterval: *promMetricsExportInterval,
151152
Version: version,
152153
BTFPath: *btfPath,
154+
DisableContainerd: *disableContainerd,
153155
ContainerdSockPath: *containerdSockPath,
154156
HostCgroupsDir: *hostCgroupsDir,
155157
MetricsHTTPListenPort: *metricsHTTPListenPort,

cmd/agent/daemon/debug/netflows.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ func NewNetflowsDebugCommand() *cobra.Command {
2929
criEndpoint := cmd.Flags().String("cri-endpoint", "unix:///run/containerd/containerd.sock", "CRI endpoint")
3030
hostCgroupsDir := cmd.Flags().String("host-cgroups", "/cgroups", "Host /sys/fs/cgroups directory name mounted to container")
3131
containerdSockPath := cmd.Flags().String("containerd-sock", "/run/containerd/containerd.sock", "Path to containerd socket file")
32+
disableContainerd := cmd.Flags().Bool("disable-containerd", true, "Disable containerd-specific features")
3233
btfPath := cmd.Flags().String("btf-path", "/sys/kernel/btf/vmlinux", "btf file path")
3334
waitDuration := cmd.Flags().Duration("wait", 2*time.Second, "Wait duration before scraping netflows")
3435
limit := cmd.Flags().Int("limit", 500, "Limit netflows output")
@@ -57,7 +58,7 @@ func NewNetflowsDebugCommand() *cobra.Command {
5758
return err
5859
}
5960

60-
containersClient, err := containers.NewClient(log, cgroupClient, *containerdSockPath, procHandler, criClient, []string{}, []string{})
61+
containersClient, err := containers.NewClient(log, cgroupClient, *containerdSockPath, *disableContainerd, procHandler, criClient, []string{}, []string{})
6162
if err != nil {
6263
return err
6364
}

pkg/containers/client.go

Lines changed: 43 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,6 @@ import (
1212
"sync/atomic"
1313
"time"
1414

15-
"github.com/castai/kvisor/cmd/agent/daemon/metrics"
16-
"github.com/castai/kvisor/pkg/cgroup"
17-
"github.com/castai/kvisor/pkg/logging"
18-
"github.com/castai/kvisor/pkg/proc"
1915
"github.com/containerd/containerd"
2016
"github.com/containerd/containerd/api/services/tasks/v1"
2117
"github.com/containerd/containerd/content"
@@ -25,6 +21,11 @@ import (
2521
"google.golang.org/grpc/backoff"
2622
"google.golang.org/grpc/credentials/insecure"
2723
criapi "k8s.io/cri-api/pkg/apis/runtime/v1"
24+
25+
"github.com/castai/kvisor/cmd/agent/daemon/metrics"
26+
"github.com/castai/kvisor/pkg/cgroup"
27+
"github.com/castai/kvisor/pkg/logging"
28+
"github.com/castai/kvisor/pkg/proc"
2829
)
2930

3031
var (
@@ -82,6 +83,7 @@ type Client struct {
8283
cgroupClient cgroupsClient
8384
criRuntimeServiceClient criClient
8485
containerContentStoreClient containerContentStoreClient
86+
containerdDisabled bool
8587

8688
containerCreatedListeners []ContainerCreatedListener
8789
containerDeletedListeners []ContainerDeletedListener
@@ -98,9 +100,27 @@ type Client struct {
98100
inactiveContainersDuration time.Duration
99101
}
100102

101-
func NewClient(log *logging.Logger, cgroupClient *cgroup.Client, containerdSock string, procHandler *proc.Proc, criRuntimeServiceClient criapi.RuntimeServiceClient,
103+
func NewClient(log *logging.Logger, cgroupClient *cgroup.Client, containerdSock string, disableContainerd bool, procHandler *proc.Proc, criRuntimeServiceClient criapi.RuntimeServiceClient,
102104
labels, annotations []string) (*Client, error) {
103105

106+
client := &Client{
107+
log: log.WithField("component", "cgroups"),
108+
cgroupClient: cgroupClient,
109+
containersByCgroup: map[uint64]*Container{},
110+
containersByID: map[string]*Container{},
111+
procHandler: procHandler,
112+
criRuntimeServiceClient: criRuntimeServiceClient,
113+
forwardedLabels: labels,
114+
forwardedAnnotations: annotations,
115+
inactiveContainersDuration: 2 * time.Minute,
116+
containerdDisabled: disableContainerd,
117+
}
118+
119+
if disableContainerd {
120+
log.Info("containerd features disabled")
121+
return client, nil
122+
}
123+
104124
backoffConfig := backoff.DefaultConfig
105125
backoffConfig.MaxDelay = 3 * time.Second
106126
connParams := grpc.ConnectParams{
@@ -128,23 +148,17 @@ func NewClient(log *logging.Logger, cgroupClient *cgroup.Client, containerdSock
128148
return nil, fmt.Errorf("failed connecting to containerd client: %w", err)
129149
}
130150

131-
return &Client{
132-
log: log.WithField("component", "cgroups"),
133-
containerdClient: containerdClient,
134-
containerContentStoreClient: containerdClient.ContentStore(),
135-
cgroupClient: cgroupClient,
136-
containersByCgroup: map[uint64]*Container{},
137-
containersByID: map[string]*Container{},
138-
procHandler: procHandler,
139-
criRuntimeServiceClient: criRuntimeServiceClient,
140-
forwardedLabels: labels,
141-
forwardedAnnotations: annotations,
142-
inactiveContainersDuration: 2 * time.Minute,
143-
}, nil
151+
client.containerdClient = containerdClient
152+
client.containerContentStoreClient = containerdClient.ContentStore()
153+
154+
return client, nil
144155
}
145156

146157
func (c *Client) Close() error {
147-
return c.containerdClient.Close()
158+
if c.containerdClient != nil {
159+
return c.containerdClient.Close()
160+
}
161+
return nil
148162
}
149163

150164
type ContainerProcess struct {
@@ -153,6 +167,10 @@ type ContainerProcess struct {
153167
}
154168

155169
func (c *Client) LoadContainerTasks(ctx context.Context) ([]ContainerProcess, error) {
170+
if c.containerdClient == nil {
171+
return nil, nil
172+
}
173+
156174
resp, err := c.containerdClient.TaskService().List(ctx, nil)
157175
if err != nil {
158176
return nil, err
@@ -324,11 +342,13 @@ func (c *Client) addContainerWithCgroup(container *criapi.Container, cg *cgroup.
324342
}
325343
cont.markAccessed()
326344

327-
imageDigest, err := c.findImageDigest(container)
328-
if err != nil {
329-
c.log.Warnf("finding image digest for container %v: %v", container.Id, err)
345+
if !c.containerdDisabled {
346+
imageDigest, err := c.findImageDigest(container)
347+
if err != nil {
348+
c.log.Warnf("finding image digest for container %v: %v", container.Id, err)
349+
}
350+
cont.ImageDigest = imageDigest
330351
}
331-
cont.ImageDigest = imageDigest
332352

333353
getSandboxCtx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
334354
defer cancel()

pkg/containers/client_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -261,6 +261,7 @@ func newTestClient() *Client {
261261
inactiveContainersDuration: 1 * time.Minute,
262262
containerdClient: &mockContainerdClient{},
263263
containerContentStoreClient: &mockContainerContentStoreClient{},
264+
containerdDisabled: false,
264265
}
265266
}
266267

0 commit comments

Comments
 (0)