Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/index/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ roaring = "0.10"
serde.workspace = true
serde_json.workspace = true
snafu.workspace = true
store-api.workspace = true
tantivy = { version = "0.24", features = ["zstd-compression"] }
tantivy-jieba = "0.16"
tokio.workspace = true
Expand Down
1 change: 1 addition & 0 deletions src/index/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ pub mod error;
pub mod external_provider;
pub mod fulltext_index;
pub mod inverted_index;
pub mod target;

pub type Bytes = Vec<u8>;
pub type BytesRef<'a> = &'a [u8];
109 changes: 109 additions & 0 deletions src/index/src/target.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::any::Any;

use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug;
use serde::{Deserialize, Serialize};
use snafu::{Snafu, ensure};
use store_api::storage::ColumnId;

/// Describes an index target. Column ids are the only supported variant for now.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum IndexTarget {
ColumnId(ColumnId),
}

impl IndexTarget {
/// Derive a stable target key string for the provided index target.
pub fn encode(&self) -> String {
match self {
IndexTarget::ColumnId(id) => id.to_string(),
}
}

/// Parse a target key string back into an index target description.
pub fn decode(key: &str) -> Result<Self, TargetKeyError> {
validate_column_key(key)?;
let id = key
.parse::<ColumnId>()
.map_err(|_| TargetKeyError::InvalidColumnId {
value: key.to_string(),
})?;
Ok(IndexTarget::ColumnId(id))
}
}

/// Errors that can occur when working with index target keys.
#[derive(Snafu, Clone, PartialEq, Eq)]
#[stack_trace_debug]
pub enum TargetKeyError {
#[snafu(display("target key cannot be empty"))]
Empty,

#[snafu(display("target key must contain digits only: {key}"))]
InvalidCharacters { key: String },

#[snafu(display("failed to parse column id from '{value}'"))]
InvalidColumnId { value: String },
}

impl ErrorExt for TargetKeyError {
fn status_code(&self) -> StatusCode {
StatusCode::InvalidArguments
}

fn as_any(&self) -> &dyn Any {
self
}
}

fn validate_column_key(key: &str) -> Result<(), TargetKeyError> {
ensure!(!key.is_empty(), EmptySnafu);
ensure!(
key.chars().all(|ch| ch.is_ascii_digit()),
InvalidCharactersSnafu {
key: key.to_string()
}
);
Ok(())
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn encode_decode_column() {
let target = IndexTarget::ColumnId(42);
let key = target.encode();
assert_eq!(key, "42");
let decoded = IndexTarget::decode(&key).unwrap();
assert_eq!(decoded, target);
}

#[test]
fn decode_rejects_empty() {
let err = IndexTarget::decode("").unwrap_err();
assert!(matches!(err, TargetKeyError::Empty));
}

#[test]
fn decode_rejects_invalid_digits() {
let err = IndexTarget::decode("1a2").unwrap_err();
assert!(matches!(err, TargetKeyError::InvalidCharacters { .. }));
}
}
1 change: 1 addition & 0 deletions src/mito2/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ futures.workspace = true
humantime-serde.workspace = true
index.workspace = true
itertools.workspace = true
greptime-proto.workspace = true
lazy_static = "1.4"
log-store = { workspace = true }
mito-codec.workspace = true
Expand Down
78 changes: 74 additions & 4 deletions src/mito2/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ mod sync_test;
#[cfg(test)]
mod truncate_test;

mod puffin_index;

use std::any::Any;
use std::collections::HashMap;
use std::sync::Arc;
Expand All @@ -78,7 +80,7 @@ use common_base::Plugins;
use common_error::ext::BoxedError;
use common_meta::key::SchemaMetadataManagerRef;
use common_recordbatch::SendableRecordBatchStream;
use common_telemetry::{info, tracing};
use common_telemetry::{info, tracing, warn};
use common_wal::options::{WAL_OPTIONS_KEY, WalOptions};
use futures::future::{join_all, try_join_all};
use futures::stream::{self, Stream, StreamExt};
Expand All @@ -97,12 +99,14 @@ use store_api::region_engine::{
RegionStatistic, SetRegionRoleStateResponse, SettableRegionRoleState, SyncManifestResponse,
};
use store_api::region_request::{AffectedRows, RegionOpenRequest, RegionRequest};
use store_api::sst_entry::{ManifestSstEntry, StorageSstEntry};
use store_api::storage::{RegionId, ScanRequest, SequenceNumber};
use store_api::sst_entry::{ManifestSstEntry, PuffinIndexMetaEntry, StorageSstEntry};
use store_api::storage::{FileId, RegionId, ScanRequest, SequenceNumber};
use tokio::sync::{Semaphore, oneshot};

use crate::access_layer::RegionFilePathFactory;
use crate::cache::{CacheManagerRef, CacheStrategy};
use crate::config::MitoConfig;
use crate::engine::puffin_index::{IndexEntryContext, collect_index_entries_from_puffin};
use crate::error::{
InvalidRequestSnafu, JoinSnafu, MitoManifestInfoSnafu, RecvSnafu, RegionNotFoundSnafu, Result,
SerdeJsonSnafu, SerializeColumnMetadataSnafu,
Expand All @@ -117,7 +121,7 @@ use crate::read::stream::ScanBatchStream;
use crate::region::MitoRegionRef;
use crate::region::opener::PartitionExprFetcherRef;
use crate::request::{RegionEditRequest, WorkerRequest};
use crate::sst::file::FileMeta;
use crate::sst::file::{FileMeta, RegionFileId};
use crate::sst::file_ref::FileReferenceManagerRef;
use crate::wal::entry_distributor::{
DEFAULT_ENTRY_RECEIVER_BUFFER_SIZE, build_wal_entry_distributor_and_receivers,
Expand Down Expand Up @@ -434,6 +438,72 @@ impl MitoEngine {
results
}

/// Lists metadata about all puffin index targets stored in the engine.
pub async fn all_index_metas(&self) -> Vec<PuffinIndexMetaEntry> {
let node_id = self.inner.workers.file_ref_manager().node_id();
let puffin_metadata_cache = self
.inner
.workers
.cache_manager()
.puffin_metadata_cache()
.cloned();

let mut results = Vec::new();

for region in self.inner.workers.all_regions() {
let mut manifest_entries = region.manifest_sst_entries().await;
let access_layer = region.access_layer.clone();
let table_dir = access_layer.table_dir().to_string();
let path_type = access_layer.path_type();
let object_store = access_layer.object_store().clone();
let puffin_factory = access_layer.puffin_manager_factory().clone();
let path_factory = RegionFilePathFactory::new(table_dir.clone(), path_type);

for entry in manifest_entries.drain(..) {
if entry.index_file_path.is_none() {
continue;
}

let file_id = match FileId::parse_str(&entry.file_id) {
Ok(file_id) => file_id,
Err(err) => {
warn!(
err;
"Failed to parse puffin index file id, table_dir: {}, file_id: {}",
entry.table_dir,
entry.file_id
);
continue;
}
};

let region_file_id = RegionFileId::new(entry.region_id, file_id);
let context = IndexEntryContext {
table_dir: &entry.table_dir,
index_file_path: entry.index_file_path.as_ref().unwrap(), // Safety: entry.index_file_path is not None
region_id: entry.region_id,
table_id: entry.table_id,
region_number: entry.region_number,
region_group: entry.region_group,
region_sequence: entry.region_sequence,
file_id: &entry.file_id,
index_file_size: entry.index_file_size,
node_id,
};

let manager = puffin_factory
.build(object_store.clone(), path_factory.clone())
.with_puffin_metadata_cache(puffin_metadata_cache.clone());

let mut metas =
collect_index_entries_from_puffin(manager, region_file_id, context).await;
results.append(&mut metas);
}
}

results
}

/// Lists all SSTs from the storage layer of all regions in the engine.
pub fn all_ssts_from_storage(&self) -> impl Stream<Item = Result<StorageSstEntry>> {
let node_id = self.inner.workers.file_ref_manager().node_id();
Expand Down
Loading
Loading