Skip to content

Commit 6bcf4a6

Browse files
authored
YMQ tags (#12872)
1 parent e46360e commit 6bcf4a6

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

59 files changed

+1666
-185
lines changed

ydb/core/grpc_services/service_ymq.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,5 +26,8 @@ void DoYmqSendMessageBatchRequest(std::unique_ptr<IRequestOpCtx> p, const IFacil
2626
void DoYmqDeleteMessageBatchRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
2727
void DoYmqChangeMessageVisibilityBatchRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
2828
void DoYmqListDeadLetterSourceQueuesRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
29+
void DoYmqListQueueTagsRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
30+
void DoYmqTagQueueRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
31+
void DoYmqUntagQueueRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
2932
}
3033
}

ydb/core/http_proxy/events.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ namespace NKikimr::NHttpProxy {
5151
struct TEvGrpcRequestResult : public TEventLocal<TEvGrpcRequestResult, EvGrpcRequestResult> {
5252
THolder<google::protobuf::Message> Message;
5353
THolder<NYdb::TStatus> Status;
54+
THolder<THashMap<TString, TString>> QueueTags;
5455
};
5556

5657
struct TEvDiscoverDatabaseEndpointRequest : public TEventLocal<TEvDiscoverDatabaseEndpointRequest, EvDiscoverDatabaseEndpointRequest> {

ydb/core/http_proxy/http_req.cpp

Lines changed: 18 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -308,6 +308,14 @@ namespace NKikimr::NHttpProxy {
308308
NYdb::EStatus(response.operation().status()),
309309
std::move(issues)
310310
);
311+
Ydb::Ymq::V1::QueueTags queueTags;
312+
response.operation().metadata().UnpackTo(&queueTags);
313+
for (const auto& [k, v] : queueTags.GetTags()) {
314+
if (!result->QueueTags.Get()) {
315+
result->QueueTags = MakeHolder<THashMap<TString, TString>>();
316+
}
317+
result->QueueTags->emplace(k, v);
318+
}
311319
actorSystem->Send(actorId, result.Release());
312320
}
313321
);
@@ -374,6 +382,9 @@ namespace NKikimr::NHttpProxy {
374382
);
375383
HttpContext.ResponseData.IsYmq = true;
376384
HttpContext.ResponseData.YmqHttpCode = 200;
385+
if (ev->Get()->QueueTags) {
386+
HttpContext.ResponseData.QueueTags = std::move(*ev->Get()->QueueTags);
387+
}
377388
ReplyToHttpContext(ctx);
378389
} else {
379390
auto retryClass = NYdb::NTopic::GetRetryErrorClass(ev->Get()->Status->GetStatus());
@@ -510,40 +521,8 @@ namespace NKikimr::NHttpProxy {
510521
SendGrpcRequestNoDriver(ctx);
511522
} else {
512523
auto requestHolder = MakeHolder<NKikimrClient::TSqsRequest>();
513-
// TODO? action = NSQS::ActionFromString(Method);
514-
NSQS::EAction action = NSQS::EAction::Unknown;
515-
if (Method == "CreateQueue") {
516-
action = NSQS::EAction::CreateQueue;
517-
} else if (Method == "GetQueueUrl") {
518-
action = NSQS::EAction::GetQueueUrl;
519-
} else if (Method == "SendMessage") {
520-
action = NSQS::EAction::SendMessage;
521-
} else if (Method == "ReceiveMessage") {
522-
action = NSQS::EAction::ReceiveMessage;
523-
} else if (Method == "GetQueueAttributes") {
524-
action = NSQS::EAction::GetQueueAttributes;
525-
} else if (Method == "ListQueues") {
526-
action = NSQS::EAction::ListQueues;
527-
} else if (Method == "DeleteMessage") {
528-
action = NSQS::EAction::DeleteMessage;
529-
} else if (Method == "PurgeQueue") {
530-
action = NSQS::EAction::PurgeQueue;
531-
} else if (Method == "DeleteQueue") {
532-
action = NSQS::EAction::DeleteQueue;
533-
} else if (Method == "ChangeMessageVisibility") {
534-
action = NSQS::EAction::ChangeMessageVisibility;
535-
} else if (Method == "SetQueueAttributes") {
536-
action = NSQS::EAction::SetQueueAttributes;
537-
} else if (Method == "SendMessageBatch") {
538-
action = NSQS::EAction::SendMessageBatch;
539-
}else if (Method == "DeleteMessageBatch") {
540-
action = NSQS::EAction::DeleteMessageBatch;
541-
} else if (Method == "ChangeMessageVisibilityBatch") {
542-
action = NSQS::EAction::ChangeMessageVisibilityBatch;
543-
} else if (Method == "ListDeadLetterSourceQueues") {
544-
action = NSQS::EAction::ListDeadLetterSourceQueues;
545-
}
546524

525+
NSQS::EAction action = NSQS::ActionFromString(Method);
547526
requestHolder->SetRequestId(HttpContext.RequestId);
548527

549528
NSQS::TAuthActorData data {
@@ -1081,6 +1060,9 @@ namespace NKikimr::NHttpProxy {
10811060
DECLARE_YMQ_PROCESSOR_QUEUE_KNOWN(DeleteMessageBatch);
10821061
DECLARE_YMQ_PROCESSOR_QUEUE_KNOWN(ChangeMessageVisibilityBatch);
10831062
DECLARE_YMQ_PROCESSOR_QUEUE_KNOWN(ListDeadLetterSourceQueues);
1063+
DECLARE_YMQ_PROCESSOR_QUEUE_KNOWN(ListQueueTags);
1064+
DECLARE_YMQ_PROCESSOR_QUEUE_KNOWN(TagQueue);
1065+
DECLARE_YMQ_PROCESSOR_QUEUE_KNOWN(UntagQueue);
10841066
#undef DECLARE_YMQ_PROCESSOR_QUEUE_KNOWN
10851067
}
10861068

@@ -1264,6 +1246,9 @@ namespace NKikimr::NHttpProxy {
12641246
requestAttributes.SourceAddress = SourceAddress;
12651247
requestAttributes.ResourceId = ResourceId;
12661248
requestAttributes.Action = NSQS::ActionFromString(MethodName);
1249+
for (const auto& [k, v] : ResponseData.QueueTags) {
1250+
requestAttributes.QueueTags[k] = v;
1251+
}
12671252

12681253
LOG_SP_DEBUG_S(
12691254
ctx,

ydb/core/http_proxy/http_req.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ struct THttpResponseData {
5959
TString YmqStatusCode;
6060
ui32 YmqHttpCode = 500;
6161
bool YmqIsFifo = false;
62+
THashMap<TString, TString> QueueTags;
6263

6364
TString DumpBody(MimeTypes contentType);
6465
};

ydb/core/http_proxy/ut/datastreams_fixture.h

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -351,7 +351,9 @@ class THttpProxyTestMock : public NUnitTest::TBaseFixture {
351351

352352
NJson::TJsonMap SendJsonRequest(TString method, NJson::TJsonMap request, ui32 expectedHttpCode = 200) {
353353
auto res = SendHttpRequest("/Root", TStringBuilder() << "AmazonSQS." << method, request, FormAuthorizationStr("ru-central1"));
354-
UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, expectedHttpCode);
354+
if (expectedHttpCode != 0) {
355+
UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, expectedHttpCode);
356+
}
355357
NJson::TJsonMap json;
356358
UNIT_ASSERT(NJson::ReadJsonTree(res.Body, &json));
357359
return json;
@@ -405,6 +407,18 @@ class THttpProxyTestMock : public NUnitTest::TBaseFixture {
405407
return SendJsonRequest("SetQueueAttributes", request, expectedHttpCode);
406408
}
407409

410+
NJson::TJsonMap ListQueueTags(NJson::TJsonMap request, ui32 expectedHttpCode = 200) {
411+
return SendJsonRequest("ListQueueTags", request, expectedHttpCode);
412+
}
413+
414+
NJson::TJsonMap TagQueue(NJson::TJsonMap request = {}, ui32 expectedHttpCode = 200) {
415+
return SendJsonRequest("TagQueue", request, expectedHttpCode);
416+
}
417+
418+
NJson::TJsonMap UntagQueue(NJson::TJsonMap request = {}, ui32 expectedHttpCode = 200) {
419+
return SendJsonRequest("UntagQueue", request, expectedHttpCode);
420+
}
421+
408422
void WaitQueueAttributes(TString queueUrl, size_t retries, NJson::TJsonMap attributes) {
409423
WaitQueueAttributes(queueUrl, retries, [&attributes](NJson::TJsonMap json) {
410424
for (const auto& [k, v] : attributes.GetMapSafe()) {
@@ -534,6 +548,7 @@ class THttpProxyTestMock : public NUnitTest::TBaseFixture {
534548
ActorRuntime->SetLogPriority(NKikimrServices::HTTP_PROXY, NLog::PRI_DEBUG);
535549
ActorRuntime->SetLogPriority(NActorsServices::EServiceCommon::HTTP, NLog::PRI_DEBUG);
536550
ActorRuntime->SetLogPriority(NKikimrServices::TICKET_PARSER, NLog::PRI_TRACE);
551+
ActorRuntime->SetLogPriority(NKikimrServices::SQS, NLog::PRI_TRACE);
537552

538553
if (enableMetering) {
539554
ActorRuntime->RegisterService(
@@ -572,6 +587,7 @@ class THttpProxyTestMock : public NUnitTest::TBaseFixture {
572587
"Columns { Name: \"Version\" Type: \"Uint64\"}"
573588
"Columns { Name: \"DlqName\" Type: \"Utf8\"}"
574589
"Columns { Name: \"TablesFormat\" Type: \"Uint32\"}"
590+
"Columns { Name: \"Tags\" Type: \"Utf8\"}"
575591
"KeyColumnNames: [\"Account\", \"QueueName\"]"
576592
);
577593

@@ -710,6 +726,7 @@ class THttpProxyTestMock : public NUnitTest::TBaseFixture {
710726
"Columns { Name: \"CustomQueueName\" Type: \"Utf8\"}"
711727
"Columns { Name: \"EventTimestamp\" Type: \"Uint64\"}"
712728
"Columns { Name: \"FolderId\" Type: \"Utf8\"}"
729+
"Columns { Name: \"Labels\" Type: \"Utf8\"}"
713730
"KeyColumnNames: [\"Account\", \"QueueName\", \"EventType\"]"
714731
);
715732

0 commit comments

Comments
 (0)