33#include < ydb-cpp-sdk/client/topic/client.h>
44#include < ydb-cpp-sdk/client/table/table.h>
55#include < src/client/persqueue_public/ut/ut_utils/ut_utils.h>
6+ #include < ydb/core/cms/console/console.h>
67#include < ydb/core/keyvalue/keyvalue_events.h>
78#include < ydb/core/persqueue/key.h>
89#include < ydb/core/persqueue/blob.h>
@@ -42,8 +43,14 @@ class TFixture : public NUnitTest::TBaseFixture {
4243 void WaitForEvent ();
4344 };
4445
46+ struct TFeatureFlags {
47+ bool EnablePQConfigTransactionsAtSchemeShard = true ;
48+ };
49+
4550 void SetUp (NUnitTest::TTestContext&) override ;
4651
52+ void NotifySchemeShard (const TFeatureFlags& flags);
53+
4754 NTable::TSession CreateTableSession ();
4855 NTable::TTransaction BeginTx (NTable::TSession& session);
4956 void CommitTx (NTable::TTransaction& tx, EStatus status = EStatus::SUCCESS);
@@ -65,9 +72,10 @@ class TFixture : public NUnitTest::TBaseFixture {
6572 const TString& consumer = TEST_CONSUMER,
6673 size_t partitionCount = 1 ,
6774 std::optional<size_t > maxPartitionCount = std::nullopt );
68-
6975 void DescribeTopic (const TString& path);
7076
77+ void AddConsumer (const TString& topic, const TVector<TString>& consumers);
78+
7179 void WriteToTopicWithInvalidTxId (bool invalidTxId);
7280
7381 TTopicWriteSessionPtr CreateTopicWriteSession (const TString& topicPath,
@@ -102,6 +110,8 @@ class TFixture : public NUnitTest::TBaseFixture {
102110 NYdb::EStatus status);
103111 void CloseTopicWriteSession (const TString& topicPath,
104112 const TString& messageGroupId);
113+ void CloseTopicReadSession (const TString& topicPath,
114+ const TString& consumerName);
105115
106116 enum EEndOfTransaction {
107117 Commit,
@@ -182,6 +192,8 @@ class TFixture : public NUnitTest::TBaseFixture {
182192 ui64 tabletId,
183193 const NPQ::TWriteId& writeId);
184194
195+ ui64 GetSchemeShardTabletId (const TActorId& actorId);
196+
185197 std::unique_ptr<TTopicSdkTestSetup> Setup;
186198 std::unique_ptr<TDriver> Driver;
187199
@@ -199,11 +211,27 @@ void TFixture::SetUp(NUnitTest::TTestContext&)
199211{
200212 NKikimr::Tests::TServerSettings settings = TTopicSdkTestSetup::MakeServerSettings ();
201213 settings.SetEnableTopicServiceTx (true );
214+
202215 Setup = std::make_unique<TTopicSdkTestSetup>(TEST_CASE_NAME, settings);
203216
204217 Driver = std::make_unique<TDriver>(Setup->MakeDriver ());
205218}
206219
220+ void TFixture::NotifySchemeShard (const TFeatureFlags& flags)
221+ {
222+ auto request = std::make_unique<NConsole::TEvConsole::TEvConfigNotificationRequest>();
223+ *request->Record .MutableConfig () = *Setup->GetServer ().ServerSettings .AppConfig ;
224+ request->Record .MutableConfig ()->MutableFeatureFlags ()->SetEnablePQConfigTransactionsAtSchemeShard (flags.EnablePQConfigTransactionsAtSchemeShard );
225+
226+ auto & runtime = Setup->GetRuntime ();
227+ auto actorId = runtime.AllocateEdgeActor ();
228+
229+ ui64 ssId = GetSchemeShardTabletId (actorId);
230+
231+ runtime.SendToPipe (ssId, actorId, request.release ());
232+ runtime.GrabEdgeEvent <NConsole::TEvConsole::TEvConfigNotificationResponse>();
233+ }
234+
207235NTable::TSession TFixture::CreateTableSession ()
208236{
209237 NTable::TTableClient client (GetDriver ());
@@ -330,6 +358,20 @@ void TFixture::CreateTopic(const TString& path,
330358 Setup->CreateTopic (path, consumer, partitionCount, maxPartitionCount);
331359}
332360
361+ void TFixture::AddConsumer (const TString& path,
362+ const TVector<TString>& consumers)
363+ {
364+ NTopic::TTopicClient client (GetDriver ());
365+ NTopic::TAlterTopicSettings settings;
366+
367+ for (const auto & consumer : consumers) {
368+ settings.BeginAddConsumer (consumer);
369+ }
370+
371+ auto result = client.AlterTopic (path, settings).GetValueSync ();
372+ UNIT_ASSERT_C (result.IsSuccess (), result.GetIssues ().ToString ());
373+ }
374+
333375void TFixture::DescribeTopic (const TString& path)
334376{
335377 Setup->DescribeTopic (path);
@@ -664,6 +706,13 @@ void TFixture::CloseTopicWriteSession(const TString& topicPath,
664706 TopicWriteSessions.erase (key);
665707}
666708
709+ void TFixture::CloseTopicReadSession (const TString& topicPath,
710+ const TString& consumerName)
711+ {
712+ Y_UNUSED (consumerName);
713+ TopicReadSessions.erase (topicPath);
714+ }
715+
667716void TFixture::WriteToTopic (const TString& topicPath,
668717 const TString& messageGroupId,
669718 const TString& message,
@@ -780,6 +829,37 @@ void TFixture::WaitForSessionClose(const TString& topicPath,
780829 UNIT_ASSERT (context.AckCount () <= context.WriteCount );
781830}
782831
832+ ui64 TFixture::GetSchemeShardTabletId (const TActorId& actorId)
833+ {
834+ auto navigate = std::make_unique<NSchemeCache::TSchemeCacheNavigate>();
835+ navigate->DatabaseName = " /Root" ;
836+
837+ NSchemeCache::TSchemeCacheNavigate::TEntry entry;
838+ entry.Path = SplitPath (" /Root" );
839+ entry.SyncVersion = true ;
840+ entry.ShowPrivatePath = true ;
841+ entry.Operation = NSchemeCache::TSchemeCacheNavigate::OpList;
842+
843+ navigate->ResultSet .push_back (std::move (entry));
844+ // navigate->UserToken = "root@builtin";
845+ navigate->Cookie = 12345 ;
846+
847+ auto & runtime = Setup->GetRuntime ();
848+
849+ runtime.Send (MakeSchemeCacheID (), actorId,
850+ new TEvTxProxySchemeCache::TEvNavigateKeySet (navigate.release ()),
851+ 0 ,
852+ true );
853+ auto response = runtime.GrabEdgeEvent <TEvTxProxySchemeCache::TEvNavigateKeySetResult>();
854+
855+ UNIT_ASSERT_VALUES_EQUAL (response->Request ->Cookie , 12345 );
856+ UNIT_ASSERT_VALUES_EQUAL (response->Request ->ErrorCount , 0 );
857+
858+ auto & front = response->Request ->ResultSet .front ();
859+
860+ return front.Self ->Info .GetSchemeshardId ();
861+ }
862+
783863ui64 TFixture::GetTopicTabletId (const TActorId& actorId, const TString& topicPath, ui32 partition)
784864{
785865 auto navigate = std::make_unique<NSchemeCache::TSchemeCacheNavigate>();
@@ -2017,6 +2097,41 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_38, TFixture)
20172097 WriteMessagesInTx (0 , 1 );
20182098}
20192099
2100+ Y_UNIT_TEST_F (ReadRuleGeneration, TFixture)
2101+ {
2102+ // There was a server
2103+ NotifySchemeShard ({.EnablePQConfigTransactionsAtSchemeShard = false });
2104+
2105+ // Users have created their own topic on it
2106+ CreateTopic (TEST_TOPIC);
2107+
2108+ // And they wrote their messages into it
2109+ WriteToTopic (TEST_TOPIC, TEST_MESSAGE_GROUP_ID, " message-1" );
2110+ WriteToTopic (TEST_TOPIC, TEST_MESSAGE_GROUP_ID, " message-2" );
2111+ WriteToTopic (TEST_TOPIC, TEST_MESSAGE_GROUP_ID, " message-3" );
2112+
2113+ // And he had a consumer
2114+ AddConsumer (TEST_TOPIC, {" consumer-1" });
2115+
2116+ // We read messages from the topic and committed offsets
2117+ auto messages = ReadFromTopic (TEST_TOPIC, " consumer-1" , TDuration::Seconds (2 ));
2118+ UNIT_ASSERT_VALUES_EQUAL (messages.size (), 3 );
2119+ CloseTopicReadSession (TEST_TOPIC, " consumer-1" );
2120+
2121+ // And then the Logbroker team turned on the feature flag
2122+ NotifySchemeShard ({.EnablePQConfigTransactionsAtSchemeShard = true });
2123+
2124+ // Users continued to write to the topic
2125+ WriteToTopic (TEST_TOPIC, TEST_MESSAGE_GROUP_ID, " message-4" );
2126+
2127+ // Users have added new consumers
2128+ AddConsumer (TEST_TOPIC, {" consumer-2" });
2129+
2130+ // And they wanted to continue reading their messages
2131+ messages = ReadFromTopic (TEST_TOPIC, " consumer-1" , TDuration::Seconds (2 ));
2132+ UNIT_ASSERT_VALUES_EQUAL (messages.size (), 1 );
2133+ }
2134+
20202135}
20212136
20222137}
0 commit comments