Skip to content

Commit f136742

Browse files
authored
Merge pull request ceph#57536 from kchheda3/wip-fix-persistent-queue-regression
rgw/notification: Store the value of `persistent_queue` for existing topics and continue commiting events for all topics subscribed to given bucket Reviewed-by: Yuval Lifshitz <[email protected]>
2 parents 3b38160 + 9f4ea76 commit f136742

File tree

3 files changed

+23
-3
lines changed

3 files changed

+23
-3
lines changed

src/rgw/driver/rados/rgw_notify.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1111,7 +1111,7 @@ int publish_reserve(const DoutPrefixProvider* dpp,
11111111
// either the topic is deleted but the corresponding notification
11121112
// still exist or in v2 mode the notification could have synced first
11131113
// but topic is not synced yet.
1114-
return 0;
1114+
continue;
11151115
}
11161116
ldpp_dout(res.dpp, 1)
11171117
<< "WARN: Using the stored topic from bucket notification struct."

src/rgw/rgw_rest_pubsub.cc

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -422,6 +422,8 @@ void RGWPSCreateTopicOp::execute(optional_yield y) {
422422
<< op_ret << dendl;
423423
return;
424424
}
425+
} else if (already_persistent) { // redundant call to CreateTopic
426+
dest.persistent_queue = topic->dest.persistent_queue;
425427
}
426428
const RGWPubSub ps(driver, get_account_or_tenant(s->owner.id), *s->penv.site);
427429
op_ret = ps.create_topic(this, topic_name, dest, topic_arn.to_string(),
@@ -882,7 +884,7 @@ void RGWPSSetTopicAttributesOp::execute(optional_yield y) {
882884
<< op_ret << dendl;
883885
return;
884886
}
885-
} else if (already_persistent) {
887+
} else if (already_persistent && !topic_needs_queue(dest)) {
886888
// changing the persistent topic to non-persistent.
887889
op_ret = driver->remove_persistent_topic(this, y, result.dest.persistent_queue);
888890
if (op_ret != -ENOENT && op_ret < 0) {

src/test/rgw/bucket_notification/test_bn.py

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -759,7 +759,7 @@ def test_ps_s3_topic_admin_on_master():
759759
assert_equal(topic_arn2,
760760
'arn:aws:sns:' + zonegroup + ':' + tenant + ':' + topic_name + '_2')
761761
endpoint_address = 'http://127.0.0.1:9002'
762-
endpoint_args = 'push-endpoint='+endpoint_address
762+
endpoint_args = 'push-endpoint=' + endpoint_address + '&persistent=true'
763763
topic_conf3 = PSTopicS3(conn, topic_name+'_3', zonegroup, endpoint_args=endpoint_args)
764764
topic_arn3 = topic_conf3.set_config()
765765
assert_equal(topic_arn3,
@@ -770,6 +770,24 @@ def test_ps_s3_topic_admin_on_master():
770770
assert_equal(parsed_result['arn'], topic_arn3)
771771
matches = [tenant, UID_PREFIX]
772772
assert_true( all([x in parsed_result['owner'] for x in matches]))
773+
assert_equal(parsed_result['dest']['persistent_queue'],
774+
tenant + ":" + topic_name + '_3')
775+
776+
# recall CreateTopic and verify the owner and persistent_queue remain same.
777+
topic_conf3 = PSTopicS3(conn, topic_name + '_3', zonegroup,
778+
endpoint_args=endpoint_args)
779+
topic_arn3 = topic_conf3.set_config()
780+
assert_equal(topic_arn3,
781+
'arn:aws:sns:' + zonegroup + ':' + tenant + ':' + topic_name + '_3')
782+
# get topic 3 via commandline
783+
result = admin(
784+
['topic', 'get', '--topic', topic_name + '_3', '--tenant', tenant],
785+
get_config_cluster())
786+
parsed_result = json.loads(result[0])
787+
assert_equal(parsed_result['arn'], topic_arn3)
788+
assert_true(all([x in parsed_result['owner'] for x in matches]))
789+
assert_equal(parsed_result['dest']['persistent_queue'],
790+
tenant + ":" + topic_name + '_3')
773791

774792
# delete topic 3
775793
remove_topic(topic_name + '_3', tenant)

0 commit comments

Comments
 (0)