Skip to content

Commit 7b22a92

Browse files
committed
refien uts
1 parent 754296c commit 7b22a92

File tree

11 files changed

+184
-136
lines changed

11 files changed

+184
-136
lines changed

src/query/service/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ common-arrow = { path = "../../common/arrow" }
4646
common-ast = { path = "../ast" }
4747
common-base = { path = "../../common/base" }
4848
common-catalog = { path = "../catalog" }
49-
common-cache = {path = "../../common/cache"}
49+
common-cache = {path = "../../common/cache" }
5050
common-compress = { path = "../../common/compress" }
5151
common-config = { path = "../config" }
5252
common-exception = { path = "../../common/exception" }
Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
// Copyright 2023 Datafuse Labs.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use common_arrow::parquet::metadata::FileMetaData;
16+
use common_arrow::parquet::metadata::ThriftFileMetaData;
17+
use common_base::base::tokio;
18+
use common_cache::Cache;
19+
use common_expression::types::Int32Type;
20+
use common_expression::types::NumberDataType;
21+
use common_expression::DataBlock;
22+
use common_expression::FromData;
23+
use common_expression::TableDataType;
24+
use common_expression::TableField;
25+
use common_expression::TableSchemaRefExt;
26+
use common_storages_fuse::io::TableMetaLocationGenerator;
27+
use common_storages_fuse::statistics::gen_columns_statistics;
28+
use common_storages_fuse::FuseStorageFormat;
29+
use opendal::Operator;
30+
use storages_common_cache::InMemoryCacheBuilder;
31+
use storages_common_cache::InMemoryItemCacheHolder;
32+
use storages_common_index::BloomIndexMeta;
33+
use sysinfo::get_current_pid;
34+
use sysinfo::ProcessExt;
35+
use sysinfo::System;
36+
use sysinfo::SystemExt;
37+
use uuid::Uuid;
38+
39+
use crate::storages::fuse::block_writer::BlockWriter;
40+
41+
// NOTE:
42+
//
43+
// usage of memory is observed at *process* level, please do not combine them into
44+
// one test function.
45+
//
46+
// by default, these cases are ignored (in CI).
47+
//
48+
// please run the following two cases individually (in different process)
49+
50+
#[tokio::test(flavor = "multi_thread")]
51+
#[ignore]
52+
async fn test_index_meta_cache_size_file_meta_data() -> common_exception::Result<()> {
53+
let thrift_file_meta = setup().await?;
54+
55+
let cache_number = 300_000;
56+
57+
let meta: FileMetaData = FileMetaData::try_from_thrift(thrift_file_meta)?;
58+
59+
let sys = System::new_all();
60+
let pid = get_current_pid().unwrap();
61+
let process = sys.process(pid).unwrap();
62+
let base_memory_usage = process.memory();
63+
let scenario = "FileMetaData";
64+
65+
eprintln!(
66+
"scenario {}, pid {}, base memory {}",
67+
scenario, pid, base_memory_usage
68+
);
69+
70+
let cache = InMemoryCacheBuilder::new_item_cache::<FileMetaData>(cache_number as u64);
71+
72+
populate_cache(&cache, meta, cache_number);
73+
show_memory_usage(scenario, base_memory_usage, cache_number);
74+
75+
drop(cache);
76+
77+
Ok(())
78+
}
79+
80+
#[tokio::test(flavor = "multi_thread")]
81+
#[ignore]
82+
async fn test_index_meta_cache_size_bloom_meta() -> common_exception::Result<()> {
83+
let thrift_file_meta = setup().await?;
84+
85+
let cache_number = 300_000;
86+
87+
let meta: FileMetaData = FileMetaData::try_from_thrift(thrift_file_meta)?;
88+
let bloom_index_meta = BloomIndexMeta::try_from(meta)?;
89+
90+
let sys = System::new_all();
91+
let pid = get_current_pid().unwrap();
92+
let process = sys.process(pid).unwrap();
93+
let base_memory_usage = process.memory();
94+
95+
let scenario = "BloomIndexMeta(mini)";
96+
eprintln!(
97+
"scenario {}, pid {}, base memory {}",
98+
scenario, pid, base_memory_usage
99+
);
100+
101+
let cache = InMemoryCacheBuilder::new_item_cache::<BloomIndexMeta>(cache_number as u64);
102+
populate_cache(&cache, bloom_index_meta, cache_number);
103+
show_memory_usage("BloomIndexMeta(Mini)", base_memory_usage, cache_number);
104+
105+
drop(cache);
106+
107+
Ok(())
108+
}
109+
110+
fn populate_cache<T>(cache: &InMemoryItemCacheHolder<T>, item: T, num_cache: usize)
111+
where T: Clone {
112+
let mut c = cache.write();
113+
for _ in 0..num_cache {
114+
let uuid = Uuid::new_v4();
115+
(*c).put(
116+
format!("{}", uuid.simple()),
117+
std::sync::Arc::new(item.clone()),
118+
);
119+
}
120+
}
121+
122+
async fn setup() -> common_exception::Result<ThriftFileMetaData> {
123+
let fields = (0..23)
124+
.map(|_| TableField::new("id", TableDataType::Number(NumberDataType::Int32)))
125+
.collect::<Vec<_>>();
126+
127+
let schema = TableSchemaRefExt::create(fields);
128+
129+
let mut columns = vec![];
130+
for _ in 0..schema.fields().len() {
131+
// values do not matter
132+
let column = Int32Type::from_data(vec![1]);
133+
columns.push(column)
134+
}
135+
136+
let block = DataBlock::new_from_columns(columns);
137+
let operator = Operator::new(opendal::services::Memory::default())?.finish();
138+
let loc_generator = TableMetaLocationGenerator::with_prefix("/".to_owned());
139+
let col_stats = gen_columns_statistics(&block, None, &schema)?;
140+
let block_writer = BlockWriter::new(&operator, &loc_generator);
141+
let (_block_meta, thrift_file_meta) = block_writer
142+
.write(FuseStorageFormat::Parquet, &schema, block, col_stats, None)
143+
.await?;
144+
145+
Ok(thrift_file_meta.unwrap())
146+
}
147+
148+
fn show_memory_usage(case: &str, base_memory_usage: u64, num_cache_items: usize) {
149+
let sys = System::new_all();
150+
let pid = get_current_pid().unwrap();
151+
let process = sys.process(pid).unwrap();
152+
{
153+
let memory_after = process.memory();
154+
let delta = memory_after - base_memory_usage;
155+
let delta_gb = (delta as f64) / 1024.0 / 1024.0 / 1024.0;
156+
eprintln!(
157+
"
158+
cache item type : {},
159+
number of cached items {},
160+
mem usage(B):{:+},
161+
mem usage(GB){:+}
162+
",
163+
case, num_cache_items, delta, delta_gb
164+
);
165+
}
166+
}

src/query/service/tests/it/storages/fuse/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
#![allow(clippy::too_many_arguments)]
1616
mod block_writer;
17+
mod bloom_index_meta_size;
1718
mod io;
1819
mod meta;
1920
mod operations;

src/query/service/tests/it/storages/fuse/statistics.rs

Lines changed: 0 additions & 114 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,7 @@
1515
use std::collections::HashMap;
1616
use std::sync::Arc;
1717

18-
use common_arrow::parquet::metadata::FileMetaData;
19-
use common_arrow::parquet::metadata::ThriftFileMetaData;
2018
use common_base::base::tokio;
21-
use common_cache::Cache;
2219
use common_expression::type_check::check;
2320
use common_expression::types::number::Int32Type;
2421
use common_expression::types::number::NumberScalar;
@@ -38,7 +35,6 @@ use common_expression::Scalar;
3835
use common_expression::TableDataType;
3936
use common_expression::TableField;
4037
use common_expression::TableSchema;
41-
use common_expression::TableSchemaRefExt;
4238
use common_functions::aggregates::eval_aggr;
4339
use common_functions::scalars::BUILTIN_FUNCTIONS;
4440
use common_sql::evaluator::BlockOperator;
@@ -54,19 +50,11 @@ use databend_query::storages::fuse::statistics::ClusterStatsGenerator;
5450
use databend_query::storages::fuse::statistics::StatisticsAccumulator;
5551
use opendal::Operator;
5652
use rand::Rng;
57-
use storages_common_cache::InMemoryCacheBuilder;
58-
use storages_common_cache::InMemoryItemCacheHolder;
59-
use storages_common_index::BloomIndexMetaMini;
6053
use storages_common_table_meta::meta::BlockMeta;
6154
use storages_common_table_meta::meta::ClusterStatistics;
6255
use storages_common_table_meta::meta::ColumnStatistics;
6356
use storages_common_table_meta::meta::Compression;
6457
use storages_common_table_meta::meta::Statistics;
65-
use sysinfo::get_current_pid;
66-
use sysinfo::ProcessExt;
67-
use sysinfo::System;
68-
use sysinfo::SystemExt;
69-
use uuid::Uuid;
7058

7159
use crate::storages::fuse::block_writer::BlockWriter;
7260
use crate::storages::fuse::table_test_fixture::TestFixture;
@@ -565,105 +553,3 @@ fn test_reduce_block_meta() -> common_exception::Result<()> {
565553

566554
Ok(())
567555
}
568-
569-
fn populate_cache<T>(cache: &InMemoryItemCacheHolder<T>, item: T, num_cache: usize)
570-
where T: Clone {
571-
let mut c = cache.write();
572-
for _ in 0..num_cache {
573-
let uuid = Uuid::new_v4();
574-
(*c).put(
575-
format!("{}", uuid.simple()),
576-
std::sync::Arc::new(item.clone()),
577-
);
578-
}
579-
}
580-
581-
async fn setup() -> common_exception::Result<ThriftFileMetaData> {
582-
let fields = (0..23)
583-
.into_iter()
584-
.map(|_| TableField::new("id", TableDataType::Number(NumberDataType::Int32)))
585-
.collect::<Vec<_>>();
586-
587-
let schema = TableSchemaRefExt::create(fields);
588-
589-
let mut columns = vec![];
590-
for _ in 0..schema.fields().len() {
591-
// values do not matter
592-
let column = Int32Type::from_data(vec![1]);
593-
columns.push(column)
594-
}
595-
596-
let block = DataBlock::new_from_columns(columns);
597-
let operator = Operator::new(opendal::services::Memory::default())?.finish();
598-
let loc_generator = TableMetaLocationGenerator::with_prefix("/".to_owned());
599-
let col_stats = gen_columns_statistics(&block, None, &schema)?;
600-
let block_writer = BlockWriter::new(&operator, &loc_generator);
601-
let (_block_meta, thrift_file_meta) = block_writer
602-
.write(FuseStorageFormat::Parquet, &schema, block, col_stats, None)
603-
.await?;
604-
605-
Ok(thrift_file_meta.unwrap())
606-
}
607-
608-
fn show_memory_usage(case: &str, base_memory_usage: u64, num_cache_items: usize) {
609-
let sys = System::new_all();
610-
let pid = get_current_pid().unwrap();
611-
let process = sys.process(pid).unwrap();
612-
{
613-
let memory_after = process.memory();
614-
let delta = memory_after - base_memory_usage;
615-
let delta_gb = (delta as f64) / 1024.0 / 1024.0 / 1024.0;
616-
eprintln!(
617-
" cache type: {}, number of cached items {}, mem usage(B):{:+}, mem usage(GB){:+}",
618-
case, num_cache_items, delta, delta_gb
619-
);
620-
}
621-
}
622-
623-
#[tokio::test(flavor = "multi_thread")]
624-
#[ignore]
625-
async fn test_index_meta_cache_size_file_meta_data() -> common_exception::Result<()> {
626-
let thrift_file_meta = setup().await?;
627-
628-
let cache_number = 300_000;
629-
630-
let meta: FileMetaData = FileMetaData::try_from_thrift(thrift_file_meta.clone())?;
631-
632-
let sys = System::new_all();
633-
let pid = get_current_pid().unwrap();
634-
let process = sys.process(pid).unwrap();
635-
let base_memory_usage = process.memory();
636-
637-
let cache = InMemoryCacheBuilder::new_item_cache::<FileMetaData>(cache_number as u64);
638-
639-
populate_cache(&cache, meta, cache_number);
640-
show_memory_usage("FileMetaData", base_memory_usage, cache_number);
641-
642-
drop(cache);
643-
644-
Ok(())
645-
}
646-
647-
#[tokio::test(flavor = "multi_thread")]
648-
#[ignore]
649-
async fn test_index_meta_cache_size_bloom_meta() -> common_exception::Result<()> {
650-
let thrift_file_meta = setup().await?;
651-
652-
let cache_number = 300_000;
653-
654-
let meta: FileMetaData = FileMetaData::try_from_thrift(thrift_file_meta.clone())?;
655-
let bloom_index_meta = BloomIndexMetaMini::try_from(meta.clone())?;
656-
657-
let sys = System::new_all();
658-
let pid = get_current_pid().unwrap();
659-
let process = sys.process(pid).unwrap();
660-
let base_memory_usage = process.memory();
661-
662-
let cache = InMemoryCacheBuilder::new_item_cache::<BloomIndexMetaMini>(cache_number as u64);
663-
populate_cache(&cache, bloom_index_meta, cache_number);
664-
show_memory_usage("BloomIndexMetaMini", base_memory_usage, cache_number);
665-
666-
drop(cache);
667-
668-
Ok(())
669-
}

src/query/storages/common/cache-manager/src/caches.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use storages_common_cache::CacheAccessor;
2424
use storages_common_cache::InMemoryItemCacheHolder;
2525
use storages_common_cache::NamedCache;
2626
use storages_common_index::filters::Xor8Filter;
27-
use storages_common_index::BloomIndexMetaMini;
27+
use storages_common_index::BloomIndexMeta;
2828
use storages_common_table_meta::meta::SegmentInfo;
2929
use storages_common_table_meta::meta::TableSnapshot;
3030
use storages_common_table_meta::meta::TableSnapshotStatistics;
@@ -41,7 +41,7 @@ pub type TableSnapshotStatisticCache = NamedCache<InMemoryItemCacheHolder<TableS
4141
/// For each indexed data block, the bloom xor8 filter of column is cached individually
4242
pub type BloomIndexFilterCache = NamedCache<InMemoryItemCacheHolder<Xor8Filter>>;
4343
/// In memory object cache of parquet FileMetaData of bloom index data
44-
pub type BloomIndexMetaCache = NamedCache<InMemoryItemCacheHolder<BloomIndexMetaMini>>;
44+
pub type BloomIndexMetaCache = NamedCache<InMemoryItemCacheHolder<BloomIndexMeta>>;
4545
/// In memory object cache of parquet FileMetaData of external parquet files
4646
pub type FileMetaDataCache = NamedCache<InMemoryItemCacheHolder<FileMetaData>>;
4747

@@ -87,7 +87,7 @@ impl CachedObject<TableSnapshotStatistics> for TableSnapshotStatistics {
8787
}
8888
}
8989

90-
impl CachedObject<BloomIndexMetaMini> for BloomIndexMetaMini {
90+
impl CachedObject<BloomIndexMeta> for BloomIndexMeta {
9191
type Cache = BloomIndexMetaCache;
9292
fn cache() -> Option<Self::Cache> {
9393
CacheManager::instance().get_bloom_index_meta_cache()

src/query/storages/common/index/src/bloom_index.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,11 +56,11 @@ use crate::filters::Xor8Filter;
5656
use crate::Index;
5757

5858
#[derive(Clone)]
59-
pub struct BloomIndexMetaMini {
59+
pub struct BloomIndexMeta {
6060
pub columns: Vec<(String, SingleColumnMeta)>,
6161
}
6262

63-
impl TryFrom<FileMetaData> for BloomIndexMetaMini {
63+
impl TryFrom<FileMetaData> for BloomIndexMeta {
6464
type Error = common_exception::ErrorCode;
6565

6666
fn try_from(meta: FileMetaData) -> std::result::Result<Self, Self::Error> {

src/query/storages/common/index/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ mod page_index;
2222
mod range_index;
2323

2424
pub use bloom_index::BloomIndex;
25-
pub use bloom_index::BloomIndexMetaMini;
25+
pub use bloom_index::BloomIndexMeta;
2626
pub use bloom_index::FilterEvalResult;
2727
pub use index::Index;
2828
pub use page_index::PageIndex;

0 commit comments

Comments
 (0)