@@ -32,28 +32,6 @@ const TString& TopicPrefix(const TActorContext& ctx) {
32
32
return topicPrefix;
33
33
}
34
34
35
- TProcessingResult ProcessMetaCacheAllTopicsResponse (TEvPqNewMetaCache::TEvDescribeAllTopicsResponse::TPtr& ev) {
36
- auto & res = ev->Get ()->Result ;
37
- const TString& path = ev->Get ()->Path ;
38
- TProcessingResult result;
39
- if (!ev->Get ()->Success ) {
40
- return TProcessingResult{
41
- MSTATUS_ERROR,
42
- NPersQueue::NErrorCode::UNKNOWN_TOPIC,
43
- Sprintf (" path '%s' has invalid/unknown root prefix, Marker# PQ14" , path.c_str ()),
44
- true
45
- };
46
- }
47
- if (!res) {
48
- return TProcessingResult{
49
- MSTATUS_ERROR,
50
- NPersQueue::NErrorCode::ERROR,
51
- Sprintf (" path '%s' describe error, Status# no status, reason: no reason, Marker# PQ1" , path.c_str ()),
52
- true
53
- };
54
- }
55
- return {};
56
- }
57
35
58
36
TProcessingResult ProcessMetaCacheSingleTopicsResponse (
59
37
const TSchemeCacheNavigate::TEntry& entry
@@ -165,7 +143,9 @@ struct TTabletInfo {
165
143
TVector<NKikimrPQ::TStatusResponse::TPartResult> StatusResponses;
166
144
};
167
145
168
- TPersQueueBaseRequestProcessor::TPersQueueBaseRequestProcessor (const NKikimrClient::TPersQueueRequest& request, const TActorId& pqMetaCacheId, bool listNodes)
146
+ TPersQueueBaseRequestProcessor::TPersQueueBaseRequestProcessor (
147
+ const NKikimrClient::TPersQueueRequest& request, const TActorId& pqMetaCacheId, bool listNodes
148
+ )
169
149
: RequestProto(new NKikimrClient::TPersQueueRequest(request))
170
150
, RequestId(RequestProto->HasRequestId () ? RequestProto->GetRequestId() : "<none>")
171
151
, PqMetaCache(pqMetaCacheId)
@@ -192,9 +172,17 @@ void TPersQueueBaseRequestProcessor::AnswerAndDie(const TActorContext& ctx) {
192
172
void TPersQueueBaseRequestProcessor::Bootstrap (const TActorContext& ctx) {
193
173
StartTimestamp = ctx.Now ();
194
174
195
- LOG_TRACE_S (ctx, NKikimrServices::PERSQUEUE, " Send to PqMetaCache TEvDescribeAllTopicsRequest" );
196
- bool ret = ctx.Send (PqMetaCache, new NPqMetaCacheV2::TEvPqNewMetaCache::TEvDescribeAllTopicsRequest ());
197
- LOG_TRACE_S (ctx, NKikimrServices::PERSQUEUE, " Send to PqMetaCache TEvDescribeAllTopicsRequest Result:" << ret);
175
+ if (TopicsToRequest.empty ()) {
176
+ throw std::runtime_error (" No topics in request" );
177
+ }
178
+ LOG_TRACE_S (ctx, NKikimrServices::PERSQUEUE, " Send to PqMetaCache TEvDescribeTopicsRequest" );
179
+ TVector<TString> topicsToRequest;
180
+ topicsToRequest.reserve (TopicsToRequest.size ());
181
+ for (const auto & topic : TopicsToRequest) {
182
+ topicsToRequest.push_back (topic);
183
+ }
184
+ bool ret = ctx.Send (PqMetaCache, new NPqMetaCacheV2::TEvPqNewMetaCache::TEvDescribeTopicsByNameRequest (topicsToRequest));
185
+ LOG_TRACE_S (ctx, NKikimrServices::PERSQUEUE, " Send to PqMetaCache TEvDescribeTopicsRequest Result:" << ret);
198
186
199
187
if (ListNodes) {
200
188
const TActorId nameserviceId = GetNameserviceActorId ();
@@ -216,7 +204,6 @@ STFUNC(TPersQueueBaseRequestProcessor::StateFunc) {
216
204
switch (ev->GetTypeRewrite ()) {
217
205
HFunc (TEvInterconnect::TEvNodesInfo, Handle);
218
206
HFunc (NPqMetaCacheV2::TEvPqNewMetaCache::TEvDescribeTopicsResponse, Handle);
219
- HFunc (NPqMetaCacheV2::TEvPqNewMetaCache::TEvDescribeAllTopicsResponse, Handle);
220
207
HFunc (NPqMetaCacheV2::TEvPqNewMetaCache::TEvGetNodesMappingResponse, Handle);
221
208
HFunc (TEvPersQueue::TEvResponse, Handle);
222
209
CFunc (TEvents::TSystem::Wakeup, HandleTimeout);
@@ -274,56 +261,54 @@ void TPersQueueBaseRequestProcessor::HandleTimeout(const TActorContext& ctx) {
274
261
}
275
262
276
263
277
- void TPersQueueBaseRequestProcessor:: GetTopicsListOrThrow (
264
+ THashSet<TString> GetTopicsListOrThrow (
278
265
const ::google::protobuf::RepeatedPtrField<::NKikimrClient::TPersQueueMetaRequest::TTopicRequest>& requests,
279
- THashMap<TString, std::shared_ptr<THashSet<ui64>>>& partitionsToRequest
266
+ THashMap<TString, std::shared_ptr<THashSet<ui64>>>* partitionsToRequest
280
267
) {
268
+ THashSet<TString> ret;
281
269
for (const auto & topicRequest : requests) {
282
270
if (topicRequest.GetTopic ().empty ()) {
283
271
throw std::runtime_error (" TopicRequest must have Topic field." );
284
272
}
285
- std::shared_ptr<THashSet<ui64>> partitionsToRequestOnTopic (new THashSet<ui64>()); // nonconst
286
- partitionsToRequest[topicRequest.GetTopic ()] = partitionsToRequestOnTopic;
287
- for (ui32 partition : topicRequest.GetPartition ()) {
288
- const bool inserted = partitionsToRequestOnTopic->insert (partition).second ;
289
- if (!inserted) {
290
- TStringBuilder desc;
291
- desc << " multiple partition " << partition
292
- << " in TopicRequest for topic '" << topicRequest.GetTopic () << " '" ;
293
- throw std::runtime_error (desc);
273
+ if (partitionsToRequest != nullptr ) {
274
+ std::shared_ptr<THashSet<ui64>> partitionsToRequestOnTopic (new THashSet<ui64>()); // nonconst
275
+ (*partitionsToRequest)[topicRequest.GetTopic ()] = partitionsToRequestOnTopic;
276
+ for (ui32 partition : topicRequest.GetPartition ()) {
277
+ const bool inserted = partitionsToRequestOnTopic->insert (partition).second ;
278
+ if (!inserted) {
279
+ TStringBuilder desc;
280
+ desc << " multiple partition " << partition
281
+ << " in TopicRequest for topic '" << topicRequest.GetTopic () << " '" ;
282
+ throw std::runtime_error (desc);
283
+ }
294
284
}
295
285
}
296
-
297
- const bool res = TopicsToRequest.insert (topicRequest.GetTopic ()).second ;
286
+ const bool res = ret.insert (topicRequest.GetTopic ()).second ;
298
287
if (!res) {
299
288
TStringBuilder desc;
300
289
desc << " multiple TopicRequest for topic '" << topicRequest.GetTopic () << " '" ;
301
290
throw std::runtime_error (desc);
302
291
}
303
292
}
304
-
293
+ return ret;
305
294
}
306
295
307
296
void TPersQueueBaseRequestProcessor::Handle (
308
- NPqMetaCacheV2::TEvPqNewMetaCache::TEvDescribeTopicsResponse::TPtr&, const TActorContext&
297
+ NPqMetaCacheV2::TEvPqNewMetaCache::TEvDescribeTopicsResponse::TPtr& ev , const TActorContext& ctx
309
298
) {
310
- Y_ABORT ();
311
- }
312
-
313
- void TPersQueueBaseRequestProcessor::Handle (
314
- NPqMetaCacheV2::TEvPqNewMetaCache::TEvDescribeAllTopicsResponse::TPtr& ev, const TActorContext& ctx
315
- ) {
316
- LOG_TRACE_S (ctx, NKikimrServices::PERSQUEUE, " TPersQueueBaseRequestProcessor::Handle" );
317
-
318
- auto & path = ev->Get ()->Path ;
319
- if (!ev->Get ()->Success ) {
320
- return SendErrorReplyAndDie (ctx, MSTATUS_ERROR, NPersQueue::NErrorCode::UNKNOWN_TOPIC,
321
- TStringBuilder () << " no path '" << path << " ', Marker# PQ17" );
299
+ TopicsConverters.reserve (ev->Get ()->TopicsRequested .size ());
300
+ Y_ABORT_UNLESS (ev->Get ()->Result ->ResultSet .size () == ev->Get ()->TopicsRequested .size ());
301
+ for (ui32 i = 0 ; i < ev->Get ()->TopicsRequested .size (); ++i) {
302
+ if (ev->Get ()->Result ->ResultSet [i].PQGroupInfo ) {
303
+ const auto & pqTabletConfig = ev->Get ()->Result ->ResultSet [i].PQGroupInfo ->Description .GetPQTabletConfig ();
304
+ TopicsConverters.push_back (ev->Get ()->TopicsRequested [i]->UpgradeToFullConverter (
305
+ pqTabletConfig,
306
+ AppData (ctx)->PQConfig .GetTestDatabaseRoot ()));
307
+ } else {
308
+ TopicsConverters.push_back (nullptr );
309
+ }
322
310
}
323
-
324
311
TopicsDescription = std::move (ev->Get ()->Result );
325
- TopicsConverters = std::move (ev->Get ()->Topics );
326
- Y_ABORT_UNLESS (TopicsDescription->ResultSet .size () == TopicsConverters.size ());
327
312
if (ReadyToCreateChildren ()) {
328
313
if (CreateChildren (ctx)) {
329
314
return ;
@@ -501,8 +486,6 @@ STFUNC(TTopicInfoBasedActor::StateFunc) {
501
486
502
487
503
488
class TMessageBusServerPersQueueImpl : public TActorBootstrapped <TMessageBusServerPersQueueImpl> {
504
- using TEvDescribeAllTopicsRequest = NMsgBusProxy::NPqMetaCacheV2::TEvPqNewMetaCache::TEvDescribeAllTopicsRequest;
505
- using TEvDescribeAllTopicsResponse = NMsgBusProxy::NPqMetaCacheV2::TEvPqNewMetaCache::TEvDescribeAllTopicsResponse;
506
489
507
490
protected:
508
491
NKikimrClient::TPersQueueRequest RequestProto;
@@ -589,6 +572,11 @@ class TMessageBusServerPersQueueImpl : public TActorBootstrapped<TMessageBusServ
589
572
}
590
573
TopicInfo[d.GetTopic (i)];
591
574
}
575
+ if (TopicInfo.empty ()) {
576
+ ErrorReason = " no topics in GetTopicMetadata request" ;
577
+ return ;
578
+ }
579
+
592
580
} else if (meta.HasCmdGetPartitionStatus ()) {
593
581
if (!GetTopicsList (meta.GetCmdGetPartitionStatus ().topicrequest ()))
594
582
return ;
@@ -601,6 +589,10 @@ class TMessageBusServerPersQueueImpl : public TActorBootstrapped<TMessageBusServ
601
589
}
602
590
TopicInfo[d.GetTopic (i)];
603
591
}
592
+ if (TopicInfo.empty ()) {
593
+ ErrorReason = " no topics in GetReadSessionsInfo request" ;
594
+ return ;
595
+ }
604
596
}
605
597
else
606
598
ErrorReason = " Not implemented yet" ;
@@ -941,31 +933,22 @@ class TMessageBusServerPersQueueImpl : public TActorBootstrapped<TMessageBusServ
941
933
}
942
934
943
935
944
- void Handle (TEvDescribeAllTopicsResponse ::TPtr& ev, const TActorContext& ctx) {
936
+ void Handle (TEvPqNewMetaCache::TEvDescribeTopicsResponse ::TPtr& ev, const TActorContext& ctx) {
945
937
--DescribeRequests;
946
- auto & res = ev->Get ()->Result ->ResultSet ;
947
- auto & topics = ev->Get ()->Topics ;
948
- auto processResult = ProcessMetaCacheAllTopicsResponse (ev);
949
- if (processResult.IsFatal ) {
950
- ErrorReason = processResult.Reason ;
951
- return SendReplyAndDie (CreateErrorReply (processResult.Status , processResult.ErrorCode , ctx), ctx);
952
- }
953
-
954
- NoTopicsAtStart = TopicInfo.empty ();
955
- bool hasTopics = !NoTopicsAtStart;
938
+ auto & resultSet = ev->Get ()->Result ->ResultSet ;
956
939
957
- Y_ABORT_UNLESS (topics.size () == res.size ());
958
- auto factory = NPersQueue::TTopicNamesConverterFactory (AppData (ctx)->PQConfig , {});
959
- for (auto i = 0u ; i != res.size (); i++) {
960
- auto & entry = res[i];
961
- auto & converter = ev->Get ()->Topics [i];
940
+ Y_ABORT_UNLESS (TopicInfo.size () == resultSet.size ());
941
+ for (auto i = 0u ; i != resultSet.size (); i++) {
942
+ auto & entry = resultSet[i];
943
+ auto & converter = ev->Get ()->TopicsRequested [i];
962
944
if (entry.Kind == TSchemeCacheNavigate::EKind::KindTopic && entry.PQGroupInfo && converter) {
963
945
auto & description = entry.PQGroupInfo ->Description ;
964
- if (!hasTopics || TopicInfo.find (converter->GetClientsideName ()) != TopicInfo.end ()) {
965
- auto & topicInfo = TopicInfo[converter->GetClientsideName ()];
966
- topicInfo.BalancerTabletId = description.GetBalancerTabletID ();
967
- topicInfo.PQInfo = entry.PQGroupInfo ;
968
- }
946
+ auto converter = ev->Get ()->TopicsRequested [i]->UpgradeToFullConverter (description.GetPQTabletConfig (),
947
+ AppData (ctx)->PQConfig .GetTestDatabaseRoot ());
948
+ Y_ABORT_UNLESS (TopicInfo.contains (converter->GetClientsideName ()));
949
+ auto & topicInfo = TopicInfo[converter->GetClientsideName ()];
950
+ topicInfo.BalancerTabletId = description.GetBalancerTabletID ();
951
+ topicInfo.PQInfo = entry.PQGroupInfo ;
969
952
}
970
953
}
971
954
@@ -1389,8 +1372,14 @@ class TMessageBusServerPersQueueImpl : public TActorBootstrapped<TMessageBusServ
1389
1372
LOG_DEBUG_S (ctx, NKikimrServices::PERSQUEUE, " scheduling HasDataInfoResponse in " << RequestProto.GetFetchRequest ().GetWaitMs ());
1390
1373
ctx.Schedule (TDuration::MilliSeconds (Min<ui32>(RequestProto.GetFetchRequest ().GetWaitMs (), 30000 )), new TEvPersQueue::TEvHasDataInfoResponse);
1391
1374
}
1375
+ Y_ABORT_UNLESS (!TopicInfo.empty ());
1392
1376
1393
- auto * request = new TEvDescribeAllTopicsRequest ();
1377
+ TVector<TString> topics;
1378
+ topics.reserve (TopicInfo.size ());
1379
+ for (const auto & [topic, _] : TopicInfo) {
1380
+ topics.push_back (topic);
1381
+ }
1382
+ auto * request = new TEvPqNewMetaCache::TEvDescribeTopicsByNameRequest (topics);
1394
1383
ctx.Send (SchemeCache, request);
1395
1384
++DescribeRequests;
1396
1385
@@ -1406,7 +1395,7 @@ class TMessageBusServerPersQueueImpl : public TActorBootstrapped<TMessageBusServ
1406
1395
1407
1396
STRICT_STFUNC (StateFunc,
1408
1397
HFunc (TEvInterconnect::TEvNodesInfo, Handle);
1409
- HFunc (TEvDescribeAllTopicsResponse , Handle);
1398
+ HFunc (TEvPqNewMetaCache::TEvDescribeTopicsResponse , Handle);
1410
1399
HFunc (TEvTabletPipe::TEvClientDestroyed, Handle);
1411
1400
HFunc (TEvTabletPipe::TEvClientConnected, Handle);
1412
1401
HFunc (TEvPersQueue::TEvResponse, Handle);
@@ -1443,6 +1432,10 @@ class TMessageBusServerPersQueueImpl : public TActorBootstrapped<TMessageBusServ
1443
1432
TopicInfo[topic] = std::move (topicInfo);
1444
1433
}
1445
1434
}
1435
+ if (TopicInfo.empty ()) {
1436
+ ErrorReason = " No topics in request" ;
1437
+ return false ;
1438
+ }
1446
1439
return true ;
1447
1440
}
1448
1441
};
0 commit comments