Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/index/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ roaring = "0.10"
serde.workspace = true
serde_json.workspace = true
snafu.workspace = true
store-api.workspace = true
tantivy = { version = "0.24", features = ["zstd-compression"] }
tantivy-jieba = "0.16"
tokio.workspace = true
Expand Down
9 changes: 9 additions & 0 deletions src/index/src/fulltext_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,3 +75,12 @@ impl Config {
Ok(Self::default())
}
}

impl Analyzer {
pub fn to_str(&self) -> &'static str {
match self {
Analyzer::English => "English",
Analyzer::Chinese => "Chinese",
}
}
}
1 change: 1 addition & 0 deletions src/index/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ pub mod error;
pub mod external_provider;
pub mod fulltext_index;
pub mod inverted_index;
pub mod target;

pub type Bytes = Vec<u8>;
pub type BytesRef<'a> = &'a [u8];
107 changes: 107 additions & 0 deletions src/index/src/target.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::any::Any;
use std::fmt::{self, Display};

use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug;
use serde::{Deserialize, Serialize};
use snafu::{Snafu, ensure};
use store_api::storage::ColumnId;

/// Describes an index target. Column ids are the only supported variant for now.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum IndexTarget {
ColumnId(ColumnId),
}

impl Display for IndexTarget {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
IndexTarget::ColumnId(id) => write!(f, "{}", id),
}
}
}

impl IndexTarget {
/// Parse a target key string back into an index target description.
pub fn decode(key: &str) -> Result<Self, TargetKeyError> {
validate_column_key(key)?;
let id = key
.parse::<ColumnId>()
.map_err(|_| InvalidColumnIdSnafu { value: key }.build())?;
Ok(IndexTarget::ColumnId(id))
}
}

/// Errors that can occur when working with index target keys.
#[derive(Snafu, Clone, PartialEq, Eq)]
#[stack_trace_debug]
pub enum TargetKeyError {
#[snafu(display("target key cannot be empty"))]
Empty,

#[snafu(display("target key must contain digits only: {key}"))]
InvalidCharacters { key: String },

#[snafu(display("failed to parse column id from '{value}'"))]
InvalidColumnId { value: String },
}

impl ErrorExt for TargetKeyError {
fn status_code(&self) -> StatusCode {
StatusCode::InvalidArguments
}

fn as_any(&self) -> &dyn Any {
self
}
}

fn validate_column_key(key: &str) -> Result<(), TargetKeyError> {
ensure!(!key.is_empty(), EmptySnafu);
ensure!(
key.chars().all(|ch| ch.is_ascii_digit()),
InvalidCharactersSnafu { key }
);
Ok(())
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn encode_decode_column() {
let target = IndexTarget::ColumnId(42);
let key = format!("{}", target);
assert_eq!(key, "42");
let decoded = IndexTarget::decode(&key).unwrap();
assert_eq!(decoded, target);
}

#[test]
fn decode_rejects_empty() {
let err = IndexTarget::decode("").unwrap_err();
assert!(matches!(err, TargetKeyError::Empty));
}

#[test]
fn decode_rejects_invalid_digits() {
let err = IndexTarget::decode("1a2").unwrap_err();
assert!(matches!(err, TargetKeyError::InvalidCharacters { .. }));
}
}
1 change: 1 addition & 0 deletions src/mito2/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ futures.workspace = true
humantime-serde.workspace = true
index.workspace = true
itertools.workspace = true
greptime-proto.workspace = true
lazy_static = "1.4"
log-store = { workspace = true }
mito-codec.workspace = true
Expand Down
95 changes: 91 additions & 4 deletions src/mito2/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ mod sync_test;
#[cfg(test)]
mod truncate_test;

mod puffin_index;

use std::any::Any;
use std::collections::HashMap;
use std::sync::Arc;
Expand All @@ -78,7 +80,7 @@ use common_base::Plugins;
use common_error::ext::BoxedError;
use common_meta::key::SchemaMetadataManagerRef;
use common_recordbatch::SendableRecordBatchStream;
use common_telemetry::{info, tracing};
use common_telemetry::{info, tracing, warn};
use common_wal::options::{WAL_OPTIONS_KEY, WalOptions};
use futures::future::{join_all, try_join_all};
use futures::stream::{self, Stream, StreamExt};
Expand All @@ -97,12 +99,14 @@ use store_api::region_engine::{
RegionStatistic, SetRegionRoleStateResponse, SettableRegionRoleState, SyncManifestResponse,
};
use store_api::region_request::{AffectedRows, RegionOpenRequest, RegionRequest};
use store_api::sst_entry::{ManifestSstEntry, StorageSstEntry};
use store_api::storage::{RegionId, ScanRequest, SequenceNumber};
use store_api::sst_entry::{ManifestSstEntry, PuffinIndexMetaEntry, StorageSstEntry};
use store_api::storage::{FileId, RegionId, ScanRequest, SequenceNumber};
use tokio::sync::{Semaphore, oneshot};

use crate::access_layer::RegionFilePathFactory;
use crate::cache::{CacheManagerRef, CacheStrategy};
use crate::config::MitoConfig;
use crate::engine::puffin_index::{IndexEntryContext, collect_index_entries_from_puffin};
use crate::error::{
InvalidRequestSnafu, JoinSnafu, MitoManifestInfoSnafu, RecvSnafu, RegionNotFoundSnafu, Result,
SerdeJsonSnafu, SerializeColumnMetadataSnafu,
Expand All @@ -117,7 +121,7 @@ use crate::read::stream::ScanBatchStream;
use crate::region::MitoRegionRef;
use crate::region::opener::PartitionExprFetcherRef;
use crate::request::{RegionEditRequest, WorkerRequest};
use crate::sst::file::FileMeta;
use crate::sst::file::{FileMeta, RegionFileId};
use crate::sst::file_ref::FileReferenceManagerRef;
use crate::wal::entry_distributor::{
DEFAULT_ENTRY_RECEIVER_BUFFER_SIZE, build_wal_entry_distributor_and_receivers,
Expand Down Expand Up @@ -434,6 +438,89 @@ impl MitoEngine {
results
}

/// Lists metadata about all puffin index targets stored in the engine.
pub async fn all_index_metas(&self) -> Vec<PuffinIndexMetaEntry> {
let node_id = self.inner.workers.file_ref_manager().node_id();
let cache_manager = self.inner.workers.cache_manager();
let puffin_metadata_cache = cache_manager.puffin_metadata_cache().cloned();
let bloom_filter_cache = cache_manager.bloom_filter_index_cache().cloned();
let inverted_index_cache = cache_manager.inverted_index_cache().cloned();

let mut results = Vec::new();

for region in self.inner.workers.all_regions() {
let manifest_entries = region.manifest_sst_entries().await;
let access_layer = region.access_layer.clone();
let table_dir = access_layer.table_dir().to_string();
let path_type = access_layer.path_type();
let object_store = access_layer.object_store().clone();
let puffin_factory = access_layer.puffin_manager_factory().clone();
let path_factory = RegionFilePathFactory::new(table_dir, path_type);

let entry_futures = manifest_entries.into_iter().map(|entry| {
let object_store = object_store.clone();
let path_factory = path_factory.clone();
let puffin_factory = puffin_factory.clone();
let puffin_metadata_cache = puffin_metadata_cache.clone();
let bloom_filter_cache = bloom_filter_cache.clone();
let inverted_index_cache = inverted_index_cache.clone();

async move {
let Some(index_file_path) = entry.index_file_path.as_ref() else {
return Vec::new();
};

let file_id = match FileId::parse_str(&entry.file_id) {
Ok(file_id) => file_id,
Err(err) => {
warn!(
err;
"Failed to parse puffin index file id, table_dir: {}, file_id: {}",
entry.table_dir,
entry.file_id
);
return Vec::new();
}
};

let region_file_id = RegionFileId::new(entry.region_id, file_id);
let context = IndexEntryContext {
table_dir: &entry.table_dir,
index_file_path: index_file_path.as_str(),
region_id: entry.region_id,
table_id: entry.table_id,
region_number: entry.region_number,
region_group: entry.region_group,
region_sequence: entry.region_sequence,
file_id: &entry.file_id,
index_file_size: entry.index_file_size,
node_id,
};

let manager = puffin_factory
.build(object_store, path_factory)
.with_puffin_metadata_cache(puffin_metadata_cache);

collect_index_entries_from_puffin(
manager,
region_file_id,
context,
bloom_filter_cache,
inverted_index_cache,
)
.await
}
});

let mut meta_stream = stream::iter(entry_futures).buffer_unordered(8); // Parallelism is 8.
while let Some(mut metas) = meta_stream.next().await {
results.append(&mut metas);
}
}

results
}

/// Lists all SSTs from the storage layer of all regions in the engine.
pub fn all_ssts_from_storage(&self) -> impl Stream<Item = Result<StorageSstEntry>> {
let node_id = self.inner.workers.file_ref_manager().node_id();
Expand Down
Loading