From 3afdf1f0da61fae650e647cbe918370a1755d15a Mon Sep 17 00:00:00 2001 From: discord9 Date: Fri, 29 Aug 2025 15:19:28 +0800 Subject: [PATCH] file ref mgr&gc worker Signed-off-by: discord9 --- src/common/meta/src/instruction.rs | 22 + src/common/meta/src/key/table_route.rs | 41 +- src/datanode/src/datanode.rs | 15 +- src/datanode/src/error.rs | 10 +- src/datanode/src/heartbeat/handler.rs | 17 + .../src/heartbeat/handler/gc_regions.rs | 194 ++++++ src/meta-srv/src/gc_trigger.rs | 283 ++++++++ src/meta-srv/src/lib.rs | 1 + src/meta-srv/src/metasrv.rs | 5 + src/meta-srv/src/metasrv/builder.rs | 12 + src/meta-srv/src/metrics.rs | 3 + src/mito2/src/access_layer.rs | 12 +- src/mito2/src/compaction/compactor.rs | 3 +- src/mito2/src/engine.rs | 25 +- src/mito2/src/gc.rs | 631 ++++++++++++++++++ src/mito2/src/lib.rs | 1 + src/mito2/src/metrics.rs | 16 + src/mito2/src/region.rs | 4 + src/mito2/src/region/opener.rs | 7 +- src/mito2/src/sst/file.rs | 10 +- src/mito2/src/sst/file_purger.rs | 452 ++++++++++++- src/mito2/src/sst/location.rs | 37 +- src/mito2/src/test_util.rs | 13 +- src/mito2/src/worker.rs | 18 + src/mito2/src/worker/handle_catchup.rs | 1 + src/mito2/src/worker/handle_create.rs | 1 + src/mito2/src/worker/handle_open.rs | 1 + src/object-store/src/lib.rs | 2 +- 28 files changed, 1807 insertions(+), 30 deletions(-) create mode 100644 src/datanode/src/heartbeat/handler/gc_regions.rs create mode 100644 src/meta-srv/src/gc_trigger.rs create mode 100644 src/mito2/src/gc.rs diff --git a/src/common/meta/src/instruction.rs b/src/common/meta/src/instruction.rs index b74b8c25b49e..8da0b4760c77 100644 --- a/src/common/meta/src/instruction.rs +++ b/src/common/meta/src/instruction.rs @@ -225,6 +225,18 @@ pub struct FlushRegions { pub region_ids: Vec, } +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub struct GcRegions { + pub region_ids: Vec, + pub ts_millis: i64, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub struct CollectFileRefs { + pub region_id: RegionId, + pub ts_millis: i64, +} + #[derive(Debug, Clone, Serialize, Deserialize, Display, PartialEq)] pub enum Instruction { /// Opens a region. @@ -245,6 +257,10 @@ pub enum Instruction { FlushRegions(FlushRegions), /// Flushes a single region. FlushRegion(RegionId), + /// Triggers garbage collection for a table. + GcRegions(GcRegions), + /// Trigger datanode to collect and upload table reference to object storage. + CollectFileRefs(CollectFileRefs), } /// The reply of [UpgradeRegion]. @@ -276,6 +292,8 @@ pub enum InstructionReply { UpgradeRegion(UpgradeRegionReply), DowngradeRegion(DowngradeRegionReply), FlushRegion(SimpleReply), + GcRegions(SimpleReply), + CollectFileRefs(SimpleReply), } impl Display for InstructionReply { @@ -288,6 +306,10 @@ impl Display for InstructionReply { write!(f, "InstructionReply::DowngradeRegion({})", reply) } Self::FlushRegion(reply) => write!(f, "InstructionReply::FlushRegion({})", reply), + Self::GcRegions(reply) => write!(f, "InstructionReply::GcRegions({})", reply), + Self::CollectFileRefs(reply) => { + write!(f, "InstructionReply::CollectFileRefs({})", reply) + } } } } diff --git a/src/common/meta/src/key/table_route.rs b/src/common/meta/src/key/table_route.rs index a17e8f2079e1..5dfc11ab461f 100644 --- a/src/common/meta/src/key/table_route.rs +++ b/src/common/meta/src/key/table_route.rs @@ -16,6 +16,8 @@ use std::collections::{HashMap, HashSet}; use std::fmt::Display; use std::sync::Arc; +use futures::stream::BoxStream; +use futures_util::TryStreamExt; use serde::{Deserialize, Serialize}; use snafu::{ensure, OptionExt, ResultExt}; use store_api::storage::{RegionId, RegionNumber}; @@ -33,8 +35,9 @@ use crate::key::{ }; use crate::kv_backend::txn::Txn; use crate::kv_backend::KvBackendRef; +use crate::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE}; use crate::rpc::router::{region_distribution, RegionRoute}; -use crate::rpc::store::BatchGetRequest; +use crate::rpc::store::{BatchGetRequest, RangeRequest}; /// The key stores table routes /// @@ -554,9 +557,41 @@ impl TableRouteManager { pub fn table_route_storage(&self) -> &TableRouteStorage { &self.storage } + + /// Returns a stream of all physical table ids. + pub fn physical_table_values( + &self, + ) -> BoxStream<'static, Result<(TableId, PhysicalTableRouteValue)>> { + let key = TableRouteKey::range_prefix(); + let req = RangeRequest::new().with_prefix(key); + + let stream = PaginationStream::new( + self.storage.kv_backend().clone(), + req, + DEFAULT_PAGE_SIZE, + |kv| { + let key = TableRouteKey::from_bytes(&kv.key)?; + Ok(key.table_id) + }, + ) + .into_stream(); + let storage = self.storage.clone(); + + Box::pin(stream.try_filter_map(move |table_id| { + let storage = storage.clone(); + async move { + let table_route = storage.get_inner(table_id).await?; + match table_route { + Some(TableRouteValue::Physical(val)) => Ok(Some((table_id, val))), + _ => Ok(None), + } + } + })) + } } /// Low-level operations of [TableRouteValue]. +#[derive(Clone)] pub struct TableRouteStorage { kv_backend: KvBackendRef, } @@ -568,6 +603,10 @@ impl TableRouteStorage { Self { kv_backend } } + pub fn kv_backend(&self) -> &KvBackendRef { + &self.kv_backend + } + /// Builds a create table route transaction, /// it expected the `__table_route/{table_id}` wasn't occupied. pub fn build_create_txn( diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index bcd5effd86da..3bc45fb6a7d2 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -39,6 +39,7 @@ use meta_client::MetaClientRef; use metric_engine::engine::MetricEngine; use mito2::config::MitoConfig; use mito2::engine::{MitoEngine, MitoEngineBuilder}; +use mito2::sst::file_purger::{FileReferenceManager, FileReferenceManagerRef}; use object_store::manager::{ObjectStoreManager, ObjectStoreManagerRef}; use object_store::util::normalize_dir; use query::dummy_catalog::{DummyCatalogManager, TableProviderFactoryRef}; @@ -238,8 +239,13 @@ impl DatanodeBuilder { table_id_schema_cache, schema_cache, )); + let file_ref_manager = Arc::new(FileReferenceManager::new(node_id)); let region_server = self - .new_region_server(schema_metadata_manager, region_event_listener) + .new_region_server( + schema_metadata_manager, + region_event_listener, + file_ref_manager, + ) .await?; // TODO(weny): Considering introducing a readonly kv_backend trait. @@ -361,6 +367,7 @@ impl DatanodeBuilder { &mut self, schema_metadata_manager: SchemaMetadataManagerRef, event_listener: RegionServerEventListenerRef, + file_ref_manager: FileReferenceManagerRef, ) -> Result { let opts: &DatanodeOptions = &self.opts; @@ -399,6 +406,7 @@ impl DatanodeBuilder { .build_store_engines( object_store_manager, schema_metadata_manager, + file_ref_manager, self.plugins.clone(), ) .await?; @@ -419,6 +427,7 @@ impl DatanodeBuilder { &mut self, object_store_manager: ObjectStoreManagerRef, schema_metadata_manager: SchemaMetadataManagerRef, + file_ref_manager: FileReferenceManagerRef, plugins: Plugins, ) -> Result> { let mut metric_engine_config = metric_engine::config::EngineConfig::default(); @@ -444,6 +453,7 @@ impl DatanodeBuilder { object_store_manager.clone(), mito_engine_config, schema_metadata_manager.clone(), + file_ref_manager.clone(), plugins.clone(), ) .await?; @@ -469,6 +479,7 @@ impl DatanodeBuilder { object_store_manager: ObjectStoreManagerRef, mut config: MitoConfig, schema_metadata_manager: SchemaMetadataManagerRef, + file_ref_manager: FileReferenceManagerRef, plugins: Plugins, ) -> Result { let opts = &self.opts; @@ -490,6 +501,7 @@ impl DatanodeBuilder { log_store, object_store_manager, schema_metadata_manager, + file_ref_manager, plugins, ); @@ -530,6 +542,7 @@ impl DatanodeBuilder { log_store, object_store_manager, schema_metadata_manager, + file_ref_manager, plugins, ); diff --git a/src/datanode/src/error.rs b/src/datanode/src/error.rs index a6d7600b31be..f40d6a5380b0 100644 --- a/src/datanode/src/error.rs +++ b/src/datanode/src/error.rs @@ -308,6 +308,14 @@ pub enum Error { location: Location, }, + #[snafu(display("Failed to run gc for mito engine"))] + GcMitoEngine { + region_id: RegionId, + source: mito2::error::Error, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Failed to build metric engine"))] BuildMetricEngine { source: metric_engine::error::Error, @@ -451,7 +459,7 @@ impl ErrorExt for Error { StopRegionEngine { source, .. } => source.status_code(), FindLogicalRegions { source, .. } => source.status_code(), - BuildMitoEngine { source, .. } => source.status_code(), + BuildMitoEngine { source, .. } | GcMitoEngine { source, .. } => source.status_code(), BuildMetricEngine { source, .. } => source.status_code(), ConcurrentQueryLimiterClosed { .. } | ConcurrentQueryLimiterTimeout { .. } => { StatusCode::RegionBusy diff --git a/src/datanode/src/heartbeat/handler.rs b/src/datanode/src/heartbeat/handler.rs index 9095545bcdad..7ec876edbd01 100644 --- a/src/datanode/src/heartbeat/handler.rs +++ b/src/datanode/src/heartbeat/handler.rs @@ -27,6 +27,7 @@ use store_api::storage::RegionId; mod close_region; mod downgrade_region; mod flush_region; +mod gc_regions; mod open_region; mod upgrade_region; @@ -40,6 +41,7 @@ pub struct RegionHeartbeatResponseHandler { catchup_tasks: TaskTracker<()>, downgrade_tasks: TaskTracker<()>, flush_tasks: TaskTracker<()>, + gc_tasks: TaskTracker<()>, } /// Handler of the instruction. @@ -52,6 +54,7 @@ pub struct HandlerContext { catchup_tasks: TaskTracker<()>, downgrade_tasks: TaskTracker<()>, flush_tasks: TaskTracker<()>, + gc_tasks: TaskTracker<()>, } impl HandlerContext { @@ -66,6 +69,7 @@ impl HandlerContext { catchup_tasks: TaskTracker::new(), downgrade_tasks: TaskTracker::new(), flush_tasks: TaskTracker::new(), + gc_tasks: TaskTracker::new(), } } } @@ -78,6 +82,7 @@ impl RegionHeartbeatResponseHandler { catchup_tasks: TaskTracker::new(), downgrade_tasks: TaskTracker::new(), flush_tasks: TaskTracker::new(), + gc_tasks: TaskTracker::new(), } } @@ -105,6 +110,14 @@ impl RegionHeartbeatResponseHandler { Instruction::FlushRegion(flush_region) => Ok(Box::new(move |handler_context| { handler_context.handle_flush_region_instruction(flush_region) })), + Instruction::GcRegions(gc_regions) => Ok(Box::new(move |handler_context| { + handler_context.handle_gc_regions_instruction(gc_regions) + })), + Instruction::CollectFileRefs(collect_file_refs) => { + Ok(Box::new(move |handler_context| { + handler_context.handle_collect_file_refs_instruction(collect_file_refs) + })) + } } } } @@ -120,6 +133,8 @@ impl HeartbeatResponseHandler for RegionHeartbeatResponseHandler { | Some((_, Instruction::UpgradeRegion { .. })) | Some((_, Instruction::FlushRegion { .. })) | Some((_, Instruction::FlushRegions { .. })) + | Some((_, Instruction::GcRegions { .. })) + | Some((_, Instruction::CollectFileRefs { .. })) ) } @@ -134,6 +149,7 @@ impl HeartbeatResponseHandler for RegionHeartbeatResponseHandler { let catchup_tasks = self.catchup_tasks.clone(); let downgrade_tasks = self.downgrade_tasks.clone(); let flush_tasks = self.flush_tasks.clone(); + let gc_tasks = self.gc_tasks.clone(); let handler = Self::build_handler(instruction)?; let _handle = common_runtime::spawn_global(async move { let reply = handler(HandlerContext { @@ -141,6 +157,7 @@ impl HeartbeatResponseHandler for RegionHeartbeatResponseHandler { catchup_tasks, downgrade_tasks, flush_tasks, + gc_tasks, }) .await; diff --git a/src/datanode/src/heartbeat/handler/gc_regions.rs b/src/datanode/src/heartbeat/handler/gc_regions.rs new file mode 100644 index 000000000000..60f0d4915212 --- /dev/null +++ b/src/datanode/src/heartbeat/handler/gc_regions.rs @@ -0,0 +1,194 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_meta::instruction::{CollectFileRefs, GcRegions, InstructionReply, SimpleReply}; +use common_telemetry::{info, warn}; +use futures::future::BoxFuture; +use mito2::engine::MitoEngine; +use mito2::gc::LocalGcWorker; +use snafu::{OptionExt, ResultExt}; +use store_api::storage::RegionId; + +use crate::error::{GcMitoEngineSnafu, RegionNotFoundSnafu, Result, UnexpectedSnafu}; +use crate::heartbeat::handler::HandlerContext; + +impl HandlerContext { + pub(crate) fn handle_gc_regions_instruction( + self, + gc_regions: GcRegions, + ) -> BoxFuture<'static, Option> { + Box::pin(async move { + let region_ids = gc_regions.region_ids.clone(); + info!("Received gc regions instruction: {:?}", region_ids); + let region_ids = gc_regions.region_ids; + let mut table_id = None; + let is_same_table = region_ids.windows(2).all(|w| { + let t1 = w[0].table_id(); + let t2 = w[1].table_id(); + if table_id.is_none() { + table_id = Some(t1); + } + t1 == t2 + }); + if !is_same_table { + return Some(InstructionReply::GcRegions(SimpleReply { + result: false, + error: Some(format!( + "Regions to GC should belong to the same table, found: {:?}", + region_ids + )), + })); + } + + let (region_id, gc_worker) = match self + .create_gc_worker(region_ids, gc_regions.ts_millis) + .await + { + Ok(worker) => worker, + Err(e) => { + return Some(InstructionReply::GcRegions(SimpleReply { + result: false, + error: Some(format!("Failed to create GC worker: {}", e)), + })); + } + }; + + let register_result = self + .gc_tasks + .try_register( + region_id, + Box::pin(async move { + info!("Starting gc worker for region {}", region_id); + gc_worker + .run() + .await + .context(GcMitoEngineSnafu { region_id })?; + info!("Gc worker for region {} finished", region_id); + Ok(()) + }), + ) + .await; + if register_result.is_busy() { + warn!("Another gc task is running for the region: {region_id}"); + } + let mut watcher = register_result.into_watcher(); + let result = self.gc_tasks.wait_until_finish(&mut watcher).await; + match result { + Ok(()) => Some(InstructionReply::GcRegions(SimpleReply { + result: true, + error: None, + })), + Err(err) => Some(InstructionReply::GcRegions(SimpleReply { + result: false, + error: Some(format!("{err:?}")), + })), + } + }) + } + + async fn create_gc_worker( + &self, + mut region_ids: Vec, + ref_ts_millis: i64, + ) -> Result<(RegionId, LocalGcWorker)> { + // always use the smallest region id on datanode as the target region id + region_ids.sort_by_key(|r| r.region_number()); + let (region_id, mito_engine) = self.find_engine_for_regions(®ion_ids)?; + + let mito_config = mito_engine.mito_config(); + let region = mito_engine + .find_region(region_id) + .context(RegionNotFoundSnafu { region_id })?; + let access_layer = region.access_layer(); + + let cache_manager = mito_engine.cache_manager(); + + let gc_worker = LocalGcWorker::try_new( + access_layer.clone(), + Some(cache_manager), + region_ids, + Default::default(), + mito_config.clone(), + ref_ts_millis, + ) + .await + .context(GcMitoEngineSnafu { region_id })?; + + Ok((region_id, gc_worker)) + } + + fn find_engine_for_regions(&self, region_ids: &[RegionId]) -> Result<(RegionId, MitoEngine)> { + for region_id in region_ids { + let engine = self.region_server.find_engine(*region_id)?; + let Some(engine) = engine else { + continue; + }; + let engine = engine + .as_any() + .downcast_ref::() + .context(UnexpectedSnafu { + violated: format!( + "Expected MitoEngine, found: {:?}", + std::any::type_name_of_val(engine.as_ref()) + ), + })? + .clone(); + return Ok((*region_id, engine)); + } + UnexpectedSnafu { + violated: format!( + "No MitoEngine found for regions on current datanode: {:?}", + region_ids + ), + } + .fail() + } + + pub fn handle_collect_file_refs_instruction( + self, + collect_file_refs: CollectFileRefs, + ) -> BoxFuture<'static, Option> { + Box::pin(async move { + match self + .trigger_file_refs_upload(collect_file_refs.region_id, collect_file_refs.ts_millis) + .await + { + Ok(()) => Some(InstructionReply::CollectFileRefs(SimpleReply { + result: true, + error: None, + })), + Err(e) => Some(InstructionReply::CollectFileRefs(SimpleReply { + result: false, + error: Some(format!( + "Failed to collect file refs for region {}: {}", + collect_file_refs.region_id, e + )), + })), + } + }) + } + + async fn trigger_file_refs_upload(&self, region_id: RegionId, now: i64) -> Result<()> { + let (region_id, mito_engine) = self.find_engine_for_regions(&[region_id])?; + + let file_ref_mgr = mito_engine.file_ref_manager(); + + file_ref_mgr + .upload_ref_file_for_table(region_id.table_id(), now) + .await + .with_context(|_| GcMitoEngineSnafu { region_id })?; + + Ok(()) + } +} diff --git a/src/meta-srv/src/gc_trigger.rs b/src/meta-srv/src/gc_trigger.rs new file mode 100644 index 000000000000..683864baee21 --- /dev/null +++ b/src/meta-srv/src/gc_trigger.rs @@ -0,0 +1,283 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// TODO(discord9): use it +#![allow(unused)] +use std::collections::HashMap; +use std::sync::Arc; +use std::time::Duration; + +use api::v1::meta::MailboxMessage; +use common_meta::instruction::{CollectFileRefs, GcRegions, Instruction}; +use common_meta::key::table_route::PhysicalTableRouteValue; +use common_meta::key::TableMetadataManagerRef; +use common_meta::peer::Peer; +use common_meta::rpc::router::RegionRoute; +use common_telemetry::{debug, error, info}; +use futures::stream::BoxStream; +use futures::TryStreamExt; +use snafu::{OptionExt as _, ResultExt}; +use store_api::storage::RegionId; +use table::metadata::TableId; +use tokio::sync::mpsc::{Receiver, Sender}; + +use crate::error::{ + self, RegionRouteNotFoundSnafu, Result, TableMetadataManagerSnafu, TableRouteNotFoundSnafu, + UnexpectedSnafu, +}; +use crate::service::mailbox::{Channel, MailboxReceiver, MailboxRef}; +use crate::{define_ticker, metrics}; + +/// The interval of the gc ticker. +const TICKER_INTERVAL: Duration = Duration::from_secs(60 * 5); + +/// [`Event`] represents various types of events that can be processed by the gc ticker. +/// +/// Variants: +/// - `Tick`: This event is used to trigger gc periodically. +pub(crate) enum Event { + Tick, +} + +pub(crate) type GcTickerRef = Arc; + +define_ticker!( + /// [GcTicker] is used to trigger gc periodically. + GcTicker, + event_type = Event, + event_value = Event::Tick +); + +/// [`GcTrigger`] is used to periodically trigger garbage collection on datanodes. +pub struct GcTrigger { + /// The metadata manager. + table_metadata_manager: TableMetadataManagerRef, + /// The mailbox to send messages. + mailbox: MailboxRef, + /// The server address. + server_addr: String, + /// The receiver of events. + receiver: Receiver, +} + +impl GcTrigger { + /// Creates a new [`GcTrigger`]. + pub(crate) fn new( + table_metadata_manager: TableMetadataManagerRef, + mailbox: MailboxRef, + server_addr: String, + ) -> (Self, GcTicker) { + let (tx, rx) = Self::channel(); + let gc_ticker = GcTicker::new(TICKER_INTERVAL, tx); + let gc_trigger = Self { + table_metadata_manager, + mailbox, + server_addr, + receiver: rx, + }; + (gc_trigger, gc_ticker) + } + + fn channel() -> (Sender, Receiver) { + tokio::sync::mpsc::channel(8) + } + + /// Starts the gc trigger. + pub fn try_start(mut self) -> Result<()> { + common_runtime::spawn_global(async move { self.run().await }); + info!("GC trigger started"); + Ok(()) + } + + async fn run(&mut self) { + // trigger a gc immediately after started + let _ = self.trigger_gc().await; + while let Some(event) = self.receiver.recv().await { + match event { + Event::Tick => { + info!("Received gc tick"); + self.handle_tick().await + } + } + } + } + + async fn handle_tick(&self) { + info!("Start to trigger gc"); + if let Err(e) = self.trigger_gc().await { + error!(e; "Failed to trigger gc"); + } + } + + /// Iterate through all physical tables and trigger gc for each table. + /// TODO(discord9): Poc impl, will parallelize later. + async fn trigger_gc(&self) -> Result<()> { + info!("Triggering gc"); + // TODO: trigger gc based on statistics, e.g. number of deleted files per table. + let mut tables: BoxStream< + 'static, + common_meta::error::Result<(TableId, PhysicalTableRouteValue)>, + > = self + .table_metadata_manager + .table_route_manager() + .physical_table_values(); + + while let Some((table_id, phy_table_val)) = tables + .as_mut() + .try_next() + .await + .context(TableMetadataManagerSnafu)? + { + info!("Triggering gc for table {}", table_id); + let phy_table_val: PhysicalTableRouteValue = phy_table_val; + let mut region_ids: Vec = phy_table_val + .region_routes + .iter() + .map(|r: &RegionRoute| r.region.id) + .collect::>(); + + region_ids.sort_by_key(|f| f.region_number()); + + // send instruction to first region id's datanode + let (first_region_id, first_region_peer) = phy_table_val + .region_routes + .first() + .and_then(|r| r.leader_peer.as_ref().map(|p| (r.region.id, p.clone()))) + .context({ TableRouteNotFoundSnafu { table_id } })?; + + let all_peers = phy_table_val + .region_routes + .iter() + .filter_map(|r| r.leader_peer.clone()) + .collect::>(); + + // only need to trigger gc for one region per datanode + let peers_to_region_ids: HashMap = phy_table_val + .region_routes + .iter() + .filter_map(|p| { + p.leader_peer + .as_ref() + .map(|peer| (peer.clone(), p.region.id)) + }) + .collect::>(); + + let now_millis = common_time::util::current_time_millis(); + + self.send_upload_ref_instructions(&peers_to_region_ids, now_millis) + .await?; + + self.send_gc_instruction(first_region_peer.clone(), region_ids.clone(), now_millis) + .await?; + info!( + "Sent gc instruction to datanode {} for table {} with regions {:?}", + first_region_peer, table_id, region_ids + ); + } + + Ok(()) + } + + /// Ask all the datanode that have at least one region of the table to upload table reference. + /// + /// If any datanode fails to reply the instruction within a timeout, then the entire gc operation + /// is considered failed. + async fn send_upload_ref_instructions( + &self, + peers_to_region_ids: &HashMap, + now_millis: i64, + ) -> Result<()> { + let mut wait_for_replies = Vec::with_capacity(peers_to_region_ids.len()); + for (peer, region_id) in peers_to_region_ids { + info!( + "Sending upload reference instruction to datanode {} for region {}", + peer, region_id + ); + let instruction = Instruction::CollectFileRefs(CollectFileRefs { + region_id: *region_id, + ts_millis: now_millis, + }); + let msg = MailboxMessage::json_message( + &format!("Upload table reference: {}", instruction), + &format!("Metasrv@{}", self.server_addr), + &format!("Datanode-{}@{}", peer.id, peer.addr), + common_time::util::current_time_millis(), + &instruction, + ) + .with_context(|_| error::SerializeToJsonSnafu { + input: instruction.to_string(), + })?; + + let mailbox_rx: MailboxReceiver = self + .mailbox + .send(&Channel::Datanode(peer.id), msg, Duration::from_secs(60)) + .await?; + + wait_for_replies.push((peer, mailbox_rx)); + } + + // wait for all replies + for (peer, mailbox_rx) in wait_for_replies { + match mailbox_rx.await { + Ok(msg) => continue, + Err(e) => { + error!(e; "Failed to receive upload reference reply from datanode {}", peer); + return Err(e); + } + } + } + + Ok(()) + } + + async fn send_gc_instruction( + &self, + peer: Peer, + region_ids: Vec, + now_millis: i64, + ) -> Result<()> { + info!( + "Sending gc instruction to datanode {} with regions {:?}", + peer, region_ids + ); + let instruction = Instruction::GcRegions(GcRegions { + region_ids, + ts_millis: now_millis, + }); + let msg = MailboxMessage::json_message( + &format!("GC regions: {}", instruction), + &format!("Metasrv@{}", self.server_addr), + &format!("Datanode-{}@{}", peer.id, peer.addr), + common_time::util::current_time_millis(), + &instruction, + ) + .with_context(|_| error::SerializeToJsonSnafu { + input: instruction.to_string(), + })?; + + if let Err(e) = self + .mailbox + .send_oneway(&Channel::Datanode(peer.id), msg) + .await + { + error!(e; "Failed to send gc instruction to datanode {}", peer); + } else { + info!("Successfully sent gc instruction to datanode {}", peer); + } + + metrics::METRIC_META_TRIGGERED_GC_TOTAL.inc(); + + Ok(()) + } +} diff --git a/src/meta-srv/src/lib.rs b/src/meta-srv/src/lib.rs index aef561e1dc3a..18030b773dea 100644 --- a/src/meta-srv/src/lib.rs +++ b/src/meta-srv/src/lib.rs @@ -26,6 +26,7 @@ pub mod error; pub mod events; mod failure_detector; pub mod flow_meta_alloc; +pub mod gc_trigger; pub mod handler; pub mod key; pub mod lease; diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index 3f44904d70f0..70f9f607c735 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -66,6 +66,7 @@ use crate::error::{ StartTelemetryTaskSnafu, StopProcedureManagerSnafu, }; use crate::failure_detector::PhiAccrualFailureDetectorOptions; +use crate::gc_trigger::GcTickerRef; use crate::handler::{HeartbeatHandlerGroupBuilder, HeartbeatHandlerGroupRef}; use crate::lease::lookup_datanode_peer; use crate::procedure::region_migration::manager::RegionMigrationManagerRef; @@ -496,6 +497,7 @@ pub struct Metasrv { topic_stats_registry: TopicStatsRegistryRef, wal_prune_ticker: Option, region_flush_ticker: Option, + gc_ticker: Option, table_id_sequence: SequenceRef, reconciliation_manager: ReconciliationManagerRef, @@ -558,6 +560,9 @@ impl Metasrv { if let Some(region_flush_trigger) = &self.region_flush_ticker { leadership_change_notifier.add_listener(region_flush_trigger.clone() as _); } + if let Some(gc_ticker) = &self.gc_ticker { + leadership_change_notifier.add_listener(gc_ticker.clone() as _); + } if let Some(customizer) = self.plugins.get::() { customizer.customize(&mut leadership_change_notifier); } diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index 449e86212079..507cfe9922ed 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -54,6 +54,7 @@ use crate::cluster::{MetaPeerClientBuilder, MetaPeerClientRef}; use crate::error::{self, BuildWalOptionsAllocatorSnafu, Result}; use crate::events::EventHandlerImpl; use crate::flow_meta_alloc::FlowPeerAllocator; +use crate::gc_trigger::GcTrigger; use crate::greptimedb_telemetry::get_greptimedb_telemetry_task; use crate::handler::failure_handler::RegionFailureHandler; use crate::handler::flow_state_handler::FlowStateHandler; @@ -431,6 +432,16 @@ impl MetasrvBuilder { None }; + let gc_ticker = { + let (gc_trigger, gc_ticker) = GcTrigger::new( + table_metadata_manager.clone(), + mailbox.clone(), + options.grpc.server_addr.clone(), + ); + gc_trigger.try_start()?; + Some(Arc::new(gc_ticker)) + }; + // remote WAL prune ticker and manager let wal_prune_ticker = if is_remote_wal && options.wal.enable_active_wal_pruning() { let (tx, rx) = WalPruneManager::channel(); @@ -558,6 +569,7 @@ impl MetasrvBuilder { leader_region_registry, wal_prune_ticker, region_flush_ticker, + gc_ticker, table_id_sequence, reconciliation_manager, topic_stats_registry, diff --git a/src/meta-srv/src/metrics.rs b/src/meta-srv/src/metrics.rs index 28be968a9fb9..cc45a1d63fe2 100644 --- a/src/meta-srv/src/metrics.rs +++ b/src/meta-srv/src/metrics.rs @@ -90,4 +90,7 @@ lazy_static! { /// The topic estimated replay size. pub static ref METRIC_META_TOPIC_ESTIMATED_REPLAY_SIZE: IntGaugeVec = register_int_gauge_vec!("meta_topic_estimated_replay_size", "meta topic estimated replay size", &["topic_name"]).unwrap(); + /// The triggered gc total counter. + pub static ref METRIC_META_TRIGGERED_GC_TOTAL: IntCounter = + register_int_counter!("meta_triggered_gc_total", "meta triggered gc total").unwrap(); } diff --git a/src/mito2/src/access_layer.rs b/src/mito2/src/access_layer.rs index 941b0268ca18..445a738fc387 100644 --- a/src/mito2/src/access_layer.rs +++ b/src/mito2/src/access_layer.rs @@ -36,7 +36,7 @@ use crate::error::{CleanDirSnafu, DeleteIndexSnafu, DeleteSstSnafu, OpenDalSnafu use crate::metrics::{COMPACTION_STAGE_ELAPSED, FLUSH_ELAPSED}; use crate::read::Source; use crate::region::options::IndexOptions; -use crate::sst::file::{FileHandle, FileId, FileMeta, RegionFileId}; +use crate::sst::file::{FileHandle, FileId, RegionFileId}; use crate::sst::index::intermediate::IntermediateManager; use crate::sst::index::puffin_manager::PuffinManagerFactory; use crate::sst::index::IndexerBuilderImpl; @@ -190,21 +190,21 @@ impl AccessLayer { } /// Deletes a SST file (and its index file if it has one) with given file id. - pub(crate) async fn delete_sst(&self, file_meta: &FileMeta) -> Result<()> { - let path = location::sst_file_path(&self.table_dir, file_meta.file_id(), self.path_type); + pub(crate) async fn delete_sst(&self, region_file_id: &RegionFileId) -> Result<()> { + let path = location::sst_file_path(&self.table_dir, *region_file_id, self.path_type); self.object_store .delete(&path) .await .context(DeleteSstSnafu { - file_id: file_meta.file_id, + file_id: region_file_id.file_id(), })?; - let path = location::index_file_path(&self.table_dir, file_meta.file_id(), self.path_type); + let path = location::index_file_path(&self.table_dir, *region_file_id, self.path_type); self.object_store .delete(&path) .await .context(DeleteIndexSnafu { - file_id: file_meta.file_id, + file_id: region_file_id.file_id(), })?; Ok(()) diff --git a/src/mito2/src/compaction/compactor.rs b/src/mito2/src/compaction/compactor.rs index ec36e0a93d8a..dce75c8d1ed8 100644 --- a/src/mito2/src/compaction/compactor.rs +++ b/src/mito2/src/compaction/compactor.rs @@ -46,7 +46,7 @@ use crate::region::version::VersionRef; use crate::region::{ManifestContext, RegionLeaderState, RegionRoleState}; use crate::schedule::scheduler::LocalScheduler; use crate::sst::file::FileMeta; -use crate::sst::file_purger::LocalFilePurger; +use crate::sst::file_purger::{FileReferenceManager, LocalFilePurger}; use crate::sst::index::intermediate::IntermediateManager; use crate::sst::index::puffin_manager::PuffinManagerFactory; use crate::sst::location::region_dir_from_table_dir; @@ -196,6 +196,7 @@ pub async fn open_compaction_region( purge_scheduler.clone(), access_layer.clone(), None, + Arc::new(FileReferenceManager::new(0)), )) }; diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index febd2be45ea4..aac84db0e629 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -97,7 +97,7 @@ use store_api::storage::{RegionId, ScanRequest, SequenceNumber}; use store_api::ManifestVersion; use tokio::sync::{oneshot, Semaphore}; -use crate::cache::CacheStrategy; +use crate::cache::{CacheManagerRef, CacheStrategy}; use crate::config::MitoConfig; use crate::error::{ InvalidRequestSnafu, JoinSnafu, MitoManifestInfoSnafu, RecvSnafu, RegionNotFoundSnafu, Result, @@ -113,6 +113,7 @@ use crate::read::stream::ScanBatchStream; use crate::region::MitoRegionRef; use crate::request::{RegionEditRequest, WorkerRequest}; use crate::sst::file::FileMeta; +use crate::sst::file_purger::FileReferenceManagerRef; use crate::wal::entry_distributor::{ build_wal_entry_distributor_and_receivers, DEFAULT_ENTRY_RECEIVER_BUFFER_SIZE, }; @@ -127,6 +128,7 @@ pub struct MitoEngineBuilder<'a, S: LogStore> { log_store: Arc, object_store_manager: ObjectStoreManagerRef, schema_metadata_manager: SchemaMetadataManagerRef, + file_ref_manager: FileReferenceManagerRef, plugins: Plugins, #[cfg(feature = "enterprise")] extension_range_provider_factory: Option, @@ -139,6 +141,7 @@ impl<'a, S: LogStore> MitoEngineBuilder<'a, S> { log_store: Arc, object_store_manager: ObjectStoreManagerRef, schema_metadata_manager: SchemaMetadataManagerRef, + file_ref_manager: FileReferenceManagerRef, plugins: Plugins, ) -> Self { Self { @@ -147,6 +150,7 @@ impl<'a, S: LogStore> MitoEngineBuilder<'a, S> { log_store, object_store_manager, schema_metadata_manager, + file_ref_manager, plugins, #[cfg(feature = "enterprise")] extension_range_provider_factory: None, @@ -174,6 +178,7 @@ impl<'a, S: LogStore> MitoEngineBuilder<'a, S> { self.log_store.clone(), self.object_store_manager, self.schema_metadata_manager, + self.file_ref_manager, self.plugins, ) .await?; @@ -210,6 +215,7 @@ impl MitoEngine { log_store: Arc, object_store_manager: ObjectStoreManagerRef, schema_metadata_manager: SchemaMetadataManagerRef, + file_ref_manager: FileReferenceManagerRef, plugins: Plugins, ) -> Result { let builder = MitoEngineBuilder::new( @@ -218,11 +224,24 @@ impl MitoEngine { log_store, object_store_manager, schema_metadata_manager, + file_ref_manager, plugins, ); builder.try_build().await } + pub fn mito_config(&self) -> &MitoConfig { + &self.inner.config + } + + pub fn cache_manager(&self) -> CacheManagerRef { + self.inner.workers.cache_manager() + } + + pub fn file_ref_manager(&self) -> FileReferenceManagerRef { + self.inner.workers.file_ref_manager() + } + /// Returns true if the specific region exists. pub fn is_region_exists(&self, region_id: RegionId) -> bool { self.inner.workers.is_region_exists(region_id) @@ -319,7 +338,7 @@ impl MitoEngine { self.find_region(id) } - fn find_region(&self, region_id: RegionId) -> Option { + pub fn find_region(&self, region_id: RegionId) -> Option { self.inner.workers.get_region(region_id) } @@ -924,6 +943,7 @@ impl MitoEngine { listener: Option, time_provider: crate::time_provider::TimeProviderRef, schema_metadata_manager: SchemaMetadataManagerRef, + file_ref_manager: FileReferenceManagerRef, ) -> Result { config.sanitize(data_home)?; @@ -938,6 +958,7 @@ impl MitoEngine { write_buffer_manager, listener, schema_metadata_manager, + file_ref_manager, time_provider, ) .await?, diff --git a/src/mito2/src/gc.rs b/src/mito2/src/gc.rs new file mode 100644 index 000000000000..eaeca751aabc --- /dev/null +++ b/src/mito2/src/gc.rs @@ -0,0 +1,631 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! GC worker which periodically checks and removes unused/obsolete SST files. +//! +//! `expel time`: the time when the file is considered as removed, as in removed from the manifest. +//! `lingering time`: the time duration before deleting files after they are removed from manifest. +//! `delta manifest`: the manifest files after the last checkpoint that contains the changes to the manifest. +//! `delete time`: the time when the file is actually deleted from the object store. +//! `unknown files`: files that are not recorded in the manifest, usually due to saved checkpoint which remove actions before the checkpoint. +//! + +use std::collections::{BTreeMap, HashMap, HashSet}; +use std::time::Duration; + +use common_telemetry::{error, info, warn}; +use common_time::Timestamp; +use object_store::Entry; +use serde::{Deserialize, Serialize}; +use snafu::{ensure, OptionExt, ResultExt as _}; +use store_api::storage::{RegionId, TableId}; +use tokio_stream::StreamExt; + +use crate::access_layer::AccessLayerRef; +use crate::cache::file_cache::{FileType, IndexKey}; +use crate::cache::CacheManagerRef; +use crate::config::MitoConfig; +use crate::error::{ + DurationOutOfRangeSnafu, EmptyRegionDirSnafu, JoinSnafu, OpenDalSnafu, RegionNotFoundSnafu, + Result, UnexpectedSnafu, +}; +use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions, RemoveFileOptions}; +use crate::manifest::storage::manifest_compress_type; +use crate::metrics::{GC_FILE_CNT_PER_TABLE, GC_TMP_REF_FILE_CNT_PER_TABLE}; +use crate::region::opener::new_manifest_dir; +use crate::sst::file::{FileId, RegionFileId}; +use crate::sst::file_purger::read_all_ref_files_for_table; +use crate::sst::location::{self, region_dir_from_table_dir}; + +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +pub struct FileGcOption { + /// Lingering time before deleting files. + /// Should be long enough to allow long running queries to finish. + /// + /// TODO(discord9): long running queries should actively write tmp manifest files + /// to prevent deletion of files they are using. + #[serde(with = "humantime_serde")] + pub lingering_time: Duration, + /// Lingering time before deleting unknown files(files with undetermine expel time). + /// expel time is the time when the file is considered as removed, as in removed from the manifest. + /// This should only occur rarely, as manifest keep tracks in `removed_files` field + /// unless something goes wrong. + #[serde(with = "humantime_serde")] + pub unknown_file_lingering_time: Duration, + /// Maximum concurrent list operations per GC job. + /// This is used to limit the number of concurrent listing operations and speed up listing. + pub max_concurrent_per_gc_job: usize, +} + +impl Default for FileGcOption { + fn default() -> Self { + Self { + // expect long running queries to be finished within a reasonable time + lingering_time: Duration::from_secs(60 * 15), + // 6 hours, for unknown expel time, which is when this file get removed from manifest, it should rarely happen, can keep it longer + unknown_file_lingering_time: Duration::from_secs(60 * 60 * 6), + max_concurrent_per_gc_job: 32, + } + } +} + +pub struct LocalGcWorker { + pub(crate) table_id: TableId, + pub(crate) access_layer: AccessLayerRef, + pub(crate) cache_manager: Option, + pub(crate) manifest_mgrs: HashMap, + /// Lingering time before deleting files. + pub(crate) opt: FileGcOption, + pub(crate) mito_config: MitoConfig, + /// Expected file ref manifest upload time, in milliseconds since UNIX_EPOCH. + /// Also accept file ref manifests that are uploaded later than this time. + pub(crate) ref_ts_millis: i64, +} + +impl LocalGcWorker { + /// Create a new LocalGcWorker, with `regions_to_gc` regions to GC. + /// The regions are specified by their `RegionId` and should all belong to the same table. + /// + pub async fn try_new( + access_layer: AccessLayerRef, + cache_manager: Option, + regions_to_gc: Vec, + opt: FileGcOption, + mito_config: MitoConfig, + ref_ts_millis: i64, + ) -> Result { + let table_id = regions_to_gc + .first() + .context(UnexpectedSnafu { + reason: "Expect at least one region, found none", + })? + .table_id(); + let mut zelf = Self { + table_id, + access_layer, + cache_manager, + manifest_mgrs: HashMap::new(), + opt, + mito_config, + ref_ts_millis, + }; + + for region_id in regions_to_gc { + ensure!( + region_id.table_id() == table_id, + UnexpectedSnafu { + reason: format!( + "All regions should belong to the same table, found region {} and table {}", + region_id, table_id + ), + } + ); + let mgr = zelf.open_mgr_for(region_id).await?; + zelf.manifest_mgrs.insert(region_id, mgr); + } + + Ok(zelf) + } + + pub async fn read_tmp_ref_files(&self) -> Result> { + let table_ref_manifest = read_all_ref_files_for_table(&self.access_layer).await?; + let latest_manifest = table_ref_manifest + .into_iter() + .filter(|(node_id, refs)| { + if refs.ts >= self.ref_ts_millis { + true + } else { + warn!( + "Skip ref manifest from node {} with ts {}, expect ts >= {}", + node_id, refs.ts, self.ref_ts_millis + ); + + false + } + }) + .collect::>(); + + let all_ref_files = latest_manifest + .into_iter() + .flat_map(|(_, refs)| refs.file_refs.into_iter()) + .map(|r| r.file_id) + .collect::>(); + + Ok(all_ref_files) + } + + /// Run the GC worker in serial mode, + /// considering list files could be slow and run multiple regions in parallel + /// may cause too many concurrent listing operations. + /// + /// TODO(discord9): consider instead running in parallel mode + pub async fn run(self) -> Result<()> { + info!("LocalGcWorker started"); + + let tmp_ref_files = self.read_tmp_ref_files().await?; + for region_id in self.manifest_mgrs.keys() { + info!("Doing gc for region {}", region_id); + self.do_region_gc(*region_id, &tmp_ref_files).await?; + info!("Gc for region {} finished", region_id); + } + info!("LocalGcWorker finished"); + Ok(()) + } +} + +impl LocalGcWorker { + /// concurrency of listing files per region. + /// This is used to limit the number of concurrent listing operations and speed up listing + pub const CONCURRENCY_LIST_PER_FILES: usize = 512; + + /// Perform GC for the region. + /// 1. Get all the removed files in delta manifest files and their expel times + /// 2. List all files in the region dir concurrently + /// 3. Filter out the files that are still in use or may still be kept for a while + /// 4. Delete the unused files + /// + /// Note that the files that are still in use or may still be kept for a while are not deleted + /// to avoid deleting files that are still needed. + pub async fn do_region_gc( + &self, + region_id: RegionId, + tmp_ref_files: &HashSet, + ) -> Result<()> { + info!("Doing gc for region {}", region_id); + // TODO(discord9): impl gc worker + let manifest = self + .manifest_mgrs + .get(®ion_id) + .context(RegionNotFoundSnafu { region_id })? + .manifest(); + let region_id = manifest.metadata.region_id; + let current_files = &manifest.files; + + let recently_removed_files = self.get_removed_files_expel_times(region_id).await?; + + if recently_removed_files.is_empty() { + // no files to remove, skip + info!("No recently removed files to gc for region {}", region_id); + return Ok(()); + } + + info!( + "Found {} recently removed files sets for region {}", + recently_removed_files.len(), + region_id + ); + + let concurrency = (current_files.len() / Self::CONCURRENCY_LIST_PER_FILES) + .max(1) + .min(self.opt.max_concurrent_per_gc_job); + + let in_used = current_files + .keys() + .cloned() + .chain(tmp_ref_files.clone().into_iter()) + .collect(); + + let true_tmp_ref_files = tmp_ref_files + .iter() + .filter(|f| !current_files.contains_key(f)) + .collect::>(); + + info!("True tmp ref files: {:?}", true_tmp_ref_files); + + GC_TMP_REF_FILE_CNT_PER_TABLE + .with_label_values(&[&self.table_id.to_string()]) + .set(true_tmp_ref_files.len() as i64); + + let unused_files = self + .list_to_be_deleted_files(region_id, in_used, recently_removed_files, concurrency) + .await?; + + let unused_len = unused_files.len(); + + info!( + "Found {} unused files to delete for region {}", + unused_len, region_id + ); + + self.delete_files(region_id, &unused_files).await?; + + info!( + "Successfully deleted {} unused files for region {}", + unused_len, region_id + ); + + Ok(()) + } + + async fn delete_files(&self, region_id: RegionId, file_ids: &[FileId]) -> Result<()> { + // Remove meta of the file from cache. + if let Some(cache) = &self.cache_manager { + for file_id in file_ids { + cache.remove_parquet_meta_data(RegionFileId::new(region_id, *file_id)); + } + } + let mut deleted_files = Vec::with_capacity(file_ids.len()); + + for file_id in file_ids { + let region_file_id = RegionFileId::new(region_id, *file_id); + match self.access_layer.delete_sst(®ion_file_id).await { + Ok(_) => { + deleted_files.push(*file_id); + } + Err(e) => { + error!(e; "Failed to delete sst and index file for {}", region_file_id); + } + } + } + + info!( + "Deleted {} files for region {}: {:?}", + deleted_files.len(), + region_id, + deleted_files + ); + + for file_id in file_ids { + let region_file_id = RegionFileId::new(region_id, *file_id); + + if let Some(write_cache) = self + .cache_manager + .as_ref() + .and_then(|cache| cache.write_cache()) + { + // Removes index file from the cache. + + write_cache + .remove(IndexKey::new(region_id, *file_id, FileType::Puffin)) + .await; + + // Remove the SST file from the cache. + write_cache + .remove(IndexKey::new(region_id, *file_id, FileType::Parquet)) + .await; + } + + // Purges index content in the stager. + if let Err(e) = self + .access_layer + .puffin_manager_factory() + .purge_stager(region_file_id) + .await + { + error!(e; "Failed to purge stager with index file, file_id: {}, region: {}", + file_id, region_id); + } + } + + GC_FILE_CNT_PER_TABLE + .with_label_values(&[&self.table_id.to_string()]) + .add(file_ids.len() as i64); + + Ok(()) + } + + /// Get the manifest manager for the region. + async fn open_mgr_for(&self, region_id: RegionId) -> Result { + let table_dir = self.access_layer.table_dir(); + let path_type = self.access_layer.path_type(); + let mito_config = &self.mito_config; + + let region_manifest_options = RegionManifestOptions { + manifest_dir: new_manifest_dir(®ion_dir_from_table_dir( + table_dir, region_id, path_type, + )), + object_store: self.access_layer.object_store().clone(), + compress_type: manifest_compress_type(mito_config.compress_manifest), + checkpoint_distance: mito_config.manifest_checkpoint_distance, + remove_file_options: RemoveFileOptions { + keep_count: mito_config.experimental_manifest_keep_removed_file_count, + keep_ttl: mito_config.experimental_manifest_keep_removed_file_ttl, + }, + }; + + RegionManifestManager::open( + region_manifest_options, + Default::default(), + Default::default(), + ) + .await? + .context(EmptyRegionDirSnafu { + region_id, + region_dir: ®ion_dir_from_table_dir(table_dir, region_id, path_type), + }) + } + + /// Get all the removed files in delta manifest files and their expel times. + /// The expel time is the time when the file is considered as removed. + /// Which is the last modified time of delta manifest which contains the remove action. + /// + pub async fn get_removed_files_expel_times( + &self, + region_id: RegionId, + ) -> Result>> { + let region_manifest = self + .manifest_mgrs + .get(®ion_id) + .context(RegionNotFoundSnafu { region_id })? + .manifest(); + + let mut ret = BTreeMap::new(); + for files in ®ion_manifest.removed_files.removed_files { + let expel_time = Timestamp::new_millisecond(files.removed_at); + let set = ret.entry(expel_time).or_insert_with(HashSet::new); + set.extend(files.file_ids.iter().cloned()); + } + + Ok(ret) + } + + /// Concurrently list unused files in the region dir + /// because there may be a lot of files in the region dir + /// and listing them may take a long time. + pub async fn list_to_be_deleted_files( + &self, + region_id: RegionId, + in_used: HashSet, + recently_removed_files: BTreeMap>, + concurrency: usize, + ) -> Result> { + let now = chrono::Utc::now(); + let may_linger_until = now + - chrono::Duration::from_std(self.opt.lingering_time).with_context(|_| { + DurationOutOfRangeSnafu { + input: self.opt.lingering_time, + } + })?; + + let unknown_file_may_linger_until = now + - chrono::Duration::from_std(self.opt.unknown_file_lingering_time).with_context( + |_| DurationOutOfRangeSnafu { + input: self.opt.unknown_file_lingering_time, + }, + )?; + + // files that may linger, which means they are not in use but may still be kept for a while + let may_linger_filenames = recently_removed_files + .iter() + .filter_map(|(ts, files)| { + if *ts < Timestamp::new_millisecond(may_linger_until.timestamp_millis()) { + // if the expel time is before the may linger time, we can delete it + return None; + } + + Some(files) + }) + .flatten() + .collect::>(); + + let all_files_appear_in_delta_manifests = recently_removed_files + .values() + .flatten() + .collect::>(); + + // in use filenames, include sst and index files + let in_use_filenames = in_used.iter().collect::>(); + + let region_dir = self.access_layer.build_region_dir(region_id); + + let partitions = gen_partition_from_concurrency(concurrency); + let bounds = vec![None] + .into_iter() + .chain(partitions.iter().map(|p| Some(p.clone()))) + .chain(vec![None]) + .collect::>(); + let mut listers = vec![]; + for part in bounds.windows(2) { + let start = part[0].clone(); + let end = part[1].clone(); + let mut lister = self.access_layer.object_store().lister_with(®ion_dir); + if let Some(s) = start { + lister = lister.start_after(&s); + } + + let lister = lister.await.context(OpenDalSnafu)?; + + listers.push((lister, end)); + } + + let (tx, mut rx) = tokio::sync::mpsc::channel(1024); + let mut handles = vec![]; + for (lister, end) in listers { + let tx = tx.clone(); + let handle = tokio::spawn(async move { + let stream = lister.take_while(|e: &std::result::Result| match e { + Ok(e) => { + if let Some(end) = &end { + // reach end, stop listing + e.name() < end.as_str() + } else { + // no end, take all entries + true + } + } + // entry went wrong, log and skip it + Err(err) => { + warn!("Failed to list entry: {}", err); + true + } + }); + let stream = stream + .filter(|e| { + if let Ok(e) = &e { + // notice that we only care about files, skip dirs + e.metadata().is_file() + } else { + // error entry, take for further logging + true + } + }) + .collect::>() + .await; + // ordering of files doesn't matter here, so we can send them directly + tx.send(stream).await.expect("Failed to send entries"); + }); + + handles.push(handle); + } + // Wait for all listers to finish + for handle in handles { + handle.await.context(JoinSnafu)?; + } + + drop(tx); // Close the channel to stop receiving + + // Collect all entries from the channel + let mut all_unused_files_ready_for_delete = vec![]; + let mut all_in_exist_linger_files = HashSet::new(); + while let Some(stream) = rx.recv().await { + all_unused_files_ready_for_delete.extend( + stream.into_iter().filter_map(Result::ok).filter_map(|e| { + let file_id = match location::parse_file_id_from_path(e.name()) { + Ok(file_id) => file_id, + Err(err) => { + error!(err; "Failed to parse file id from path: {}", e.name()); + // if we can't parse the file id, it means it's not a sst or index file + // shouldn't delete it because we don't know what it is + return None; + } + }; + if may_linger_filenames.contains(&file_id) { + all_in_exist_linger_files.insert(file_id); + } + + let should_delete = !in_use_filenames.contains(&file_id) + && !may_linger_filenames.contains(&file_id) + && { + if !all_files_appear_in_delta_manifests.contains(&file_id) { + // if the file's expel time is unknown(because not appear in delta manifest), we keep it for a while + // using it's last modified time + // notice unknown files use a different lingering time + e.metadata() + .last_modified() + .map(|t| t < unknown_file_may_linger_until) + .unwrap_or(false) + } else { + // if the file did appear in manifest delta(and passes previous predicate), we can delete it immediately + true + } + }; + if should_delete { + Some(file_id) + } else { + None + } + }), + ); + } + + info!("All in exist linger files: {:?}", all_in_exist_linger_files); + + Ok(all_unused_files_ready_for_delete) + } +} + +/// Generate partition prefixes based on concurrency and +/// assume file names are evenly-distributed uuid string, +/// to evenly distribute files across partitions. +/// For example, if concurrency is 2, partition prefixes will be: +/// ["8"] so it divide uuids into two partitions based on the first character. +/// If concurrency is 32, partition prefixes will be: +/// ["08", "10", "18", "20", "28", "30", "38" ..., "f0", "f8"] +/// if concurrency is 1, it returns an empty vector. +/// +fn gen_partition_from_concurrency(concurrency: usize) -> Vec { + let n = concurrency.next_power_of_two(); + if n <= 1 { + return vec![]; + } + + // `d` is the number of hex characters required to build the partition key. + // `p` is the total number of possible values for a key of length `d`. + // We need to find the smallest `d` such that 16^d >= n. + let mut d = 0; + let mut p: u128 = 1; + while p < n as u128 { + p *= 16; + d += 1; + } + + let total_space = p; + let step = total_space / n as u128; + + (1..n) + .map(|i| { + let boundary = i as u128 * step; + format!("{:0width$x}", boundary, width = d) + }) + .collect() +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_gen_partition_from_concurrency() { + let partitions = gen_partition_from_concurrency(1); + assert!(partitions.is_empty()); + + let partitions = gen_partition_from_concurrency(2); + assert_eq!(partitions, vec!["8"]); + + let partitions = gen_partition_from_concurrency(3); + assert_eq!(partitions, vec!["4", "8", "c"]); + + let partitions = gen_partition_from_concurrency(4); + assert_eq!(partitions, vec!["4", "8", "c"]); + + let partitions = gen_partition_from_concurrency(8); + assert_eq!(partitions, vec!["2", "4", "6", "8", "a", "c", "e"]); + + let partitions = gen_partition_from_concurrency(16); + assert_eq!( + partitions, + vec!["1", "2", "3", "4", "5", "6", "7", "8", "9", "a", "b", "c", "d", "e", "f"] + ); + + let partitions = gen_partition_from_concurrency(32); + assert_eq!( + partitions, + [ + "08", "10", "18", "20", "28", "30", "38", "40", "48", "50", "58", "60", "68", "70", + "78", "80", "88", "90", "98", "a0", "a8", "b0", "b8", "c0", "c8", "d0", "d8", "e0", + "e8", "f0", "f8", + ] + ); + } +} diff --git a/src/mito2/src/lib.rs b/src/mito2/src/lib.rs index ad4045c86e5d..69b0dc9996d0 100644 --- a/src/mito2/src/lib.rs +++ b/src/mito2/src/lib.rs @@ -36,6 +36,7 @@ pub mod error; #[cfg(feature = "enterprise")] pub mod extension; pub mod flush; +pub mod gc; pub mod manifest; pub mod memtable; mod metrics; diff --git a/src/mito2/src/metrics.rs b/src/mito2/src/metrics.rs index 39ae53f5abd1..db903748c6d4 100644 --- a/src/mito2/src/metrics.rs +++ b/src/mito2/src/metrics.rs @@ -451,6 +451,22 @@ lazy_static! { exponential_buckets(0.001, 10.0, 8).unwrap(), ) .unwrap(); + + /// Counter for the number of files that's tmp ref by at least one datanode. + pub static ref GC_TMP_REF_FILE_CNT_PER_TABLE: IntGaugeVec = + register_int_gauge_vec!( + "greptime_mito_gc_tmp_ref_file_count_per_table", + "mito gc tmp ref file count per table", + &["table_id"], + ).unwrap(); + + /// Counter for the number of files deleted by the GC worker. + pub static ref GC_FILE_CNT_PER_TABLE: IntGaugeVec = + register_int_gauge_vec!( + "greptime_mito_gc_file_count_per_table", + "mito gc deleted file count per table", + &["table_id"], + ).unwrap(); } /// Stager notifier to collect metrics. diff --git a/src/mito2/src/region.rs b/src/mito2/src/region.rs index e6dd142f0b97..8fc5f482e704 100644 --- a/src/mito2/src/region.rs +++ b/src/mito2/src/region.rs @@ -204,6 +204,10 @@ impl MitoRegion { self.access_layer.table_dir() } + pub fn access_layer(&self) -> &AccessLayerRef { + &self.access_layer + } + /// Returns whether the region is writable. pub(crate) fn is_writable(&self) -> bool { matches!( diff --git a/src/mito2/src/region/opener.rs b/src/mito2/src/region/opener.rs index e0c343994fe8..daa0fd37b392 100644 --- a/src/mito2/src/region/opener.rs +++ b/src/mito2/src/region/opener.rs @@ -60,7 +60,7 @@ use crate::region::{ use crate::region_write_ctx::RegionWriteCtx; use crate::request::OptionOutputTx; use crate::schedule::scheduler::SchedulerRef; -use crate::sst::file_purger::LocalFilePurger; +use crate::sst::file_purger::{FileReferenceManagerRef, LocalFilePurger}; use crate::sst::index::intermediate::IntermediateManager; use crate::sst::index::puffin_manager::PuffinManagerFactory; use crate::sst::location::region_dir_from_table_dir; @@ -86,6 +86,7 @@ pub(crate) struct RegionOpener { stats: ManifestStats, wal_entry_reader: Option>, replay_checkpoint: Option, + file_ref_manager: FileReferenceManagerRef, } impl RegionOpener { @@ -102,6 +103,7 @@ impl RegionOpener { puffin_manager_factory: PuffinManagerFactory, intermediate_manager: IntermediateManager, time_provider: TimeProviderRef, + file_ref_manager: FileReferenceManagerRef, ) -> RegionOpener { RegionOpener { region_id, @@ -120,6 +122,7 @@ impl RegionOpener { stats: Default::default(), wal_entry_reader: None, replay_checkpoint: None, + file_ref_manager, } } @@ -288,6 +291,7 @@ impl RegionOpener { self.purge_scheduler, access_layer, self.cache_manager, + self.file_ref_manager, )), provider, last_flush_millis: AtomicI64::new(now), @@ -410,6 +414,7 @@ impl RegionOpener { self.purge_scheduler.clone(), access_layer.clone(), self.cache_manager.clone(), + self.file_ref_manager.clone(), )); let memtable_builder = self.memtable_builder_provider.builder_for_options( region_options.memtable.as_ref(), diff --git a/src/mito2/src/sst/file.rs b/src/mito2/src/sst/file.rs index 83881b6fa4a6..c9a3462392e5 100644 --- a/src/mito2/src/sst/file.rs +++ b/src/mito2/src/sst/file.rs @@ -333,16 +333,16 @@ struct FileHandleInner { impl Drop for FileHandleInner { fn drop(&mut self) { - if self.deleted.load(Ordering::Relaxed) { - self.file_purger.send_request(PurgeRequest { - file_meta: self.meta.clone(), - }); - } + self.file_purger.send_request(PurgeRequest { + file_meta: self.meta.clone(), + deleted: self.deleted.load(Ordering::Relaxed), + }); } } impl FileHandleInner { fn new(meta: FileMeta, file_purger: FilePurgerRef) -> FileHandleInner { + file_purger.add_new_file(&meta); FileHandleInner { meta, compacting: AtomicBool::new(false), diff --git a/src/mito2/src/sst/file_purger.rs b/src/mito2/src/sst/file_purger.rs index 130264f68a17..dbdd10735130 100644 --- a/src/mito2/src/sst/file_purger.rs +++ b/src/mito2/src/sst/file_purger.rs @@ -12,28 +12,40 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::{HashMap, HashSet}; use std::fmt; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use common_telemetry::{error, info}; +use object_store::EntryMode; +use serde::{Deserialize, Serialize}; +use snafu::ResultExt; +use store_api::region_request::PathType; +use store_api::storage::{RegionId, TableId}; use crate::access_layer::AccessLayerRef; use crate::cache::file_cache::{FileType, IndexKey}; use crate::cache::CacheManagerRef; +use crate::error::{OpenDalSnafu, Result, SerdeJsonSnafu}; use crate::schedule::scheduler::SchedulerRef; -use crate::sst::file::FileMeta; +use crate::sst::file::{FileId, FileMeta}; /// Request to remove a file. #[derive(Debug)] pub struct PurgeRequest { /// File meta. pub file_meta: FileMeta, + pub deleted: bool, } /// A worker to delete files in background. pub trait FilePurger: Send + Sync + fmt::Debug { /// Send a purge request to the background worker. fn send_request(&self, request: PurgeRequest); + + fn add_new_file(&self, _: &FileMeta) { + // noop + } } pub type FilePurgerRef = Arc; @@ -53,6 +65,11 @@ pub struct LocalFilePurger { scheduler: SchedulerRef, sst_layer: AccessLayerRef, cache_manager: Option, + file_ref_manager: FileReferenceManagerRef, + /// Whether the underlying object store is local filesystem. + /// if it is, we can delete the file directly. + /// Otherwise, we should inform the global file ref manager to delete the file. + is_local_fs: bool, } impl fmt::Debug for LocalFilePurger { @@ -63,24 +80,30 @@ impl fmt::Debug for LocalFilePurger { } } +pub fn is_local_fs(sst_layer: &AccessLayerRef) -> bool { + sst_layer.object_store().info().scheme() == object_store::Scheme::Fs +} + impl LocalFilePurger { /// Creates a new purger. pub fn new( scheduler: SchedulerRef, sst_layer: AccessLayerRef, cache_manager: Option, + file_ref_manager: FileReferenceManagerRef, ) -> Self { + let is_local_fs = is_local_fs(&sst_layer); Self { scheduler, sst_layer, cache_manager, + file_ref_manager, + is_local_fs, } } -} -impl FilePurger for LocalFilePurger { - fn send_request(&self, request: PurgeRequest) { - let file_meta = request.file_meta; + /// Deletes the file(and it's index, if any) from cache and storage. + fn delete_file(&self, file_meta: FileMeta) { let sst_layer = self.sst_layer.clone(); // Remove meta of the file from cache. @@ -90,7 +113,7 @@ impl FilePurger for LocalFilePurger { let cache_manager = self.cache_manager.clone(); if let Err(e) = self.scheduler.schedule(Box::pin(async move { - if let Err(e) = sst_layer.delete_sst(&file_meta).await { + if let Err(e) = sst_layer.delete_sst(&file_meta.file_id()).await { error!(e; "Failed to delete SST file, file_id: {}, region: {}", file_meta.file_id, file_meta.region_id); } else { @@ -137,6 +160,285 @@ impl FilePurger for LocalFilePurger { } } +impl FilePurger for LocalFilePurger { + fn send_request(&self, request: PurgeRequest) { + if self.is_local_fs { + if request.deleted { + self.delete_file(request.file_meta); + } + } else { + // if not on local file system, instead inform the global file purger to remove the file reference. + // notice that no matter whether the file is deleted or not, we need to remove the reference + // because the file is no longer in use nonetheless. + self.file_ref_manager.remove_file(&request.file_meta); + } + } + + fn add_new_file(&self, file_meta: &FileMeta) { + if self.is_local_fs { + // If the access layer is local file system, we don't need to track the file reference. + return; + } + self.file_ref_manager + .add_file(file_meta, self.sst_layer.clone()); + } +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] +pub struct FileRef { + pub region_id: RegionId, + pub file_id: FileId, +} + +impl FileRef { + pub fn new(region_id: RegionId, file_id: FileId) -> Self { + Self { region_id, file_id } + } +} + +/// File references for a table. +/// It contains all files referenced by the table. +#[derive(Debug, Clone)] +pub struct TableFileRefs { + /// (FileRef, Ref Count) meaning how many FileHandleInner is opened for this file. + pub files: HashMap, + /// Access layer per table. Will be used to upload tmp file for recording references + /// to the object storage. + pub access_layer: AccessLayerRef, +} + +pub const PURGER_REFS_PATH: &str = ".purger_refs"; + +/// Returns the path of the tmp ref file for given table id and datanode id. +pub fn ref_file_path(table_dir: &str, node_id: u64, path_type: PathType) -> String { + let path_type_postfix = match path_type { + PathType::Bare => "", + PathType::Data => ".data", + PathType::Metadata => ".metadata", + }; + format!( + "{}/{}/{:020}{}.refs", + table_dir, PURGER_REFS_PATH, node_id, path_type_postfix + ) +} + +pub fn ref_path_to_node_id_path_type(path: &str) -> Option<(u64, PathType)> { + let parts: Vec<&str> = path.rsplitn(2, '/').collect(); + if parts.len() != 2 { + return None; + } + let file_name = parts[0]; + let segments: Vec<&str> = file_name.split('.').collect(); + if segments.len() < 2 { + return None; + } + let node_id_str = segments[0]; + let path_type = if segments.len() == 2 { + PathType::Bare + } else { + match segments[1] { + "data" => PathType::Data, + "metadata" => PathType::Metadata, + _ => return None, + } + }; + let node_id = node_id_str.parse::().ok()?; + Some((node_id, path_type)) +} + +/// Returns the directory path to store all purger ref files. +pub fn ref_dir(table_dir: &str) -> String { + object_store::util::join_dir(table_dir, PURGER_REFS_PATH) +} + +/// Manages all file references in one datanode. +/// It keeps track of which files are referenced and group by table ids. +/// And periodically update the references to tmp file in object storage. +/// This is useful for ensuring that files are not deleted while they are still in use by any +/// query. +#[derive(Debug, Default)] +pub struct FileReferenceManager { + /// Datanode id. used to determine tmp ref file name. + node_id: u64, + /// TODO(discord9): use no hash hasher since table id is sequential. + files_per_table: Mutex>, +} + +pub type FileReferenceManagerRef = Arc; + +/// The tmp file uploaded to object storage to record one table's file references. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct TableFileRefsManifest { + pub file_refs: HashSet, + /// Unix timestamp in milliseconds sent by metasrv to indicate when the manifest is created. Used for gc worker to make sure it gets the latest version of the manifest. + pub ts: i64, +} + +/// Reads all ref files for a table from the given access layer. +/// Returns a list of `TableFileRefsManifest`. +pub async fn read_all_ref_files_for_table( + access_layer: &AccessLayerRef, +) -> Result> { + let ref_dir = ref_dir(access_layer.table_dir()); + // list everything under the ref dir. And filter out by path type in `..refs` postfix. + let entries = access_layer + .object_store() + .list(&ref_dir) + .await + .context(OpenDalSnafu)?; + let mut manifests = Vec::new(); + + for entry in entries { + if entry.metadata().mode() != EntryMode::FILE { + continue; + } + let Some((node_id, path_type)) = ref_path_to_node_id_path_type(entry.path()) else { + continue; + }; + if path_type != access_layer.path_type() { + continue; + } + + // read file and parse as `TableFileRefsManifest`. + let buf = access_layer + .object_store() + .read(entry.path()) + .await + .context(OpenDalSnafu)? + .to_bytes(); + + let manifest: TableFileRefsManifest = + serde_json::from_slice(&buf).context(SerdeJsonSnafu)?; + manifests.push((node_id, manifest)); + } + Ok(manifests) +} + +impl FileReferenceManager { + pub fn new(node_id: u64) -> Self { + Self { + node_id, + files_per_table: Default::default(), + } + } + + fn ref_file_manifest( + &self, + table_id: TableId, + now: i64, + ) -> Option<(TableFileRefsManifest, AccessLayerRef)> { + let file_refs = self + .files_per_table + .lock() + .unwrap() + .get(&table_id) + .cloned()?; + + if file_refs.files.is_empty() { + return None; + } + + let ref_manifest = TableFileRefsManifest { + file_refs: file_refs.files.keys().cloned().collect(), + ts: now, + }; + let access_layer = &file_refs.access_layer; + + info!( + "Preparing to upload ref file for table {}, node {}, path_type {:?}, {} files: {:?}", + table_id, + self.node_id, + access_layer.path_type(), + ref_manifest.file_refs.len(), + ref_manifest.file_refs, + ); + + Some((ref_manifest, access_layer.clone())) + } + + pub async fn upload_ref_file_for_table(&self, table_id: TableId, now: i64) -> Result<()> { + let Some((ref_manifest, access_layer)) = self.ref_file_manifest(table_id, now) else { + return Ok(()); + }; + + let path = ref_file_path( + access_layer.table_dir(), + self.node_id, + access_layer.path_type(), + ); + + let content = serde_json::to_string(&ref_manifest).context(SerdeJsonSnafu)?; + + if let Err(e) = access_layer.object_store().write(&path, content).await { + error!(e; "Failed to upload ref file to {}, table {}", path, table_id); + return Err(e).context(OpenDalSnafu); + } else { + info!( + "Successfully uploaded ref files with {} refs to {}, table {}", + ref_manifest.file_refs.len(), + path, + table_id + ); + } + + Ok(()) + } + + /// Adds a new file reference. + /// Also records the access layer for the table if not exists. + /// The access layer will be used to upload ref file to object storage. + pub fn add_file(&self, file_meta: &FileMeta, table_access_layer: AccessLayerRef) { + let table_id = file_meta.region_id.table_id(); + { + let mut guard = self.files_per_table.lock().unwrap(); + let file_ref = FileRef::new(file_meta.region_id, file_meta.file_id); + guard + .entry(table_id) + .and_modify(|refs| { + refs.files + .entry(file_ref.clone()) + .and_modify(|count| *count += 1) + .or_insert(1); + }) + .or_insert_with(|| TableFileRefs { + files: HashMap::from_iter([(file_ref, 1)]), + access_layer: table_access_layer.clone(), + }); + } + } + + pub fn remove_file(&self, file_meta: &FileMeta) { + let table_id = file_meta.region_id.table_id(); + let file_ref = FileRef::new(file_meta.region_id, file_meta.file_id); + let mut guard = self.files_per_table.lock().unwrap(); + guard.entry(table_id).and_modify(|refs| { + refs.files.entry(file_ref.clone()).and_modify(|count| { + if *count > 0 { + *count -= 1; + } + }); + }); + + // if the ref count is 0, meaning no existing FileHandleInner, remove the file ref from table. + if guard + .get(&table_id) + .map(|table_refs| table_refs.files.get(&file_ref)) + == Some(Some(&0)) + { + if let Some(table_refs) = guard.get_mut(&table_id) { + table_refs.files.remove(&file_ref); + } + } + + // if no more files for the table, remove the table entry. + if let Some(refs) = guard.get(&table_id) { + if refs.files.is_empty() { + guard.remove(&table_id); + } + } + } +} + #[cfg(test)] mod tests { use std::num::NonZeroU64; @@ -188,7 +490,12 @@ mod tests { let scheduler = Arc::new(LocalScheduler::new(3)); - let file_purger = Arc::new(LocalFilePurger::new(scheduler.clone(), layer, None)); + let file_purger = Arc::new(LocalFilePurger::new( + scheduler.clone(), + layer, + None, + Arc::new(Default::default()), + )); { let handle = FileHandle::new( @@ -253,7 +560,12 @@ mod tests { let scheduler = Arc::new(LocalScheduler::new(3)); - let file_purger = Arc::new(LocalFilePurger::new(scheduler.clone(), layer, None)); + let file_purger = Arc::new(LocalFilePurger::new( + scheduler.clone(), + layer, + None, + Arc::new(Default::default()), + )); { let handle = FileHandle::new( @@ -280,4 +592,126 @@ mod tests { assert!(!object_store.exists(&path).await.unwrap()); assert!(!object_store.exists(&index_path).await.unwrap()); } + + #[tokio::test] + async fn test_file_ref_mgr() { + common_telemetry::init_default_ut_logging(); + + let dir = create_temp_dir("file_ref_mgr"); + let dir_path = dir.path().display().to_string(); + let builder = Fs::default().root(&dir_path); + let sst_file_id = RegionFileId::new(RegionId::new(0, 0), FileId::random()); + let sst_dir = "table1"; + + let index_aux_path = dir.path().join("index_aux"); + let puffin_mgr = PuffinManagerFactory::new(&index_aux_path, 4096, None, None) + .await + .unwrap(); + let intm_mgr = IntermediateManager::init_fs(index_aux_path.to_str().unwrap()) + .await + .unwrap(); + + let object_store = ObjectStore::new(builder).unwrap().finish(); + + let layer = Arc::new(AccessLayer::new( + sst_dir, + PathType::Bare, + object_store.clone(), + puffin_mgr, + intm_mgr, + )); + + let file_ref_mgr = FileReferenceManager::new(0); + + let file_meta = FileMeta { + region_id: sst_file_id.region_id(), + file_id: sst_file_id.file_id(), + time_range: FileTimeRange::default(), + level: 0, + file_size: 4096, + available_indexes: SmallVec::from_iter([IndexType::InvertedIndex]), + index_file_size: 4096, + num_rows: 1024, + num_row_groups: 1, + sequence: NonZeroU64::new(4096), + }; + + file_ref_mgr.add_file(&file_meta, layer.clone()); + + assert_eq!( + file_ref_mgr + .files_per_table + .lock() + .unwrap() + .get(&0) + .unwrap() + .files, + HashMap::from_iter([(FileRef::new(file_meta.region_id, file_meta.file_id), 1)]) + ); + + file_ref_mgr.add_file(&file_meta, layer.clone()); + + let expected_table_ref_manifest = TableFileRefsManifest { + file_refs: HashSet::from_iter([FileRef::new(file_meta.region_id, file_meta.file_id)]), + ts: 0, + }; + + assert_eq!( + file_ref_mgr.ref_file_manifest(0, 0).unwrap().0, + expected_table_ref_manifest + ); + + assert_eq!( + file_ref_mgr + .files_per_table + .lock() + .unwrap() + .get(&0) + .unwrap() + .files, + HashMap::from_iter([(FileRef::new(file_meta.region_id, file_meta.file_id), 2)]) + ); + + assert_eq!( + file_ref_mgr.ref_file_manifest(0, 0).unwrap().0, + expected_table_ref_manifest + ); + + file_ref_mgr.remove_file(&file_meta); + + assert_eq!( + file_ref_mgr + .files_per_table + .lock() + .unwrap() + .get(&0) + .unwrap() + .files, + HashMap::from_iter([(FileRef::new(file_meta.region_id, file_meta.file_id), 1)]) + ); + + assert_eq!( + file_ref_mgr.ref_file_manifest(0, 0).unwrap().0, + expected_table_ref_manifest + ); + + file_ref_mgr.remove_file(&file_meta); + + assert!( + file_ref_mgr + .files_per_table + .lock() + .unwrap() + .get(&0) + .is_none(), + "{:?}", + file_ref_mgr.files_per_table.lock().unwrap() + ); + + assert!( + file_ref_mgr.ref_file_manifest(0, 0).is_none(), + "{:?}", + file_ref_mgr.files_per_table.lock().unwrap() + ); + } } diff --git a/src/mito2/src/sst/location.rs b/src/mito2/src/sst/location.rs index f1b4e052a623..54f8a943c15e 100644 --- a/src/mito2/src/sst/location.rs +++ b/src/mito2/src/sst/location.rs @@ -13,12 +13,14 @@ // limitations under the License. use object_store::util; +use snafu::OptionExt; use store_api::metric_engine_consts::{DATA_REGION_SUBDIR, METADATA_REGION_SUBDIR}; use store_api::path_utils::region_name; use store_api::region_request::PathType; use store_api::storage::RegionId; -use crate::sst::file::RegionFileId; +use crate::error::UnexpectedSnafu; +use crate::sst::file::{FileId, RegionFileId}; /// Generate region dir from table_dir, region_id and path_type pub fn region_dir_from_table_dir( @@ -54,6 +56,33 @@ pub fn index_file_path( util::join_path(&index_dir, &format!("{}.puffin", region_file_id.file_id())) } +/// Get RegionFileId from sst or index filename +pub fn parse_file_id_from_path(filepath: &str) -> crate::error::Result { + let filename = filepath.rsplit('/').next().context(UnexpectedSnafu { + reason: format!("invalid file path: {}", filepath), + })?; + let parts: Vec<&str> = filename.split('.').collect(); + if parts.len() != 2 { + return UnexpectedSnafu { + reason: format!("invalid file name: {}", filename), + } + .fail(); + } + if parts[1] != "parquet" && parts[1] != "puffin" { + return UnexpectedSnafu { + reason: format!("invalid file extension: {}", parts[1]), + } + .fail(); + } + let file_id = parts[0]; + FileId::parse_str(file_id).map_err(|e| { + UnexpectedSnafu { + reason: format!("invalid file id: {}, err: {}", file_id, e), + } + .build() + }) +} + #[cfg(test)] mod tests { use store_api::storage::RegionId; @@ -77,6 +106,12 @@ mod tests { sst_file_path("table_dir", region_file_id, PathType::Metadata), format!("table_dir/1_0000000002/metadata/{}.parquet", file_id) ); + + for path_type in &[PathType::Bare, PathType::Data, PathType::Metadata] { + let path = sst_file_path("table_dir", region_file_id, *path_type); + let parsed_file_id = parse_file_id_from_path(&path).unwrap(); + assert_eq!(parsed_file_id, file_id); + } } #[test] diff --git a/src/mito2/src/test_util.rs b/src/mito2/src/test_util.rs index e6325b513e1a..70aec82dd24d 100644 --- a/src/mito2/src/test_util.rs +++ b/src/mito2/src/test_util.rs @@ -73,7 +73,7 @@ use crate::error::Result; use crate::flush::{WriteBufferManager, WriteBufferManagerRef}; use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions}; use crate::read::{Batch, BatchBuilder, BatchReader}; -use crate::sst::file_purger::{FilePurgerRef, NoopFilePurger}; +use crate::sst::file_purger::{FilePurgerRef, FileReferenceManagerRef, NoopFilePurger}; use crate::sst::index::intermediate::IntermediateManager; use crate::sst::index::puffin_manager::PuffinManagerFactory; use crate::time_provider::{StdTimeProvider, TimeProviderRef}; @@ -209,6 +209,7 @@ pub struct TestEnv { log_store_factory: LogStoreFactory, object_store_manager: Option, schema_metadata_manager: SchemaMetadataManagerRef, + file_ref_manager: FileReferenceManagerRef, kv_backend: KvBackendRef, } @@ -243,6 +244,7 @@ impl TestEnv { log_store_factory: LogStoreFactory::RaftEngine(RaftEngineLogStoreFactory), object_store_manager: None, schema_metadata_manager, + file_ref_manager: Arc::new(Default::default()), kv_backend, } } @@ -280,6 +282,7 @@ impl TestEnv { log_store, zelf.object_store_manager.as_ref().unwrap().clone(), zelf.schema_metadata_manager.clone(), + zelf.file_ref_manager.clone(), Plugins::new(), ) .await @@ -333,6 +336,7 @@ impl TestEnv { listener, Arc::new(StdTimeProvider), self.schema_metadata_manager.clone(), + self.file_ref_manager.clone(), ) .await .unwrap(), @@ -345,6 +349,7 @@ impl TestEnv { listener, Arc::new(StdTimeProvider), self.schema_metadata_manager.clone(), + self.file_ref_manager.clone(), ) .await .unwrap(), @@ -388,6 +393,7 @@ impl TestEnv { listener, Arc::new(StdTimeProvider), self.schema_metadata_manager.clone(), + self.file_ref_manager.clone(), ) .await .unwrap(), @@ -400,6 +406,7 @@ impl TestEnv { listener, Arc::new(StdTimeProvider), self.schema_metadata_manager.clone(), + self.file_ref_manager.clone(), ) .await .unwrap(), @@ -432,6 +439,7 @@ impl TestEnv { listener, time_provider.clone(), self.schema_metadata_manager.clone(), + self.file_ref_manager.clone(), ) .await .unwrap(), @@ -444,6 +452,7 @@ impl TestEnv { listener, time_provider.clone(), self.schema_metadata_manager.clone(), + self.file_ref_manager.clone(), ) .await .unwrap(), @@ -480,6 +489,7 @@ impl TestEnv { log_store, Arc::new(object_store_manager), self.schema_metadata_manager.clone(), + self.file_ref_manager.clone(), Plugins::new(), ) .await @@ -489,6 +499,7 @@ impl TestEnv { log_store, Arc::new(object_store_manager), self.schema_metadata_manager.clone(), + self.file_ref_manager.clone(), Plugins::new(), ) .await diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs index 835fabac976e..09be4389259c 100644 --- a/src/mito2/src/worker.rs +++ b/src/mito2/src/worker.rs @@ -66,6 +66,7 @@ use crate::request::{ }; use crate::schedule::scheduler::{LocalScheduler, SchedulerRef}; use crate::sst::file::FileId; +use crate::sst::file_purger::FileReferenceManagerRef; use crate::sst::index::intermediate::IntermediateManager; use crate::sst::index::puffin_manager::PuffinManagerFactory; use crate::time_provider::{StdTimeProvider, TimeProviderRef}; @@ -130,6 +131,8 @@ pub(crate) struct WorkerGroup { purge_scheduler: SchedulerRef, /// Cache. cache_manager: CacheManagerRef, + /// File reference manager. + file_ref_manager: FileReferenceManagerRef, } impl WorkerGroup { @@ -141,6 +144,7 @@ impl WorkerGroup { log_store: Arc, object_store_manager: ObjectStoreManagerRef, schema_metadata_manager: SchemaMetadataManagerRef, + file_ref_manager: FileReferenceManagerRef, plugins: Plugins, ) -> Result { let (flush_sender, flush_receiver) = watch::channel(()); @@ -204,6 +208,7 @@ impl WorkerGroup { flush_receiver: flush_receiver.clone(), plugins: plugins.clone(), schema_metadata_manager: schema_metadata_manager.clone(), + file_ref_manager: file_ref_manager.clone(), } .start() }) @@ -215,6 +220,7 @@ impl WorkerGroup { compact_job_pool, purge_scheduler, cache_manager, + file_ref_manager, }) } @@ -266,6 +272,10 @@ impl WorkerGroup { self.cache_manager.clone() } + pub(crate) fn file_ref_manager(&self) -> FileReferenceManagerRef { + self.file_ref_manager.clone() + } + /// Get worker for specific `region_id`. pub(crate) fn worker(&self, region_id: RegionId) -> &RegionWorker { let index = region_id_to_index(region_id, self.workers.len()); @@ -286,6 +296,7 @@ impl WorkerGroup { /// Starts a worker group with `write_buffer_manager` and `listener` for tests. /// /// The number of workers should be power of two. + #[allow(clippy::too_many_arguments)] pub(crate) async fn start_for_test( config: Arc, log_store: Arc, @@ -293,6 +304,7 @@ impl WorkerGroup { write_buffer_manager: Option, listener: Option, schema_metadata_manager: SchemaMetadataManagerRef, + file_ref_manager: FileReferenceManagerRef, time_provider: TimeProviderRef, ) -> Result { let (flush_sender, flush_receiver) = watch::channel(()); @@ -350,6 +362,7 @@ impl WorkerGroup { flush_receiver: flush_receiver.clone(), plugins: Plugins::new(), schema_metadata_manager: schema_metadata_manager.clone(), + file_ref_manager: file_ref_manager.clone(), } .start() }) @@ -361,6 +374,7 @@ impl WorkerGroup { compact_job_pool, purge_scheduler, cache_manager, + file_ref_manager, }) } @@ -428,6 +442,7 @@ struct WorkerStarter { flush_receiver: watch::Receiver<()>, plugins: Plugins, schema_metadata_manager: SchemaMetadataManagerRef, + file_ref_manager: FileReferenceManagerRef, } impl WorkerStarter { @@ -480,6 +495,7 @@ impl WorkerStarter { request_wait_time: REQUEST_WAIT_TIME.with_label_values(&[&id_string]), region_edit_queues: RegionEditQueues::default(), schema_metadata_manager: self.schema_metadata_manager, + file_ref_manager: self.file_ref_manager.clone(), }; let handle = common_runtime::spawn_global(async move { worker_thread.run().await; @@ -729,6 +745,8 @@ struct RegionWorkerLoop { region_edit_queues: RegionEditQueues, /// Database level metadata manager. schema_metadata_manager: SchemaMetadataManagerRef, + /// Datanode level file references manager. + file_ref_manager: FileReferenceManagerRef, } impl RegionWorkerLoop { diff --git a/src/mito2/src/worker/handle_catchup.rs b/src/mito2/src/worker/handle_catchup.rs index 44c5f811eff7..646ce7897f17 100644 --- a/src/mito2/src/worker/handle_catchup.rs +++ b/src/mito2/src/worker/handle_catchup.rs @@ -151,6 +151,7 @@ impl RegionWorkerLoop { self.puffin_manager_factory.clone(), self.intermediate_manager.clone(), self.time_provider.clone(), + self.file_ref_manager.clone(), ) .cache(Some(self.cache_manager.clone())) .options(region.version().options.clone())? diff --git a/src/mito2/src/worker/handle_create.rs b/src/mito2/src/worker/handle_create.rs index b7c945bed3ff..166a005e9396 100644 --- a/src/mito2/src/worker/handle_create.rs +++ b/src/mito2/src/worker/handle_create.rs @@ -66,6 +66,7 @@ impl RegionWorkerLoop { self.puffin_manager_factory.clone(), self.intermediate_manager.clone(), self.time_provider.clone(), + self.file_ref_manager.clone(), ) .metadata_builder(builder) .parse_options(request.options)? diff --git a/src/mito2/src/worker/handle_open.rs b/src/mito2/src/worker/handle_open.rs index 23148c36b101..9c98a29317e9 100644 --- a/src/mito2/src/worker/handle_open.rs +++ b/src/mito2/src/worker/handle_open.rs @@ -103,6 +103,7 @@ impl RegionWorkerLoop { self.puffin_manager_factory.clone(), self.intermediate_manager.clone(), self.time_provider.clone(), + self.file_ref_manager.clone(), ) .skip_wal_replay(request.skip_wal_replay) .cache(Some(self.cache_manager.clone())) diff --git a/src/object-store/src/lib.rs b/src/object-store/src/lib.rs index 1e15d1437b74..dd37e2116ffd 100644 --- a/src/object-store/src/lib.rs +++ b/src/object-store/src/lib.rs @@ -16,7 +16,7 @@ pub use opendal::raw::{Access, HttpClient}; pub use opendal::{ services, Buffer, Builder as ObjectStoreBuilder, Entry, EntryMode, Error, ErrorKind, FuturesAsyncReader, FuturesAsyncWriter, Lister, Operator as ObjectStore, Reader, Result, - Writer, + Scheme, Writer, }; pub mod config;