Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,8 @@ class TDqSolomonMetricsQueueActor : public NActors::TActorBootstrapped<TDqSolomo
return false;
}

DownloadedBytes += response.DownloadedBytes;

if (CurrentPage >= response.Result.PagesCount) {
LOG_I("TDqSolomonMetricsQueueActor", "SaveRetrievedResults no more metrics to list");
HasMoreMetrics = false;
Expand Down Expand Up @@ -372,7 +374,8 @@ class TDqSolomonMetricsQueueActor : public NActors::TActorBootstrapped<TDqSolomo
}

LOG_D("TDqSolomonMetricsQueueActor", "SendMetrics Sending " << result.size() << " metrics to consumer with id " << consumer);
Send(consumer, new TEvSolomonProvider::TEvMetricsBatch(std::move(result), HasNoMoreItems(), transportMeta));
Send(consumer, new TEvSolomonProvider::TEvMetricsBatch(std::move(result), HasNoMoreItems(), DownloadedBytes, transportMeta));
DownloadedBytes = 0;

if (HasNoMoreItems()) {
TryFinish(consumer, transportMeta.GetSeqNo());
Expand Down Expand Up @@ -421,6 +424,7 @@ class TDqSolomonMetricsQueueActor : public NActors::TActorBootstrapped<TDqSolomo
bool HasPendingRequests;
THashMap<NActors::TActorId, TDeque<NDqProto::TMessageTransportMeta>> PendingRequests;
std::vector<NSo::MetricQueue::TMetric> Metrics;
ui64 DownloadedBytes = 0;
TMaybe<TString> MaybeIssues;

const TDqSolomonReadParams ReadParams;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,9 @@ class TDqSolomonReadActor : public NActors::TActorBootstrapped<TDqSolomonReadAct
IsConfirmedMetricsQueueFinish = true;
}

IngressStats.Bytes += batch.GetDownloadedBytes();
IngressStats.Chunks++;
IngressStats.Resume();
auto& listedMetrics = batch.GetMetrics();

SOURCE_LOG_D("HandleMetricsBatch batch of size " << listedMetrics.size());
Expand Down Expand Up @@ -249,6 +252,10 @@ class TDqSolomonReadActor : public NActors::TActorBootstrapped<TDqSolomonReadAct
return;
}

IngressStats.Bytes += batch.Response.DownloadedBytes;
IngressStats.Chunks++;
IngressStats.Resume();

auto& metric = batch.Metric;
auto& pointsCount = batch.Response.Result.PointsCount;
ParsePointsCount(metric, pointsCount);
Expand Down Expand Up @@ -348,7 +355,7 @@ class TDqSolomonReadActor : public NActors::TActorBootstrapped<TDqSolomonReadAct

for (size_t i = 0; i < timestamps.size(); ++i){
TInstant timestamp = TInstant::MilliSeconds(timestamps[i]);
if (timestamp < from || timestamp > to) {
if (timestamp < from || timestamp >= to) {
continue;
}

Expand Down Expand Up @@ -388,6 +395,10 @@ class TDqSolomonReadActor : public NActors::TActorBootstrapped<TDqSolomonReadAct
}

finished = LastMetricProcessed();
if (MetricsData.empty()) {
IngressStats.TryPause();
}

MetricsData.clear();
return 0;
}
Expand Down Expand Up @@ -552,6 +563,10 @@ class TDqSolomonReadActor : public NActors::TActorBootstrapped<TDqSolomonReadAct
}
}

IngressStats.Bytes += batch.Response.DownloadedBytes;
IngressStats.Rows += batch.Response.Result.Timeseries.size();
IngressStats.Chunks++;
IngressStats.Resume();
PendingDataRequests_.erase(request);
CurrentInflight--;

Expand Down
3 changes: 2 additions & 1 deletion ydb/library/yql/providers/solomon/events/events.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,12 @@ struct TEvSolomonProvider {
public NActors::TEventPB<TEvMetricsBatch, NSo::MetricQueue::TEvMetricsBatch, EvMetricsBatch> {

TEvMetricsBatch() = default;
TEvMetricsBatch(std::vector<NSo::MetricQueue::TMetric> metrics, bool noMoreMetrics, const NDqProto::TMessageTransportMeta& transportMeta) {
TEvMetricsBatch(std::vector<NSo::MetricQueue::TMetric> metrics, bool noMoreMetrics, ui64 downloadedBytes, const NDqProto::TMessageTransportMeta& transportMeta) {
Record.MutableMetrics()->Assign(
metrics.begin(),
metrics.end());
Record.SetNoMoreMetrics(noMoreMetrics);
Record.SetDownloadedBytes(downloadedBytes);
*Record.MutableTransportMeta() = transportMeta;
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ message TEvGetNextBatch {
message TEvMetricsBatch {
bool NoMoreMetrics = 1;
repeated TMetric Metrics = 2;
uint64 DownloadedBytes = 3;

optional NYql.NDqProto.TMessageTransportMeta TransportMeta = 100;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,21 +136,25 @@ class TSolomonDqIntegration: public TDqIntegrationBase {
if (!ExtractSettingValue(settingsRef.Child(i)->Tail(), settingsRef.Child(i)->Head().Content(), ctx, value)) {
return {};
}
if (!TInstant::TryParseIso8601(value, from)) {
ctx.AddError(TIssue(ctx.GetPosition(settingsRef.Child(i)->Head().Pos()), "couldn't parse `from`, use Iso8601 format, e.g. 2025-03-12T14:40:39Z"));
TInstant userFrom;
if (!TInstant::TryParseIso8601(value, userFrom)) {
ctx.AddError(TIssue(ctx.GetPosition(settingsRef.Child(i)->Head().Pos()), "couldn't parse `from`, use ISO8601 format, e.g. 2025-03-12T14:40:39Z"));
return {};
}
from = std::min(TInstant::Now(), userFrom);
continue;
}
if (settingsRef.Child(i)->Head().IsAtom("to"sv)) {
TStringBuf value;
if (!ExtractSettingValue(settingsRef.Child(i)->Tail(), settingsRef.Child(i)->Head().Content(), ctx, value)) {
return {};
}
if (!TInstant::TryParseIso8601(value, to)) {
ctx.AddError(TIssue(ctx.GetPosition(settingsRef.Child(i)->Head().Pos()), "couldn't parse `to`, use Iso8601 format, e.g. 2025-03-12T14:40:39Z"));
TInstant userTo;
if (!TInstant::TryParseIso8601(value, userTo)) {
ctx.AddError(TIssue(ctx.GetPosition(settingsRef.Child(i)->Head().Pos()), "couldn't parse `to`, use ISO8601 format, e.g. 2025-03-12T14:40:39Z"));
return {};
}
to = std::min(TInstant::Now(), userTo);
continue;
}
if (settingsRef.Child(i)->Head().IsAtom("program"sv)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,20 @@ class TSolomonLoadTableMetadataTransformer : public TGraphTransformerBase {

TInstant from;
if (auto time = ExtractSetting(settings, "from")) {
from = TInstant::ParseIso8601(*time);
if (!TInstant::TryParseIso8601(*time, from)) {
ctx.AddError(TIssue(ctx.GetPosition(n->Pos()), "couldn't parse `from`, use ISO8601 format, e.g. 2025-03-12T14:40:39Z"));
return TStatus::Error;
}
} else {
from = TInstant::Now() - TDuration::Days(7);
from = TInstant::Zero();
}

TInstant to;
if (auto time = ExtractSetting(settings, "to")) {
to = TInstant::ParseIso8601(*time);
if (!TInstant::TryParseIso8601(*time, to)) {
ctx.AddError(TIssue(ctx.GetPosition(n->Pos()), "couldn't parse `to`, use ISO8601 format, e.g. 2025-03-12T14:40:39Z"));
return TStatus::Error;
}
} else {
to = TInstant::Now();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,75 +74,78 @@ TGetLabelsResponse ProcessGetLabelsResponse(NYql::IHTTPGateway::TResult&& respon
TGetLabelsResult result;

if (response.CurlResponseCode != CURLE_OK) {
return TGetLabelsResponse(TStringBuilder{} << "Error while sending list metric names request to monitoring api: " << response.Issues.ToOneLineString());
return TGetLabelsResponse(TStringBuilder{} << "Monitoring api get labels response: " << response.Issues.ToOneLineString() <<
", internal code: " << static_cast<int>(response.CurlResponseCode));
}

if (response.Content.HttpResponseCode < 200 || response.Content.HttpResponseCode >= 300) {
return TGetLabelsResponse(TStringBuilder{} << "Error while sending list metric names request to monitoring api: " << response.Content.data());
return TGetLabelsResponse(TStringBuilder{} << "Monitoring api get labels response: " << response.Content.data() <<
", internal code: " << response.Content.HttpResponseCode);
}

NJson::TJsonValue json;
try {
NJson::ReadJsonTree(response.Content.data(), &json, /*throwOnError*/ true);
} catch (const std::exception& e) {
return TGetLabelsResponse(TStringBuilder{} << "Failed to parse response from monitoring api: " << e.what());
return TGetLabelsResponse("Monitoring api get labels response is not a valid json");
}

if (!json.IsMap() || !json.Has("names") || !json["names"].IsArray()) {
return TGetLabelsResponse("Invalid result from monitoring api");
return TGetLabelsResponse("Monitoring api get labels response doesn't contain requested info");
}

const auto names = json["names"].GetArray();

for (const auto& name : names) {
if (!name.IsString()) {
return TGetLabelsResponse("Invalid label names from monitoring api");
} else {
result.Labels.push_back(name.GetString());
return TGetLabelsResponse("Monitoring api get labels response contains invalid label names");
}
result.Labels.push_back(name.GetString());
}
for (const auto& [key, selector] : knownSelectors) {
result.Labels.push_back(key);
}

return TGetLabelsResponse(std::move(result));
return TGetLabelsResponse(std::move(result), response.Content.size() + response.Content.Headers.size());
}

TListMetricsResponse ProcessListMetricsResponse(NYql::IHTTPGateway::TResult&& response) {
TListMetricsResult result;

if (response.CurlResponseCode != CURLE_OK) {
return TListMetricsResponse(TStringBuilder{} << "Error while sending list metrics request to monitoring api: " << response.Issues.ToOneLineString());
return TListMetricsResponse(TStringBuilder{} << "Monitoring api list metrics response: " << response.Issues.ToOneLineString() <<
", internal code: " << static_cast<int>(response.CurlResponseCode));
}

if (response.Content.HttpResponseCode < 200 || response.Content.HttpResponseCode >= 300) {
return TListMetricsResponse(TStringBuilder{} << "Error while sending list metrics request to monitoring api: " << response.Content.data());
return TListMetricsResponse(TStringBuilder{} << "Monitoring api list metrics response: " << response.Content.data() <<
", internal code: " << response.Content.HttpResponseCode);
}

NJson::TJsonValue json;
try {
NJson::ReadJsonTree(response.Content.data(), &json, /*throwOnError*/ true);
} catch (const std::exception& e) {
return TListMetricsResponse(TStringBuilder{} << "Failed to parse response from monitoring api: " << e.what());
return TListMetricsResponse("Monitoring api list metrics response is not a valid json" );
}

if (!json.IsMap() || !json.Has("result") || !json.Has("page")) {
return TListMetricsResponse("Invalid list metrics result from monitoring api");
return TListMetricsResponse("Monitoring api list metrics response doesn't contain requested info");
}

const auto pagesInfo = json["page"];
if (!pagesInfo.IsMap() ||
!pagesInfo.Has("pagesCount") || !pagesInfo["pagesCount"].IsInteger() ||
!pagesInfo.Has("totalCount") || !pagesInfo["totalCount"].IsInteger()) {
return TListMetricsResponse("Invalid paging info from monitoring api");
return TListMetricsResponse("Monitoring api list metrics response doesn't contain paging info");
}

result.PagesCount = pagesInfo["pagesCount"].GetInteger();
result.TotalCount = pagesInfo["totalCount"].GetInteger();

for (const auto& metricObj : json["result"].GetArray()) {
if (!metricObj.IsMap() || !metricObj.Has("labels") || !metricObj["labels"].IsMap() || !metricObj.Has("type") || !metricObj["type"].IsString()) {
return TListMetricsResponse("Invalid list metrics result from monitoring api");
return TListMetricsResponse("Monitoring api list metrics response contains invalid metrics");
}

TSelectors selectors;
Expand All @@ -153,7 +156,7 @@ TListMetricsResponse ProcessListMetricsResponse(NYql::IHTTPGateway::TResult&& re
result.Metrics.emplace_back(std::move(selectors), metricObj["type"].GetString());
}

return TListMetricsResponse(std::move(result));
return TListMetricsResponse(std::move(result), response.Content.size() + response.Content.Headers.size());
}

TGetPointsCountResponse ProcessGetPointsCountResponse(NYql::IHTTPGateway::TResult&& response, ui64 downsampledPointsCount) {
Expand All @@ -169,46 +172,48 @@ TGetPointsCountResponse ProcessGetPointsCountResponse(NYql::IHTTPGateway::TResul
for (const auto& whitelistIssue : whitelistIssues) {
if (issues.find(whitelistIssue) != issues.npos) {
result.PointsCount = 0;
return TGetPointsCountResponse(std::move(result));
return TGetPointsCountResponse(std::move(result), 0);
}
}

return TGetPointsCountResponse(TStringBuilder() << "Error while sending points count request to monitoring api: " << issues);
return TGetPointsCountResponse(TStringBuilder() << "Monitoring api points count response: " << issues <<
", internal code: " << static_cast<int>(response.CurlResponseCode));
}

if (response.Content.HttpResponseCode < 200 || response.Content.HttpResponseCode >= 300) {
return TGetPointsCountResponse(TStringBuilder{} << "Error while sending points count request to monitoring api: " << response.Content.data());
return TGetPointsCountResponse(TStringBuilder{} << "Monitoring api points count response: " << response.Content.data() <<
", internal code: " << response.Content.HttpResponseCode);
}

NJson::TJsonValue json;
try {
NJson::ReadJsonTree(response.Content.data(), &json, /*throwOnError*/ true);
} catch (const std::exception& e) {
return TGetPointsCountResponse(TStringBuilder{} << "Failed to parse points count response from monitoring api: " << e.what());
return TGetPointsCountResponse("Monitoring api points count response is not a valid json");
}

if (!json.IsMap() || !json.Has("scalar") || !json["scalar"].IsInteger()) {
return TGetPointsCountResponse("Invalid points count result from monitoring api");
return TGetPointsCountResponse("Monitoring api points count response doesn't contain requested info");
}

result.PointsCount = json["scalar"].GetInteger() + downsampledPointsCount;

return TGetPointsCountResponse(std::move(result));
return TGetPointsCountResponse(std::move(result), response.Content.size() + response.Content.Headers.size());
}

TGetDataResponse ProcessGetDataResponse(NYdbGrpc::TGrpcStatus&& status, ReadResponse&& response) {
TGetDataResult result;

if (!status.Ok()) {
TString error = TStringBuilder{} << "Error while sending data request to monitoring api: " << status.Msg;
TString error = TStringBuilder{} << "Monitoring api get data response: " << status.Msg;
if (status.GRpcStatusCode == grpc::StatusCode::RESOURCE_EXHAUSTED || status.GRpcStatusCode == grpc::StatusCode::UNAVAILABLE) {
return TGetDataResponse(error, EStatus::STATUS_RETRIABLE_ERROR);
}
return TGetDataResponse(error);
}

if (response.response_per_query_size() != 1) {
return TGetDataResponse("Invalid get data repsonse size from monitoring api");
return TGetDataResponse("Monitoring api get data response is invalid");
}

const auto& responseValue = response.response_per_query()[0];
Expand All @@ -235,7 +240,7 @@ TGetDataResponse ProcessGetDataResponse(NYdbGrpc::TGrpcStatus&& status, ReadResp
result.Timeseries.emplace_back(std::move(metric), std::move(timestamps), std::move(values));
}

return TGetDataResponse(std::move(result));
return TGetDataResponse(std::move(result), response.ByteSize());
}

class TSolomonAccessorClient : public ISolomonAccessorClient, public std::enable_shared_from_this<TSolomonAccessorClient> {
Expand Down Expand Up @@ -329,7 +334,7 @@ class TSolomonAccessorClient : public ISolomonAccessorClient, public std::enable
TGetPointsCountResult result;
result.PointsCount = downsampledPointsCount;

resultPromise.SetValue(TGetPointsCountResponse(std::move(result)));
resultPromise.SetValue(TGetPointsCountResponse(std::move(result), 0));
}

return resultPromise.GetFuture();
Expand Down Expand Up @@ -579,7 +584,7 @@ class TSolomonAccessorClient : public ISolomonAccessorClient, public std::enable
fullSelectors[labelName] = {"=", "-"};
}
}
return selectors;
return fullSelectors;
}

TString BuildSelectorsProgram(const TSelectors& selectors, bool useNewFormat = false) const {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@ TSolomonClientResponse<T>::TSolomonClientResponse(const TString& error, EStatus
, Error(error) {}

template <typename T>
TSolomonClientResponse<T>::TSolomonClientResponse(T&& result)
TSolomonClientResponse<T>::TSolomonClientResponse(T&& result, ui64 downloadedBytes)
: Status(STATUS_OK)
, Result(std::move(result)) {}
, Result(std::move(result))
, DownloadedBytes(downloadedBytes) {}

template class TSolomonClientResponse<TGetLabelsResult>;
template class TSolomonClientResponse<TListMetricsResult>;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,13 @@ class TSolomonClientResponse {
public:
TSolomonClientResponse();
explicit TSolomonClientResponse(const TString& error, EStatus status = STATUS_FATAL_ERROR);
explicit TSolomonClientResponse(T&& result);
explicit TSolomonClientResponse(T&& result, ui64 downloadedBytes);

public:
EStatus Status;
TString Error;
T Result;
ui64 DownloadedBytes = 0;
};

struct TGetLabelsResult {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,4 @@
<tmp_path>/program.sql:<main>: Error: Fatal Error


<tmp_path>/program.sql:<main>: Error: Error while sending data request to monitoring api: Project invalid does not exist (appeared 1 time at ISOTIME)
<tmp_path>/program.sql:<main>: Error: Monitoring api get data response: Project invalid does not exist (appeared 1 time at ISOTIME)
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Failed to execute query, reason:
Request finished with status: EXTERNAL_ERROR
Issues:
<main>: Error: Error while sending data request to monitoring api: Project invalid does not exist
<main>: Error: Monitoring api get data response: Project invalid does not exist



Loading
Loading