11#include " data_plane_helpers.h"
2+ #include < ydb-cpp-sdk/client/resources/ydb_resources.h>
3+ #include < ydb-cpp-sdk/client/topic/client.h>
24
35namespace NKikimr ::NPersQueueTests {
46
@@ -51,7 +53,8 @@ namespace NKikimr::NPersQueueTests {
5153 std::optional<ui32> partitionGroup,
5254 std::optional<TString> codec,
5355 std::optional<bool > reconnectOnFailure,
54- std::unordered_map<std::string, std::string> sessionMeta
56+ std::unordered_map<std::string, std::string> sessionMeta,
57+ const TString& userAgent
5558 ) {
5659 auto settings = TWriteSessionSettings ().Path (topic).MessageGroupId (sourceId);
5760 if (partitionGroup) settings.PartitionGroupId (*partitionGroup);
@@ -66,6 +69,9 @@ namespace NKikimr::NPersQueueTests {
6669 }
6770 settings.MaxMemoryUsage (1024 *1024 *1024 *1024ll );
6871 settings.Meta_ .Fields = sessionMeta;
72+ if (!userAgent.empty ()) {
73+ settings.Header ({{NYdb::YDB_APPLICATION_NAME, userAgent}});
74+ }
6975 return CreateSimpleWriter (driver, settings);
7076 }
7177
@@ -79,6 +85,21 @@ namespace NKikimr::NPersQueueTests {
7985 return TPersQueueClient (driver, clientSettings).CreateReadSession (TReadSessionSettings (settings).DisableClusterDiscovery (true ));
8086 }
8187
88+ std::shared_ptr<NYdb::NTopic::IReadSession> CreateReader (
89+ NYdb::TDriver& driver,
90+ const NYdb::NTopic::TReadSessionSettings& settings,
91+ std::shared_ptr<NYdb::ICredentialsProviderFactory> creds,
92+ const TString& userAgent
93+ ) {
94+ NYdb::NTopic::TTopicClientSettings clientSettings;
95+ if (creds) clientSettings.CredentialsProviderFactory (creds);
96+ auto readerSettings = settings;
97+ if (!userAgent.empty ()) {
98+ readerSettings.Header ({{NYdb::YDB_APPLICATION_NAME, userAgent}});
99+ }
100+ return NYdb::NTopic::TTopicClient (driver, clientSettings).CreateReadSession (readerSettings);
101+ }
102+
82103 TMaybe<TReadSessionEvent::TDataReceivedEvent> GetNextMessageSkipAssignment (std::shared_ptr<IReadSession>& reader, TDuration timeout) {
83104 while (true ) {
84105 auto future = reader->WaitEvent ();
@@ -99,4 +120,24 @@ namespace NKikimr::NPersQueueTests {
99120 }
100121 return {};
101122 }
123+
124+ TMaybe<NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent> GetNextMessageSkipAssignment (std::shared_ptr<NYdb::NTopic::IReadSession>& reader, TDuration timeout) {
125+ while (true ) {
126+ auto future = reader->WaitEvent ();
127+ future.Wait (timeout);
128+ std::optional<NYdb::NTopic::TReadSessionEvent::TEvent> event = reader->GetEvent (false , 1 );
129+ if (!event)
130+ return {};
131+ if (auto e = std::get_if<NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent>(&*event)) {
132+ return *e;
133+ } else if (auto * e = std::get_if<NYdb::NTopic::TReadSessionEvent::TStartPartitionSessionEvent>(&*event)) {
134+ e->Confirm ();
135+ } else if (auto * e = std::get_if<NYdb::NTopic::TReadSessionEvent::TStopPartitionSessionEvent>(&*event)) {
136+ e->Confirm ();
137+ } else if (std::get_if<NYdb::NTopic::TSessionClosedEvent>(&*event)) {
138+ return {};
139+ }
140+ }
141+ return {};
142+ }
102143}
0 commit comments