Skip to content

Commit 1b75a8b

Browse files
committed
Intermediate changes
commit_hash:6b4eee36c65ef8a39d492c84245849ea4522ac30
1 parent 418c1a2 commit 1b75a8b

File tree

8 files changed

+514
-37
lines changed

8 files changed

+514
-37
lines changed

yt/yt/library/profiling/solomon/cube.cpp

Lines changed: 35 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -754,36 +754,42 @@ int TCube<T>::ReadSensorValues(
754754
template <class T>
755755
void TCube<T>::DumpCube(NProto::TCube *cube, const std::vector<TTagIdList>& extraProjections) const
756756
{
757-
for (const auto& extraTags : extraProjections) {
758-
for (const auto& [tagIds, window] : Projections_) {
759-
auto projection = cube->add_projections();
760-
for (auto tagId : tagIds) {
761-
projection->add_tag_ids(tagId);
762-
}
763-
for (auto tagId : extraTags) {
764-
projection->add_tag_ids(tagId);
765-
}
757+
for (const auto& extraTagIds : extraProjections) {
758+
DumpCube(cube, extraTagIds);
759+
}
760+
}
766761

767-
projection->set_has_value(window.HasValue[Index_]);
768-
if constexpr (std::is_same_v<T, i64>) {
769-
projection->set_counter(window.Values[Index_]);
770-
} else if constexpr (std::is_same_v<T, TDuration>) {
771-
projection->set_duration(window.Values[Index_].GetValue());
772-
} else if constexpr (std::is_same_v<T, double>) {
773-
projection->set_gauge(window.Values[Index_]);
774-
} else if constexpr (std::is_same_v<T, TSummarySnapshot<double>>) {
775-
ToProto(projection->mutable_summary(), window.Values[Index_]);
776-
} else if constexpr (std::is_same_v<T, TSummarySnapshot<TDuration>>) {
777-
ToProto(projection->mutable_timer(), window.Values[Index_]);
778-
} else if constexpr (std::is_same_v<T, TTimeHistogramSnapshot>) {
779-
ToProto(projection->mutable_time_histogram(), window.Values[Index_]);
780-
} else if constexpr (std::is_same_v<T, TGaugeHistogramSnapshot>) {
781-
ToProto(projection->mutable_gauge_histogram(), window.Values[Index_]);
782-
} else if constexpr (std::is_same_v<T, TRateHistogramSnapshot>) {
783-
ToProto(projection->mutable_rate_histogram(), window.Values[Index_]);
784-
} else {
785-
THROW_ERROR_EXCEPTION("Unexpected cube type");
786-
}
762+
template <class T>
763+
void TCube<T>::DumpCube(NProto::TCube *cube, const TTagIdList& extraTagIds) const
764+
{
765+
for (const auto& [tagIds, window] : Projections_) {
766+
auto projection = cube->add_projections();
767+
for (auto tagId : tagIds) {
768+
projection->add_tag_ids(tagId);
769+
}
770+
for (auto tagId : extraTagIds) {
771+
projection->add_tag_ids(tagId);
772+
}
773+
774+
projection->set_has_value(window.HasValue[Index_]);
775+
if constexpr (std::is_same_v<T, i64>) {
776+
projection->set_counter(window.Values[Index_]);
777+
} else if constexpr (std::is_same_v<T, TDuration>) {
778+
projection->set_duration(window.Values[Index_].GetValue());
779+
} else if constexpr (std::is_same_v<T, double>) {
780+
projection->set_gauge(window.Values[Index_]);
781+
} else if constexpr (std::is_same_v<T, TSummarySnapshot<double>>) {
782+
ToProto(projection->mutable_summary(), window.Values[Index_]);
783+
} else if constexpr (std::is_same_v<T, TSummarySnapshot<TDuration>>) {
784+
ToProto(projection->mutable_timer(), window.Values[Index_]);
785+
} else if constexpr (std::is_same_v<T, TTimeHistogramSnapshot>) {
786+
ToProto(projection->mutable_time_histogram(), window.Values[Index_]);
787+
} else if constexpr (std::is_same_v<T, TGaugeHistogramSnapshot>) {
788+
ToProto(projection->mutable_gauge_histogram(), window.Values[Index_]);
789+
} else if constexpr (std::is_same_v<T, TRateHistogramSnapshot>) {
790+
ToProto(projection->mutable_rate_histogram(), window.Values[Index_]);
791+
} else {
792+
THROW_ERROR_EXCEPTION("Unexpected cube type %Qv", TypeName<T>());
787793
}
788794
}
789795
}

yt/yt/library/profiling/solomon/cube.h

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,7 @@ class TCube
125125

126126
// Each projection from `extraProjections` added to each inner projection of this cube.
127127
void DumpCube(NProto::TCube* cube, const std::vector<TTagIdList>& extraProjections) const;
128+
void DumpCube(NProto::TCube* cube, const TTagIdList& extraTagIds = {}) const;
128129

129130
private:
130131
const int WindowSize_;
@@ -138,4 +139,15 @@ class TCube
138139

139140
////////////////////////////////////////////////////////////////////////////////
140141

142+
using TGaugeCube = TCube<double>;
143+
using TCounterCube = TCube<i64>;
144+
using TTimeCounterCube = TCube<TDuration>;
145+
using TSummaryCube = TCube<TSummarySnapshot<double>>;
146+
using TTimerCube = TCube<TSummarySnapshot<TDuration>>;
147+
using TTimeHistogramCube = TCube<TTimeHistogramSnapshot>;
148+
using TGaugeHistogramCube = TCube<TGaugeHistogramSnapshot>;
149+
using TRateHistogramCube = TCube<TRateHistogramSnapshot>;
150+
151+
////////////////////////////////////////////////////////////////////////////////
152+
141153
} // namespace NYT::NProfiling
Lines changed: 214 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,214 @@
1+
#include "encoder.h"
2+
3+
#include <util/generic/overloaded.h>
4+
5+
namespace NYT::NProfiling {
6+
7+
////////////////////////////////////////////////////////////////////////////////
8+
9+
namespace {
10+
11+
template <std::derived_from<THistogramSnapshot> THistogram>
12+
THistogram BuildHistogram(NMonitoring::IHistogramSnapshotPtr snapshot)
13+
{
14+
THistogram hist;
15+
hist.Values.reserve(snapshot->Count());
16+
hist.Bounds.reserve(snapshot->Count());
17+
18+
for (ui32 i = 0; i < snapshot->Count(); ++i) {
19+
hist.Values.push_back(snapshot->Value(i));
20+
21+
if (snapshot->UpperBound(i) != NMonitoring::HISTOGRAM_INF_BOUND) {
22+
hist.Bounds.push_back(snapshot->UpperBound(i));
23+
}
24+
}
25+
26+
return hist;
27+
}
28+
29+
} // namespace
30+
31+
////////////////////////////////////////////////////////////////////////////////
32+
33+
TSensorEncoder::TSensorEncoder(std::string sensorNameLabel, std::string sensorNamePrefix)
34+
: SensorNameLabel_(std::move(sensorNameLabel))
35+
, SensorNamePrefix_(std::move(sensorNamePrefix))
36+
{ }
37+
38+
void TSensorEncoder::Close()
39+
{ }
40+
41+
void TSensorEncoder::OnStreamBegin()
42+
{ }
43+
44+
void TSensorEncoder::OnStreamEnd()
45+
{ }
46+
47+
void TSensorEncoder::OnCommonTime(TInstant /*time*/)
48+
{ }
49+
50+
void TSensorEncoder::OnMetricBegin(NMonitoring::EMetricType type)
51+
{
52+
SensorContext_ = TSensorContext{ .Type = type, .TagIds = CommonTagIds_ };
53+
}
54+
55+
void TSensorEncoder::OnMetricEnd()
56+
{
57+
SensorContext_.reset();
58+
}
59+
60+
void TSensorEncoder::OnLabelsBegin()
61+
{ }
62+
63+
void TSensorEncoder::OnLabelsEnd()
64+
{
65+
if (!SensorContext_) {
66+
return;
67+
}
68+
69+
if (SensorContext_->Name.empty()) {
70+
THROW_ERROR_EXCEPTION("Failed to find label with sensor name")
71+
<< TErrorAttribute("label", SensorNameLabel_);
72+
}
73+
74+
auto* cube = Cubes_.FindPtr(SensorContext_->Name);
75+
if (cube == nullptr) {
76+
switch (SensorContext_->Type) {
77+
case NMonitoring::EMetricType::GAUGE:
78+
Cubes_.emplace(SensorContext_->Name, TGaugeCube{1, 0});
79+
break;
80+
case NMonitoring::EMetricType::RATE:
81+
Cubes_.emplace(SensorContext_->Name, TCounterCube{1, 0});
82+
break;
83+
case NMonitoring::EMetricType::HIST:
84+
Cubes_.emplace(SensorContext_->Name, TGaugeHistogramCube{1, 0});
85+
break;
86+
case NMonitoring::EMetricType::HIST_RATE:
87+
Cubes_.emplace(SensorContext_->Name, TRateHistogramCube{1, 0});
88+
break;
89+
case NMonitoring::EMetricType::DSUMMARY:
90+
Cubes_.emplace(SensorContext_->Name, TSummaryCube{1, 0});
91+
break;
92+
case NMonitoring::EMetricType::LOGHIST:
93+
case NMonitoring::EMetricType::IGAUGE:
94+
case NMonitoring::EMetricType::COUNTER:
95+
case NMonitoring::EMetricType::UNKNOWN:
96+
THROW_ERROR_EXCEPTION("Unsupported metric type %Qv", ToString(SensorContext_->Type));
97+
}
98+
cube = Cubes_.FindPtr(SensorContext_->Name);
99+
}
100+
101+
SensorContext_->Cube = cube;
102+
std::visit(
103+
[this] (auto&& cube) {
104+
cube.Add(SensorContext_->TagIds);
105+
},
106+
*SensorContext_->Cube);
107+
}
108+
109+
void TSensorEncoder::OnLabel(TStringBuf name, TStringBuf value)
110+
{
111+
if (name == SensorNameLabel_) {
112+
if (!SensorContext_) {
113+
THROW_ERROR_EXCEPTION("Found label with sensor name among common labels")
114+
<< TErrorAttribute("label", SensorNameLabel_);
115+
}
116+
if (!SensorContext_->Name.empty()) {
117+
THROW_ERROR_EXCEPTION("Found label with sensor name multiple times")
118+
<< TErrorAttribute("label", SensorNameLabel_);
119+
}
120+
SensorContext_->Name = SensorNamePrefix_ + std::string{value};
121+
return;
122+
}
123+
124+
auto tagId = TagRegistry_.Encode(TTag{std::string{name}, std::string{value}});
125+
if (SensorContext_) {
126+
SensorContext_->TagIds.push_back(tagId);
127+
} else {
128+
CommonTagIds_.push_back(tagId);
129+
}
130+
}
131+
132+
void TSensorEncoder::OnLabel(ui32 /*name*/, ui32 /*value*/)
133+
{
134+
YT_UNIMPLEMENTED();
135+
}
136+
137+
std::pair<ui32, ui32> TSensorEncoder::PrepareLabel(TStringBuf /*name*/, TStringBuf /*value*/)
138+
{
139+
YT_UNIMPLEMENTED();
140+
}
141+
142+
void TSensorEncoder::OnDouble(TInstant /*time*/, double value)
143+
{
144+
auto& cube = std::get<TGaugeCube>(*SensorContext_->Cube);
145+
cube.Update(SensorContext_->TagIds, value);
146+
}
147+
148+
void TSensorEncoder::OnInt64(TInstant /*time*/, i64 value)
149+
{
150+
auto& cube = std::get<TCounterCube>(*SensorContext_->Cube);
151+
cube.Update(SensorContext_->TagIds, value);
152+
}
153+
154+
void TSensorEncoder::OnUint64(TInstant time, ui64 value)
155+
{
156+
OnInt64(time, SafeIntegerCast<i64>(value));
157+
}
158+
159+
void TSensorEncoder::OnHistogram(TInstant /*time*/, NMonitoring::IHistogramSnapshotPtr snapshot)
160+
{
161+
std::visit(TOverloaded{
162+
[&] (TGaugeHistogramCube& cube) {
163+
cube.Update(SensorContext_->TagIds, BuildHistogram<TGaugeHistogramSnapshot>(snapshot));
164+
},
165+
[&] (TRateHistogramCube& cube) {
166+
cube.Update(SensorContext_->TagIds, BuildHistogram<TRateHistogramSnapshot>(snapshot));
167+
},
168+
[] (auto&& cube) {
169+
THROW_ERROR_EXCEPTION("Unexpected cube type for histogram snapshot %Qv", TypeName<decltype(cube)>());
170+
},
171+
}, *SensorContext_->Cube);
172+
}
173+
174+
void TSensorEncoder::OnLogHistogram(TInstant /*time*/, NMonitoring::TLogHistogramSnapshotPtr /*snapshot*/)
175+
{
176+
THROW_ERROR_EXCEPTION("OnLogHistogram method is not implemented");
177+
}
178+
179+
void TSensorEncoder::OnSummaryDouble(TInstant /*time*/, NMonitoring::ISummaryDoubleSnapshotPtr snapshot)
180+
{
181+
auto& cube = std::get<TSummaryCube>(*SensorContext_->Cube);
182+
183+
TSummarySnapshot summary(
184+
snapshot->GetSum(),
185+
snapshot->GetMin(),
186+
snapshot->GetMax(),
187+
snapshot->GetLast(),
188+
snapshot->GetCount());
189+
cube.Update(SensorContext_->TagIds, std::move(summary));
190+
}
191+
192+
NProto::TSensorDump TSensorEncoder::BuildSensorDump()
193+
{
194+
NProto::TSensorDump sensorDump;
195+
196+
TagRegistry_.DumpTags(&sensorDump);
197+
198+
for (const auto& [name, cube] : Cubes_) {
199+
auto* cubeProto = sensorDump.add_cubes();
200+
201+
cubeProto->set_name(name);
202+
std::visit(
203+
[&cubeProto] (auto&& cube) {
204+
cube.DumpCube(cubeProto);
205+
},
206+
cube);
207+
}
208+
209+
return sensorDump;
210+
}
211+
212+
////////////////////////////////////////////////////////////////////////////////
213+
214+
} // namespace NYT::NProfiling
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
#pragma once
2+
3+
#include "cube.h"
4+
#include "tag_registry.h"
5+
6+
#include <library/cpp/monlib/encode/encoder.h>
7+
8+
namespace NYT::NProfiling {
9+
10+
////////////////////////////////////////////////////////////////////////////////
11+
12+
// Encodes incoming metrics into `NProto::TSensorDump`.
13+
class TSensorEncoder
14+
: public NMonitoring::IMetricEncoder
15+
{
16+
public:
17+
explicit TSensorEncoder(std::string sensorNameLabel, std::string sensorNamePrefix = "");
18+
void Close() override;
19+
20+
void OnStreamBegin() override;
21+
void OnStreamEnd() override;
22+
23+
void OnCommonTime(TInstant time) override;
24+
25+
void OnMetricBegin(NMonitoring::EMetricType type) override;
26+
void OnMetricEnd() override;
27+
28+
void OnLabelsBegin() override;
29+
void OnLabelsEnd() override;
30+
void OnLabel(TStringBuf name, TStringBuf value) override;
31+
void OnLabel(ui32 name, ui32 value) override;
32+
std::pair<ui32, ui32> PrepareLabel(TStringBuf name, TStringBuf value) override;
33+
34+
void OnDouble(TInstant time, double value) override;
35+
void OnInt64(TInstant time, i64 value) override;
36+
void OnUint64(TInstant time, ui64 value) override;
37+
38+
void OnHistogram(TInstant time, NMonitoring::IHistogramSnapshotPtr snapshot) override;
39+
void OnLogHistogram(TInstant time, NMonitoring::TLogHistogramSnapshotPtr snapshot) override;
40+
void OnSummaryDouble(TInstant time, NMonitoring::ISummaryDoubleSnapshotPtr snapshot) override;
41+
42+
NProto::TSensorDump BuildSensorDump();
43+
44+
private:
45+
const std::string SensorNameLabel_;
46+
const std::string SensorNamePrefix_;
47+
48+
TTagRegistry TagRegistry_;
49+
TTagIdList CommonTagIds_;
50+
51+
using TGenericCube = std::variant<TGaugeCube, TCounterCube, TGaugeHistogramCube, TRateHistogramCube, TSummaryCube>;
52+
THashMap<std::string, TGenericCube> Cubes_;
53+
54+
struct TSensorContext
55+
{
56+
std::string Name;
57+
NMonitoring::EMetricType Type = NMonitoring::EMetricType::UNKNOWN;
58+
TTagIdList TagIds;
59+
TGenericCube* Cube = nullptr;
60+
};
61+
std::optional<TSensorContext> SensorContext_;
62+
};
63+
64+
////////////////////////////////////////////////////////////////////////////////
65+
66+
} // namespace NYT::NProfiling

0 commit comments

Comments
 (0)