Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions cloud/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ CONF_Strings(recycle_whitelist, ""); // Comma seprated list
CONF_Strings(recycle_blacklist, ""); // Comma seprated list
// IO worker thread pool concurrency: object list, delete
CONF_mInt32(instance_recycler_worker_pool_size, "32");
// Max number of delete tasks per batch when recycling objects.
// Each task deletes up to 1000 files. Controls memory usage during large-scale deletion.
CONF_Int32(recycler_max_tasks_per_batch, "1000");
// The worker pool size for http api `statistics_recycle` worker pool
CONF_mInt32(instance_recycler_statistics_recycle_worker_pool_size, "5");
CONF_Bool(enable_checker, "false");
Expand Down
142 changes: 103 additions & 39 deletions cloud/src/recycler/obj_storage_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include <chrono>

#include "common/config.h"
#include "cpp/sync_point.h"
#include "recycler/sync_executor.h"

Expand All @@ -31,58 +32,121 @@ ObjectStorageResponse ObjStorageClient::delete_objects_recursively_(ObjectStorag
int64_t expired_time,
size_t batch_size) {
TEST_SYNC_POINT_CALLBACK("ObjStorageClient::delete_objects_recursively_", &batch_size);
size_t num_deleted_objects = 0;
auto list_iter = list_objects(path);
ObjectStorageResponse ret;
size_t num_deleted = 0;
int error_count = 0;
size_t batch_count = 0;
auto start_time = steady_clock::now();

auto list_iter = list_objects(path);
// Read max tasks per batch from config, validate to prevent overflow
int32_t config_val = config::recycler_max_tasks_per_batch;
size_t max_tasks_per_batch = 1000; // default value
if (config_val > 0) {
max_tasks_per_batch = static_cast<size_t>(config_val);
} else {
LOG(WARNING) << "recycler_max_tasks_per_batch=" << config_val
<< " is not positive, using default 1000";
}

ObjectStorageResponse ret;
std::vector<std::string> keys;
SyncExecutor<int> concurrent_delete_executor(
option.executor,
fmt::format("delete objects under bucket {}, path {}", path.bucket, path.key),
[](const int& ret) { return ret != 0; });

for (auto obj = list_iter->next(); obj.has_value(); obj = list_iter->next()) {
if (expired_time > 0 && obj->mtime_s > expired_time) {
continue;
while (true) {
// Create a new SyncExecutor for each batch
// Note: cancel lambda only takes effect within the current batch
SyncExecutor<int> batch_executor(
option.executor, fmt::format("delete batch under {}/{}", path.bucket, path.key),
[](const int& r) { return r != 0; });

std::vector<std::string> keys;
size_t tasks_in_batch = 0;
bool has_more = true;

// Collect tasks until reaching batch limit or no more files
while (tasks_in_batch < max_tasks_per_batch && has_more) {
auto obj = list_iter->next();
if (!obj.has_value()) {
has_more = false;
break;
}
if (expired_time > 0 && obj->mtime_s > expired_time) {
continue;
}

num_deleted++;
keys.emplace_back(std::move(obj->key));

// Submit a delete task when we have batch_size keys
if (keys.size() >= batch_size) {
batch_executor.add([this, &path, k = std::move(keys), option]() mutable {
return delete_objects(path.bucket, std::move(k), option).ret;
});
keys.clear();
tasks_in_batch++;
}
}

num_deleted_objects++;
keys.emplace_back(std::move(obj->key));
if (keys.size() < batch_size) {
continue;
// Handle remaining keys (less than batch_size)
if (!keys.empty()) {
batch_executor.add([this, &path, k = std::move(keys), option]() mutable {
return delete_objects(path.bucket, std::move(k), option).ret;
});
tasks_in_batch++;
}
concurrent_delete_executor.add([this, &path, k = std::move(keys), option]() mutable {
return delete_objects(path.bucket, std::move(k), option).ret;
});
}

if (!list_iter->is_valid()) {
bool finished;
concurrent_delete_executor.when_all(&finished);
return {-1};
}
// Before exiting on empty batch, check if listing is valid
// Avoid silently treating listing failure as success
if (tasks_in_batch == 0) {
if (!list_iter->is_valid()) {
LOG(WARNING) << "list_iter invalid with no tasks collected";
ret = {-1};
}
break;
}

if (!keys.empty()) {
concurrent_delete_executor.add([this, &path, k = std::move(keys), option]() mutable {
return delete_objects(path.bucket, std::move(k), option).ret;
});
}
bool finished = true;
std::vector<int> rets = concurrent_delete_executor.when_all(&finished);
for (int r : rets) {
if (r != 0) {
ret = -1;
// Wait for current batch to complete
bool finished = true;
std::vector<int> rets = batch_executor.when_all(&finished);
batch_count++;

for (int r : rets) {
if (r != 0) {
error_count++;
}
}

// Log batch progress for monitoring long-running delete tasks
auto batch_elapsed = duration_cast<milliseconds>(steady_clock::now() - start_time).count();
LOG(INFO) << "delete objects under " << path.bucket << "/" << path.key << " batch "
<< batch_count << " completed"
<< ", tasks_in_batch=" << tasks_in_batch << ", total_deleted=" << num_deleted
<< ", elapsed=" << batch_elapsed << " ms";

// Check finished status: false means stop_token triggered, task timeout, or task invalid
if (!finished) {
LOG(WARNING) << "batch execution did not finish normally, stopping";
ret = {-1};
break;
}

// Check if list_iter is still valid (network errors, etc.)
if (!list_iter->is_valid()) {
LOG(WARNING) << "list_iter became invalid during iteration";
ret = {-1};
break;
}

// batch_executor goes out of scope, resources are automatically released
}

if (error_count > 0) {
LOG(WARNING) << "delete_objects_recursively completed with " << error_count << " errors";
ret = {-1};
}

auto elapsed = duration_cast<milliseconds>(steady_clock::now() - start_time).count();
LOG(INFO) << "delete objects under " << path.bucket << "/" << path.key
<< " finished, ret=" << ret.ret << ", finished=" << finished
<< ", num_deleted_objects=" << num_deleted_objects << ", cost=" << elapsed << " ms";

ret = finished ? ret : -1;
<< " finished, ret=" << ret.ret << ", total_batches=" << batch_count
<< ", num_deleted=" << num_deleted << ", error_count=" << error_count
<< ", cost=" << elapsed << " ms";

return ret;
}
Expand Down
3 changes: 2 additions & 1 deletion cloud/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ add_executable(recycler_test
recycler_test.cpp
recycler_operation_log_test.cpp
snapshot_data_size_calculator_test.cpp
recycle_versioned_keys_test.cpp)
recycle_versioned_keys_test.cpp
recycler_batch_delete_test.cpp)

add_executable(mem_txn_kv_test mem_txn_kv_test.cpp)

Expand Down
Loading