Skip to content

Commit ed37233

Browse files
authored
Merge pull request ceph#62887 from clwluvw/copy-obj-remote
rgw: report copy obj progress from the frontend CR Reviewed-by: Adam C. Emerson <[email protected]>
2 parents 1aca146 + 393f871 commit ed37233

File tree

4 files changed

+82
-26
lines changed

4 files changed

+82
-26
lines changed

src/rgw/driver/rados/rgw_rados.cc

Lines changed: 25 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -4989,19 +4989,31 @@ int RGWRados::copy_obj(RGWObjectCtx& src_obj_ctx,
49894989
ldpp_dout(dpp, 5) << "Copy object " << src_obj.bucket << ":" << src_obj.get_oid() << " => " << dest_obj.bucket << ":" << dest_obj.get_oid() << dendl;
49904990

49914991
if (remote_src || !source_zone.empty()) {
4992-
// null_yield resolves a crash when calling progress_cb(), because the beast
4993-
// frontend tried to use this same yield context to write the progress
4994-
// response to the frontend socket. call fetch_remote_obj() synchronously so
4995-
// that only one thread tries to suspend that coroutine
4996-
const req_context rctx{dpp, null_yield, nullptr};
4997-
const rgw_owner remote_user_owner(remote_user);
4998-
return fetch_remote_obj(dest_obj_ctx, &remote_user_owner, &remote_user, info, source_zone,
4999-
dest_obj, src_obj, dest_bucket_info, &src_bucket_info,
5000-
dest_placement, src_mtime, mtime, mod_ptr,
5001-
unmod_ptr, high_precision_time,
5002-
if_match, if_nomatch, attrs_mod, copy_if_newer, attrs, category,
5003-
olh_epoch, delete_at, ptag, petag, progress_cb, progress_data, rctx,
5004-
nullptr /* filter */, stat_follow_olh, stat_dest_obj, std::nullopt);
4992+
RGWCopyObj *op = static_cast<RGWCopyObj *>(progress_data);
4993+
auto& progress_tracker = op->get_progress_tracker();
4994+
4995+
int ret = 0;
4996+
boost::asio::spawn(driver->get_io_context(),
4997+
[&](boost::asio::yield_context yield) {
4998+
const req_context rctx{dpp, yield, nullptr};
4999+
const rgw_owner remote_user_owner(remote_user);
5000+
ret = fetch_remote_obj(dest_obj_ctx, &remote_user_owner, &remote_user, info, source_zone,
5001+
dest_obj, src_obj, dest_bucket_info, &src_bucket_info,
5002+
dest_placement, src_mtime, mtime, mod_ptr,
5003+
unmod_ptr, high_precision_time,
5004+
if_match, if_nomatch, attrs_mod, copy_if_newer, attrs, category,
5005+
olh_epoch, delete_at, ptag, petag, progress_cb, progress_data, rctx,
5006+
nullptr /* filter */, stat_follow_olh, stat_dest_obj, std::nullopt);
5007+
5008+
progress_tracker.done = true;
5009+
progress_tracker.cv.notify_one();
5010+
}, [] (std::exception_ptr eptr) {
5011+
if (eptr) std::rethrow_exception(eptr);
5012+
});
5013+
5014+
op->progress_cb_handler();
5015+
5016+
return ret;
50055017
}
50065018

50075019
map<string, bufferlist> src_attrs;

src/rgw/rgw_op.cc

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5951,11 +5951,31 @@ void RGWCopyObj::progress_cb(off_t ofs)
59515951
return;
59525952
}
59535953

5954-
send_partial_response(ofs);
5954+
std::lock_guard<std::mutex> l(progress_tracker->mtx);
5955+
progress_tracker->ofs_queue.push(ofs);
5956+
progress_tracker->cv.notify_one();
59555957

59565958
last_ofs = ofs;
59575959
}
59585960

5961+
void RGWCopyObj::progress_cb_handler()
5962+
{
5963+
std::unique_lock<std::mutex> l(progress_tracker->mtx);
5964+
while(!progress_tracker->done || !progress_tracker->ofs_queue.empty()) {
5965+
progress_tracker->cv.wait(l, [&]() {
5966+
return progress_tracker->done || !progress_tracker->ofs_queue.empty();
5967+
});
5968+
5969+
while (!progress_tracker->ofs_queue.empty()) {
5970+
auto ofs = progress_tracker->ofs_queue.front();
5971+
progress_tracker->ofs_queue.pop();
5972+
l.unlock();
5973+
send_partial_response(ofs);
5974+
l.lock();
5975+
}
5976+
}
5977+
}
5978+
59595979
void RGWCopyObj::pre_exec()
59605980
{
59615981
rgw_bucket_object_pre_exec(s);

src/rgw/rgw_op.h

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1611,6 +1611,17 @@ class RGWCopyObj : public RGWOp {
16111611
RGWObjectRetention *obj_retention;
16121612
RGWObjectLegalHold *obj_legal_hold;
16131613

1614+
// remote copy progress helper
1615+
struct ProgressTracker {
1616+
std::mutex mtx;
1617+
std::condition_variable cv;
1618+
std::queue<off_t> ofs_queue;
1619+
std::atomic<bool> done{false};
1620+
1621+
ProgressTracker() {}
1622+
};
1623+
std::optional<ProgressTracker> progress_tracker;
1624+
16141625
int init_common();
16151626

16161627
public:
@@ -1651,6 +1662,13 @@ class RGWCopyObj : public RGWOp {
16511662
void pre_exec() override;
16521663
void execute(optional_yield y) override;
16531664
void progress_cb(off_t ofs);
1665+
void progress_cb_handler();
1666+
ProgressTracker& get_progress_tracker() {
1667+
if (!progress_tracker) {
1668+
progress_tracker.emplace();
1669+
}
1670+
return *progress_tracker;
1671+
}
16541672

16551673
virtual int check_storage_class(const rgw_placement_rule& src_placement) {
16561674
return 0;

src/test/rgw/rgw_multi/tests.py

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -5243,8 +5243,6 @@ def test_copy_obj_between_zonegroups(zonegroup):
52435243
source_bucket = source_zone.create_bucket(gen_bucket_name())
52445244

52455245
objname = 'dummy'
5246-
k = new_key(source_zone, source_bucket.name, objname)
5247-
k.set_contents_from_string('foo')
52485246

52495247
for zg in realm.current_period.zonegroups:
52505248
if zg.name == zonegroup.name:
@@ -5254,16 +5252,24 @@ def test_copy_obj_between_zonegroups(zonegroup):
52545252
dest_bucket = dest_zone.create_bucket(gen_bucket_name())
52555253
realm_meta_checkpoint(realm)
52565254

5257-
# copy object
5258-
dest_zone.s3_client.copy_object(
5259-
Bucket=dest_bucket.name,
5260-
CopySource=f'{source_bucket.name}/{objname}',
5261-
Key=objname
5262-
)
5263-
5264-
# check that object exists in destination bucket
5265-
k = get_key(dest_zone, dest_bucket, objname)
5266-
assert_equal(k.get_contents_as_string().decode('utf-8'), 'foo')
5255+
# try object sizes of 4K and 8MiB
5256+
# 4K to test no progress case
5257+
# 8MiB to test progress case
5258+
obj_sizes = [4096, 8 * 1024 * 1024]
5259+
for size in obj_sizes:
5260+
k = new_key(source_zone, source_bucket.name, objname)
5261+
k.set_contents_from_string('x' * size)
5262+
5263+
# copy object
5264+
dest_zone.s3_client.copy_object(
5265+
Bucket=dest_bucket.name,
5266+
CopySource=f'{source_bucket.name}/{objname}',
5267+
Key=objname
5268+
)
5269+
5270+
# check that object exists in destination bucket
5271+
k = get_key(dest_zone, dest_bucket, objname)
5272+
assert_equal(k.get_contents_as_string().decode('utf-8'), 'x' * size)
52675273

52685274
@allow_bucket_replication
52695275
def test_bucket_replication_alt_user_delete_forbidden():

0 commit comments

Comments
 (0)