Skip to content

Commit 263f13f

Browse files
committed
rgw/logging: add error message when log_record fails
when log_record fails in journal mode due to issues in the target bucket, the result code that the client get will be confusing, since there is no indication that the issue is wit hte target bucket and not the source bucket on which the client was operating. the HTTP error message will be used to convey this information. Fixes: https://tracker.ceph.com/issues/72543 Signed-off-by: Yuval Lifshitz <[email protected]>
1 parent ecc2be5 commit 263f13f

File tree

3 files changed

+102
-56
lines changed

3 files changed

+102
-56
lines changed

src/rgw/rgw_bucket_logging.cc

Lines changed: 80 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -288,19 +288,19 @@ int new_logging_object(const configuration& conf,
288288
int ret = target_bucket->set_logging_object_name(obj_name, conf.target_prefix, y, dpp, (old_name == std::nullopt), objv_tracker);
289289
if (ret == -EEXIST || ret == -ECANCELED) {
290290
if (ret = target_bucket->get_logging_object_name(obj_name, conf.target_prefix, y, dpp, nullptr); ret < 0) {
291-
ldpp_dout(dpp, 1) << "ERROR: failed to get name of logging object of bucket '" <<
291+
ldpp_dout(dpp, 1) << "ERROR: failed to get name of logging object of logging bucket '" <<
292292
target_bucket_id << "' and prefix '" << conf.target_prefix << "', ret = " << ret << dendl;
293293
return ret;
294294
}
295-
ldpp_dout(dpp, 20) << "INFO: name already set. got name of logging object '" << obj_name << "' of bucket '" <<
295+
ldpp_dout(dpp, 20) << "INFO: name already set. got name of logging object '" << obj_name << "' of logging bucket '" <<
296296
target_bucket_id << "' and prefix '" << conf.target_prefix << "'" << dendl;
297297
return -ECANCELED;
298298
} else if (ret < 0) {
299-
ldpp_dout(dpp, 1) << "ERROR: failed to write name of logging object '" << obj_name << "' of bucket '" <<
299+
ldpp_dout(dpp, 1) << "ERROR: failed to write name of logging object '" << obj_name << "' of logging bucket '" <<
300300
target_bucket_id << "'. ret = " << ret << dendl;
301301
return ret;
302302
}
303-
ldpp_dout(dpp, 20) << "INFO: wrote name of logging object '" << obj_name << "' of bucket '" <<
303+
ldpp_dout(dpp, 20) << "INFO: wrote name of logging object '" << obj_name << "' of logging bucket '" <<
304304
target_bucket_id << "'" << dendl;
305305
return 0;
306306
}
@@ -315,7 +315,7 @@ int commit_logging_object(const configuration& conf,
315315
std::string target_tenant_name;
316316
int ret = rgw_parse_url_bucket(conf.target_bucket, tenant_name, target_tenant_name, target_bucket_name);
317317
if (ret < 0) {
318-
ldpp_dout(dpp, 1) << "ERROR: failed to parse target bucket '" << conf.target_bucket << "' when committing logging object, ret = "
318+
ldpp_dout(dpp, 1) << "ERROR: failed to parse logging bucket '" << conf.target_bucket << "' when committing logging object, ret = "
319319
<< ret << dendl;
320320
return ret;
321321
}
@@ -324,7 +324,7 @@ int commit_logging_object(const configuration& conf,
324324
ret = driver->load_bucket(dpp, target_bucket_id,
325325
&target_bucket, y);
326326
if (ret < 0) {
327-
ldpp_dout(dpp, 1) << "ERROR: failed to get target logging bucket '" << target_bucket_id << "' when committing logging object, ret = "
327+
ldpp_dout(dpp, 1) << "ERROR: failed to get logging bucket '" << target_bucket_id << "' when committing logging object, ret = "
328328
<< ret << dendl;
329329
return ret;
330330
}
@@ -338,12 +338,12 @@ int commit_logging_object(const configuration& conf,
338338
std::string* last_committed) {
339339
std::string obj_name;
340340
if (const int ret = target_bucket->get_logging_object_name(obj_name, conf.target_prefix, y, dpp, nullptr); ret < 0) {
341-
ldpp_dout(dpp, 1) << "ERROR: failed to get name of logging object of bucket '" <<
341+
ldpp_dout(dpp, 1) << "ERROR: failed to get name of logging object of logging bucket '" <<
342342
target_bucket->get_key() << "'. ret = " << ret << dendl;
343343
return ret;
344344
}
345345
if (const int ret = target_bucket->commit_logging_object(obj_name, y, dpp, conf.target_prefix, last_committed); ret < 0) {
346-
ldpp_dout(dpp, 1) << "ERROR: failed to commit logging object '" << obj_name << "' of bucket '" <<
346+
ldpp_dout(dpp, 1) << "ERROR: failed to commit logging object '" << obj_name << "' of logging bucket '" <<
347347
target_bucket->get_key() << "'. ret = " << ret << dendl;
348348
return ret;
349349
}
@@ -359,19 +359,20 @@ int rollover_logging_object(const configuration& conf,
359359
optional_yield y,
360360
bool must_commit,
361361
RGWObjVersionTracker* objv_tracker,
362-
std::string* last_committed) {
362+
std::string* last_committed,
363+
std::string* err_message) {
363364
std::string target_bucket_name;
364365
std::string target_tenant_name;
365366
std::ignore = rgw_parse_url_bucket(conf.target_bucket, target_bucket->get_tenant(), target_tenant_name, target_bucket_name);
366367
if (target_bucket_name != target_bucket->get_name() || target_tenant_name != target_bucket->get_tenant()) {
367368
ldpp_dout(dpp, 1) << "ERROR: logging bucket name mismatch. conf= '" << conf.target_bucket <<
368369
"', bucket= '" << target_bucket->get_key() << "'" << dendl;
369-
return -EINVAL;
370+
return -EINVAL; // this should never happen
370371
}
371372

372373
auto old_obj = obj_name.empty() ? std::nullopt : std::optional<std::string>(obj_name);
373374

374-
auto handle_error = [&dpp, &old_obj, &target_bucket](int ret) {
375+
auto handle_error = [&dpp, &old_obj, &target_bucket, err_message](int ret) {
375376
if (ret == -ECANCELED) {
376377
ldpp_dout(dpp, 20) << "INFO: rollover already performed for logging object '" << old_obj << "' to logging bucket '" <<
377378
target_bucket->get_key() << "'. ret = " << ret << dendl;
@@ -380,6 +381,9 @@ int rollover_logging_object(const configuration& conf,
380381
if (ret < 0) {
381382
ldpp_dout(dpp, 1) << "ERROR: failed to rollover logging object '" << old_obj << "' to logging bucket '" <<
382383
target_bucket->get_key() << "'. ret = " << ret << dendl;
384+
if (err_message) {
385+
*err_message = fmt::format("Failed to rollover logging object of logging bucket '{}'", target_bucket->get_name());
386+
}
383387
}
384388
return ret;
385389
};
@@ -396,6 +400,9 @@ int rollover_logging_object(const configuration& conf,
396400
}
397401
if (const int ret = target_bucket->commit_logging_object(*old_obj, y, dpp, conf.target_prefix, last_committed); ret < 0) {
398402
if (must_commit) {
403+
if (err_message) {
404+
*err_message = fmt::format("Failed to commit logging object of logging bucket '{}'", target_bucket->get_name());
405+
}
399406
return ret;
400407
}
401408
ldpp_dout(dpp, 5) << "WARNING: failed to commit object '" << old_obj << "' to logging bucket '" <<
@@ -464,32 +471,38 @@ int log_record(rgw::sal::Driver* driver,
464471
bool log_source_bucket) {
465472
if (!s->bucket) {
466473
ldpp_dout(dpp, 1) << "ERROR: only bucket operations are logged in bucket logging" << dendl;
467-
return -EINVAL;
474+
return -EINVAL; // this should never happen
468475
}
476+
auto set_journal_err = [&conf, s](const std::string& err_message) {
477+
if (conf.logging_type == LoggingType::Journal) s->err.message = err_message;
478+
};
469479
std::string target_bucket_name;
470480
std::string target_tenant_name;
471481
int ret = rgw_parse_url_bucket(conf.target_bucket, s->bucket_tenant, target_tenant_name, target_bucket_name);
472482
if (ret < 0) {
473-
ldpp_dout(dpp, 1) << "ERROR: failed to parse target logging bucket '" << conf.target_bucket << "', ret = " << ret << dendl;
483+
ldpp_dout(dpp, 1) << "ERROR: failed to parse logging bucket name '" << conf.target_bucket << "', ret = " << ret << dendl;
484+
set_journal_err(fmt::format("Faild to parse logging bucket name '{}'", conf.target_bucket));
474485
return ret;
475486
}
476487
const rgw_bucket target_bucket_id(target_tenant_name, target_bucket_name);
477488
std::unique_ptr<rgw::sal::Bucket> target_bucket;
478489
ret = driver->load_bucket(dpp, target_bucket_id,
479490
&target_bucket, y);
480491
if (ret < 0) {
481-
ldpp_dout(dpp, 1) << "ERROR: failed to get target logging bucket '" << target_bucket_id << "'. ret = " << ret << dendl;
492+
ldpp_dout(dpp, 1) << "ERROR: failed to load logging bucket '" << target_bucket_id << "'. ret = " << ret << dendl;
493+
set_journal_err(fmt::format("Faild to load logging bucket '{}'", target_bucket_id.bucket_id));
482494
return ret;
483495
}
484496

485497
rgw::ARN target_resource_arn(target_bucket_id, conf.target_prefix);
486-
if (ret = verify_target_bucket_policy(dpp, target_bucket.get(), target_resource_arn, s); ret < 0) {
487-
ldpp_dout(dpp, 1) << "ERROR: failed to verify target logging bucket policy for bucket '" << target_bucket->get_key() <<
488-
"'. ret = " << ret << dendl;
489-
return -EACCES;
498+
std::string err_message;
499+
if (ret = verify_target_bucket_policy(dpp, target_bucket.get(), target_resource_arn, s, &err_message); ret < 0) {
500+
set_journal_err(err_message);
501+
return ret;
490502
}
491503

492-
if (ret = verify_target_bucket_attributes(dpp, target_bucket.get()); ret < 0) {
504+
if (ret = verify_target_bucket_attributes(dpp, target_bucket.get(), &err_message); ret < 0) {
505+
set_journal_err(err_message);
493506
return ret;
494507
}
495508

@@ -500,9 +513,10 @@ int log_record(rgw::sal::Driver* driver,
500513
if (ret == 0) {
501514
const auto time_to_commit = time_from_name(obj_name, dpp) + std::chrono::seconds(conf.obj_roll_time);
502515
if (ceph::coarse_real_time::clock::now() > time_to_commit) {
503-
ldpp_dout(dpp, 20) << "INFO: logging object '" << obj_name << "' exceeded its time, will be committed to bucket '" <<
516+
ldpp_dout(dpp, 20) << "INFO: logging object '" << obj_name << "' exceeded its time, will be committed to logging bucket '" <<
504517
target_bucket_id << "'" << dendl;
505-
if (ret = rollover_logging_object(conf, target_bucket, obj_name, dpp, region, s->bucket, y, false, &objv_tracker, nullptr); ret < 0) {
518+
if (ret = rollover_logging_object(conf, target_bucket, obj_name, dpp, region, s->bucket, y, false, &objv_tracker, nullptr, &err_message); ret < 0) {
519+
set_journal_err(err_message);
506520
return ret;
507521
}
508522
} else {
@@ -512,19 +526,21 @@ int log_record(rgw::sal::Driver* driver,
512526
// try to create the temporary log object for the first time
513527
ret = new_logging_object(conf, target_bucket, obj_name, dpp, region, s->bucket, y, std::nullopt, &objv_tracker);
514528
if (ret == 0) {
515-
ldpp_dout(dpp, 20) << "INFO: first time logging for bucket '" << target_bucket_id << "' and prefix '" <<
529+
ldpp_dout(dpp, 20) << "INFO: first time logging for logging bucket '" << target_bucket_id << "' and prefix '" <<
516530
conf.target_prefix << "'" << dendl;
517531
} else if (ret == -ECANCELED) {
518-
ldpp_dout(dpp, 20) << "INFO: logging object '" << obj_name << "' already exists for bucket '" << target_bucket_id << "' and prefix" <<
532+
ldpp_dout(dpp, 20) << "INFO: logging object '" << obj_name << "' already exists for logging bucket '" << target_bucket_id << "' and prefix" <<
519533
conf.target_prefix << "'" << dendl;
520534
} else {
521-
ldpp_dout(dpp, 1) << "ERROR: failed to create logging object of bucket '" <<
522-
target_bucket_id << "' and prefix '" << conf.target_prefix << "' for the first time. ret = " << ret << dendl;
535+
ldpp_dout(dpp, 1) << "ERROR: failed to create first time logging object of logging bucket '" <<
536+
target_bucket_id << "' and prefix '" << conf.target_prefix << "'. ret = " << ret << dendl;
537+
set_journal_err(fmt::format("Faild create first time logging object of logging bucket '{}'", target_bucket->get_name()));
523538
return ret;
524539
}
525540
} else {
526-
ldpp_dout(dpp, 1) << "ERROR: failed to get name of logging object of bucket '" <<
541+
ldpp_dout(dpp, 1) << "ERROR: failed to get name of logging object of logging bucket '" <<
527542
target_bucket_id << "'. ret = " << ret << dendl;
543+
set_journal_err(fmt::format("Faild to get name of logging object of logging bucket '{}'", target_bucket->get_name()));
528544
return ret;
529545
}
530546

@@ -559,7 +575,6 @@ int log_record(rgw::sal::Driver* driver,
559575
bucket_name = full_bucket_name(s->bucket);
560576
}
561577

562-
563578
switch (conf.logging_type) {
564579
case LoggingType::Standard:
565580
record = fmt::format("{} {} [{:%d/%b/%Y:%H:%M:%S %z}] {} {} {} {} {} \"{} {}{}{} HTTP/1.1\" {} {} {} {} {} {} {} \"{}\" {} {} {} {} {} {} {} {} {}",
@@ -607,14 +622,15 @@ int log_record(rgw::sal::Driver* driver,
607622
case LoggingType::Any:
608623
ldpp_dout(dpp, 1) << "ERROR: failed to format record when writing to logging object '" <<
609624
obj_name << "' due to unsupported logging type" << dendl;
610-
return -EINVAL;
625+
return -EINVAL; // this should never happen
611626
}
612627

613628
// get quota of the owner of the target bucket
614629
RGWQuota user_quota;
615630
if (ret = get_owner_quota_info(dpp, y, driver, target_bucket->get_owner(), user_quota); ret < 0) {
616-
ldpp_dout(dpp, 1) << "ERROR: failed to get quota of owner of target logging bucket '" <<
631+
ldpp_dout(dpp, 1) << "ERROR: failed to get quota of owner of logging bucket '" <<
617632
target_bucket_id << "' failed. ret = " << ret << dendl;
633+
set_journal_err(fmt::format("Faild to get quota of owner of logging bucket '{}'", target_bucket->get_name()));
618634
return ret;
619635
}
620636
// start with system default quota
@@ -631,8 +647,9 @@ int log_record(rgw::sal::Driver* driver,
631647
}
632648
// verify there is enough quota to write the record
633649
if (ret = target_bucket->check_quota(dpp, quota, record.length(), y); ret < 0) {
634-
ldpp_dout(dpp, 1) << "ERROR: quota check on target logging bucket '" <<
650+
ldpp_dout(dpp, 1) << "ERROR: quota check on logging bucket '" <<
635651
target_bucket_id << "' failed. ret = " << ret << dendl;
652+
set_journal_err(fmt::format("Quota check on logging bucket '{}' failed", target_bucket->get_name()));
636653
return ret;
637654
}
638655

@@ -643,12 +660,14 @@ int log_record(rgw::sal::Driver* driver,
643660
async_completion); ret < 0 && ret != -EFBIG) {
644661
ldpp_dout(dpp, 1) << "ERROR: failed to write record to logging object '" <<
645662
obj_name << "'. ret = " << ret << dendl;
663+
set_journal_err(fmt::format("Failed to write record to logging object of logging bucket {}", target_bucket->get_name()));
646664
return ret;
647665
}
648666
if (ret == -EFBIG) {
649-
ldpp_dout(dpp, 5) << "WARNING: logging object '" << obj_name << "' is full, will be committed to bucket '" <<
667+
ldpp_dout(dpp, 5) << "WARNING: logging object '" << obj_name << "' is full, will be committed to logging bucket '" <<
650668
target_bucket->get_key() << "'" << dendl;
651-
if (ret = rollover_logging_object(conf, target_bucket, obj_name, dpp, region, s->bucket, y, true, &objv_tracker, nullptr); ret < 0 ) {
669+
if (ret = rollover_logging_object(conf, target_bucket, obj_name, dpp, region, s->bucket, y, true, &objv_tracker, nullptr, &err_message); ret < 0 ) {
670+
set_journal_err(err_message);
652671
return ret;
653672
}
654673
if (ret = target_bucket->write_logging_object(obj_name,
@@ -658,6 +677,7 @@ int log_record(rgw::sal::Driver* driver,
658677
async_completion); ret < 0) {
659678
ldpp_dout(dpp, 1) << "ERROR: failed to write record to logging object '" <<
660679
obj_name << "'. ret = " << ret << dendl;
680+
set_journal_err(fmt::format("Failed to write record to logging object of logging bucket {}", target_bucket->get_name()));
661681
return ret;
662682
}
663683
}
@@ -735,7 +755,7 @@ int update_bucket_logging_sources(const DoutPrefixProvider* dpp, rgw::sal::Drive
735755
std::unique_ptr<rgw::sal::Bucket> target_bucket;
736756
const int ret = driver->load_bucket(dpp, target_bucket_id, &target_bucket, y);
737757
if (ret < 0) {
738-
ldpp_dout(dpp, 5) << "WARNING: failed to get target logging bucket '" << target_bucket_id <<
758+
ldpp_dout(dpp, 5) << "WARNING: failed to get logging bucket '" << target_bucket_id <<
739759
"' in order to update logging sources. ret = " << ret << dendl;
740760
return ret;
741761
}
@@ -887,7 +907,7 @@ int source_bucket_cleanup(const DoutPrefixProvider* dpp,
887907
}
888908
rgw_bucket target_bucket_id;
889909
if (const int ret = get_bucket_id(conf->target_bucket, info.bucket.tenant, target_bucket_id); ret < 0) {
890-
ldpp_dout(dpp, 5) << "WARNING: failed to parse target logging bucket '" <<
910+
ldpp_dout(dpp, 5) << "WARNING: failed to parse logging bucket '" <<
891911
conf->target_bucket << "' during cleanup. ret = " << ret << dendl;
892912
return 0;
893913
}
@@ -904,7 +924,8 @@ int source_bucket_cleanup(const DoutPrefixProvider* dpp,
904924
int verify_target_bucket_policy(const DoutPrefixProvider* dpp,
905925
rgw::sal::Bucket* target_bucket,
906926
const rgw::ARN& target_resource_arn,
907-
req_state* s) {
927+
req_state* s,
928+
std::string* err_message) {
908929
// verify target permissions for bucket logging
909930
// this is implementing the policy based permission granting from:
910931
// https://docs.aws.amazon.com/AmazonS3/latest/userguide/enable-server-access-logging.html#grant-log-delivery-permissions-general
@@ -913,6 +934,9 @@ int verify_target_bucket_policy(const DoutPrefixProvider* dpp,
913934
const auto policy_it = target_attrs.find(RGW_ATTR_IAM_POLICY);
914935
if (policy_it == target_attrs.end()) {
915936
ldpp_dout(dpp, 1) << "ERROR: logging bucket '" << target_bucket_id << "' must have bucket policy to allow logging" << dendl;
937+
if (err_message) {
938+
*err_message = fmt::format("Logging bucket '{}' must have policy to allow logging", target_bucket->get_name());
939+
}
916940
return -EACCES;
917941
}
918942
try {
@@ -929,33 +953,48 @@ int verify_target_bucket_policy(const DoutPrefixProvider* dpp,
929953
"' must have a bucket policy that allows logging service principal to put objects in the following resource ARN: '" <<
930954
target_resource_arn.to_string() << "' from source bucket ARN: '" << source_bucket_arn <<
931955
"' and source account: '" << source_account << "'" << dendl;
956+
if (err_message) {
957+
*err_message = fmt::format("Logging bucket '{}' policy does not allow logging", target_bucket->get_name());
958+
}
932959
return -EACCES;
933960
}
934961
} catch (const rgw::IAM::PolicyParseException& err) {
935962
ldpp_dout(dpp, 1) << "ERROR: failed to parse logging bucket '" << target_bucket_id <<
936963
"' policy. error: " << err.what() << dendl;
964+
if (err_message) {
965+
*err_message = fmt::format("Logging bucket '{}' has invalid policy", target_bucket->get_name());
966+
}
937967
return -EACCES;
938968
}
939969
return 0;
940970
}
941971

942-
int verify_target_bucket_attributes(const DoutPrefixProvider* dpp, rgw::sal::Bucket* target_bucket) {
972+
int verify_target_bucket_attributes(const DoutPrefixProvider* dpp, rgw::sal::Bucket* target_bucket, std::string* err_message) {
943973
const auto& target_info = target_bucket->get_info();
944974
if (target_info.requester_pays) {
945975
// target bucket must not have requester pays set on it
946-
ldpp_dout(dpp, 1) << "ERROR: logging target bucket '" << target_bucket->get_key() << "', is configured with requester pays" << dendl;
976+
ldpp_dout(dpp, 1) << "ERROR: logging bucket '" << target_bucket->get_key() << "', is configured with requester pays" << dendl;
977+
if (err_message) {
978+
*err_message = fmt::format("Logging bucket '{}' is configured with requester pays", target_bucket->get_name());
979+
}
947980
return -EINVAL;
948981
}
949982

950983
const auto& target_attrs = target_bucket->get_attrs();
951984
if (target_attrs.find(RGW_ATTR_BUCKET_LOGGING) != target_attrs.end()) {
952985
// target bucket must not have logging set on it
953-
ldpp_dout(dpp, 1) << "ERROR: logging target bucket '" << target_bucket->get_key() << "', is configured with bucket logging" << dendl;
986+
ldpp_dout(dpp, 1) << "ERROR: logging bucket '" << target_bucket->get_key() << "', is configured with bucket logging" << dendl;
987+
if (err_message) {
988+
*err_message = fmt::format("Logging bucket '{}' is configured with bucket logging", target_bucket->get_name());
989+
}
954990
return -EINVAL;
955991
}
956992
if (target_attrs.find(RGW_ATTR_BUCKET_ENCRYPTION_POLICY) != target_attrs.end()) {
957993
// verify target bucket does not have encryption
958-
ldpp_dout(dpp, 1) << "ERROR: logging target bucket '" << target_bucket->get_key() << "', is configured with encryption" << dendl;
994+
ldpp_dout(dpp, 1) << "ERROR: logging bucket '" << target_bucket->get_key() << "', is configured with encryption" << dendl;
995+
if (err_message) {
996+
*err_message = fmt::format("Logging bucket '{}' is configured with encryption", target_bucket->get_name());
997+
}
959998
return -EINVAL;
960999
}
9611000
return 0;
@@ -988,13 +1027,13 @@ int get_target_and_conf_from_source(
9881027

9891028
rgw_bucket target_bucket_id;
9901029
if (const int ret = get_bucket_id(configuration.target_bucket, tenant, target_bucket_id); ret < 0) {
991-
ldpp_dout(dpp, 1) << "ERROR: failed to parse target bucket '" << configuration.target_bucket << "', ret = " << ret << dendl;
1030+
ldpp_dout(dpp, 1) << "ERROR: failed to parse logging bucket '" << configuration.target_bucket << "', ret = " << ret << dendl;
9921031
return ret;
9931032
}
9941033

9951034
if (const int ret = driver->load_bucket(dpp, target_bucket_id,
9961035
&target_bucket, y); ret < 0) {
997-
ldpp_dout(dpp, 1) << "ERROR: failed to get target bucket '" << target_bucket_id << "', ret = " << ret << dendl;
1036+
ldpp_dout(dpp, 1) << "ERROR: failed to get logging bucket '" << target_bucket_id << "', ret = " << ret << dendl;
9981037
return ret;
9991038
}
10001039
return 0;

0 commit comments

Comments
 (0)