Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 26 additions & 9 deletions AtlasAgent/src/atlas-agent.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,17 @@ std::unique_ptr<GpuMetrics> init_gpu(Registry* registry, std::unique_ptr<Nvml> l
}

#if defined(TITUS_SYSTEM_SERVICE)
static void gather_peak_titus_metrics(CGroup* cGroup) { cGroup->cpu_peak_stats(); }
static void gather_peak_titus_metrics(CGroup* cGroup, const bool fiveSecondMetricsEnabled, const bool sixtySecondMetricsEnabled)
{
cGroup->CpuStats(fiveSecondMetricsEnabled, sixtySecondMetricsEnabled);
}

static void gather_slow_titus_metrics(CGroup* cGroup, Proc* proc, Disk* disk, Aws* aws)
{
aws->update_stats();
cGroup->cpu_stats();
cGroup->memory_stats_v2();
cGroup->memory_stats_std_v2();
cGroup->network_stats();
cGroup->MemoryStatsV2();
cGroup->MemoryStatsStdV2();
cGroup->NetworkStats();
disk->titus_disk_stats();
proc->netstat_stats();
proc->network_stats();
Expand Down Expand Up @@ -243,15 +245,30 @@ void collect_titus_metrics(Registry* registry, std::unique_ptr<atlasagent::Nvml>

auto now = system_clock::now();
auto next_run = now;
auto next_slow_run = now + seconds(60);
auto next_sixty_second_run = now + seconds(60);
auto next_five_second_run = now + seconds(5);
std::chrono::nanoseconds time_to_sleep;

do
{
auto start = system_clock::now();
gather_peak_titus_metrics(&cGroup);
bool fiveSecondMetricsEnabled = (start >= next_five_second_run);
bool sixtySecondMetricsEnabled = (start >= next_sixty_second_run);

// 1 second, 5 second, and 60 second CPU metrics are gathered here because they read from
// the same /proc/stat file
gather_peak_titus_metrics(&cGroup, fiveSecondMetricsEnabled, sixtySecondMetricsEnabled);

if (start >= next_slow_run)
// If its time to gather 5 second metrics, update the next run time
// Currently we only have CPU metrics that run every 5 seconds, but if we add more in the future
// we can gather them here
if (fiveSecondMetricsEnabled == true)
{
next_five_second_run += seconds(5);
}

// If its time to gather 60 second metrics, gather the metrics and update the next run time
if (sixtySecondMetricsEnabled == true)
{
gather_slow_titus_metrics(&cGroup, &proc, &disk, &aws);
perf_metrics.collect();
Expand All @@ -265,7 +282,7 @@ void collect_titus_metrics(Registry* registry, std::unique_ptr<atlasagent::Nvml>
}
auto elapsed = duration_cast<milliseconds>(system_clock::now() - start);
Logger()->info("Published Titus metrics (delay={})", elapsed);
next_slow_run += seconds(60);
next_sixty_second_run += seconds(60);
}

next_run += seconds(1);
Expand Down
123 changes: 69 additions & 54 deletions lib/collectors/cgroup/src/cgroup.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ namespace atlasagent

constexpr auto MICROS = 1000 * 1000.0;

void CGroup::network_stats() noexcept
void CGroup::NetworkStats() noexcept
{
auto megabits = std::getenv("TITUS_NUM_NETWORK_BANDWIDTH");

Expand All @@ -24,7 +24,7 @@ void CGroup::network_stats() noexcept
}
}

void CGroup::pressure_stall() noexcept
void CGroup::PressureStall() noexcept
{
auto lines = read_lines_fields(path_prefix_, "cpu.pressure");

Expand Down Expand Up @@ -58,79 +58,85 @@ void CGroup::pressure_stall() noexcept
}
}

void CGroup::cpu_throttle_v2() noexcept
void CGroup::CpuThrottleV2(const std::unordered_map<std::string, int64_t>& stats) noexcept
{
std::unordered_map<std::string, int64_t> stats;
parse_kv_from_file(path_prefix_, "cpu.stat", &stats);

static auto prev_throttled_time = static_cast<int64_t>(-1);
auto cur_throttled_time = stats["throttled_usec"];
auto cur_throttled_time = stats.at("throttled_usec");
if (prev_throttled_time >= 0)
{
auto seconds = (cur_throttled_time - prev_throttled_time) / MICROS;
registry_->CreateCounter("cgroup.cpu.throttledTime").Increment(seconds);
}
prev_throttled_time = cur_throttled_time;

registry_->CreateMonotonicCounter("cgroup.cpu.numThrottled").Set(stats["nr_throttled"]);
registry_->CreateMonotonicCounter("cgroup.cpu.numThrottled").Set(stats.at("nr_throttled"));
}

void CGroup::cpu_time_v2() noexcept
void CGroup::CpuTimeV2(const std::unordered_map<std::string, int64_t>& stats) noexcept
{
std::unordered_map<std::string, int64_t> stats;
parse_kv_from_file(path_prefix_, "cpu.stat", &stats);

static auto prev_proc_time = static_cast<int64_t>(-1);
if (prev_proc_time >= 0)
{
auto secs = (stats["usage_usec"] - prev_proc_time) / MICROS;
auto secs = (stats.at("usage_usec") - prev_proc_time) / MICROS;
registry_->CreateCounter("cgroup.cpu.processingTime").Increment(secs);
}
prev_proc_time = stats["usage_usec"];
prev_proc_time = stats.at("usage_usec");

static auto prev_sys_usage = static_cast<int64_t>(-1);
if (prev_sys_usage >= 0)
{
auto secs = (stats["system_usec"] - prev_sys_usage) / MICROS;
auto secs = (stats.at("system_usec") - prev_sys_usage) / MICROS;
registry_->CreateCounter("cgroup.cpu.usageTime", {{"id", "system"}}).Increment(secs);

}
prev_sys_usage = stats["system_usec"];
prev_sys_usage = stats.at("system_usec");

static auto prev_user_usage = static_cast<int64_t>(-1);
if (prev_user_usage >= 0)
{
auto secs = (stats["user_usec"] - prev_user_usage) / MICROS;
auto secs = (stats.at("user_usec") - prev_user_usage) / MICROS;
registry_->CreateCounter("cgroup.cpu.usageTime", {{"id", "user"}}).Increment(secs);
}
prev_user_usage = stats["user_usec"];
prev_user_usage = stats.at("user_usec");
}

double CGroup::get_avail_cpu_time(double delta_t, double num_cpu) noexcept
double CGroup::GetAvailCpuTime(const double delta_t, const double cpuCount) noexcept
{
auto cpu_max = read_num_vector_from_file(path_prefix_, "cpu.max");
auto cfs_period = cpu_max[1];
auto cfs_quota = cfs_period * num_cpu;
auto cfs_quota = cfs_period * cpuCount;
return (delta_t / cfs_period) * cfs_quota;
}

double CGroup::get_num_cpu() noexcept
double CGroup::GetNumCpu() noexcept
{
auto env_num_cpu = std::getenv("TITUS_NUM_CPU");
auto num_cpu = 0.0;
auto cpuCount = 0.0;
if (env_num_cpu != nullptr)
{
num_cpu = strtod(env_num_cpu, nullptr);
cpuCount = strtod(env_num_cpu, nullptr);
}
return cpuCount;
}

void CGroup::CpuProcessingCapacity(const absl::Time& now, const double cpuCount, const absl::Duration& interval) noexcept
{
static absl::Time last_updated;
if (last_updated == absl::UnixEpoch())
{
last_updated = now - interval;
}
return num_cpu;
auto delta_t = absl::ToDoubleSeconds(now - last_updated);
last_updated = now;
registry_->CreateCounter("cgroup.cpu.processingCapacity").Increment(delta_t * cpuCount);
}

void CGroup::cpu_utilization_v2(absl::Time now) noexcept
void CGroup::CpuUtilizationV2(const absl::Time& now, const double cpuCount, const absl::Duration& interval) noexcept
{
static absl::Time last_updated;
if (last_updated == absl::UnixEpoch())
{
// ensure cgroup.cpu.processingCapacity has a consistent value after one sample
last_updated = now - update_interval_;
last_updated = now - interval;
}
auto delta_t = absl::ToDoubleSeconds(now - last_updated);
last_updated = now;
Expand All @@ -141,12 +147,9 @@ void CGroup::cpu_utilization_v2(absl::Time now) noexcept
registry_->CreateGauge("cgroup.cpu.weight").Set(weight);
}

auto num_cpu = get_num_cpu();
auto avail_cpu_time = get_avail_cpu_time(delta_t, num_cpu);

registry_->CreateCounter("cgroup.cpu.processingCapacity").Increment(delta_t * num_cpu);
registry_->CreateGauge("sys.cpu.numProcessors").Set(num_cpu);
registry_->CreateGauge("titus.cpu.requested").Set(num_cpu);
auto avail_cpu_time = GetAvailCpuTime(delta_t, cpuCount);
registry_->CreateGauge("sys.cpu.numProcessors").Set(cpuCount);
registry_->CreateGauge("titus.cpu.requested").Set(cpuCount);

std::unordered_map<std::string, int64_t> stats;
parse_kv_from_file(path_prefix_, "cpu.stat", &stats);
Expand All @@ -168,36 +171,57 @@ void CGroup::cpu_utilization_v2(absl::Time now) noexcept
prev_user_time = stats["user_usec"];
}

void CGroup::cpu_peak_utilization_v2(absl::Time now) noexcept
void CGroup::CpuPeakUtilizationV2(const absl::Time& now, const std::unordered_map<std::string, int64_t>& stats,
const double cpuCount) noexcept
{
static absl::Time last_updated;
auto delta_t = absl::ToDoubleSeconds(now - last_updated);
last_updated = now;

auto num_cpu = get_num_cpu();
auto avail_cpu_time = get_avail_cpu_time(delta_t, num_cpu);

std::unordered_map<std::string, int64_t> stats;
parse_kv_from_file(path_prefix_, "cpu.stat", &stats);
auto avail_cpu_time = GetAvailCpuTime(delta_t, cpuCount);

static auto prev_system_time = static_cast<int64_t>(-1);
if (prev_system_time >= 0)
{
auto secs = (stats["system_usec"] - prev_system_time) / MICROS;
auto secs = (stats.at("system_usec") - prev_system_time) / MICROS;
registry_->CreateMaxGauge("sys.cpu.peakUtilization", {{"id", "system"}}).Set((secs / avail_cpu_time) * 100);
}
prev_system_time = stats["system_usec"];
prev_system_time = stats.at("system_usec");

static auto prev_user_time = static_cast<int64_t>(-1);
if (prev_user_time >= 0)
{
auto secs = (stats["user_usec"] - prev_user_time) / MICROS;
auto secs = (stats.at("user_usec") - prev_user_time) / MICROS;
registry_->CreateMaxGauge("sys.cpu.peakUtilization", {{"id", "user"}}).Set((secs / avail_cpu_time) * 100);
}
prev_user_time = stats["user_usec"];
prev_user_time = stats.at("user_usec");
}

void CGroup::CpuStats(const bool fiveSecondMetricsEnabled, const bool sixtySecondMetricsEnabled)
{
std::unordered_map<std::string, int64_t> stats;
parse_kv_from_file(path_prefix_, "cpu.stat", &stats);
auto cpuCount = GetNumCpu();

// Collect 60 second metrics if enabled
if (sixtySecondMetricsEnabled)
{
CpuThrottleV2(stats);
CpuUtilizationV2(absl::Now(), cpuCount, absl::Seconds(60));
}

// Collect 5 second metrics if enabled
if (fiveSecondMetricsEnabled)
{
CpuTimeV2(stats);
CpuProcessingCapacity(absl::Now(), cpuCount, absl::Seconds(5));
}

// Always collect peak stats (called every 1 second)
CpuPeakUtilizationV2(absl::Now(), stats, cpuCount);
}

void CGroup::memory_stats_v2() noexcept
void CGroup::MemoryStatsV2() noexcept
{
auto usage_bytes = read_num_from_file(path_prefix_, "memory.current");
if (usage_bytes >= 0)
Expand Down Expand Up @@ -237,7 +261,7 @@ void CGroup::memory_stats_v2() noexcept
registry_->CreateMonotonicCounter("cgroup.mem.pageFaults", {{"id", "major"}}).Set(stats["pgmajfault"]);
}

void CGroup::memory_stats_std_v2() noexcept
void CGroup::MemoryStatsStdV2() noexcept
{
auto mem_limit = read_num_from_file(path_prefix_, "memory.max");
auto mem_usage = read_num_from_file(path_prefix_, "memory.current");
Expand Down Expand Up @@ -271,13 +295,4 @@ void CGroup::memory_stats_std_v2() noexcept
}
}

void CGroup::do_cpu_stats(absl::Time now) noexcept
{
cpu_throttle_v2();
cpu_time_v2();
cpu_utilization_v2(now);
}

void CGroup::do_cpu_peak_stats(absl::Time now) noexcept { cpu_peak_utilization_v2(now); }

} // namespace atlasagent
45 changes: 21 additions & 24 deletions lib/collectors/cgroup/src/cgroup.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,36 +9,33 @@ namespace atlasagent
class CGroup
{
public:
explicit CGroup(Registry* registry, std::string path_prefix = "/sys/fs/cgroup",
absl::Duration update_interval = absl::Seconds(60)) noexcept
: registry_(registry), path_prefix_(std::move(path_prefix)), update_interval_{update_interval}
explicit CGroup(Registry* registry, std::string path_prefix = "/sys/fs/cgroup") noexcept
: path_prefix_(std::move(path_prefix)), registry_(registry)
{
}

void cpu_stats() noexcept { do_cpu_stats(absl::Now()); }
void cpu_peak_stats() noexcept { do_cpu_peak_stats(absl::Now()); }
void memory_stats_v2() noexcept;
void memory_stats_std_v2() noexcept;
void network_stats() noexcept;
void pressure_stall() noexcept;
void set_prefix(std::string new_prefix) noexcept { path_prefix_ = std::move(new_prefix); }
void CpuStats(const bool fiveSecondMetricsEnabled, const bool sixtySecondMetricsEnabled);
void MemoryStatsV2() noexcept;
void MemoryStatsStdV2() noexcept;
void NetworkStats() noexcept;
void PressureStall() noexcept;
void SetPrefix(std::string new_prefix) noexcept { path_prefix_ = std::move(new_prefix); }

private:
Registry* registry_;
protected:
// For testing access
std::string path_prefix_;
absl::Duration update_interval_;

void cpu_throttle_v2() noexcept;
void cpu_time_v2() noexcept;
void cpu_utilization_v2(absl::Time now) noexcept;
void cpu_peak_utilization_v2(absl::Time now) noexcept;
double get_avail_cpu_time(double delta_t, double num_cpu) noexcept;
double get_num_cpu() noexcept;
double GetNumCpu() noexcept;
void CpuThrottleV2(const std::unordered_map<std::string, int64_t>& stats) noexcept;
void CpuTimeV2(const std::unordered_map<std::string, int64_t>& stats) noexcept;
void CpuUtilizationV2(const absl::Time& now, const double cpuCount, const absl::Duration& interval) noexcept;
void CpuPeakUtilizationV2(const absl::Time& now, const std::unordered_map<std::string, int64_t>& stats,
const double cpuCount) noexcept;
void CpuProcessingCapacity(const absl::Time& now, const double cpuCount, const absl::Duration& interval) noexcept;

private:
double GetAvailCpuTime(const double delta_t, const double cpuCount) noexcept;

protected:
// for testing
void do_cpu_stats(absl::Time now) noexcept;
void do_cpu_peak_stats(absl::Time now) noexcept;
Registry* registry_;
};

} // namespace atlasagent
Loading