Skip to content

Commit e63e663

Browse files
committed
rgw/admin/notification: add command to dump notifications
command will dump pending notifications from a persistent queue in JSON format: radosgw-admin topic dump --topic <name> Fixes: https://tracker.ceph.com/issues/66404 Signed-off-by: Yuval Lifshitz <[email protected]>
1 parent 8c4f910 commit e63e663

File tree

6 files changed

+220
-46
lines changed

6 files changed

+220
-46
lines changed

src/rgw/driver/rados/rgw_notify.cc

Lines changed: 0 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -26,50 +26,6 @@
2626

2727
namespace rgw::notify {
2828

29-
struct event_entry_t {
30-
rgw_pubsub_s3_event event;
31-
std::string push_endpoint;
32-
std::string push_endpoint_args;
33-
std::string arn_topic;
34-
ceph::coarse_real_time creation_time;
35-
uint32_t time_to_live = DEFAULT_GLOBAL_VALUE;
36-
uint32_t max_retries = DEFAULT_GLOBAL_VALUE;
37-
uint32_t retry_sleep_duration = DEFAULT_GLOBAL_VALUE;
38-
39-
void encode(bufferlist& bl) const {
40-
ENCODE_START(3, 1, bl);
41-
encode(event, bl);
42-
encode(push_endpoint, bl);
43-
encode(push_endpoint_args, bl);
44-
encode(arn_topic, bl);
45-
encode(creation_time, bl);
46-
encode(time_to_live, bl);
47-
encode(max_retries, bl);
48-
encode(retry_sleep_duration, bl);
49-
ENCODE_FINISH(bl);
50-
}
51-
52-
void decode(bufferlist::const_iterator& bl) {
53-
DECODE_START(3, bl);
54-
decode(event, bl);
55-
decode(push_endpoint, bl);
56-
decode(push_endpoint_args, bl);
57-
decode(arn_topic, bl);
58-
if (struct_v > 1) {
59-
decode(creation_time, bl);
60-
} else {
61-
creation_time = ceph::coarse_real_clock::zero();
62-
}
63-
if (struct_v > 2) {
64-
decode(time_to_live, bl);
65-
decode(max_retries, bl);
66-
decode(retry_sleep_duration, bl);
67-
}
68-
DECODE_FINISH(bl);
69-
}
70-
};
71-
WRITE_CLASS_ENCODER(event_entry_t)
72-
7329
static inline std::ostream& operator<<(std::ostream& out,
7430
const event_entry_t& e) {
7531
std::string host;

src/rgw/rgw_admin.cc

Lines changed: 70 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ extern "C" {
3232

3333
#include "cls/rgw/cls_rgw_types.h"
3434
#include "cls/rgw/cls_rgw_client.h"
35+
#include "cls/2pc_queue/cls_2pc_queue_types.h"
36+
#include "cls/2pc_queue/cls_2pc_queue_client.h"
3537

3638
#include "include/utime.h"
3739
#include "include/str_list.h"
@@ -327,6 +329,7 @@ void usage()
327329
cout << " topic get get a bucket notifications topic\n";
328330
cout << " topic rm remove a bucket notifications topic\n";
329331
cout << " topic stats get a bucket notifications persistent topic stats (i.e. reservations, entries & size)\n";
332+
cout << " topic dump dump (in JSON format) all pending bucket notifications of a persistent topic\n";
330333
cout << " script put upload a Lua script to a context\n";
331334
cout << " script get get the Lua script of a context\n";
332335
cout << " script rm remove the Lua scripts of a context\n";
@@ -867,6 +870,7 @@ enum class OPT {
867870
PUBSUB_NOTIFICATION_GET,
868871
PUBSUB_NOTIFICATION_RM,
869872
PUBSUB_TOPIC_STATS,
873+
PUBSUB_TOPIC_DUMP,
870874
SCRIPT_PUT,
871875
SCRIPT_GET,
872876
SCRIPT_RM,
@@ -1115,6 +1119,7 @@ static SimpleCmd::Commands all_cmds = {
11151119
{ "notification get", OPT::PUBSUB_NOTIFICATION_GET },
11161120
{ "notification rm", OPT::PUBSUB_NOTIFICATION_RM },
11171121
{ "topic stats", OPT::PUBSUB_TOPIC_STATS },
1122+
{ "topic dump", OPT::PUBSUB_TOPIC_DUMP },
11181123
{ "script put", OPT::SCRIPT_PUT },
11191124
{ "script get", OPT::SCRIPT_GET },
11201125
{ "script rm", OPT::SCRIPT_RM },
@@ -4326,6 +4331,7 @@ int main(int argc, const char **argv)
43264331
OPT::PUBSUB_TOPIC_GET,
43274332
OPT::PUBSUB_NOTIFICATION_GET,
43284333
OPT::PUBSUB_TOPIC_STATS ,
4334+
OPT::PUBSUB_TOPIC_DUMP ,
43294335
OPT::SCRIPT_GET,
43304336
};
43314337

@@ -4426,6 +4432,7 @@ int main(int argc, const char **argv)
44264432
&& opt_cmd != OPT::PUBSUB_TOPIC_RM
44274433
&& opt_cmd != OPT::PUBSUB_NOTIFICATION_RM
44284434
&& opt_cmd != OPT::PUBSUB_TOPIC_STATS
4435+
&& opt_cmd != OPT::PUBSUB_TOPIC_DUMP
44294436
&& opt_cmd != OPT::SCRIPT_PUT
44304437
&& opt_cmd != OPT::SCRIPT_GET
44314438
&& opt_cmd != OPT::SCRIPT_RM
@@ -11270,9 +11277,10 @@ int main(int argc, const char **argv)
1127011277
return ENOENT;
1127111278
}
1127211279

11280+
auto ioctx = static_cast<rgw::sal::RadosStore*>(driver)->getRados()->get_notif_pool_ctx();
1127311281
rgw::notify::rgw_topic_stats stats;
1127411282
ret = rgw::notify::get_persistent_queue_stats(
11275-
dpp(), static_cast<rgw::sal::RadosStore *>(driver)->getRados()->get_notif_pool_ctx(),
11283+
dpp(), ioctx,
1127611284
topic.dest.persistent_queue, stats, null_yield);
1127711285
if (ret < 0) {
1127811286
cerr << "ERROR: could not get persistent queue: " << cpp_strerror(-ret) << std::endl;
@@ -11281,6 +11289,67 @@ int main(int argc, const char **argv)
1128111289
encode_json("", stats, formatter.get());
1128211290
formatter->flush(cout);
1128311291
}
11292+
11293+
if (opt_cmd == OPT::PUBSUB_TOPIC_DUMP) {
11294+
if (topic_name.empty()) {
11295+
cerr << "ERROR: topic name was not provided (via --topic)" << std::endl;
11296+
return EINVAL;
11297+
}
11298+
const std::string& account = !account_id.empty() ? account_id : tenant;
11299+
RGWPubSub ps(driver, account, *site);
11300+
11301+
rgw_pubsub_topic topic;
11302+
ret = ps.get_topic(dpp(), topic_name, topic, null_yield, nullptr);
11303+
if (ret < 0) {
11304+
cerr << "ERROR: could not get topic. error: " << cpp_strerror(-ret) << std::endl;
11305+
return -ret;
11306+
}
11307+
11308+
if (topic.dest.persistent_queue.empty()) {
11309+
cerr << "ERROR: topic does not have a persistent queue" << std::endl;
11310+
return ENOENT;
11311+
}
11312+
11313+
auto ioctx = static_cast<rgw::sal::RadosStore*>(driver)->getRados()->get_notif_pool_ctx();
11314+
std::string marker;
11315+
std::string end_marker;
11316+
librados::ObjectReadOperation rop;
11317+
std::vector<cls_queue_entry> queue_entries;
11318+
bool truncated = true;
11319+
formatter->open_array_section("eventEntries");
11320+
while (truncated) {
11321+
bufferlist bl;
11322+
int rc;
11323+
cls_2pc_queue_list_entries(rop, marker, max_entries, &bl, &rc);
11324+
ioctx.operate(topic.dest.persistent_queue, &rop, nullptr);
11325+
if (rc < 0 ) {
11326+
cerr << "ERROR: could not list entries from queue. error: " << cpp_strerror(-ret) << std::endl;
11327+
return -rc;
11328+
}
11329+
rc = cls_2pc_queue_list_entries_result(bl, queue_entries, &truncated, end_marker);
11330+
if (rc < 0) {
11331+
cerr << "ERROR: failed to parse list entries from queue (skipping). error: " << cpp_strerror(-ret) << std::endl;
11332+
return -rc;
11333+
}
11334+
11335+
std::for_each(queue_entries.cbegin(),
11336+
queue_entries.cend(),
11337+
[&formatter](const auto& queue_entry) {
11338+
rgw::notify::event_entry_t event_entry;
11339+
bufferlist::const_iterator iter{&queue_entry.data};
11340+
try {
11341+
event_entry.decode(iter);
11342+
encode_json("", event_entry, formatter.get());
11343+
} catch (const buffer::error& e) {
11344+
cerr << "ERROR: failed to decode queue entry. error: " << e.what() << std::endl;
11345+
}
11346+
});
11347+
formatter->flush(cout);
11348+
marker = end_marker;
11349+
}
11350+
formatter->close_section();
11351+
formatter->flush(cout);
11352+
}
1128411353

1128511354
if (opt_cmd == OPT::SCRIPT_PUT) {
1128611355
if (!str_script_ctx) {

src/rgw/rgw_pubsub.cc

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -378,6 +378,23 @@ void rgw_pubsub_s3_event::dump(Formatter *f) const {
378378
encode_json("opaqueData", opaque_data, f);
379379
}
380380

381+
namespace rgw::notify {
382+
void event_entry_t::dump(Formatter *f) const {
383+
Formatter::ObjectSection s(*f, "entry");
384+
{
385+
Formatter::ObjectSection sub_s(*f, "event");
386+
event.dump(f);
387+
}
388+
encode_json("pushEndpoint", push_endpoint, f);
389+
encode_json("pushEndpointArgs", push_endpoint_args, f);
390+
encode_json("topic", arn_topic, f);
391+
encode_json("creationTime", creation_time, f);
392+
encode_json("TTL", time_to_live, f);
393+
encode_json("maxRetries", max_retries, f);
394+
encode_json("retrySleepDuration", retry_sleep_duration, f);
395+
}
396+
}
397+
381398
void rgw_pubsub_topic::dump(Formatter *f) const
382399
{
383400
encode_json("owner", owner, f);

src/rgw/rgw_pubsub.h

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -672,12 +672,56 @@ class RGWPubSub
672672
};
673673

674674
namespace rgw::notify {
675-
676675
// Denotes that the topic has not overridden the global configurations for (time_to_live / max_retries / retry_sleep_duration)
677676
// defaults: (rgw_topic_persistency_time_to_live / rgw_topic_persistency_max_retries / rgw_topic_persistency_sleep_duration)
678677
constexpr uint32_t DEFAULT_GLOBAL_VALUE = UINT32_MAX;
679678
// Used in case the topic is using the default global value for dumping in a formatter
680679
constexpr static const std::string_view DEFAULT_CONFIG{"None"};
680+
struct event_entry_t {
681+
rgw_pubsub_s3_event event;
682+
std::string push_endpoint;
683+
std::string push_endpoint_args;
684+
std::string arn_topic;
685+
ceph::coarse_real_time creation_time;
686+
uint32_t time_to_live = DEFAULT_GLOBAL_VALUE;
687+
uint32_t max_retries = DEFAULT_GLOBAL_VALUE;
688+
uint32_t retry_sleep_duration = DEFAULT_GLOBAL_VALUE;
689+
690+
void encode(bufferlist& bl) const {
691+
ENCODE_START(3, 1, bl);
692+
encode(event, bl);
693+
encode(push_endpoint, bl);
694+
encode(push_endpoint_args, bl);
695+
encode(arn_topic, bl);
696+
encode(creation_time, bl);
697+
encode(time_to_live, bl);
698+
encode(max_retries, bl);
699+
encode(retry_sleep_duration, bl);
700+
ENCODE_FINISH(bl);
701+
}
702+
703+
void decode(bufferlist::const_iterator& bl) {
704+
DECODE_START(3, bl);
705+
decode(event, bl);
706+
decode(push_endpoint, bl);
707+
decode(push_endpoint_args, bl);
708+
decode(arn_topic, bl);
709+
if (struct_v > 1) {
710+
decode(creation_time, bl);
711+
} else {
712+
creation_time = ceph::coarse_real_clock::zero();
713+
}
714+
if (struct_v > 2) {
715+
decode(time_to_live, bl);
716+
decode(max_retries, bl);
717+
decode(retry_sleep_duration, bl);
718+
}
719+
DECODE_FINISH(bl);
720+
}
721+
722+
void dump(Formatter *f) const;
723+
};
724+
WRITE_CLASS_ENCODER(event_entry_t)
681725
}
682726

683727
std::string topic_to_unique(const std::string& topic,

src/test/cli/radosgw-admin/help.t

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,7 @@
194194
topic get get a bucket notifications topic
195195
topic rm remove a bucket notifications topic
196196
topic stats get a bucket notifications persistent topic stats (i.e. reservations, entries & size)
197+
topic dump dump (in JSON format) all pending bucket notifications of a persistent topic
197198
script put upload a Lua script to a context
198199
script get get the Lua script of a context
199200
script rm remove the Lua scripts of a context

src/test/rgw/bucket_notification/test_bn.py

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3016,6 +3016,93 @@ def test_ps_s3_persistent_topic_stats():
30163016
conn.delete_bucket(bucket_name)
30173017
http_server.close()
30183018

3019+
@attr('basic_test')
3020+
def test_persistent_topic_dump():
3021+
""" test persistent topic dump """
3022+
conn = connection()
3023+
zonegroup = get_config_zonegroup()
3024+
3025+
# create random port for the http server
3026+
host = get_ip()
3027+
port = random.randint(10000, 20000)
3028+
3029+
# create bucket
3030+
bucket_name = gen_bucket_name()
3031+
bucket = conn.create_bucket(bucket_name)
3032+
topic_name = bucket_name + TOPIC_SUFFIX
3033+
3034+
# create s3 topic
3035+
endpoint_address = 'http://'+host+':'+str(port)
3036+
endpoint_args = 'push-endpoint='+endpoint_address+'&persistent=true'+ \
3037+
'&retry_sleep_duration=1'
3038+
topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
3039+
topic_arn = topic_conf.set_config()
3040+
# create s3 notification
3041+
notification_name = bucket_name + NOTIFICATION_SUFFIX
3042+
topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn,
3043+
'Events': []
3044+
}]
3045+
3046+
s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
3047+
response, status = s3_notification_conf.set_config()
3048+
assert_equal(status/100, 2)
3049+
3050+
# create objects in the bucket (async)
3051+
number_of_objects = 20
3052+
client_threads = []
3053+
start_time = time.time()
3054+
for i in range(number_of_objects):
3055+
key = bucket.new_key('key-'+str(i))
3056+
content = str(os.urandom(1024*1024))
3057+
thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
3058+
thr.start()
3059+
client_threads.append(thr)
3060+
[thr.join() for thr in client_threads]
3061+
time_diff = time.time() - start_time
3062+
print('average time for creation + async http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
3063+
3064+
# topic dump
3065+
result = admin(['topic', 'dump', '--topic', topic_name], get_config_cluster())
3066+
assert_equal(result[1], 0)
3067+
parsed_result = json.loads(result[0])
3068+
assert_equal(len(parsed_result), number_of_objects)
3069+
3070+
# delete objects from the bucket
3071+
client_threads = []
3072+
start_time = time.time()
3073+
for key in bucket.list():
3074+
thr = threading.Thread(target = key.delete, args=())
3075+
thr.start()
3076+
client_threads.append(thr)
3077+
[thr.join() for thr in client_threads]
3078+
time_diff = time.time() - start_time
3079+
print('average time for deletion + async http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
3080+
3081+
# topic stats
3082+
result = admin(['topic', 'dump', '--topic', topic_name], get_config_cluster())
3083+
assert_equal(result[1], 0)
3084+
print(result[0])
3085+
parsed_result = json.loads(result[0])
3086+
assert_equal(len(parsed_result), 2*number_of_objects)
3087+
3088+
# start an http server in a separate thread
3089+
http_server = HTTPServerWithEvents((host, port))
3090+
3091+
wait_for_queue_to_drain(topic_name, http_port=port)
3092+
3093+
result = admin(['topic', 'dump', '--topic', topic_name], get_config_cluster())
3094+
assert_equal(result[1], 0)
3095+
parsed_result = json.loads(result[0])
3096+
assert_equal(len(parsed_result), 0)
3097+
3098+
# cleanup
3099+
s3_notification_conf.del_config()
3100+
topic_conf.del_config()
3101+
# delete the bucket
3102+
conn.delete_bucket(bucket_name)
3103+
http_server.close()
3104+
3105+
30193106
def ps_s3_persistent_topic_configs(persistency_time, config_dict):
30203107
conn = connection()
30213108
zonegroup = get_config_zonegroup()

0 commit comments

Comments
 (0)