Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ There is a main configuration file with basic settings that point to the other c
"collectors" : "collectors.json",
"receivers" : "receivers.json",
"router" : "router.json",
"stats_api" : "api.json",
"interval": 10,
"duration": 1
}
Expand All @@ -32,6 +33,7 @@ See the component READMEs for their configuration:
* [`sinks`](./sinks/README.md)
* [`receivers`](./receivers/README.md)
* [`router`](./internal/metricRouter/README.md)
* [`stats_api`](./internal/metricRouter/StatsApi.md)


# Installation
Expand Down
23 changes: 23 additions & 0 deletions cc-metric-collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type CentralConfigFile struct {
RouterConfigFile string `json:"router"`
SinkConfigFile string `json:"sinks"`
ReceiverConfigFile string `json:"receivers,omitempty"`
StatsApiConfigFile string `json:"stats_api,omitempty"`
}

func LoadCentralConfiguration(file string, config *CentralConfigFile) error {
Expand All @@ -52,6 +53,7 @@ type RuntimeConfig struct {
CollectManager collectors.CollectorManager
SinkManager sinks.SinkManager
ReceiveManager receivers.ReceiveManager
StatsApi mr.StatsApi
MultiChanTicker mct.MultiChanTicker

Channels []chan lp.CCMetric
Expand Down Expand Up @@ -152,18 +154,24 @@ func shutdownHandler(config *RuntimeConfig, shutdownSignal chan os.Signal) {
cclog.Debug("Shutdown SinkManager...")
config.SinkManager.Close()
}
if config.StatsApi != nil {
cclog.Debug("Shutdown StatsApi...")
config.StatsApi.Close()
}
}

func mainFunc() int {
var err error
use_recv := false
use_api := false

// Initialize runtime configuration
rcfg := RuntimeConfig{
MetricRouter: nil,
CollectManager: nil,
SinkManager: nil,
ReceiveManager: nil,
StatsApi: nil,
CliArgs: ReadCli(),
}

Expand Down Expand Up @@ -253,13 +261,28 @@ func mainFunc() int {
use_recv = true
}

// Create new statistics API manager
if len(rcfg.ConfigFile.StatsApiConfigFile) > 0 {
rcfg.StatsApi, err = mr.NewStatsApi(rcfg.MultiChanTicker, &rcfg.Sync, rcfg.ConfigFile.StatsApiConfigFile)
if err != nil {
cclog.Error(err.Error())
return 1
}
use_api = true
}

// Create shutdown handler
shutdownSignal := make(chan os.Signal, 1)
signal.Notify(shutdownSignal, os.Interrupt)
signal.Notify(shutdownSignal, syscall.SIGTERM)
rcfg.Sync.Add(1)
go shutdownHandler(&rcfg, shutdownSignal)

// Start the stats api early to be prepared for init settings
if use_api {
rcfg.StatsApi.Start()
}

// Start the managers
rcfg.MetricRouter.Start()
rcfg.SinkManager.Start()
Expand Down
13 changes: 9 additions & 4 deletions collectors/beegfsmetaMetric.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter"
)

const DEFAULT_BEEGFS_CMD = "beegfs-ctl"
Expand All @@ -29,10 +30,11 @@ type BeegfsMetaCollectorConfig struct {

type BeegfsMetaCollector struct {
metricCollector
tags map[string]string
matches map[string]string
config BeegfsMetaCollectorConfig
skipFS map[string]struct{}
tags map[string]string
matches map[string]string
config BeegfsMetaCollectorConfig
skipFS map[string]struct{}
statsProcessedMetrics int64
}

func (m *BeegfsMetaCollector) Init(config json.RawMessage) error {
Expand Down Expand Up @@ -105,6 +107,7 @@ func (m *BeegfsMetaCollector) Init(config json.RawMessage) error {
if err != nil {
return fmt.Errorf("BeegfsMetaCollector.Init(): Failed to find beegfs-ctl binary '%s': %v", m.config.Beegfs, err)
}
m.statsProcessedMetrics = 0
m.init = true
return nil
}
Expand Down Expand Up @@ -218,10 +221,12 @@ func (m *BeegfsMetaCollector) Read(interval time.Duration, output chan lp.CCMetr
y, err := lp.New(key, m.tags, m.meta, map[string]interface{}{"value": value}, time.Now())
if err == nil {
output <- y
m.statsProcessedMetrics++
}
}
}
}
stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics)
}

func (m *BeegfsMetaCollector) Close() {
Expand Down
13 changes: 9 additions & 4 deletions collectors/beegfsstorageMetric.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter"
)

// Struct for the collector-specific JSON config
Expand All @@ -27,10 +28,11 @@ type BeegfsStorageCollectorConfig struct {

type BeegfsStorageCollector struct {
metricCollector
tags map[string]string
matches map[string]string
config BeegfsStorageCollectorConfig
skipFS map[string]struct{}
tags map[string]string
matches map[string]string
config BeegfsStorageCollectorConfig
skipFS map[string]struct{}
statsProcessedMetrics int64
}

func (m *BeegfsStorageCollector) Init(config json.RawMessage) error {
Expand Down Expand Up @@ -98,6 +100,7 @@ func (m *BeegfsStorageCollector) Init(config json.RawMessage) error {
if err != nil {
return fmt.Errorf("BeegfsStorageCollector.Init(): Failed to find beegfs-ctl binary '%s': %v", m.config.Beegfs, err)
}
m.statsProcessedMetrics = 0
m.init = true
return nil
}
Expand Down Expand Up @@ -210,10 +213,12 @@ func (m *BeegfsStorageCollector) Read(interval time.Duration, output chan lp.CCM
y, err := lp.New(key, m.tags, m.meta, map[string]interface{}{"value": value}, time.Now())
if err == nil {
output <- y
m.statsProcessedMetrics++
}
}
}
}
stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics)
}

func (m *BeegfsStorageCollector) Close() {
Expand Down
8 changes: 6 additions & 2 deletions collectors/cpufreqCpuinfoMetric.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter"
)

//
Expand All @@ -36,7 +37,8 @@ type CPUFreqCpuInfoCollectorTopology struct {

type CPUFreqCpuInfoCollector struct {
metricCollector
topology []*CPUFreqCpuInfoCollectorTopology
topology []*CPUFreqCpuInfoCollectorTopology
statsProcessedMetrics int64
}

func (m *CPUFreqCpuInfoCollector) Init(config json.RawMessage) error {
Expand Down Expand Up @@ -155,7 +157,7 @@ func (m *CPUFreqCpuInfoCollector) Init(config json.RawMessage) error {
"package_id": t.physicalPackageID,
}
}

m.statsProcessedMetrics = 0
m.init = true
return nil
}
Expand Down Expand Up @@ -196,13 +198,15 @@ func (m *CPUFreqCpuInfoCollector) Read(interval time.Duration, output chan lp.CC
return
}
if y, err := lp.New("cpufreq", t.tagSet, m.meta, map[string]interface{}{"value": value}, now); err == nil {
m.statsProcessedMetrics++
output <- y
}
}
processorCounter++
}
}
}
stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics)
}

func (m *CPUFreqCpuInfoCollector) Close() {
Expand Down
10 changes: 7 additions & 3 deletions collectors/cpufreqMetric.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter"
"golang.org/x/sys/unix"
)

Expand Down Expand Up @@ -39,8 +40,9 @@ type CPUFreqCollectorTopology struct {
//
type CPUFreqCollector struct {
metricCollector
topology []CPUFreqCollectorTopology
config struct {
topology []CPUFreqCollectorTopology
statsProcessedMetrics int64
config struct {
ExcludeMetrics []string `json:"exclude_metrics,omitempty"`
}
}
Expand Down Expand Up @@ -166,7 +168,7 @@ func (m *CPUFreqCollector) Init(config json.RawMessage) error {
"package_id": t.physicalPackageID,
}
}

m.statsProcessedMetrics = 0
m.init = true
return nil
}
Expand Down Expand Up @@ -203,9 +205,11 @@ func (m *CPUFreqCollector) Read(interval time.Duration, output chan lp.CCMetric)
}

if y, err := lp.New("cpufreq", t.tagSet, m.meta, map[string]interface{}{"value": cpuFreq}, now); err == nil {
m.statsProcessedMetrics++
output <- y
}
}
stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics)
}

func (m *CPUFreqCollector) Close() {
Expand Down
14 changes: 10 additions & 4 deletions collectors/cpustatMetric.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter"
)

const CPUSTATFILE = `/proc/stat`
Expand All @@ -21,10 +22,11 @@ type CpustatCollectorConfig struct {

type CpustatCollector struct {
metricCollector
config CpustatCollectorConfig
matches map[string]int
cputags map[string]map[string]string
nodetags map[string]string
config CpustatCollectorConfig
matches map[string]int
cputags map[string]map[string]string
nodetags map[string]string
statsProcessedMetrics int64
}

func (m *CpustatCollector) Init(config json.RawMessage) error {
Expand Down Expand Up @@ -86,6 +88,7 @@ func (m *CpustatCollector) Init(config json.RawMessage) error {
num_cpus++
}
}
m.statsProcessedMetrics = 0
m.init = true
return nil
}
Expand All @@ -106,6 +109,7 @@ func (m *CpustatCollector) parseStatLine(linefields []string, tags map[string]st
for name, value := range values {
y, err := lp.New(name, tags, m.meta, map[string]interface{}{"value": (value * 100.0) / total}, t)
if err == nil {
m.statsProcessedMetrics++
output <- y
}
}
Expand Down Expand Up @@ -141,8 +145,10 @@ func (m *CpustatCollector) Read(interval time.Duration, output chan lp.CCMetric)
time.Now(),
)
if err == nil {
m.statsProcessedMetrics++
output <- num_cpus_metric
}
stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics)
}

func (m *CpustatCollector) Close() {
Expand Down
25 changes: 20 additions & 5 deletions collectors/customCmdMetric.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"time"

lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
stats "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter"
influx "github.com/influxdata/line-protocol"
)

Expand All @@ -23,11 +24,14 @@ type CustomCmdCollectorConfig struct {

type CustomCmdCollector struct {
metricCollector
handler *influx.MetricHandler
parser *influx.Parser
config CustomCmdCollectorConfig
commands []string
files []string
handler *influx.MetricHandler
parser *influx.Parser
config CustomCmdCollectorConfig
commands []string
files []string
statsProcessedMetrics int64
statsProcessedCommands int64
statsProcessedFiles int64
}

func (m *CustomCmdCollector) Init(config json.RawMessage) error {
Expand Down Expand Up @@ -66,6 +70,9 @@ func (m *CustomCmdCollector) Init(config json.RawMessage) error {
m.handler = influx.NewMetricHandler()
m.parser = influx.NewParser(m.handler)
m.parser.SetTimeFunc(DefaultTime)
m.statsProcessedMetrics = 0
m.statsProcessedFiles = 0
m.statsProcessedCommands = 0
m.init = true
return nil
}
Expand Down Expand Up @@ -100,9 +107,13 @@ func (m *CustomCmdCollector) Read(interval time.Duration, output chan lp.CCMetri

y := lp.FromInfluxMetric(c)
if err == nil {
m.statsProcessedMetrics++
stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics)
output <- y
}
}
m.statsProcessedCommands++
stats.ComponentStatInt(m.name, "processed_commands", m.statsProcessedCommands)
}
for _, file := range m.files {
buffer, err := ioutil.ReadFile(file)
Expand All @@ -122,9 +133,13 @@ func (m *CustomCmdCollector) Read(interval time.Duration, output chan lp.CCMetri
}
y := lp.FromInfluxMetric(f)
if err == nil {
m.statsProcessedMetrics++
stats.ComponentStatInt(m.name, "processed_metrics", m.statsProcessedMetrics)
output <- y
}
}
m.statsProcessedFiles++
stats.ComponentStatInt(m.name, "processed_files", m.statsProcessedFiles)
}
}

Expand Down
Loading