Skip to content

Commit 94253b0

Browse files
authored
CGroup Disk Metrics (#169)
1 parent fc3439a commit 94253b0

30 files changed

+616
-18
lines changed

AtlasAgent/src/atlas-agent.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -264,6 +264,7 @@ void collect_titus_metrics(Registry* registry, std::unique_ptr<atlasagent::Nvml>
264264
// we can gather them here
265265
if (fiveSecondMetricsEnabled == true)
266266
{
267+
cGroup.IOStats();
267268
next_five_second_run += seconds(5);
268269
}
269270

lib/collectors/cgroup/src/cgroup.cpp

Lines changed: 313 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#include "cgroup.h"
22
#include <lib/util/src/util.h>
33
#include <cstdlib>
4+
#include <charconv>
45
#include <map>
56
#include <unistd.h>
67

@@ -87,7 +88,6 @@ void CGroup::CpuTimeV2(const std::unordered_map<std::string, int64_t>& stats) no
8788
{
8889
auto secs = (stats.at("system_usec") - prev_sys_usage) / MICROS;
8990
registry_->CreateCounter("cgroup.cpu.usageTime", {{"id", "system"}}).Increment(secs);
90-
9191
}
9292
prev_sys_usage = stats.at("system_usec");
9393

@@ -168,8 +168,7 @@ void CGroup::CpuUtilizationV2(const absl::Time& now, const double cpuCount, cons
168168
prev_user_time = stats.at("user_usec");
169169
}
170170

171-
void CGroup::CpuPeakUtilizationV2(const absl::Time& now, const std::unordered_map<std::string, int64_t>& stats,
172-
const double cpuCount) noexcept
171+
void CGroup::CpuPeakUtilizationV2(const absl::Time& now, const std::unordered_map<std::string, int64_t>& stats, const double cpuCount) noexcept
173172
{
174173
static absl::Time last_updated;
175174
auto delta_t = absl::ToDoubleSeconds(now - last_updated);
@@ -292,4 +291,315 @@ void CGroup::MemoryStatsStdV2() noexcept
292291
}
293292
}
294293

294+
std::unordered_map<std::string, std::string> FindDeviceNames()
295+
{
296+
auto lines = read_lines_fields("/proc", "diskstats");
297+
std::unordered_map<std::string, std::string> deviceMap(lines.size());
298+
static constexpr unsigned int EXPECTED_FIELDS = 20;
299+
for (const auto& fields : lines)
300+
{
301+
if (fields.size() != EXPECTED_FIELDS) [[unlikely]]
302+
{
303+
atlasagent::Logger()->warn("Unexpected number of fields in /proc/diskstats line: {}", fields.size());
304+
continue;
305+
}
306+
307+
std::string majorMinor = fields[0] + ':' + fields[1];
308+
deviceMap.emplace(std::move(majorMinor), fields[2]);
309+
}
310+
return deviceMap;
311+
}
312+
313+
std::optional<IOStats> ParseIOLine(const std::vector<std::string>& fields, const std::unordered_map<std::string, std::string>& devMap) try
314+
{
315+
// Set the key to the device name
316+
IOStats stats;
317+
stats.majorMinor = fields[0];
318+
319+
auto it = devMap.find(stats.majorMinor);
320+
if (it != devMap.end())
321+
{
322+
stats.deviceName = it->second;
323+
}
324+
else
325+
{
326+
atlasagent::Logger()->warn("Device major:minor {} not found in /proc/diskstats mapping", stats.majorMinor);
327+
}
328+
329+
// Iterate through the remaining fields and parse key-value pairs
330+
for (size_t i = 1; i < fields.size(); ++i)
331+
{
332+
auto pos = fields[i].find('=');
333+
if (pos == std::string::npos)
334+
{
335+
throw std::runtime_error("Malformed key=value pair in io.stat: " + fields[i]);
336+
}
337+
338+
std::string_view currentField(fields[i]);
339+
std::string_view key = currentField.substr(0, pos);
340+
std::string_view value_str = currentField.substr(pos + 1);
341+
342+
double value;
343+
auto [ptr, ec] = std::from_chars(value_str.data(), value_str.data() + value_str.size(), value);
344+
if (ec != std::errc())
345+
{
346+
throw std::runtime_error("Failed to parse expected integer from io.stat");
347+
};
348+
349+
if (value < 0)
350+
{
351+
throw std::runtime_error("Negative value in io.stat for key: " + std::string(key));
352+
}
353+
354+
if (key == "rbytes" && stats.rBytes.has_value() == false)
355+
stats.rBytes = value;
356+
else if (key == "wbytes" && stats.wBytes.has_value() == false)
357+
stats.wBytes = value;
358+
else if (key == "rios" && stats.rOperations.has_value() == false)
359+
stats.rOperations = value;
360+
else if (key == "wios" && stats.wOperations.has_value() == false)
361+
stats.wOperations = value;
362+
else if (key == "dbytes" && stats.dBytes.has_value() == false)
363+
stats.dBytes = value;
364+
else if (key == "dios" && stats.dOperations.has_value() == false)
365+
stats.dOperations = value;
366+
else
367+
throw std::runtime_error("Unexpected or duplicate key in io.stat: " + std::string(key));
368+
}
369+
370+
// Validate that all required IO statistics are set
371+
bool all_stats_valid = stats.rBytes.has_value() && stats.wBytes.has_value() && stats.rOperations.has_value() &&
372+
stats.wOperations.has_value() && stats.dBytes.has_value() && stats.dOperations.has_value();
373+
374+
if (!all_stats_valid)
375+
{
376+
throw std::runtime_error("Incomplete IO statistics for device: " + stats.majorMinor);
377+
}
378+
379+
return stats;
380+
}
381+
catch (const std::exception& ex)
382+
{
383+
atlasagent::Logger()->error("Exception parsing IO stat line: {}", ex.what());
384+
return std::nullopt;
385+
}
386+
387+
std::unordered_map<std::string, IOStats> ParseIOLines(const std::vector<std::vector<std::string>>& lines, const std::unordered_map<std::string, std::string>& devMap) try
388+
{
389+
std::unordered_map<std::string, IOStats> ioStats;
390+
391+
// Iterate through each line from io.stat
392+
for (const auto& fields : lines)
393+
{
394+
// Skip completely empty lines (device with no stats)
395+
if (fields.size() == 1) continue;
396+
397+
// Each line should have exactly 7 fields: device rbytes= wbytes= rios= wios= dbytes= dios=
398+
if (fields.size() != 7)
399+
{
400+
throw std::runtime_error("Invalid number of fields in io.stat line: " + std::to_string(fields.size()));
401+
}
402+
403+
// Parse the individual line into an IOStats object
404+
auto ioStatObject = ParseIOLine(fields, devMap);
405+
if (ioStatObject.has_value() == false)
406+
{
407+
return {};
408+
}
409+
ioStats.emplace(ioStatObject->majorMinor, std::move(ioStatObject.value()));
410+
}
411+
return ioStats;
412+
}
413+
catch (const std::exception& ex)
414+
{
415+
atlasagent::Logger()->error("Exception parsing IO stat lines: {}", ex.what());
416+
return {};
417+
}
418+
419+
std::optional<IOThrottle> ParseIOThrottleLine(const std::vector<std::string>& fields) try
420+
{
421+
// Set the device name
422+
IOThrottle throttle;
423+
throttle.device = fields[0];
424+
425+
// Iterate through the remaining throttle fields
426+
for (size_t i = 1; i < fields.size(); ++i)
427+
{
428+
auto pos = fields[i].find('=');
429+
if (pos == std::string::npos)
430+
{
431+
throw std::runtime_error("Malformed key=value pair in io.max: " + fields[i]);
432+
}
433+
434+
std::string_view currentField(fields[i]);
435+
std::string_view key = currentField.substr(0, pos);
436+
std::string_view value_str = currentField.substr(pos + 1);
437+
438+
double value;
439+
if (value_str == "max")
440+
{
441+
value = -1.0;
442+
}
443+
else
444+
{
445+
auto [ptr, ec] = std::from_chars(value_str.data(), value_str.data() + value_str.size(), value);
446+
if (ec != std::errc())
447+
{
448+
throw std::runtime_error("Failed to parse expected integer from io.max");
449+
};
450+
451+
if (value < 0)
452+
{
453+
throw std::runtime_error("Negative value in io.max for key: " + std::string(key));
454+
}
455+
}
456+
457+
// Direct assignment based on key
458+
if (key == "rbps" && throttle.rBps.has_value() == false)
459+
throttle.rBps = value;
460+
else if (key == "wbps" && throttle.wBps.has_value() == false)
461+
throttle.wBps = value;
462+
else if (key == "riops" && throttle.rIops.has_value() == false)
463+
throttle.rIops = value;
464+
else if (key == "wiops" && throttle.wIops.has_value() == false)
465+
throttle.wIops = value;
466+
else
467+
throw std::runtime_error("Unexpected or duplicate key in io.max: " + std::string(key));
468+
}
469+
470+
bool validThrottle = throttle.rBps.has_value() && throttle.wBps.has_value() && throttle.rIops.has_value() &&
471+
throttle.wIops.has_value();
472+
473+
if (!validThrottle)
474+
{
475+
throw std::runtime_error("Incomplete IO throttle settings for device: " + throttle.device);
476+
}
477+
return throttle;
478+
}
479+
catch (const std::exception& ex)
480+
{
481+
atlasagent::Logger()->error("Exception parsing IO throttle line: {}", ex.what());
482+
return std::nullopt;
483+
}
484+
485+
std::unordered_map<std::string, IOThrottle> ParseIOThrottleLines(const std::vector<std::vector<std::string>>& lines) try
486+
{
487+
std::unordered_map<std::string, IOThrottle> ioThrottles;
488+
489+
// Iterate through each line from io.max
490+
for (const auto& fields : lines)
491+
{
492+
// Each line should have exactly 5 fields: device rbps= wbps= riops= wiops=
493+
if (fields.size() != 5)
494+
{
495+
throw std::runtime_error("Unexpected number of fields in io.max line " + std::to_string(fields.size()));
496+
}
497+
498+
auto throttle = ParseIOThrottleLine(fields);
499+
if (throttle.has_value() == false)
500+
{
501+
return {};
502+
}
503+
ioThrottles.emplace(throttle->device, throttle.value());
504+
}
505+
return ioThrottles;
506+
}
507+
catch (const std::exception& ex)
508+
{
509+
atlasagent::Logger()->error("Exception parsing IO throttle lines: {}", ex.what());
510+
return {};
511+
}
512+
513+
void UpdateIOMetrics(const std::unordered_map<std::string, IOStats>& ioStats, const std::unordered_map<std::string, IOThrottle>& ioThrottles, Registry* registry)
514+
{
515+
// Static map to hold previous IOStats for delta calculations
516+
static std::unordered_map<std::string, IOStats> previousStats;
517+
constexpr double INTERVAL_SECONDS = 5.0;
518+
constexpr double PERCENT_MULTIPLIER = 100.0;
519+
520+
// Iterate through current IO statistics
521+
for (const auto& [deviceKey, currentStat] : ioStats)
522+
{
523+
atlasagent::Logger()->debug("IO Stats for device {}:", deviceKey);
524+
atlasagent::Logger()->debug("\tIO Object: {{current_rbytes: {}, current_rios: {}, current_wbytes: {}, current_wios: {}}}",
525+
currentStat.rBytes.value(), currentStat.rOperations.value(), currentStat.wBytes.value(),
526+
currentStat.wOperations.value());
527+
528+
// Check if we have previous data for a delta calculation
529+
auto prev_it = previousStats.find(deviceKey);
530+
if (prev_it != previousStats.end())
531+
{
532+
const auto& prevStat = prev_it->second;
533+
534+
// Calculate deltas
535+
const auto delta_rbytes = currentStat.rBytes.value() - prevStat.rBytes.value();
536+
const auto delta_wbytes = currentStat.wBytes.value() - prevStat.wBytes.value();
537+
const auto delta_rios = currentStat.rOperations.value() - prevStat.rOperations.value();
538+
const auto delta_wios = currentStat.wOperations.value() - prevStat.wOperations.value();
539+
540+
atlasagent::Logger()->debug("\tRealtime Counter: {{delta_rbytes: {}, delta_rios: {}, delta_wbytes: {}, delta_wios: {}}}",
541+
delta_rbytes, delta_rios, delta_wbytes, delta_wios);
542+
543+
// Update byte and operation counters
544+
registry->CreateCounter("disk.io.bytes", {{"dev", currentStat.deviceName}, {"id", "read"}}).Increment(delta_rbytes);
545+
registry->CreateCounter("disk.io.bytes", {{"dev", currentStat.deviceName}, {"id", "write"}}).Increment(delta_wbytes);
546+
registry->CreateCounter("disk.io.ops", {{"dev", currentStat.deviceName}, {"id", "read"}, {"statistic", "count"}}).Increment(delta_rios);
547+
registry->CreateCounter("disk.io.ops", {{"dev", currentStat.deviceName}, {"id", "write"}, {"statistic", "count"}}).Increment(delta_wios);
548+
549+
// Calculate throttle utilization if throttle data is available
550+
auto throttle_it = ioThrottles.find(deviceKey);
551+
if (throttle_it != ioThrottles.end())
552+
{
553+
const auto& throttle = throttle_it->second;
554+
555+
atlasagent::Logger()->debug("\tThrottle Settings: {{read_bps: {}, write_bps: {}, read_iops: {}, write_iops: {}}}",
556+
throttle.rBps.value(), throttle.wBps.value(), throttle.rIops.value(),
557+
throttle.wIops.value());
558+
559+
// Helper lambda to record throttle utilization
560+
auto record_throttle_utilization = [&](double delta, const std::optional<double>& limit, const std::string& metric_name, const std::string& operation)
561+
{
562+
if (limit.has_value() && limit.value() > 0)
563+
{
564+
// Utilization = (delta / (limit * interval)) * 100
565+
const auto utilization = (delta / (limit.value() * INTERVAL_SECONDS)) * PERCENT_MULTIPLIER;
566+
registry->CreateDistributionSummary(metric_name, {{"dev", currentStat.deviceName}, {"id", operation}}).Record(utilization);
567+
}
568+
};
569+
570+
record_throttle_utilization(delta_rbytes, throttle.rBps, "cgroup.disk.io.throttleActivityBytes", "read");
571+
record_throttle_utilization(delta_wbytes, throttle.wBps, "cgroup.disk.io.throttleActivityBytes", "write");
572+
record_throttle_utilization(delta_rios, throttle.rIops, "cgroup.disk.io.throttleActivityOperations", "read");
573+
record_throttle_utilization(delta_wios, throttle.wIops, "cgroup.disk.io.throttleActivityOperations", "write");
574+
}
575+
}
576+
// Update previous stats for next iteration
577+
previousStats[deviceKey] = currentStat;
578+
}
579+
}
580+
581+
void CGroup::IOStats()
582+
{
583+
// Read the contents of io.stat
584+
auto ioStatLines = read_lines_fields(path_prefix_, "io.stat");
585+
586+
// Find all the device names from /proc/diskstats and create mapping of {major:minor, device name}
587+
auto deviceNames = FindDeviceNames();
588+
589+
// Parse the io.stat lines into structured IOStats objects
590+
auto ioStats = ParseIOLines(ioStatLines, deviceNames);
591+
if (ioStats.empty())
592+
{
593+
atlasagent::Logger()->info("No valid IO statistics found in io.stat");
594+
return;
595+
}
596+
597+
// Read the contents of io.max and parse into structured IOThrottle objects
598+
auto throttleLines = read_lines_fields(path_prefix_, "io.max");
599+
auto ioThrottles = ParseIOThrottleLines(throttleLines);
600+
601+
// Update metrics based on parsed IO statistics and throttling information
602+
UpdateIOMetrics(ioStats, ioThrottles, registry_);
603+
}
604+
295605
} // namespace atlasagent

0 commit comments

Comments
 (0)