Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions dbms/src/Common/TiFlashMetrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@ TiFlashMetrics::TiFlashMetrics()
.Name("tiflash_storage_ru_read_bytes")
.Help("Read bytes for storage RU calculation")
.Register(*registry);

registered_s3_store_summary_bytes_family = &prometheus::BuildGauge()
.Name("tiflash_storage_s3_store_summary_bytes")
.Help("S3 storage summary bytes by store and file type")
.Register(*registry);
}

void TiFlashMetrics::addReplicaSyncRU(UInt32 keyspace_id, UInt64 ru)
Expand Down Expand Up @@ -249,4 +254,37 @@ prometheus::Counter & TiFlashMetrics::getStorageRUReadBytesCounter(
return counter;
}
}

void TiFlashMetrics::setS3StoreSummaryBytes(UInt64 store_id, UInt64 data_file_bytes, UInt64 dt_file_bytes)
{
// Fast path.
{
std::shared_lock lock(s3_store_summary_bytes_mtx);
auto it = registered_s3_store_summary_bytes_metrics.find(store_id);
if (it != registered_s3_store_summary_bytes_metrics.end())
{
it->second.data_file_bytes->Set(data_file_bytes);
it->second.dt_file_bytes->Set(dt_file_bytes);
return;
}
}

std::unique_lock lock(s3_store_summary_bytes_mtx);
auto [it, inserted] = registered_s3_store_summary_bytes_metrics.try_emplace(store_id);
if (inserted)
{
auto store_id_str = std::to_string(store_id);
auto & data_file_bytes_metric
= registered_s3_store_summary_bytes_family->Add({{"store_id", store_id_str}, {"type", "data_file_bytes"}});
auto & dt_file_bytes_metric
= registered_s3_store_summary_bytes_family->Add({{"store_id", store_id_str}, {"type", "dt_file_bytes"}});
it->second = S3StoreSummaryBytesMetrics{
.data_file_bytes = &data_file_bytes_metric,
.dt_file_bytes = &dt_file_bytes_metric,
};
}

it->second.data_file_bytes->Set(data_file_bytes);
it->second.dt_file_bytes->Set(dt_file_bytes);
}
} // namespace DB
11 changes: 11 additions & 0 deletions dbms/src/Common/TiFlashMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -1325,6 +1325,8 @@ class TiFlashMetrics
const String & resource_group,
const DM::ReadRUType type);

void setS3StoreSummaryBytes(UInt64 store_id, UInt64 data_file_bytes, UInt64 dt_file_bytes);

private:
TiFlashMetrics();

Expand Down Expand Up @@ -1366,6 +1368,15 @@ class TiFlashMetrics
// {keyspace}_{resource_group}_{type} -> Counter
std::unordered_map<std::string, prometheus::Counter *> registered_storage_ru_read_bytes_metrics;

struct S3StoreSummaryBytesMetrics
{
prometheus::Gauge * data_file_bytes;
prometheus::Gauge * dt_file_bytes;
};
prometheus::Family<prometheus::Gauge> * registered_s3_store_summary_bytes_family;
std::shared_mutex s3_store_summary_bytes_mtx;
std::unordered_map<UInt64, S3StoreSummaryBytesMetrics> registered_s3_store_summary_bytes_metrics;

public:
#define MAKE_METRIC_MEMBER_M(family_name, help, type, ...) \
MetricFamily<prometheus::type> family_name \
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Interpreters/Settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,7 @@ struct Settings
M(SettingBool, remote_checkpoint_only_upload_manifest, true, "Only upload manifest data when uploading checkpoint") \
M(SettingInt64, remote_gc_method, 1, "The method of running GC task on the remote store. 1 - lifecycle, 2 - scan.") \
M(SettingInt64, remote_gc_interval_seconds, 3600, "The interval of running GC task on the remote store. Unit is second.") \
M(SettingInt64, remote_summary_interval_seconds, 0, "The interval of collecting remote S3 storage summary. Unit is second. <=0 disables periodic summary task.") \
M(SettingInt64, remote_gc_verify_consistency, 0, "[testing] Verify the consistenct of valid locks when doing GC") \
M(SettingInt64, remote_gc_min_age_seconds, 3600, "The file will NOT be compacted when the time difference between the last modification is less than this threshold") \
M(SettingDouble, remote_gc_ratio, 0.5, "The files with valid rate less than this threshold will be compacted") \
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Storages/KVStore/TMTContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ void TMTContext::initS3GCManager(const TiFlashRaftProxyHelper * proxy_helper)
}
// TODO: make it reloadable
remote_gc_config.interval_seconds = context.getSettingsRef().remote_gc_interval_seconds;
remote_gc_config.summary_interval_seconds = context.getSettingsRef().remote_summary_interval_seconds;
remote_gc_config.verify_locks = context.getSettingsRef().remote_gc_verify_consistency > 0;
// set the gc_method so that S3LockService can set tagging when create delmark
S3::ClientFactory::instance().gc_method = remote_gc_config.method;
Expand Down
71 changes: 69 additions & 2 deletions dbms/src/Storages/S3/S3GCManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,11 @@ bool S3GCManager::runOnAllStores()
return false;
}

bool S3GCManager::isOwner() const
{
return gc_owner_manager->isOwner();
}

void S3GCManager::runForStore(UInt64 gc_store_id)
{
// get a timepoint at the begin, only remove objects that expired compare
Expand Down Expand Up @@ -824,6 +829,17 @@ S3StoreStorageSummary S3GCManager::getStoreStorageSummary(StoreID store_id)
String last_dtfile_key;
size_t num_dtfile_keys_for_last_dtfile = 0;
S3::listPrefix(*client, prefix, [&](const Aws::S3::Model::Object & object) {
if (shutdown_called)
{
LOG_INFO(
log,
"getS3StorageSummary shutting down, break, store_id={} processed_keys={}",
store_id,
num_processed_keys);
// .more=false to break the listing early
return PageResult{.num_keys = 1, .more = false};
}

const auto & key = object.GetKey();
const auto view = S3FilenameView::fromKey(key);
if (watch.elapsedSeconds() - last_elapsed > log_interval_seconds)
Expand Down Expand Up @@ -897,6 +913,7 @@ S3StoreStorageSummary S3GCManager::getStoreStorageSummary(StoreID store_id)
return PageResult{.num_keys = 1, .more = true};
});
summary.num_keys = num_processed_keys;
TiFlashMetrics::instance().setS3StoreSummaryBytes(store_id, summary.data_file.bytes, summary.dt_file.bytes);
LOG_INFO(log, "getS3StorageSummary finish, elapsed={:.3f}s summary={}", watch.elapsedSeconds(), summary);
return summary;
}
Expand All @@ -922,6 +939,49 @@ S3GCManagerService::S3GCManagerService(
[this]() { return manager->runOnAllStores(); },
false,
/*interval_ms*/ config.interval_seconds * 1000);

if (config.summary_interval_seconds <= 0)
{
LOG_INFO(
Logger::get("S3GCManagerService"),
"The periodic S3 storage summary will be disabled, summary_interval_seconds={}",
config.summary_interval_seconds);
}
else
{
if (config.summary_interval_seconds < 12 * 3600)
{
LOG_WARNING(
Logger::get("S3GCManagerService"),
"The summary_interval_seconds is too small, it may cause high overhead on S3. "
"It is recommended to set it to a value larger than 12 hours (43200 seconds), "
"summary_interval_seconds={}",
config.summary_interval_seconds);
}

summary_timer = global_ctx.getBackgroundPool().addTask(
[this]() {
// Only run summary in the owner instance
if (!manager || !manager->isOwner())
return false;

try
{
auto summary = manager->getS3StorageSummary({});
LOG_INFO(
Logger::get("S3GCManagerService"),
"Periodic S3 storage summary finished, num_stores={}",
summary.stores.size());
}
catch (...)
{
tryLogCurrentException(Logger::get("S3GCManagerService"), "periodic getS3StorageSummary failed");
}
return false;
},
false,
config.summary_interval_seconds * 1000);
}
}

S3GCManagerService::~S3GCManagerService()
Expand All @@ -942,9 +1002,16 @@ void S3GCManagerService::shutdown()
// Remove the task handler. It will block until the task break
global_ctx.getBackgroundPool().removeTask(timer);
timer = nullptr;
// then we can reset the manager
manager = nullptr;
}

if (summary_timer)
{
global_ctx.getBackgroundPool().removeTask(summary_timer);
summary_timer = nullptr;
}

// then we can reset the manager
manager = nullptr;
}

void S3GCManagerService::wake() const
Expand Down
6 changes: 6 additions & 0 deletions dbms/src/Storages/S3/S3GCManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ struct S3GCConfig
// The interval of the S3 GC routine runs
Int64 interval_seconds = 600;

// The interval of periodic S3 storage summary task.
Int64 summary_interval_seconds = 24 * 60 * 60;

// The maximum number of manifest files preserve
// for each store
size_t manifest_preserve_count = 10;
Expand Down Expand Up @@ -143,6 +146,8 @@ class S3GCManager

bool runOnAllStores();

bool isOwner() const;

void shutdown() { shutdown_called = true; }

S3StoreStorageSummary getStoreStorageSummary(StoreID store_id);
Expand Down Expand Up @@ -222,6 +227,7 @@ class S3GCManagerService
Context & global_ctx;
std::unique_ptr<S3GCManager> manager;
BackgroundProcessingPool::TaskHandle timer;
BackgroundProcessingPool::TaskHandle summary_timer;
};

} // namespace DB::S3
Expand Down
Loading