Skip to content

Commit 42bebf4

Browse files
authored
merge to stable-25-3: Skip metering report with ydb_size=0 and topic category (#26080)
LOGBROKER-9983
2 parents 1746d41 + 851e854 commit 42bebf4

File tree

4 files changed

+65
-19
lines changed

4 files changed

+65
-19
lines changed

ydb/core/persqueue/pqtablet/metering_sink.cpp

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -167,16 +167,19 @@ const TMeteringSink::FlushParameters TMeteringSink::GetFlushParameters(const EMe
167167
case EMeteringJson::UsedStorageV1: {
168168
ui64 duration = (now - lastFlush).MilliSeconds();
169169
ui64 avgUsage = duration > 0 ? CurrentUsedStorage_ * 1_MB * 1000 / duration : 0;
170+
ui64 quantity = (avgUsage > 0) ? 1 : 0;
170171

171172
CurrentUsedStorage_ = 0;
172173

173174
return TMeteringSink::FlushParameters(
174175
"used_storage",
175176
"ydb.serverless.v1",
176-
"byte*second"
177+
"byte*second",
178+
quantity
177179
).withTags({
178180
{"ydb_size", avgUsage}
179181
})
182+
.withImplicitZero()
180183
.withVersion("1.0.0");
181184
}
182185

@@ -195,7 +198,7 @@ void TMeteringSink::Flush(TInstant now, bool force) {
195198
continue;
196199
}
197200

198-
auto parameters = GetFlushParameters(whichOne, now, lastFlush);
201+
const auto parameters = GetFlushParameters(whichOne, now, lastFlush);
199202

200203
if (parameters.OneFlush) {
201204
const auto isTimeToFlushUnits = now.Hours() > lastFlush.Hours();
@@ -218,13 +221,15 @@ void TMeteringSink::Flush(TInstant now, bool force) {
218221
auto interval = TInstant::Hours(lastFlush.Hours()) + Parameters_.FlushLimit;
219222

220223
auto tryFlush = [&](TInstant start, TInstant finish) {
221-
const auto metricsJson = GetMeteringJson(
222-
parameters,
223-
parameters.Quantity * (finish.Seconds() - start.Seconds()),
224-
start,
225-
finish,
226-
now);
227-
FlushFunction_(metricsJson);
224+
if (parameters.Quantity > 0 || !parameters.ImplicitZero) {
225+
const auto metricsJson = GetMeteringJson(
226+
parameters,
227+
parameters.Quantity * (finish.Seconds() - start.Seconds()),
228+
start,
229+
finish,
230+
now);
231+
FlushFunction_(metricsJson);
232+
}
228233

229234
lastFlush = finish;
230235
};

ydb/core/persqueue/pqtablet/metering_sink.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,12 +79,19 @@ class TMeteringSink {
7979
return *this;
8080
}
8181

82+
// skip flush, if quantity is zero
83+
FlushParameters& withImplicitZero() {
84+
ImplicitZero = true;
85+
return *this;
86+
}
87+
8288
TString Name;
8389
TString Schema;
8490
TString Units;
8591
ui64 Quantity;
8692
THashMap<TString, ui64> Tags;
8793
bool OneFlush = false;
94+
bool ImplicitZero = false;
8895
TString Version = "v1";
8996
};
9097

ydb/core/persqueue/ut/metering_sink_ut.cpp

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,34 @@ Y_UNIT_TEST(UsedStorageV1) {
202202
UNIT_ASSERT_VALUES_EQUAL(fullMetering, referenceStorageJson);
203203
}
204204

205+
Y_UNIT_TEST(UnusedStorageV1) {
206+
const ui64 creationTs = 1651752943168786;
207+
const ui64 flushTs = 1651754943168786;
208+
const ui32 partitions = 7;
209+
const ui64 reservedSpace = 42_GB;
210+
211+
TMeteringSink meteringSink;
212+
meteringSink.Create(TInstant::FromValue(creationTs), {
213+
.FlushInterval = TDuration::Seconds(10),
214+
.TabletId = "tabletId",
215+
.YcCloudId = "cloudId",
216+
.YcFolderId = "folderId",
217+
.YdbDatabaseId = "databaseId",
218+
.StreamName = "streamName",
219+
.ResourceId = "streamPath",
220+
.PartitionsSize = partitions,
221+
.ReservedSpace = reservedSpace,
222+
}, {EMeteringJson::UsedStorageV1}, [&](TString json) {
223+
UNIT_FAIL("Flush should not be called");
224+
Y_UNUSED(json);
225+
});
226+
227+
const ui32 quantity = 0;
228+
229+
meteringSink.IncreaseQuantity(EMeteringJson::UsedStorageV1, quantity);
230+
meteringSink.MayFlushForcibly(TInstant::FromValue(flushTs));
231+
}
232+
205233
} // Y_UNIT_TEST_SUITE(MeteringSink)
206234

207235
} // namespace NKikimr::NPQ

ydb/core/tx/schemeshard/ut_cdc_stream/ut_cdc_stream.cpp

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1134,6 +1134,7 @@ Y_UNIT_TEST_SUITE(TCdcStreamTests) {
11341134
TTestBasicRuntime runtime;
11351135
TTestEnv env(runtime, TTestEnvOptions()
11361136
.EnableProtoSourceIdInfo(true)
1137+
.EnableChangefeedInitialScan(true)
11371138
.EnablePqBilling(serverless));
11381139
ui64 txId = 100;
11391140

@@ -1207,20 +1208,28 @@ Y_UNIT_TEST_SUITE(TCdcStreamTests) {
12071208

12081209
TestCreateTable(runtime, schemeShard, ++txId, dbName, R"(
12091210
Name: "Table"
1210-
Columns { Name: "key" Type: "Uint64" }
1211-
Columns { Name: "value" Type: "Uint64" }
1211+
Columns { Name: "key" Type: "Uint32" }
1212+
Columns { Name: "value" Type: "Utf8" }
12121213
KeyColumnNames: ["key"]
12131214
UniformPartitionsCount: 2
12141215
)");
12151216
env.TestWaitNotification(runtime, txId, schemeShard);
12161217

12171218
runtime.SetLogPriority(NKikimrServices::PERSQUEUE, NLog::PRI_NOTICE);
1219+
{
1220+
// should write at least 16MiB to exceed the non-metered limit of topic's partition
1221+
const TString value = TString(500_KB, 'x');
1222+
for (size_t i = 0; i < 100; ++i) {
1223+
WriteRow(runtime, schemeShard, ++txId, dbName + "/Table", 0, i, value);
1224+
}
1225+
}
1226+
12181227
TVector<TString> meteringRecords;
12191228
runtime.SetObserverFunc([&meteringRecords](TAutoPtr<IEventHandle>& ev) {
12201229
if (ev->GetTypeRewrite() != NMetering::TEvMetering::EvWriteMeteringJson) {
12211230
return TTestActorRuntime::EEventAction::PROCESS;
12221231
}
1223-
1232+
Cerr << "GOT METERING RECORD: " << ev->Get<NMetering::TEvMetering::TEvWriteMeteringJson>()->MeteringJson << Endl;
12241233
meteringRecords.push_back(ev->Get<NMetering::TEvMetering::TEvWriteMeteringJson>()->MeteringJson);
12251234
return TTestActorRuntime::EEventAction::PROCESS;
12261235
});
@@ -1229,8 +1238,9 @@ Y_UNIT_TEST_SUITE(TCdcStreamTests) {
12291238
TableName: "Table"
12301239
StreamDescription {
12311240
Name: "Stream"
1232-
Mode: ECdcStreamModeKeysOnly
1241+
Mode: ECdcStreamModeNewImage
12331242
Format: ECdcStreamFormatProto
1243+
State: ECdcStreamStateScan
12341244
}
12351245
)");
12361246
env.TestWaitNotification(runtime, txId, schemeShard);
@@ -1239,26 +1249,22 @@ Y_UNIT_TEST_SUITE(TCdcStreamTests) {
12391249
env.SimulateSleep(runtime, TDuration::Seconds(10));
12401250
}
12411251

1242-
for (const auto& rec : meteringRecords) {
1243-
Cerr << "GOT METERING: " << rec << "\n";
1244-
}
1245-
12461252
UNIT_ASSERT_VALUES_EQUAL(meteringRecords.size(), (serverless ? 3 : 0));
12471253

12481254
if (!meteringRecords) {
12491255
return;
12501256
}
12511257

12521258
NJson::TJsonValue json;
1253-
NJson::ReadJsonTree(meteringRecords[0], &json, true);
1259+
NJson::ReadJsonTree(meteringRecords.back(), &json, true);
12541260
auto& map = json.GetMap();
12551261
UNIT_ASSERT(map.contains("schema"));
12561262
UNIT_ASSERT(map.contains("resource_id"));
12571263
UNIT_ASSERT(map.contains("tags"));
12581264
UNIT_ASSERT(map.find("tags")->second.GetMap().contains("ydb_size"));
12591265
UNIT_ASSERT_VALUES_EQUAL(map.find("schema")->second.GetString(), "ydb.serverless.v1");
12601266
UNIT_ASSERT_VALUES_EQUAL(map.find("resource_id")->second.GetString(), Sprintf("%s/Table/Stream/streamImpl", dbName.c_str()));
1261-
UNIT_ASSERT_VALUES_EQUAL(map.find("tags")->second.GetMap().find("ydb_size")->second.GetInteger(), 0);
1267+
UNIT_ASSERT_GT(map.find("tags")->second.GetMap().find("ydb_size")->second.GetInteger(), 0);
12621268
}
12631269

12641270
Y_UNIT_TEST(MeteringServerless) {

0 commit comments

Comments
 (0)