Skip to content

Commit 4b96e1c

Browse files
author
Samu
authored
Run storage metrics collection in separate pipeline (#612)
1 parent f5a699f commit 4b96e1c

File tree

4 files changed

+111
-84
lines changed

4 files changed

+111
-84
lines changed

Makefile

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,7 @@ kvisor-agent-docker:
188188

189189
.PHONY: kvisor-agent-docker-image
190190
kvisor-agent-docker-image: clean-kvisor-agent kvisor-agent-docker
191-
docker build -t $(IMAGE_REPO)-agent:$(IMAGE_TAG) . -f Dockerfile.agent
191+
docker build -t $(IMAGE_REPO)-agent:$(IMAGE_TAG) --build-arg TARGETARCH=$(GO_ARCH) . -f Dockerfile.agent
192192

193193
.PHONY: kvisor-agent-push-deploy
194194
kvisor-agent-push-deploy: kvisor-agent-docker-image
@@ -212,7 +212,7 @@ $(OUTPUT_DIR_BIN)/kvisor-controller-$(GO_ARCH):
212212

213213
.PHONY: kvisor-controller-docker-image
214214
kvisor-controller-docker-image: clean-kvisor-controller kvisor-controller
215-
docker build -t $(IMAGE_REPO)-controller:$(IMAGE_TAG) . -f Dockerfile.controller
215+
docker build -t $(IMAGE_REPO)-controller:$(IMAGE_TAG) --build-arg TARGETARCH=$(GO_ARCH) . -f Dockerfile.controller
216216

217217
.PHONY: kvisor-controller-push-deploy
218218
kvisor-controller-push-deploy: kvisor-controller-docker-image
@@ -225,7 +225,7 @@ kvisor-scanners-docker:
225225

226226
.PHONY: kvisor-scanners-docker-image
227227
kvisor-scanners-docker-image: clean-kvisor-image-scanner kvisor-image-scanner clean-kvisor-linter kvisor-linter
228-
docker build -t $(IMAGE_REPO)-scanners:$(IMAGE_TAG) . -f Dockerfile.scanners
228+
docker build -t $(IMAGE_REPO)-scanners:$(IMAGE_TAG) --build-arg TARGETARCH=$(GO_ARCH) . -f Dockerfile.scanners
229229

230230
.PHONY: kvisor-scanners-push-deploy
231231
kvisor-scanners-push-deploy: kvisor-scanners-docker-image
@@ -249,7 +249,7 @@ $(OUTPUT_DIR_BIN)/kvisor-event-generator:
249249

250250
.PHONY: kvisor-event-generator-docker-image
251251
kvisor-event-generator-docker-image: clean-kvisor-event-generator kvisor-event-generator
252-
docker build -t $(IMAGE_REPO)-event-generator:$(IMAGE_TAG) . -f Dockerfile.event-generator
252+
docker build -t $(IMAGE_REPO)-event-generator:$(IMAGE_TAG) --build-arg TARGETARCH=$(GO_ARCH) . -f Dockerfile.event-generator
253253

254254
.PHONY: kvisor-event-generator-push-deploy
255255
kvisor-event-generator-push-deploy: kvisor-event-generator-docker-image

cmd/agent/daemon/pipeline/controller.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,11 @@ func (c *Controller) Run(ctx context.Context) error {
220220
return c.runStatsPipeline(ctx)
221221
})
222222
}
223+
if c.cfg.Stats.StorageEnabled {
224+
errg.Go(func() error {
225+
return c.runStoragePipeline(ctx)
226+
})
227+
}
223228
if c.cfg.Netflow.Enabled {
224229
// Conntrack cache is used only in netflow pipeline.
225230
// It's safe to use non synced lru since it's accessed form one goroutine.

cmd/agent/daemon/pipeline/stats_pipeline.go

Lines changed: 0 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package pipeline
33
import (
44
"context"
55
"errors"
6-
"fmt"
76
"log/slog"
87
"time"
98

@@ -106,10 +105,6 @@ func (c *Controller) runStatsPipeline(ctx context.Context) error {
106105
start := time.Now()
107106
c.scrapeNodeStats(nodeStats, batchState)
108107
c.scrapeContainersStats(containerStatsGroups, batchState)
109-
if c.cfg.Stats.StorageEnabled {
110-
c.collectStorageMetrics()
111-
c.collectNodeStatsSummary(ctx)
112-
}
113108
send()
114109
c.log.Debugf("stats exported, duration=%v", time.Since(start))
115110
}
@@ -331,78 +326,3 @@ func getPSIStatsDiff(prev, curr *castaipb.PSIStats) *castaipb.PSIStats {
331326
}
332327
return res
333328
}
334-
335-
func (c *Controller) collectStorageMetrics() {
336-
start := time.Now()
337-
c.log.Debug("starting storage metrics collection")
338-
339-
timestamp := time.Now().UTC()
340-
if err := c.processBlockDeviceMetrics(timestamp); err != nil {
341-
c.log.Errorf("failed to collect block device metrics: %v", err)
342-
}
343-
344-
if err := c.processFilesystemMetrics(timestamp); err != nil {
345-
c.log.Errorf("failed to collect filesystem metrics: %v", err)
346-
}
347-
348-
c.log.Debugf("storage metrics collection completed in %v", time.Since(start))
349-
}
350-
351-
func (c *Controller) processBlockDeviceMetrics(timestamp time.Time) error {
352-
if c.blockDeviceMetricsWriter == nil {
353-
return fmt.Errorf("block device metrics writer not initialized")
354-
}
355-
356-
blockMetrics, err := c.storageInfoProvider.BuildBlockDeviceMetrics(timestamp)
357-
if err != nil {
358-
return fmt.Errorf("failed to collect block device metrics: %w", err)
359-
}
360-
361-
c.log.Infof("collected %d block device metrics", len(blockMetrics))
362-
363-
if err := c.blockDeviceMetricsWriter.Write(blockMetrics...); err != nil {
364-
return fmt.Errorf("failed to write block device metrics: %w", err)
365-
}
366-
367-
return nil
368-
}
369-
370-
func (c *Controller) processFilesystemMetrics(timestamp time.Time) error {
371-
if c.filesystemMetricsWriter == nil {
372-
return fmt.Errorf("filesystem metrics writer not initialized")
373-
}
374-
375-
fsMetrics, err := c.storageInfoProvider.BuildFilesystemMetrics(timestamp)
376-
if err != nil {
377-
return fmt.Errorf("failed to collect filesystem metrics: %w", err)
378-
}
379-
380-
c.log.Infof("collected %d filesystem metrics", len(fsMetrics))
381-
382-
if err := c.filesystemMetricsWriter.Write(fsMetrics...); err != nil {
383-
return fmt.Errorf("failed to write filesystem metric: %w", err)
384-
}
385-
386-
return nil
387-
}
388-
389-
func (c *Controller) collectNodeStatsSummary(ctx context.Context) {
390-
if c.nodeStatsSummaryWriter == nil || c.storageInfoProvider == nil {
391-
return
392-
}
393-
394-
ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
395-
defer cancel()
396-
397-
metric, err := c.storageInfoProvider.CollectNodeStatsSummary(ctx)
398-
if err != nil {
399-
c.log.Errorf("failed to collect node stats summary: %v", err)
400-
return
401-
}
402-
403-
c.log.Info("collected node stats summary")
404-
405-
if err := c.nodeStatsSummaryWriter.Write(*metric); err != nil {
406-
c.log.Errorf("failed to write node stats summary: %v", err)
407-
}
408-
}
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
package pipeline
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"time"
7+
)
8+
9+
func (c *Controller) runStoragePipeline(ctx context.Context) error {
10+
c.log.Info("running storage stats pipeline")
11+
defer c.log.Info("storage stats pipeline done")
12+
13+
ticker := time.NewTicker(c.cfg.Stats.ScrapeInterval)
14+
defer ticker.Stop()
15+
16+
for {
17+
select {
18+
case <-ctx.Done():
19+
return ctx.Err()
20+
case <-ticker.C:
21+
start := time.Now()
22+
c.collectStorageMetrics()
23+
c.collectNodeStatsSummary(ctx)
24+
c.log.Debugf("storage stats exported, duration=%v", time.Since(start))
25+
}
26+
}
27+
}
28+
29+
func (c *Controller) collectStorageMetrics() {
30+
start := time.Now()
31+
c.log.Debug("starting storage stats collection")
32+
33+
timestamp := time.Now().UTC()
34+
if err := c.processBlockDeviceMetrics(timestamp); err != nil {
35+
c.log.Errorf("failed to collect block device metrics: %v", err)
36+
}
37+
38+
if err := c.processFilesystemMetrics(timestamp); err != nil {
39+
c.log.Errorf("failed to collect filesystem metrics: %v", err)
40+
}
41+
42+
c.log.Debugf("storage stats collection completed in %v", time.Since(start))
43+
}
44+
45+
func (c *Controller) processBlockDeviceMetrics(timestamp time.Time) error {
46+
if c.blockDeviceMetricsWriter == nil {
47+
return fmt.Errorf("block device metrics writer not initialized")
48+
}
49+
50+
blockMetrics, err := c.storageInfoProvider.BuildBlockDeviceMetrics(timestamp)
51+
if err != nil {
52+
return fmt.Errorf("failed to collect block device metrics: %w", err)
53+
}
54+
55+
c.log.Infof("collected %d block device metrics", len(blockMetrics))
56+
57+
if err := c.blockDeviceMetricsWriter.Write(blockMetrics...); err != nil {
58+
return fmt.Errorf("failed to write block device metrics: %w", err)
59+
}
60+
61+
return nil
62+
}
63+
64+
func (c *Controller) processFilesystemMetrics(timestamp time.Time) error {
65+
if c.filesystemMetricsWriter == nil {
66+
return fmt.Errorf("filesystem metrics writer not initialized")
67+
}
68+
69+
fsMetrics, err := c.storageInfoProvider.BuildFilesystemMetrics(timestamp)
70+
if err != nil {
71+
return fmt.Errorf("failed to collect filesystem metrics: %w", err)
72+
}
73+
74+
c.log.Infof("collected %d filesystem metrics", len(fsMetrics))
75+
76+
if err := c.filesystemMetricsWriter.Write(fsMetrics...); err != nil {
77+
return fmt.Errorf("failed to write filesystem metric: %w", err)
78+
}
79+
80+
return nil
81+
}
82+
83+
func (c *Controller) collectNodeStatsSummary(ctx context.Context) {
84+
if c.nodeStatsSummaryWriter == nil || c.storageInfoProvider == nil {
85+
return
86+
}
87+
88+
ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
89+
defer cancel()
90+
91+
metric, err := c.storageInfoProvider.CollectNodeStatsSummary(ctx)
92+
if err != nil {
93+
c.log.Errorf("failed to collect node stats summary: %v", err)
94+
return
95+
}
96+
97+
c.log.Info("collected node stats summary")
98+
99+
if err := c.nodeStatsSummaryWriter.Write(*metric); err != nil {
100+
c.log.Errorf("failed to write node stats summary: %v", err)
101+
}
102+
}

0 commit comments

Comments
 (0)