Skip to content

Commit ebacbe9

Browse files
committed
cdc_test
1 parent d325a44 commit ebacbe9

File tree

1 file changed

+27
-10
lines changed

1 file changed

+27
-10
lines changed

ydb/core/tx/schemeshard/ut_cdc_stream/ut_cdc_stream.cpp

Lines changed: 27 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1134,6 +1134,7 @@ Y_UNIT_TEST_SUITE(TCdcStreamTests) {
11341134
TTestBasicRuntime runtime;
11351135
TTestEnv env(runtime, TTestEnvOptions()
11361136
.EnableProtoSourceIdInfo(true)
1137+
.EnableChangefeedInitialScan(true)
11371138
.EnablePqBilling(serverless));
11381139
ui64 txId = 100;
11391140

@@ -1202,25 +1203,44 @@ Y_UNIT_TEST_SUITE(TCdcStreamTests) {
12021203
NLs::PathExist,
12031204
NLs::ExtractTenantSchemeshard(&schemeShard)
12041205
});
1205-
12061206
UNIT_ASSERT(schemeShard != 0 && schemeShard != TTestTxConfig::SchemeShard);
12071207

12081208
TestCreateTable(runtime, schemeShard, ++txId, dbName, R"(
12091209
Name: "Table"
12101210
Columns { Name: "key" Type: "Uint64" }
1211-
Columns { Name: "value" Type: "Uint64" }
1211+
Columns { Name: "value" Type: "Utf8" }
12121212
KeyColumnNames: ["key"]
12131213
UniformPartitionsCount: 2
12141214
)");
12151215
env.TestWaitNotification(runtime, txId, schemeShard);
12161216

12171217
runtime.SetLogPriority(NKikimrServices::PERSQUEUE, NLog::PRI_NOTICE);
1218+
{
1219+
// should write at least 16MiB to exceed the topics non-metered limit
1220+
const TString value = TString(500_KB, 'x');
1221+
for (size_t i = 0; i < 100; ++i) {
1222+
const unsigned key = 3000 + i;
1223+
const TString writeQuery = Sprintf(R"(
1224+
(
1225+
(let key '( '('key (Uint64 '%u ) ) ) )
1226+
(let row '( '('value (Utf8 '%s) ) ) )
1227+
(return (AsList (UpdateRow '__user__%s key row) ))
1228+
)
1229+
)", key, value.c_str(), "Table");
1230+
NKikimrMiniKQL::TResult result;
1231+
TString err;
1232+
NKikimrProto::EReplyStatus status = LocalMiniKQL(runtime, TTestTxConfig::FakeHiveTablets + 6, writeQuery, result, err);
1233+
UNIT_ASSERT_VALUES_EQUAL(err, "");
1234+
UNIT_ASSERT_VALUES_EQUAL(status, NKikimrProto::EReplyStatus::OK);
1235+
}
1236+
}
1237+
12181238
TVector<TString> meteringRecords;
12191239
runtime.SetObserverFunc([&meteringRecords](TAutoPtr<IEventHandle>& ev) {
12201240
if (ev->GetTypeRewrite() != NMetering::TEvMetering::EvWriteMeteringJson) {
12211241
return TTestActorRuntime::EEventAction::PROCESS;
12221242
}
1223-
1243+
Cerr << "GOT METERING RECORD: " << ev->Get<NMetering::TEvMetering::TEvWriteMeteringJson>()->MeteringJson << Endl;
12241244
meteringRecords.push_back(ev->Get<NMetering::TEvMetering::TEvWriteMeteringJson>()->MeteringJson);
12251245
return TTestActorRuntime::EEventAction::PROCESS;
12261246
});
@@ -1229,8 +1249,9 @@ Y_UNIT_TEST_SUITE(TCdcStreamTests) {
12291249
TableName: "Table"
12301250
StreamDescription {
12311251
Name: "Stream"
1232-
Mode: ECdcStreamModeKeysOnly
1252+
Mode: ECdcStreamModeNewImage
12331253
Format: ECdcStreamFormatProto
1254+
State: ECdcStreamStateScan
12341255
}
12351256
)");
12361257
env.TestWaitNotification(runtime, txId, schemeShard);
@@ -1239,26 +1260,22 @@ Y_UNIT_TEST_SUITE(TCdcStreamTests) {
12391260
env.SimulateSleep(runtime, TDuration::Seconds(10));
12401261
}
12411262

1242-
for (const auto& rec : meteringRecords) {
1243-
Cerr << "GOT METERING: " << rec << "\n";
1244-
}
1245-
12461263
UNIT_ASSERT_VALUES_EQUAL(meteringRecords.size(), (serverless ? 3 : 0));
12471264

12481265
if (!meteringRecords) {
12491266
return;
12501267
}
12511268

12521269
NJson::TJsonValue json;
1253-
NJson::ReadJsonTree(meteringRecords[0], &json, true);
1270+
NJson::ReadJsonTree(meteringRecords.back(), &json, true);
12541271
auto& map = json.GetMap();
12551272
UNIT_ASSERT(map.contains("schema"));
12561273
UNIT_ASSERT(map.contains("resource_id"));
12571274
UNIT_ASSERT(map.contains("tags"));
12581275
UNIT_ASSERT(map.find("tags")->second.GetMap().contains("ydb_size"));
12591276
UNIT_ASSERT_VALUES_EQUAL(map.find("schema")->second.GetString(), "ydb.serverless.v1");
12601277
UNIT_ASSERT_VALUES_EQUAL(map.find("resource_id")->second.GetString(), Sprintf("%s/Table/Stream/streamImpl", dbName.c_str()));
1261-
UNIT_ASSERT_VALUES_EQUAL(map.find("tags")->second.GetMap().find("ydb_size")->second.GetInteger(), 0);
1278+
UNIT_ASSERT_GT(map.find("tags")->second.GetMap().find("ydb_size")->second.GetInteger(), 0);
12621279
}
12631280

12641281
Y_UNIT_TEST(MeteringServerless) {

0 commit comments

Comments
 (0)