diff --git a/Cargo.lock b/Cargo.lock index 929bb77b2334..b494f49d348d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6119,6 +6119,7 @@ dependencies = [ "serde", "serde_json", "snafu 0.8.6", + "store-api", "tantivy", "tantivy-jieba", "tempfile", @@ -7624,6 +7625,7 @@ dependencies = [ "dotenv", "either", "futures", + "greptime-proto", "humantime-serde", "index", "itertools 0.14.0", diff --git a/src/index/Cargo.toml b/src/index/Cargo.toml index 971118d8350f..bde6959b8912 100644 --- a/src/index/Cargo.toml +++ b/src/index/Cargo.toml @@ -34,6 +34,7 @@ roaring = "0.10" serde.workspace = true serde_json.workspace = true snafu.workspace = true +store-api.workspace = true tantivy = { version = "0.24", features = ["zstd-compression"] } tantivy-jieba = "0.16" tokio.workspace = true diff --git a/src/index/src/fulltext_index.rs b/src/index/src/fulltext_index.rs index 4cbbbdf477fd..8de28c049019 100644 --- a/src/index/src/fulltext_index.rs +++ b/src/index/src/fulltext_index.rs @@ -75,3 +75,12 @@ impl Config { Ok(Self::default()) } } + +impl Analyzer { + pub fn to_str(&self) -> &'static str { + match self { + Analyzer::English => "English", + Analyzer::Chinese => "Chinese", + } + } +} diff --git a/src/index/src/lib.rs b/src/index/src/lib.rs index 8c88a8d8007e..547f880bb45d 100644 --- a/src/index/src/lib.rs +++ b/src/index/src/lib.rs @@ -21,6 +21,7 @@ pub mod error; pub mod external_provider; pub mod fulltext_index; pub mod inverted_index; +pub mod target; pub type Bytes = Vec; pub type BytesRef<'a> = &'a [u8]; diff --git a/src/index/src/target.rs b/src/index/src/target.rs new file mode 100644 index 000000000000..cca0a2819232 --- /dev/null +++ b/src/index/src/target.rs @@ -0,0 +1,107 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; +use std::fmt::{self, Display}; + +use common_error::ext::ErrorExt; +use common_error::status_code::StatusCode; +use common_macro::stack_trace_debug; +use serde::{Deserialize, Serialize}; +use snafu::{Snafu, ensure}; +use store_api::storage::ColumnId; + +/// Describes an index target. Column ids are the only supported variant for now. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub enum IndexTarget { + ColumnId(ColumnId), +} + +impl Display for IndexTarget { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + IndexTarget::ColumnId(id) => write!(f, "{}", id), + } + } +} + +impl IndexTarget { + /// Parse a target key string back into an index target description. + pub fn decode(key: &str) -> Result { + validate_column_key(key)?; + let id = key + .parse::() + .map_err(|_| InvalidColumnIdSnafu { value: key }.build())?; + Ok(IndexTarget::ColumnId(id)) + } +} + +/// Errors that can occur when working with index target keys. +#[derive(Snafu, Clone, PartialEq, Eq)] +#[stack_trace_debug] +pub enum TargetKeyError { + #[snafu(display("target key cannot be empty"))] + Empty, + + #[snafu(display("target key must contain digits only: {key}"))] + InvalidCharacters { key: String }, + + #[snafu(display("failed to parse column id from '{value}'"))] + InvalidColumnId { value: String }, +} + +impl ErrorExt for TargetKeyError { + fn status_code(&self) -> StatusCode { + StatusCode::InvalidArguments + } + + fn as_any(&self) -> &dyn Any { + self + } +} + +fn validate_column_key(key: &str) -> Result<(), TargetKeyError> { + ensure!(!key.is_empty(), EmptySnafu); + ensure!( + key.chars().all(|ch| ch.is_ascii_digit()), + InvalidCharactersSnafu { key } + ); + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn encode_decode_column() { + let target = IndexTarget::ColumnId(42); + let key = format!("{}", target); + assert_eq!(key, "42"); + let decoded = IndexTarget::decode(&key).unwrap(); + assert_eq!(decoded, target); + } + + #[test] + fn decode_rejects_empty() { + let err = IndexTarget::decode("").unwrap_err(); + assert!(matches!(err, TargetKeyError::Empty)); + } + + #[test] + fn decode_rejects_invalid_digits() { + let err = IndexTarget::decode("1a2").unwrap_err(); + assert!(matches!(err, TargetKeyError::InvalidCharacters { .. })); + } +} diff --git a/src/mito2/Cargo.toml b/src/mito2/Cargo.toml index 380913e25f07..0f138f241a0c 100644 --- a/src/mito2/Cargo.toml +++ b/src/mito2/Cargo.toml @@ -49,6 +49,7 @@ futures.workspace = true humantime-serde.workspace = true index.workspace = true itertools.workspace = true +greptime-proto.workspace = true lazy_static = "1.4" log-store = { workspace = true } mito-codec.workspace = true diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index c10c398681b6..d48f75fa91a8 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -67,6 +67,8 @@ mod sync_test; #[cfg(test)] mod truncate_test; +mod puffin_index; + use std::any::Any; use std::collections::HashMap; use std::sync::Arc; @@ -78,7 +80,7 @@ use common_base::Plugins; use common_error::ext::BoxedError; use common_meta::key::SchemaMetadataManagerRef; use common_recordbatch::SendableRecordBatchStream; -use common_telemetry::{info, tracing}; +use common_telemetry::{info, tracing, warn}; use common_wal::options::{WAL_OPTIONS_KEY, WalOptions}; use futures::future::{join_all, try_join_all}; use futures::stream::{self, Stream, StreamExt}; @@ -97,12 +99,14 @@ use store_api::region_engine::{ RegionStatistic, SetRegionRoleStateResponse, SettableRegionRoleState, SyncManifestResponse, }; use store_api::region_request::{AffectedRows, RegionOpenRequest, RegionRequest}; -use store_api::sst_entry::{ManifestSstEntry, StorageSstEntry}; -use store_api::storage::{RegionId, ScanRequest, SequenceNumber}; +use store_api::sst_entry::{ManifestSstEntry, PuffinIndexMetaEntry, StorageSstEntry}; +use store_api::storage::{FileId, RegionId, ScanRequest, SequenceNumber}; use tokio::sync::{Semaphore, oneshot}; +use crate::access_layer::RegionFilePathFactory; use crate::cache::{CacheManagerRef, CacheStrategy}; use crate::config::MitoConfig; +use crate::engine::puffin_index::{IndexEntryContext, collect_index_entries_from_puffin}; use crate::error::{ InvalidRequestSnafu, JoinSnafu, MitoManifestInfoSnafu, RecvSnafu, RegionNotFoundSnafu, Result, SerdeJsonSnafu, SerializeColumnMetadataSnafu, @@ -117,7 +121,7 @@ use crate::read::stream::ScanBatchStream; use crate::region::MitoRegionRef; use crate::region::opener::PartitionExprFetcherRef; use crate::request::{RegionEditRequest, WorkerRequest}; -use crate::sst::file::FileMeta; +use crate::sst::file::{FileMeta, RegionFileId}; use crate::sst::file_ref::FileReferenceManagerRef; use crate::wal::entry_distributor::{ DEFAULT_ENTRY_RECEIVER_BUFFER_SIZE, build_wal_entry_distributor_and_receivers, @@ -434,6 +438,89 @@ impl MitoEngine { results } + /// Lists metadata about all puffin index targets stored in the engine. + pub async fn all_index_metas(&self) -> Vec { + let node_id = self.inner.workers.file_ref_manager().node_id(); + let cache_manager = self.inner.workers.cache_manager(); + let puffin_metadata_cache = cache_manager.puffin_metadata_cache().cloned(); + let bloom_filter_cache = cache_manager.bloom_filter_index_cache().cloned(); + let inverted_index_cache = cache_manager.inverted_index_cache().cloned(); + + let mut results = Vec::new(); + + for region in self.inner.workers.all_regions() { + let manifest_entries = region.manifest_sst_entries().await; + let access_layer = region.access_layer.clone(); + let table_dir = access_layer.table_dir().to_string(); + let path_type = access_layer.path_type(); + let object_store = access_layer.object_store().clone(); + let puffin_factory = access_layer.puffin_manager_factory().clone(); + let path_factory = RegionFilePathFactory::new(table_dir, path_type); + + let entry_futures = manifest_entries.into_iter().map(|entry| { + let object_store = object_store.clone(); + let path_factory = path_factory.clone(); + let puffin_factory = puffin_factory.clone(); + let puffin_metadata_cache = puffin_metadata_cache.clone(); + let bloom_filter_cache = bloom_filter_cache.clone(); + let inverted_index_cache = inverted_index_cache.clone(); + + async move { + let Some(index_file_path) = entry.index_file_path.as_ref() else { + return Vec::new(); + }; + + let file_id = match FileId::parse_str(&entry.file_id) { + Ok(file_id) => file_id, + Err(err) => { + warn!( + err; + "Failed to parse puffin index file id, table_dir: {}, file_id: {}", + entry.table_dir, + entry.file_id + ); + return Vec::new(); + } + }; + + let region_file_id = RegionFileId::new(entry.region_id, file_id); + let context = IndexEntryContext { + table_dir: &entry.table_dir, + index_file_path: index_file_path.as_str(), + region_id: entry.region_id, + table_id: entry.table_id, + region_number: entry.region_number, + region_group: entry.region_group, + region_sequence: entry.region_sequence, + file_id: &entry.file_id, + index_file_size: entry.index_file_size, + node_id, + }; + + let manager = puffin_factory + .build(object_store, path_factory) + .with_puffin_metadata_cache(puffin_metadata_cache); + + collect_index_entries_from_puffin( + manager, + region_file_id, + context, + bloom_filter_cache, + inverted_index_cache, + ) + .await + } + }); + + let mut meta_stream = stream::iter(entry_futures).buffer_unordered(8); // Parallelism is 8. + while let Some(mut metas) = meta_stream.next().await { + results.append(&mut metas); + } + } + + results + } + /// Lists all SSTs from the storage layer of all regions in the engine. pub fn all_ssts_from_storage(&self) -> impl Stream> { let node_id = self.inner.workers.file_ref_manager().node_id(); diff --git a/src/mito2/src/engine/basic_test.rs b/src/mito2/src/engine/basic_test.rs index 2a5d0fbf87df..1993cc0d35db 100644 --- a/src/mito2/src/engine/basic_test.rs +++ b/src/mito2/src/engine/basic_test.rs @@ -819,3 +819,150 @@ StorageSstEntry { file_path: "test/22_0000000042/.parquet", file_size: StorageSstEntry { file_path: "test/22_0000000042/index/.puffin", file_size: None, last_modified_ms: None, node_id: None }"# ); } + +#[tokio::test] +async fn test_all_index_metas_list_all_types() { + use datatypes::schema::{ + FulltextAnalyzer, FulltextBackend, FulltextOptions, SkippingIndexOptions, SkippingIndexType, + }; + + let mut env = TestEnv::new().await; + let engine = env.create_engine(MitoConfig::default()).await; + + // One region with both fulltext backends and inverted index enabled, plus bloom skipping index + let region_id = RegionId::new(11, 1); + + let mut request = CreateRequestBuilder::new().tag_num(3).field_num(2).build(); + // inverted index on tag_0 + request.column_metadatas[0] + .column_schema + .set_inverted_index(true); + // fulltext bloom on tag_1 + let ft_bloom = FulltextOptions::new_unchecked( + true, + FulltextAnalyzer::English, + false, + FulltextBackend::Bloom, + 4, + 0.001, + ); + request.column_metadatas[1] + .column_schema + .set_fulltext_options(&ft_bloom) + .unwrap(); + // fulltext tantivy on tag_2 + let ft_tantivy = FulltextOptions::new_unchecked( + true, + FulltextAnalyzer::Chinese, + true, + FulltextBackend::Tantivy, + 2, + 0.01, + ); + request.column_metadatas[2] + .column_schema + .set_fulltext_options(&ft_tantivy) + .unwrap(); + // bloom filter skipping index on field_1 (which is at index 3) + let skipping = SkippingIndexOptions::new_unchecked(2, 0.01, SkippingIndexType::BloomFilter); + request.column_metadatas[3] + .column_schema + .set_skipping_options(&skipping) + .unwrap(); + + // inverted index on field_1 + request.column_metadatas[4] + .column_schema + .set_inverted_index(true); + + engine + .handle_request(region_id, RegionRequest::Create(request.clone())) + .await + .unwrap(); + + // write some rows (schema: tag_0, tag_1, tag_2, field_0, field_1, ts) + let column_schemas = rows_schema(&request); + let rows_vec: Vec = (0..20) + .map(|ts| api::v1::Row { + values: vec![ + api::v1::Value { + value_data: Some(api::v1::value::ValueData::StringValue("x".to_string())), + }, + api::v1::Value { + value_data: Some(api::v1::value::ValueData::StringValue("y".to_string())), + }, + api::v1::Value { + value_data: Some(api::v1::value::ValueData::StringValue("z".to_string())), + }, + api::v1::Value { + value_data: Some(api::v1::value::ValueData::F64Value(ts as f64)), + }, + api::v1::Value { + value_data: Some(api::v1::value::ValueData::F64Value((20 - ts) as f64)), + }, + api::v1::Value { + value_data: Some(api::v1::value::ValueData::TimestampMillisecondValue( + ts as i64 * 1000, + )), + }, + ], + }) + .collect(); + let rows = api::v1::Rows { + schema: column_schemas.clone(), + rows: rows_vec, + }; + put_rows(&engine, region_id, rows).await; + + // flush to generate sst and indexes + engine + .handle_request( + region_id, + RegionRequest::Flush(RegionFlushRequest { + row_group_size: None, + }), + ) + .await + .unwrap(); + + fn bucket_size(size: u64) -> u64 { + if size < 512 { size } else { (size / 16) * 16 } + } + + let mut metas = engine.all_index_metas().await; + for entry in &mut metas { + entry.index_file_path = entry.index_file_path.replace(&entry.file_id, ""); + entry.file_id = "".to_string(); + entry.index_file_size = entry.index_file_size.map(bucket_size); + if entry.index_type == "fulltext_tantivy" { + entry.blob_size = bucket_size(entry.blob_size); + } + if let Some(meta_json) = entry.meta_json.as_mut() + && let Ok(mut value) = serde_json::from_str::(meta_json) + { + if let Some(inverted) = value.get_mut("inverted").and_then(|v| v.as_object_mut()) { + inverted.insert("base_offset".to_string(), serde_json::Value::from(0)); + } + *meta_json = value.to_string(); + } + } + metas.sort_by(|a, b| { + (a.index_type.as_str(), a.target_key.as_str()) + .cmp(&(b.index_type.as_str(), b.target_key.as_str())) + }); + + let debug_format = metas + .iter() + .map(|entry| format!("\n{:?}", entry)) + .collect::(); + + assert_eq!( + debug_format, + r#" +PuffinIndexMetaEntry { table_dir: "test/", index_file_path: "test/11_0000000001/index/.puffin", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "", index_file_size: Some(6032), index_type: "bloom_filter", target_type: "column", target_key: "3", target_json: "{\"column\":3}", blob_size: 751, meta_json: Some("{\"bloom\":{\"bloom_filter_size\":640,\"row_count\":20,\"rows_per_segment\":2,\"segment_count\":10}}"), node_id: None } +PuffinIndexMetaEntry { table_dir: "test/", index_file_path: "test/11_0000000001/index/.puffin", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "", index_file_size: Some(6032), index_type: "fulltext_bloom", target_type: "column", target_key: "1", target_json: "{\"column\":1}", blob_size: 87, meta_json: Some("{\"bloom\":{\"bloom_filter_size\":64,\"row_count\":20,\"rows_per_segment\":4,\"segment_count\":5},\"fulltext\":{\"analyzer\":\"English\",\"case_sensitive\":false}}"), node_id: None } +PuffinIndexMetaEntry { table_dir: "test/", index_file_path: "test/11_0000000001/index/.puffin", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "", index_file_size: Some(6032), index_type: "fulltext_tantivy", target_type: "column", target_key: "2", target_json: "{\"column\":2}", blob_size: 1104, meta_json: Some("{\"fulltext\":{\"analyzer\":\"Chinese\",\"case_sensitive\":true}}"), node_id: None } +PuffinIndexMetaEntry { table_dir: "test/", index_file_path: "test/11_0000000001/index/.puffin", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "", index_file_size: Some(6032), index_type: "inverted", target_type: "column", target_key: "0", target_json: "{\"column\":0}", blob_size: 70, meta_json: Some("{\"inverted\":{\"base_offset\":0,\"bitmap_type\":\"Roaring\",\"fst_size\":44,\"inverted_index_size\":70,\"null_bitmap_size\":8,\"relative_fst_offset\":26,\"relative_null_bitmap_offset\":0,\"segment_row_count\":1024,\"total_row_count\":20}}"), node_id: None } +PuffinIndexMetaEntry { table_dir: "test/", index_file_path: "test/11_0000000001/index/.puffin", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "", index_file_size: Some(6032), index_type: "inverted", target_type: "column", target_key: "4", target_json: "{\"column\":4}", blob_size: 515, meta_json: Some("{\"inverted\":{\"base_offset\":0,\"bitmap_type\":\"Roaring\",\"fst_size\":147,\"inverted_index_size\":515,\"null_bitmap_size\":8,\"relative_fst_offset\":368,\"relative_null_bitmap_offset\":0,\"segment_row_count\":1024,\"total_row_count\":20}}"), node_id: None }"# + ); +} diff --git a/src/mito2/src/engine/puffin_index.rs b/src/mito2/src/engine/puffin_index.rs new file mode 100644 index 000000000000..05529db59ba4 --- /dev/null +++ b/src/mito2/src/engine/puffin_index.rs @@ -0,0 +1,502 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::convert::TryFrom; + +use common_base::range_read::RangeReader; +use common_telemetry::warn; +use greptime_proto::v1::index::{BloomFilterMeta, InvertedIndexMeta, InvertedIndexMetas}; +use index::bitmap::BitmapType; +use index::bloom_filter::reader::{BloomFilterReader, BloomFilterReaderImpl}; +use index::fulltext_index::Config as FulltextConfig; +use index::inverted_index::format::reader::{InvertedIndexBlobReader, InvertedIndexReader}; +use index::target::IndexTarget; +use puffin::blob_metadata::BlobMetadata; +use puffin::puffin_manager::{PuffinManager, PuffinReader}; +use serde_json::{Map, Value, json}; +use store_api::sst_entry::PuffinIndexMetaEntry; +use store_api::storage::{ColumnId, RegionGroup, RegionId, RegionNumber, RegionSeq, TableId}; + +use crate::cache::index::bloom_filter_index::{ + BloomFilterIndexCacheRef, CachedBloomFilterIndexBlobReader, Tag, +}; +use crate::cache::index::inverted_index::{CachedInvertedIndexBlobReader, InvertedIndexCacheRef}; +use crate::sst::file::RegionFileId; +use crate::sst::index::bloom_filter::INDEX_BLOB_TYPE as BLOOM_BLOB_TYPE; +use crate::sst::index::fulltext_index::{ + INDEX_BLOB_TYPE_BLOOM as FULLTEXT_BLOOM_BLOB_TYPE, + INDEX_BLOB_TYPE_TANTIVY as FULLTEXT_TANTIVY_BLOB_TYPE, +}; +use crate::sst::index::inverted_index::INDEX_BLOB_TYPE as INVERTED_BLOB_TYPE; +use crate::sst::index::puffin_manager::{SstPuffinManager, SstPuffinReader}; + +const INDEX_TYPE_BLOOM: &str = "bloom_filter"; +const INDEX_TYPE_FULLTEXT_BLOOM: &str = "fulltext_bloom"; +const INDEX_TYPE_FULLTEXT_TANTIVY: &str = "fulltext_tantivy"; +const INDEX_TYPE_INVERTED: &str = "inverted"; + +const TARGET_TYPE_UNKNOWN: &str = "unknown"; + +const TARGET_TYPE_COLUMN: &str = "column"; + +pub(crate) struct IndexEntryContext<'a> { + pub(crate) table_dir: &'a str, + pub(crate) index_file_path: &'a str, + pub(crate) region_id: RegionId, + pub(crate) table_id: TableId, + pub(crate) region_number: RegionNumber, + pub(crate) region_group: RegionGroup, + pub(crate) region_sequence: RegionSeq, + pub(crate) file_id: &'a str, + pub(crate) index_file_size: Option, + pub(crate) node_id: Option, +} + +/// Collect index metadata entries present in the SST puffin file. +pub(crate) async fn collect_index_entries_from_puffin( + manager: SstPuffinManager, + region_file_id: RegionFileId, + context: IndexEntryContext<'_>, + bloom_filter_cache: Option, + inverted_index_cache: Option, +) -> Vec { + let mut entries = Vec::new(); + + let reader = match manager.reader(®ion_file_id).await { + Ok(reader) => reader, + Err(err) => { + warn!( + err; + "Failed to open puffin index file, table_dir: {}, file_id: {}", + context.table_dir, + context.file_id + ); + return entries; + } + }; + + let file_metadata = match reader.metadata().await { + Ok(metadata) => metadata, + Err(err) => { + warn!( + err; + "Failed to read puffin file metadata, table_dir: {}, file_id: {}", + context.table_dir, + context.file_id + ); + return entries; + } + }; + + for blob in &file_metadata.blobs { + match BlobIndexTypeTargetKey::from_blob_type(&blob.blob_type) { + Some(BlobIndexTypeTargetKey::BloomFilter(target_key)) => { + let bloom_meta = try_read_bloom_meta( + &reader, + region_file_id, + blob.blob_type.as_str(), + target_key, + bloom_filter_cache.as_ref(), + Tag::Skipping, + &context, + ) + .await; + + let bloom_value = bloom_meta.as_ref().map(bloom_meta_value); + let (target_type, target_json) = decode_target_info(target_key); + let meta_json = build_meta_json(bloom_value, None, None); + let entry = build_index_entry( + &context, + INDEX_TYPE_BLOOM, + target_type, + target_key.to_string(), + target_json, + blob.length as u64, + meta_json, + ); + entries.push(entry); + } + Some(BlobIndexTypeTargetKey::FulltextBloom(target_key)) => { + let bloom_meta = try_read_bloom_meta( + &reader, + region_file_id, + blob.blob_type.as_str(), + target_key, + bloom_filter_cache.as_ref(), + Tag::Fulltext, + &context, + ) + .await; + + let bloom_value = bloom_meta.as_ref().map(bloom_meta_value); + let fulltext_value = Some(fulltext_meta_value(blob)); + let (target_type, target_json) = decode_target_info(target_key); + let meta_json = build_meta_json(bloom_value, fulltext_value, None); + let entry = build_index_entry( + &context, + INDEX_TYPE_FULLTEXT_BLOOM, + target_type, + target_key.to_string(), + target_json, + blob.length as u64, + meta_json, + ); + entries.push(entry); + } + Some(BlobIndexTypeTargetKey::FulltextTantivy(target_key)) => { + let fulltext_value = Some(fulltext_meta_value(blob)); + let (target_type, target_json) = decode_target_info(target_key); + let meta_json = build_meta_json(None, fulltext_value, None); + let entry = build_index_entry( + &context, + INDEX_TYPE_FULLTEXT_TANTIVY, + target_type, + target_key.to_string(), + target_json, + blob.length as u64, + meta_json, + ); + entries.push(entry); + } + Some(BlobIndexTypeTargetKey::Inverted) => { + let mut inverted_entries = collect_inverted_entries( + &reader, + region_file_id, + inverted_index_cache.as_ref(), + &context, + ) + .await; + entries.append(&mut inverted_entries); + } + None => {} + } + } + + entries +} + +async fn collect_inverted_entries( + reader: &SstPuffinReader, + region_file_id: RegionFileId, + cache: Option<&InvertedIndexCacheRef>, + context: &IndexEntryContext<'_>, +) -> Vec { + // Read the inverted index blob and surface its per-column metadata entries. + let file_id = region_file_id.file_id(); + + let guard = match reader.blob(INVERTED_BLOB_TYPE).await { + Ok(guard) => guard, + Err(err) => { + warn!( + err; + "Failed to open inverted index blob, table_dir: {}, file_id: {}", + context.table_dir, + context.file_id + ); + return Vec::new(); + } + }; + + let blob_reader = match guard.reader().await { + Ok(reader) => reader, + Err(err) => { + warn!( + err; + "Failed to build inverted index blob reader, table_dir: {}, file_id: {}", + context.table_dir, + context.file_id + ); + return Vec::new(); + } + }; + + let blob_size = blob_reader + .metadata() + .await + .ok() + .map(|meta| meta.content_length); + let metas = if let (Some(cache), Some(blob_size)) = (cache, blob_size) { + let reader = CachedInvertedIndexBlobReader::new( + file_id, + blob_size, + InvertedIndexBlobReader::new(blob_reader), + cache.clone(), + ); + match reader.metadata().await { + Ok(metas) => metas, + Err(err) => { + warn!( + err; + "Failed to read inverted index metadata, table_dir: {}, file_id: {}", + context.table_dir, + context.file_id + ); + return Vec::new(); + } + } + } else { + let reader = InvertedIndexBlobReader::new(blob_reader); + match reader.metadata().await { + Ok(metas) => metas, + Err(err) => { + warn!( + err; + "Failed to read inverted index metadata, table_dir: {}, file_id: {}", + context.table_dir, + context.file_id + ); + return Vec::new(); + } + } + }; + + build_inverted_entries(context, metas.as_ref()) +} + +fn build_inverted_entries( + context: &IndexEntryContext<'_>, + metas: &InvertedIndexMetas, +) -> Vec { + let mut entries = Vec::new(); + for (name, meta) in &metas.metas { + let (target_type, target_json) = decode_target_info(name); + let inverted_value = inverted_meta_value(meta, metas); + let meta_json = build_meta_json(None, None, Some(inverted_value)); + let entry = build_index_entry( + context, + INDEX_TYPE_INVERTED, + target_type, + name.clone(), + target_json, + meta.inverted_index_size, + meta_json, + ); + entries.push(entry); + } + entries +} + +async fn try_read_bloom_meta( + reader: &SstPuffinReader, + region_file_id: RegionFileId, + blob_type: &str, + target_key: &str, + cache: Option<&BloomFilterIndexCacheRef>, + tag: Tag, + context: &IndexEntryContext<'_>, +) -> Option { + let column_id = decode_column_id(target_key); + + // Failures are logged but do not abort the overall metadata collection. + match reader.blob(blob_type).await { + Ok(guard) => match guard.reader().await { + Ok(blob_reader) => { + let blob_size = blob_reader + .metadata() + .await + .ok() + .map(|meta| meta.content_length); + let bloom_reader = BloomFilterReaderImpl::new(blob_reader); + let result = match (cache, column_id, blob_size) { + (Some(cache), Some(column_id), Some(blob_size)) => { + CachedBloomFilterIndexBlobReader::new( + region_file_id.file_id(), + column_id, + tag, + blob_size, + bloom_reader, + cache.clone(), + ) + .metadata() + .await + } + _ => bloom_reader.metadata().await, + }; + + match result { + Ok(meta) => Some(meta), + Err(err) => { + warn!( + err; + "Failed to read index metadata, table_dir: {}, file_id: {}, blob: {}", + context.table_dir, + context.file_id, + blob_type + ); + None + } + } + } + Err(err) => { + warn!( + err; + "Failed to open index blob reader, table_dir: {}, file_id: {}, blob: {}", + context.table_dir, + context.file_id, + blob_type + ); + None + } + }, + Err(err) => { + warn!( + err; + "Failed to open index blob, table_dir: {}, file_id: {}, blob: {}", + context.table_dir, + context.file_id, + blob_type + ); + None + } + } +} + +fn decode_target_info(target_key: &str) -> (String, String) { + match IndexTarget::decode(target_key) { + Ok(IndexTarget::ColumnId(id)) => ( + TARGET_TYPE_COLUMN.to_string(), + json!({ "column": id }).to_string(), + ), + _ => ( + TARGET_TYPE_UNKNOWN.to_string(), + json!({ "error": "failed_to_decode" }).to_string(), + ), + } +} + +fn decode_column_id(target_key: &str) -> Option { + match IndexTarget::decode(target_key) { + Ok(IndexTarget::ColumnId(id)) => Some(id), + _ => None, + } +} + +fn bloom_meta_value(meta: &BloomFilterMeta) -> Value { + json!({ + "rows_per_segment": meta.rows_per_segment, + "segment_count": meta.segment_count, + "row_count": meta.row_count, + "bloom_filter_size": meta.bloom_filter_size, + }) +} + +fn fulltext_meta_value(blob: &BlobMetadata) -> Value { + let config = FulltextConfig::from_blob_metadata(blob).unwrap_or_default(); + json!({ + "analyzer": config.analyzer.to_str(), + "case_sensitive": config.case_sensitive, + }) +} + +fn inverted_meta_value(meta: &InvertedIndexMeta, metas: &InvertedIndexMetas) -> Value { + let bitmap_type = BitmapType::try_from(meta.bitmap_type) + .map(|bt| format!("{:?}", bt)) + .unwrap_or_else(|_| meta.bitmap_type.to_string()); + json!({ + "bitmap_type": bitmap_type, + "base_offset": meta.base_offset, + "inverted_index_size": meta.inverted_index_size, + "relative_fst_offset": meta.relative_fst_offset, + "fst_size": meta.fst_size, + "relative_null_bitmap_offset": meta.relative_null_bitmap_offset, + "null_bitmap_size": meta.null_bitmap_size, + "segment_row_count": metas.segment_row_count, + "total_row_count": metas.total_row_count, + }) +} + +fn build_meta_json( + bloom: Option, + fulltext: Option, + inverted: Option, +) -> Option { + let mut map = Map::new(); + if let Some(value) = bloom { + map.insert("bloom".to_string(), value); + } + if let Some(value) = fulltext { + map.insert("fulltext".to_string(), value); + } + if let Some(value) = inverted { + map.insert("inverted".to_string(), value); + } + if map.is_empty() { + None + } else { + Some(Value::Object(map).to_string()) + } +} + +enum BlobIndexTypeTargetKey<'a> { + BloomFilter(&'a str), + FulltextBloom(&'a str), + FulltextTantivy(&'a str), + Inverted, +} + +impl<'a> BlobIndexTypeTargetKey<'a> { + fn from_blob_type(blob_type: &'a str) -> Option { + if let Some(target_key) = Self::target_key_from_blob(blob_type, BLOOM_BLOB_TYPE) { + Some(BlobIndexTypeTargetKey::BloomFilter(target_key)) + } else if let Some(target_key) = + Self::target_key_from_blob(blob_type, FULLTEXT_BLOOM_BLOB_TYPE) + { + Some(BlobIndexTypeTargetKey::FulltextBloom(target_key)) + } else if let Some(target_key) = + Self::target_key_from_blob(blob_type, FULLTEXT_TANTIVY_BLOB_TYPE) + { + Some(BlobIndexTypeTargetKey::FulltextTantivy(target_key)) + } else if blob_type == INVERTED_BLOB_TYPE { + Some(BlobIndexTypeTargetKey::Inverted) + } else { + None + } + } + + fn target_key_from_blob(blob_type: &'a str, prefix: &str) -> Option<&'a str> { + // Blob types encode their target as "-". + blob_type + .strip_prefix(prefix) + .and_then(|suffix| suffix.strip_prefix('-')) + } +} + +fn build_index_entry( + context: &IndexEntryContext<'_>, + index_type: &str, + target_type: String, + target_key: String, + target_json: String, + blob_size: u64, + meta_json: Option, +) -> PuffinIndexMetaEntry { + PuffinIndexMetaEntry { + table_dir: context.table_dir.to_string(), + index_file_path: context.index_file_path.to_string(), + region_id: context.region_id, + table_id: context.table_id, + region_number: context.region_number, + region_group: context.region_group, + region_sequence: context.region_sequence, + file_id: context.file_id.to_string(), + index_file_size: context.index_file_size, + index_type: index_type.to_string(), + target_type, + target_key, + target_json, + blob_size, + meta_json, + node_id: context.node_id, + } +} diff --git a/src/mito2/src/sst/index/bloom_filter.rs b/src/mito2/src/sst/index/bloom_filter.rs index 7f454937f0e8..0896c3028ff5 100644 --- a/src/mito2/src/sst/index/bloom_filter.rs +++ b/src/mito2/src/sst/index/bloom_filter.rs @@ -15,4 +15,4 @@ pub(crate) mod applier; pub(crate) mod creator; -const INDEX_BLOB_TYPE: &str = "greptime-bloom-filter-v1"; +pub(crate) const INDEX_BLOB_TYPE: &str = "greptime-bloom-filter-v1"; diff --git a/src/mito2/src/sst/index/bloom_filter/applier.rs b/src/mito2/src/sst/index/bloom_filter/applier.rs index 4562f01cdf70..3fa387c8dc55 100644 --- a/src/mito2/src/sst/index/bloom_filter/applier.rs +++ b/src/mito2/src/sst/index/bloom_filter/applier.rs @@ -22,6 +22,7 @@ use common_base::range_read::RangeReader; use common_telemetry::warn; use index::bloom_filter::applier::{BloomFilterApplier, InListPredicate}; use index::bloom_filter::reader::{BloomFilterReader, BloomFilterReaderImpl}; +use index::target::IndexTarget; use object_store::ObjectStore; use puffin::puffin_manager::cache::PuffinMetadataCacheRef; use puffin::puffin_manager::{PuffinManager, PuffinReader}; @@ -263,12 +264,14 @@ impl BloomFilterIndexApplier { file_cache.local_store(), WriteCachePathProvider::new(file_cache.clone()), ); + let blob_name = Self::column_blob_name(column_id); + let reader = puffin_manager .reader(&file_id) .await .context(PuffinBuildReaderSnafu)? .with_file_size_hint(file_size_hint) - .blob(&Self::column_blob_name(column_id)) + .blob(&blob_name) .await .context(PuffinReadBlobSnafu)? .reader() @@ -279,7 +282,7 @@ impl BloomFilterIndexApplier { // TODO(ruihang): use the same util with the code in creator fn column_blob_name(column_id: ColumnId) -> String { - format!("{INDEX_BLOB_TYPE}-{column_id}") + format!("{INDEX_BLOB_TYPE}-{}", IndexTarget::ColumnId(column_id)) } /// Creates a blob reader from the remote index file @@ -297,12 +300,14 @@ impl BloomFilterIndexApplier { ) .with_puffin_metadata_cache(self.puffin_metadata_cache.clone()); + let blob_name = Self::column_blob_name(column_id); + puffin_manager .reader(&file_id) .await .context(PuffinBuildReaderSnafu)? .with_file_size_hint(file_size_hint) - .blob(&Self::column_blob_name(column_id)) + .blob(&blob_name) .await .context(PuffinReadBlobSnafu)? .reader() diff --git a/src/mito2/src/sst/index/bloom_filter/creator.rs b/src/mito2/src/sst/index/bloom_filter/creator.rs index 387302aeb0b7..a48898902f1d 100644 --- a/src/mito2/src/sst/index/bloom_filter/creator.rs +++ b/src/mito2/src/sst/index/bloom_filter/creator.rs @@ -21,6 +21,7 @@ use datatypes::arrow::record_batch::RecordBatch; use datatypes::schema::SkippingIndexType; use datatypes::vectors::Helper; use index::bloom_filter::creator::BloomFilterCreator; +use index::target::IndexTarget; use mito_codec::index::{IndexValueCodec, IndexValuesCodec}; use mito_codec::row_converter::SortField; use puffin::puffin_manager::{PuffinWriter, PutOptions}; @@ -381,7 +382,8 @@ impl BloomFilterIndexer { ) -> Result { let (tx, rx) = tokio::io::duplex(PIPE_BUFFER_SIZE_FOR_SENDING_BLOB); - let blob_name = format!("{}-{}", INDEX_BLOB_TYPE, col_id); + let target_key = IndexTarget::ColumnId(*col_id); + let blob_name = format!("{INDEX_BLOB_TYPE}-{target_key}"); let (index_finish, puffin_add_blob) = futures::join!( creator.finish(tx.compat_write()), puffin_writer.put_blob( diff --git a/src/mito2/src/sst/index/fulltext_index.rs b/src/mito2/src/sst/index/fulltext_index.rs index 86d8a35b9d29..542a372bb01c 100644 --- a/src/mito2/src/sst/index/fulltext_index.rs +++ b/src/mito2/src/sst/index/fulltext_index.rs @@ -15,5 +15,5 @@ pub(crate) mod applier; pub(crate) mod creator; -const INDEX_BLOB_TYPE_TANTIVY: &str = "greptime-fulltext-index-v1"; -const INDEX_BLOB_TYPE_BLOOM: &str = "greptime-fulltext-index-bloom"; +pub(crate) const INDEX_BLOB_TYPE_TANTIVY: &str = "greptime-fulltext-index-v1"; +pub(crate) const INDEX_BLOB_TYPE_BLOOM: &str = "greptime-fulltext-index-bloom"; diff --git a/src/mito2/src/sst/index/fulltext_index/applier.rs b/src/mito2/src/sst/index/fulltext_index/applier.rs index c88fc611dbbc..6b68fc348da8 100644 --- a/src/mito2/src/sst/index/fulltext_index/applier.rs +++ b/src/mito2/src/sst/index/fulltext_index/applier.rs @@ -24,6 +24,7 @@ use index::bloom_filter::reader::BloomFilterReaderImpl; use index::fulltext_index::search::{FulltextIndexSearcher, RowId, TantivyFulltextIndexSearcher}; use index::fulltext_index::tokenizer::{ChineseTokenizer, EnglishTokenizer, Tokenizer}; use index::fulltext_index::{Analyzer, Config}; +use index::target::IndexTarget; use object_store::ObjectStore; use puffin::puffin_manager::cache::PuffinMetadataCacheRef; use puffin::puffin_manager::{GuardWithMetadata, PuffinManager, PuffinReader}; @@ -171,7 +172,10 @@ impl FulltextIndexApplier { column_id: ColumnId, request: &FulltextRequest, ) -> Result>> { - let blob_key = format!("{INDEX_BLOB_TYPE_TANTIVY}-{column_id}"); + let blob_key = format!( + "{INDEX_BLOB_TYPE_TANTIVY}-{}", + IndexTarget::ColumnId(column_id) + ); let dir = self .index_source .dir(file_id, &blob_key, file_size_hint) @@ -283,7 +287,10 @@ impl FulltextIndexApplier { terms: &[FulltextTerm], output: &mut [(usize, Vec>)], ) -> Result { - let blob_key = format!("{INDEX_BLOB_TYPE_BLOOM}-{column_id}"); + let blob_key = format!( + "{INDEX_BLOB_TYPE_BLOOM}-{}", + IndexTarget::ColumnId(column_id) + ); let Some(reader) = self .index_source .blob(file_id, &blob_key, file_size_hint) diff --git a/src/mito2/src/sst/index/fulltext_index/creator.rs b/src/mito2/src/sst/index/fulltext_index/creator.rs index 5a7c92dd46de..be8675a36f0c 100644 --- a/src/mito2/src/sst/index/fulltext_index/creator.rs +++ b/src/mito2/src/sst/index/fulltext_index/creator.rs @@ -25,6 +25,7 @@ use index::fulltext_index::create::{ BloomFilterFulltextIndexCreator, FulltextIndexCreator, TantivyFulltextIndexCreator, }; use index::fulltext_index::{Analyzer, Config}; +use index::target::IndexTarget; use puffin::blob_metadata::CompressionCodec; use puffin::puffin_manager::PutOptions; use snafu::{ResultExt, ensure}; @@ -385,16 +386,22 @@ impl AltFulltextCreator { ) -> Result { match self { Self::Tantivy(creator) => { - let key = format!("{INDEX_BLOB_TYPE_TANTIVY}-{}", column_id); + let blob_key = format!( + "{INDEX_BLOB_TYPE_TANTIVY}-{}", + IndexTarget::ColumnId(*column_id) + ); creator - .finish(puffin_writer, &key, put_options) + .finish(puffin_writer, &blob_key, put_options) .await .context(FulltextFinishSnafu) } Self::Bloom(creator) => { - let key = format!("{INDEX_BLOB_TYPE_BLOOM}-{}", column_id); + let blob_key = format!( + "{INDEX_BLOB_TYPE_BLOOM}-{}", + IndexTarget::ColumnId(*column_id) + ); creator - .finish(puffin_writer, &key, put_options) + .finish(puffin_writer, &blob_key, put_options) .await .context(FulltextFinishSnafu) } diff --git a/src/mito2/src/sst/index/inverted_index.rs b/src/mito2/src/sst/index/inverted_index.rs index 73dca4ac47f2..f7919ae24f54 100644 --- a/src/mito2/src/sst/index/inverted_index.rs +++ b/src/mito2/src/sst/index/inverted_index.rs @@ -15,4 +15,4 @@ pub(crate) mod applier; pub(crate) mod creator; -const INDEX_BLOB_TYPE: &str = "greptime-inverted-index-v1"; +pub(crate) const INDEX_BLOB_TYPE: &str = "greptime-inverted-index-v1"; diff --git a/src/mito2/src/sst/index/inverted_index/applier/builder.rs b/src/mito2/src/sst/index/inverted_index/applier/builder.rs index 60690df3a88c..8bc5e8b6d184 100644 --- a/src/mito2/src/sst/index/inverted_index/applier/builder.rs +++ b/src/mito2/src/sst/index/inverted_index/applier/builder.rs @@ -27,6 +27,7 @@ use datatypes::data_type::ConcreteDataType; use datatypes::value::Value; use index::inverted_index::search::index_apply::PredicatesIndexApplier; use index::inverted_index::search::predicate::Predicate; +use index::target::IndexTarget; use mito_codec::index::IndexValueCodec; use mito_codec::row_converter::SortField; use object_store::ObjectStore; @@ -139,8 +140,13 @@ impl<'a> InvertedIndexApplierBuilder<'a> { let predicates = self .output .iter() - .map(|(column_id, predicates)| (column_id.to_string(), predicates.clone())) - .collect(); + .map(|(column_id, predicates)| { + ( + format!("{}", IndexTarget::ColumnId(*column_id)), + predicates.clone(), + ) + }) + .collect::>(); let applier = PredicatesIndexApplier::try_from(predicates); Ok(Some( diff --git a/src/mito2/src/sst/index/inverted_index/creator.rs b/src/mito2/src/sst/index/inverted_index/creator.rs index 608b11b0759f..b7019422f83c 100644 --- a/src/mito2/src/sst/index/inverted_index/creator.rs +++ b/src/mito2/src/sst/index/inverted_index/creator.rs @@ -24,6 +24,7 @@ use index::inverted_index::create::InvertedIndexCreator; use index::inverted_index::create::sort::external_sort::ExternalSorter; use index::inverted_index::create::sort_create::SortIndexCreator; use index::inverted_index::format::writer::InvertedIndexBlobWriter; +use index::target::IndexTarget; use mito_codec::index::{IndexValueCodec, IndexValuesCodec}; use mito_codec::row_converter::SortField; use puffin::puffin_manager::{PuffinWriter, PutOptions}; @@ -72,7 +73,7 @@ pub struct InvertedIndexer { /// The memory usage of the index creator. memory_usage: Arc, - /// Ids of indexed columns and their names (`to_string` of the column id). + /// Ids of indexed columns and their encoded target keys. indexed_column_ids: Vec<(ColumnId, String)>, /// Region metadata for column lookups. @@ -115,8 +116,8 @@ impl InvertedIndexer { let indexed_column_ids = indexed_column_ids .into_iter() .map(|col_id| { - let col_id_str = col_id.to_string(); - (col_id, col_id_str) + let target_key = format!("{}", IndexTarget::ColumnId(col_id)); + (col_id, target_key) }) .collect(); Self { @@ -181,7 +182,7 @@ impl InvertedIndexer { let column_indices = self.column_index_cache.as_ref().unwrap(); - for ((col_id, col_id_str), &column_index) in + for ((col_id, target_key), &column_index) in self.indexed_column_ids.iter().zip(column_indices.iter()) { if let Some(index) = column_index { @@ -197,7 +198,7 @@ impl InvertedIndexer { if value_ref.is_null() { self.index_creator - .push_with_name(col_id_str, None) + .push_with_name(target_key, None) .await .context(PushIndexValueSnafu)?; } else { @@ -208,7 +209,7 @@ impl InvertedIndexer { ) .context(EncodeSnafu)?; self.index_creator - .push_with_name(col_id_str, Some(&self.value_buf)) + .push_with_name(target_key, Some(&self.value_buf)) .await .context(PushIndexValueSnafu)?; } @@ -286,7 +287,7 @@ impl InvertedIndexer { let n = batch.num_rows(); guard.inc_row_count(n); - for (col_id, col_id_str) in &self.indexed_column_ids { + for (col_id, target_key) in &self.indexed_column_ids { match self.codec.pk_col_info(*col_id) { // pk Some(col_info) => { @@ -308,7 +309,7 @@ impl InvertedIndexer { .transpose()?; self.index_creator - .push_with_name_n(col_id_str, value, n) + .push_with_name_n(target_key, value, n) .await .context(PushIndexValueSnafu)?; } @@ -327,7 +328,7 @@ impl InvertedIndexer { let value = values.data.get_ref(i); if value.is_null() { self.index_creator - .push_with_name(col_id_str, None) + .push_with_name(target_key, None) .await .context(PushIndexValueSnafu)?; } else { @@ -338,7 +339,7 @@ impl InvertedIndexer { ) .context(EncodeSnafu)?; self.index_creator - .push_with_name(col_id_str, Some(&self.value_buf)) + .push_with_name(target_key, Some(&self.value_buf)) .await .context(PushIndexValueSnafu)?; } diff --git a/src/store-api/src/sst_entry.rs b/src/store-api/src/sst_entry.rs index a680e438bc2e..34e37de7eac5 100644 --- a/src/store-api/src/sst_entry.rs +++ b/src/store-api/src/sst_entry.rs @@ -249,6 +249,115 @@ impl StorageSstEntry { } } +/// An entry describing puffin index metadata for inspection. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct PuffinIndexMetaEntry { + /// The table directory this index belongs to. + pub table_dir: String, + /// The full path of the index file in object store. + pub index_file_path: String, + /// The region id referencing the index file. + pub region_id: RegionId, + /// The table id referencing the index file. + pub table_id: TableId, + /// The region number referencing the index file. + pub region_number: RegionNumber, + /// The region group referencing the index file. + pub region_group: RegionGroup, + /// The region sequence referencing the index file. + pub region_sequence: RegionSeq, + /// Engine-specific file identifier (string form). + pub file_id: String, + /// Size of the index file in object store (if available). + pub index_file_size: Option, + /// Logical index type (`bloom_filter`, `fulltext_bloom`, `fulltext_tantivy`, `inverted`). + pub index_type: String, + /// Target type (`column`, ...). + pub target_type: String, + /// Encoded target key string. + pub target_key: String, + /// Structured JSON describing the target. + pub target_json: String, + /// Size of the blob storing this target. + pub blob_size: u64, + /// Structured JSON describing index-specific metadata (if available). + pub meta_json: Option, + /// Node id associated with the index file (if known). + pub node_id: Option, +} + +impl PuffinIndexMetaEntry { + /// Returns the schema describing puffin index metadata entries. + pub fn schema() -> SchemaRef { + use datatypes::prelude::ConcreteDataType as Ty; + Arc::new(Schema::new(vec![ + ColumnSchema::new("table_dir", Ty::string_datatype(), false), + ColumnSchema::new("index_file_path", Ty::string_datatype(), false), + ColumnSchema::new("region_id", Ty::uint64_datatype(), false), + ColumnSchema::new("table_id", Ty::uint32_datatype(), false), + ColumnSchema::new("region_number", Ty::uint32_datatype(), false), + ColumnSchema::new("region_group", Ty::uint8_datatype(), false), + ColumnSchema::new("region_sequence", Ty::uint32_datatype(), false), + ColumnSchema::new("file_id", Ty::string_datatype(), false), + ColumnSchema::new("index_file_size", Ty::uint64_datatype(), true), + ColumnSchema::new("index_type", Ty::string_datatype(), false), + ColumnSchema::new("target_type", Ty::string_datatype(), false), + ColumnSchema::new("target_key", Ty::string_datatype(), false), + ColumnSchema::new("target_json", Ty::string_datatype(), false), + ColumnSchema::new("blob_size", Ty::uint64_datatype(), false), + ColumnSchema::new("meta_json", Ty::string_datatype(), true), + ColumnSchema::new("node_id", Ty::uint64_datatype(), true), + ])) + } + + /// Converts a list of puffin index metadata entries to a record batch. + pub fn to_record_batch(entries: &[Self]) -> std::result::Result { + let schema = Self::schema(); + let table_dirs = entries.iter().map(|e| e.table_dir.as_str()); + let index_file_paths = entries.iter().map(|e| e.index_file_path.as_str()); + let region_ids = entries.iter().map(|e| e.region_id.as_u64()); + let table_ids = entries.iter().map(|e| e.table_id); + let region_numbers = entries.iter().map(|e| e.region_number); + let region_groups = entries.iter().map(|e| e.region_group); + let region_sequences = entries.iter().map(|e| e.region_sequence); + let file_ids = entries.iter().map(|e| e.file_id.as_str()); + let index_file_sizes = entries.iter().map(|e| e.index_file_size); + let index_types = entries.iter().map(|e| e.index_type.as_str()); + let target_types = entries.iter().map(|e| e.target_type.as_str()); + let target_keys = entries.iter().map(|e| e.target_key.as_str()); + let target_jsons = entries.iter().map(|e| e.target_json.as_str()); + let blob_sizes = entries.iter().map(|e| e.blob_size); + let meta_jsons = entries.iter().map(|e| e.meta_json.as_deref()); + let node_ids = entries.iter().map(|e| e.node_id); + + let columns: Vec = vec![ + Arc::new(StringArray::from_iter_values(table_dirs)), + Arc::new(StringArray::from_iter_values(index_file_paths)), + Arc::new(UInt64Array::from_iter_values(region_ids)), + Arc::new(UInt32Array::from_iter_values(table_ids)), + Arc::new(UInt32Array::from_iter_values(region_numbers)), + Arc::new(UInt8Array::from_iter_values(region_groups)), + Arc::new(UInt32Array::from_iter_values(region_sequences)), + Arc::new(StringArray::from_iter_values(file_ids)), + Arc::new(UInt64Array::from_iter(index_file_sizes)), + Arc::new(StringArray::from_iter_values(index_types)), + Arc::new(StringArray::from_iter_values(target_types)), + Arc::new(StringArray::from_iter_values(target_keys)), + Arc::new(StringArray::from_iter_values(target_jsons)), + Arc::new(UInt64Array::from_iter_values(blob_sizes)), + Arc::new(StringArray::from_iter(meta_jsons)), + Arc::new(UInt64Array::from_iter(node_ids)), + ]; + + DfRecordBatch::try_new(schema.arrow_schema().clone(), columns) + } + + /// Reserved internal inspect table name for puffin index metadata. + pub fn reserved_table_name_for_inspection() -> &'static str { + "__inspect/__mito/__puffin_index_meta" + } +} + fn build_plan_helper( scan_request: ScanRequest, table_name: &str, @@ -577,6 +686,188 @@ mod tests { assert!(node_ids.is_null(1)); } + #[test] + fn test_puffin_index_meta_to_record_batch() { + let entries = vec![ + PuffinIndexMetaEntry { + table_dir: "table1".to_string(), + index_file_path: "index1".to_string(), + region_id: RegionId::with_group_and_seq(10, 0, 20), + table_id: 10, + region_number: 20, + region_group: 0, + region_sequence: 20, + file_id: "file1".to_string(), + index_file_size: Some(1024), + index_type: "bloom_filter".to_string(), + target_type: "column".to_string(), + target_key: "1".to_string(), + target_json: "{\"column\":1}".to_string(), + blob_size: 256, + meta_json: Some("{\"bloom\":{}}".to_string()), + node_id: Some(42), + }, + PuffinIndexMetaEntry { + table_dir: "table2".to_string(), + index_file_path: "index2".to_string(), + region_id: RegionId::with_group_and_seq(11, 0, 21), + table_id: 11, + region_number: 21, + region_group: 0, + region_sequence: 21, + file_id: "file2".to_string(), + index_file_size: None, + index_type: "inverted".to_string(), + target_type: "unknown".to_string(), + target_key: "legacy".to_string(), + target_json: "{}".to_string(), + blob_size: 0, + meta_json: None, + node_id: None, + }, + ]; + + let schema = PuffinIndexMetaEntry::schema(); + let batch = PuffinIndexMetaEntry::to_record_batch(&entries).unwrap(); + + assert_eq!(schema.arrow_schema().fields().len(), batch.num_columns()); + assert_eq!(2, batch.num_rows()); + + let table_dirs = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!("table1", table_dirs.value(0)); + assert_eq!("table2", table_dirs.value(1)); + + let index_file_paths = batch + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!("index1", index_file_paths.value(0)); + assert_eq!("index2", index_file_paths.value(1)); + + let region_ids = batch + .column(2) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!( + RegionId::with_group_and_seq(10, 0, 20).as_u64(), + region_ids.value(0) + ); + assert_eq!( + RegionId::with_group_and_seq(11, 0, 21).as_u64(), + region_ids.value(1) + ); + + let table_ids = batch + .column(3) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(10, table_ids.value(0)); + assert_eq!(11, table_ids.value(1)); + + let region_numbers = batch + .column(4) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(20, region_numbers.value(0)); + assert_eq!(21, region_numbers.value(1)); + + let region_groups = batch + .column(5) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(0, region_groups.value(0)); + assert_eq!(0, region_groups.value(1)); + + let region_sequences = batch + .column(6) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(20, region_sequences.value(0)); + assert_eq!(21, region_sequences.value(1)); + + let file_ids = batch + .column(7) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!("file1", file_ids.value(0)); + assert_eq!("file2", file_ids.value(1)); + + let index_file_sizes = batch + .column(8) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(1024, index_file_sizes.value(0)); + assert!(index_file_sizes.is_null(1)); + + let index_types = batch + .column(9) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!("bloom_filter", index_types.value(0)); + assert_eq!("inverted", index_types.value(1)); + + let target_types = batch + .column(10) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!("column", target_types.value(0)); + assert_eq!("unknown", target_types.value(1)); + + let target_keys = batch + .column(11) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!("1", target_keys.value(0)); + assert_eq!("legacy", target_keys.value(1)); + + let target_json = batch + .column(12) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!("{\"column\":1}", target_json.value(0)); + assert_eq!("{}", target_json.value(1)); + + let blob_sizes = batch + .column(13) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(256, blob_sizes.value(0)); + assert_eq!(0, blob_sizes.value(1)); + + let meta_jsons = batch + .column(14) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!("{\"bloom\":{}}", meta_jsons.value(0)); + assert!(meta_jsons.is_null(1)); + + let node_ids = batch + .column(15) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(42, node_ids.value(0)); + assert!(node_ids.is_null(1)); + } + #[test] fn test_manifest_build_plan() { // Note: filter must reference a column in the projected schema