Skip to content

Commit c51a3bf

Browse files
authored
Merge pull request ceph#54459 from AliMasarweh/wip-alimasa-2pc-remove-issue
RGW: make new rados support old RGW 2pc remove Reviewed-by: yuvalif, cbodley
2 parents 8300186 + 855098f commit c51a3bf

File tree

9 files changed

+261
-20
lines changed

9 files changed

+261
-20
lines changed

src/cls/2pc_queue/cls_2pc_queue.cc

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -578,6 +578,19 @@ static int cls_2pc_queue_list_entries(cls_method_context_t hctx, bufferlist *in,
578578
return 0;
579579
}
580580

581+
static int cls_2pc_queue_count_entries(cls_method_context_t hctx, cls_queue_list_op& op, cls_queue_head& head,
582+
uint32_t& entries_to_remove)
583+
{
584+
cls_queue_list_ret op_ret;
585+
auto ret = queue_list_entries(hctx, op, op_ret, head);
586+
if (ret < 0) {
587+
return ret;
588+
}
589+
590+
entries_to_remove = op_ret.entries.size();
591+
return 0;
592+
}
593+
581594
static int cls_2pc_queue_remove_entries(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
582595
{
583596
auto in_iter = in->cbegin();
@@ -594,6 +607,21 @@ static int cls_2pc_queue_remove_entries(cls_method_context_t hctx, bufferlist *i
594607
if (ret < 0) {
595608
return ret;
596609
}
610+
611+
// Old RGW is running, and it sent cls_queue_remove_op instead of cls_2pc_queue_remove_op
612+
if (rem_2pc_op.entries_to_remove == 0) {
613+
CLS_LOG(10, "INFO: cls_2pc_queue_remove_entries: incompatible RGW with rados, counting entries to remove...");
614+
cls_queue_list_op list_op;
615+
list_op.max = std::numeric_limits<uint64_t>::max(); // max length because endmarker is the stopping condition.
616+
list_op.end_marker = rem_2pc_op.end_marker;
617+
ret = cls_2pc_queue_count_entries(hctx, list_op, head, rem_2pc_op.entries_to_remove);
618+
if (ret < 0) {
619+
CLS_LOG(1, "ERROR: cls_2pc_queue_count_entries: returned: %d", ret);
620+
return ret;
621+
}
622+
CLS_LOG(10, "INFO: cls_2pc_queue_count_entries: counted: %u", rem_2pc_op.entries_to_remove);
623+
}
624+
597625
cls_queue_remove_op rem_op;
598626
rem_op.end_marker = std::move(rem_2pc_op.end_marker);
599627
ret = queue_remove_entries(hctx, rem_op, head);

src/cls/2pc_queue/cls_2pc_queue_client.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,5 +87,8 @@ void cls_2pc_queue_expire_reservations(librados::ObjectWriteOperation& op,
8787
ceph::coarse_real_time stale_time);
8888

8989
// remove all entries up to the given marker
90-
void cls_2pc_queue_remove_entries(librados::ObjectWriteOperation& op, const std::string& end_marker, uint64_t entries_to_remove);
90+
// if there is no race condition, providing the number of entries_to_remove is recommended, as it is more efficient.
91+
// if there is no guarantee against two clienst deleting entries at the same time, you can leave the entries_to_remove unprovided or input zero entries_to_remove
92+
// the function will count how many entries it needs to removed
93+
void cls_2pc_queue_remove_entries(librados::ObjectWriteOperation& op, const std::string& end_marker, uint64_t entries_to_remove=0);
9194

src/cls/2pc_queue/cls_2pc_queue_ops.h

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -118,21 +118,23 @@ WRITE_CLASS_ENCODER(cls_2pc_queue_reservations_ret)
118118

119119
struct cls_2pc_queue_remove_op {
120120
std::string end_marker;
121-
uint32_t entries_to_remove;
121+
uint32_t entries_to_remove = 0;
122122

123123
cls_2pc_queue_remove_op() {}
124124

125125
void encode(ceph::buffer::list& bl) const {
126-
ENCODE_START(1, 1, bl);
126+
ENCODE_START(2, 1, bl);
127127
encode(end_marker, bl);
128128
encode(entries_to_remove, bl);
129129
ENCODE_FINISH(bl);
130130
}
131131

132132
void decode(ceph::buffer::list::const_iterator& bl) {
133-
DECODE_START(1, bl);
133+
DECODE_START(2, bl);
134134
decode(end_marker, bl);
135-
decode(entries_to_remove, bl);
135+
if (struct_v > 1) {
136+
decode(entries_to_remove, bl);
137+
}
136138
DECODE_FINISH(bl);
137139
}
138140
};

src/cls/queue/cls_queue_client.cc

Lines changed: 29 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -48,16 +48,9 @@ void cls_queue_enqueue(ObjectWriteOperation& op, uint32_t expiration_secs, vecto
4848
op.exec(QUEUE_CLASS, QUEUE_ENQUEUE, in);
4949
}
5050

51-
int cls_queue_list_entries(IoCtx& io_ctx, const string& oid, const string& marker, uint32_t max,
52-
vector<cls_queue_entry>& entries,
53-
bool *truncated, string& next_marker)
51+
int cls_queue_list_entries_inner(IoCtx& io_ctx, const string& oid, vector<cls_queue_entry>& entries,
52+
bool *truncated, string& next_marker, bufferlist& in, bufferlist& out)
5453
{
55-
bufferlist in, out;
56-
cls_queue_list_op op;
57-
op.start_marker = marker;
58-
op.max = max;
59-
encode(op, in);
60-
6154
int r = io_ctx.exec(oid, QUEUE_CLASS, QUEUE_LIST_ENTRIES, in, out);
6255
if (r < 0)
6356
return r;
@@ -78,6 +71,33 @@ int cls_queue_list_entries(IoCtx& io_ctx, const string& oid, const string& marke
7871
return 0;
7972
}
8073

74+
int cls_queue_list_entries(IoCtx& io_ctx, const string& oid, const string& marker, uint32_t max,
75+
vector<cls_queue_entry>& entries,
76+
bool *truncated, string& next_marker)
77+
{
78+
bufferlist in, out;
79+
cls_queue_list_op op;
80+
op.start_marker = marker;
81+
op.max = max;
82+
encode(op, in);
83+
84+
return cls_queue_list_entries_inner(io_ctx, oid, entries, truncated, next_marker, in, out);
85+
}
86+
87+
int cls_queue_list_entries(IoCtx& io_ctx, const string& oid, const string& marker, const string& end_marker,
88+
vector<cls_queue_entry>& entries,
89+
bool *truncated, string& next_marker)
90+
{
91+
bufferlist in, out;
92+
cls_queue_list_op op;
93+
op.start_marker = marker;
94+
op.max = std::numeric_limits<uint64_t>::max();
95+
op.end_marker = end_marker;
96+
encode(op, in);
97+
98+
return cls_queue_list_entries_inner(io_ctx, oid, entries, truncated, next_marker, in, out);
99+
}
100+
81101
void cls_queue_remove_entries(ObjectWriteOperation& op, const string& end_marker)
82102
{
83103
bufferlist in, out;

src/cls/queue/cls_queue_client.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ int cls_queue_get_capacity(librados::IoCtx& io_ctx, const std::string& oid, uint
1111
void cls_queue_enqueue(librados::ObjectWriteOperation& op, uint32_t expiration_secs, std::vector<bufferlist> bl_data_vec);
1212
int cls_queue_list_entries(librados::IoCtx& io_ctx, const std::string& oid, const std::string& marker, uint32_t max,
1313
std::vector<cls_queue_entry>& entries, bool *truncated, std::string& next_marker);
14+
int cls_queue_list_entries(librados::IoCtx& io_ctx, const std::string& oid, const std::string& marker, const std::string& end_marker,
15+
std::vector<cls_queue_entry>& entries, bool *truncated, std::string& next_marker);
1416
void cls_queue_remove_entries(librados::ObjectWriteOperation& op, const std::string& end_marker);
1517

1618
#endif

src/cls/queue/cls_queue_ops.h

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,20 +54,25 @@ WRITE_CLASS_ENCODER(cls_queue_enqueue_op)
5454
struct cls_queue_list_op {
5555
uint64_t max;
5656
std::string start_marker;
57+
std::string end_marker;
5758

5859
cls_queue_list_op() {}
5960

6061
void encode(ceph::buffer::list& bl) const {
61-
ENCODE_START(1, 1, bl);
62+
ENCODE_START(2, 1, bl);
6263
encode(max, bl);
6364
encode(start_marker, bl);
65+
encode(end_marker, bl);
6466
ENCODE_FINISH(bl);
6567
}
6668

6769
void decode(ceph::buffer::list::const_iterator& bl) {
68-
DECODE_START(1, bl);
70+
DECODE_START(2, bl);
6971
decode(max, bl);
7072
decode(start_marker, bl);
73+
if (struct_v > 1) {
74+
decode(end_marker, bl);
75+
}
7176
DECODE_FINISH(bl);
7277
}
7378
};

src/cls/queue/cls_queue_src.cc

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -400,6 +400,10 @@ int queue_list_entries(cls_method_context_t hctx, const cls_queue_list_op& op, c
400400
CLS_LOG(10, "INFO: queue_list_entries(): not enough data to read data, breaking out!");
401401
break;
402402
}
403+
if (!op.end_marker.empty() && entry.marker == op.end_marker) {
404+
last_marker = entry.marker;
405+
break;
406+
}
403407
op_ret.entries.emplace_back(entry);
404408
// Resetting some values
405409
offset_populated = false;
@@ -414,11 +418,17 @@ int queue_list_entries(cls_method_context_t hctx, const cls_queue_list_op& op, c
414418
}
415419
} while(index < bl_chunk.length());
416420

417-
CLS_LOG(10, "INFO: num_ops: %lu and op.max is %lu\n", num_ops, op.max);
421+
CLS_LOG(10, "INFO: num_ops: %lu and op.max is %lu, last_marker: %s and op.end_marker is %s\n",
422+
num_ops, op.max, last_marker.c_str(), op.end_marker.c_str());
418423

419-
if (num_ops == op.max) {
420-
next_marker = cls_queue_marker{(entry_start_offset + index), gen};
421-
CLS_LOG(10, "INFO: queue_list_entries(): num_ops is same as op.max, hence breaking out from outer loop with next offset: %lu", next_marker.offset);
424+
if (num_ops == op.max || (!op.end_marker.empty() && op.end_marker == last_marker)) {
425+
if (!op.end_marker.empty()) {
426+
next_marker.from_str(op.end_marker.c_str());
427+
} else {
428+
next_marker = cls_queue_marker{(entry_start_offset + index), gen};
429+
}
430+
CLS_LOG(10, "INFO: queue_list_entries(): either num_ops is same as op.max or last_marker is same as op.end_marker, "
431+
"hence breaking out from outer loop with next offset: %lu", next_marker.offset);
422432
break;
423433
}
424434

src/test/cls_2pc_queue/test_cls_2pc_queue.cc

Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
#include "gtest/gtest.h"
1212
#include "test/librados/test_cxx.h"
1313
#include "global/global_context.h"
14+
#include "cls/2pc_queue/cls_2pc_queue_const.h"
1415

1516
#include <string>
1617
#include <vector>
@@ -173,6 +174,131 @@ TEST_F(TestCls2PCQueue, Commit)
173174
ASSERT_EQ(reservations.size(), 0);
174175
}
175176

177+
TEST_F(TestCls2PCQueue, Stats)
178+
{
179+
const std::string queue_name = __PRETTY_FUNCTION__;
180+
const auto max_size = 1024*1024*128;
181+
const auto number_of_ops = 200U;
182+
const auto number_of_elements = 23U;
183+
auto total_committed_elements = 0U;
184+
librados::ObjectWriteOperation op;
185+
op.create(true);
186+
cls_2pc_queue_init(op, queue_name, max_size);
187+
ASSERT_EQ(0, ioctx.operate(queue_name, &op));
188+
189+
for (auto i = 0U; i < number_of_ops; ++i) {
190+
const std::string element_prefix("op-" +to_string(i) + "-element-");
191+
auto total_size = 0UL;
192+
std::vector<bufferlist> data(number_of_elements);
193+
// create vector of buffer lists
194+
std::generate(data.begin(), data.end(), [j = 0, &element_prefix, &total_size] () mutable {
195+
bufferlist bl;
196+
bl.append(element_prefix + to_string(j++));
197+
total_size += bl.length();
198+
return bl;
199+
});
200+
201+
cls_2pc_reservation::id_t res_id;
202+
ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, total_size, number_of_elements, res_id), 0);
203+
ASSERT_NE(res_id, cls_2pc_reservation::NO_ID);
204+
cls_2pc_queue_commit(op, data, res_id);
205+
ASSERT_EQ(0, ioctx.operate(queue_name, &op));
206+
207+
total_committed_elements += number_of_elements;
208+
uint32_t committed_entries;
209+
uint64_t size;
210+
211+
ASSERT_EQ(cls_2pc_queue_get_topic_stats(ioctx, queue_name, committed_entries, size), 0);
212+
ASSERT_EQ(committed_entries, total_committed_elements);
213+
}
214+
cls_2pc_reservations reservations;
215+
ASSERT_EQ(0, cls_2pc_queue_list_reservations(ioctx, queue_name, reservations));
216+
ASSERT_EQ(reservations.size(), 0);
217+
}
218+
219+
TEST_F(TestCls2PCQueue, UpgradeFromReef)
220+
{
221+
const std::string queue_name = __PRETTY_FUNCTION__;
222+
const auto max_size = 1024*1024*128;
223+
const auto number_of_ops = 200U;
224+
const auto number_of_elements = 23U;
225+
auto total_committed_elements = 0U;
226+
librados::ObjectWriteOperation wop;
227+
wop.create(true);
228+
cls_2pc_queue_init(wop, queue_name, max_size);
229+
ASSERT_EQ(0, ioctx.operate(queue_name, &wop));
230+
231+
for (auto i = 0U; i < number_of_ops; ++i) {
232+
const std::string element_prefix("wop-" +to_string(i) + "-element-");
233+
auto total_size = 0UL;
234+
std::vector<bufferlist> data(number_of_elements);
235+
// create vector of buffer lists
236+
std::generate(data.begin(), data.end(), [j = 0, &element_prefix, &total_size] () mutable {
237+
bufferlist bl;
238+
bl.append(element_prefix + to_string(j++));
239+
total_size += bl.length();
240+
return bl;
241+
});
242+
243+
cls_2pc_reservation::id_t res_id;
244+
ASSERT_EQ(cls_2pc_queue_reserve(ioctx, queue_name, total_size, number_of_elements, res_id), 0);
245+
ASSERT_NE(res_id, cls_2pc_reservation::NO_ID);
246+
cls_2pc_queue_commit(wop, data, res_id);
247+
ASSERT_EQ(0, ioctx.operate(queue_name, &wop));
248+
249+
total_committed_elements += number_of_elements;
250+
uint32_t committed_entries;
251+
uint64_t size;
252+
253+
ASSERT_EQ(cls_2pc_queue_get_topic_stats(ioctx, queue_name, committed_entries, size), 0);
254+
ASSERT_EQ(committed_entries, total_committed_elements);
255+
}
256+
cls_2pc_reservations reservations;
257+
ASSERT_EQ(0, cls_2pc_queue_list_reservations(ioctx, queue_name, reservations));
258+
ASSERT_EQ(reservations.size(), 0);
259+
260+
constexpr auto max_elements = 42U;
261+
std::string marker;
262+
std::string end_marker;
263+
librados::ObjectReadOperation rop;
264+
auto consume_count = 0U;
265+
std::vector<cls_queue_entry> entries;
266+
bool truncated = true;
267+
268+
auto simulate_reef_cls_2pc_queue_remove_entries = [](librados::ObjectWriteOperation& wop, const std::string& end_marker) {
269+
bufferlist in;
270+
cls_queue_remove_op rem_op;
271+
rem_op.end_marker = end_marker;
272+
encode(rem_op, in);
273+
wop.exec(TPC_QUEUE_CLASS, TPC_QUEUE_REMOVE_ENTRIES, in);
274+
};
275+
276+
while (truncated) {
277+
bufferlist bl;
278+
int rc;
279+
cls_2pc_queue_list_entries(rop, marker, max_elements, &bl, &rc);
280+
ASSERT_EQ(0, ioctx.operate(queue_name, &rop, nullptr));
281+
ASSERT_EQ(rc, 0);
282+
ASSERT_EQ(cls_2pc_queue_list_entries_result(bl, entries, &truncated, end_marker), 0);
283+
284+
consume_count += entries.size();
285+
// simulating reef cls_2pc_queue_remove_entries with cls_queue_remove_op
286+
simulate_reef_cls_2pc_queue_remove_entries(wop, end_marker);
287+
marker = end_marker;
288+
total_committed_elements -= entries.size();
289+
}
290+
291+
// execute all delete operations in a batch
292+
ASSERT_EQ(0, ioctx.operate(queue_name, &wop));
293+
ASSERT_EQ(consume_count, number_of_ops*number_of_elements);
294+
295+
uint32_t entries_number;
296+
uint64_t size;
297+
ASSERT_EQ(cls_2pc_queue_get_topic_stats(ioctx, queue_name, entries_number, size), 0);
298+
ASSERT_EQ(total_committed_elements, 0);
299+
ASSERT_EQ(entries_number, 0);
300+
}
301+
176302
TEST_F(TestCls2PCQueue, Abort)
177303
{
178304
const std::string queue_name = __PRETTY_FUNCTION__;

src/test/cls_queue/test_cls_queue.cc

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,51 @@ TEST_F(TestClsQueue, List)
137137
ASSERT_EQ(total_elements, number_of_ops*number_of_elements);
138138
}
139139

140+
TEST_F(TestClsQueue, ListByEndMarker)
141+
{
142+
const std::string queue_name = "my-queue";
143+
const uint64_t queue_size = 1024*1024;
144+
librados::ObjectWriteOperation op;
145+
op.create(true);
146+
cls_queue_init(op, queue_name, queue_size);
147+
ASSERT_EQ(0, ioctx.operate(queue_name, &op));
148+
const auto number_of_ops = 10;
149+
const auto number_of_elements = 100;
150+
151+
// test multiple enqueues
152+
test_enqueue(queue_name, number_of_ops, number_of_elements, 0);
153+
154+
const auto max_elements = 42;
155+
std::string marker, end_marker;
156+
bool truncated = false;
157+
std::string max_op_next_marker;
158+
auto total_elements = 0;
159+
do {
160+
std::vector<cls_queue_entry> entries;
161+
auto ret = cls_queue_list_entries(ioctx, queue_name, marker, max_elements, entries, &truncated, max_op_next_marker);
162+
ASSERT_EQ(0, ret);
163+
end_marker = max_op_next_marker;
164+
165+
std::vector<cls_queue_entry> end_marker_entries;
166+
std::string end_marker_next_marker;
167+
bool end_marker_truncated = false;
168+
ret = cls_queue_list_entries(ioctx, queue_name, marker, end_marker, end_marker_entries,
169+
&end_marker_truncated, end_marker_next_marker);
170+
ASSERT_EQ(0, ret);
171+
172+
ASSERT_EQ(end_marker_next_marker, end_marker);
173+
ASSERT_EQ(end_marker_entries.size(), entries.size());
174+
for (auto i = 0U; i < end_marker_entries.size() && i < entries.size(); ++i) {
175+
ASSERT_EQ(end_marker_entries[i].marker, entries[i].marker);
176+
}
177+
178+
marker = max_op_next_marker;
179+
total_elements += entries.size();
180+
} while (truncated);
181+
182+
ASSERT_EQ(total_elements, number_of_ops*number_of_elements);
183+
}
184+
140185
TEST_F(TestClsQueue, Dequeue)
141186
{
142187
const std::string queue_name = "my-queue";

0 commit comments

Comments
 (0)