Skip to content

Commit 754296c

Browse files
committed
refactor: shrink bloom index meta cache size
1 parent f9e9b00 commit 754296c

File tree

16 files changed

+293
-55
lines changed

16 files changed

+293
-55
lines changed

Cargo.lock

Lines changed: 26 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/query/service/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ common-arrow = { path = "../../common/arrow" }
4646
common-ast = { path = "../ast" }
4747
common-base = { path = "../../common/base" }
4848
common-catalog = { path = "../catalog" }
49+
common-cache = {path = "../../common/cache"}
4950
common-compress = { path = "../../common/compress" }
5051
common-config = { path = "../config" }
5152
common-exception = { path = "../../common/exception" }
@@ -166,6 +167,7 @@ toml = { version = "0.7.3", default-features = false }
166167
url = "2.3.1"
167168
walkdir = "2.3.2"
168169
wiremock = "0.5.14"
170+
sysinfo = "0.28.3"
169171

170172
[build-dependencies]
171173
common-building = { path = "../../common/building" }

src/query/service/src/interpreters/interpreter_insert.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -485,6 +485,8 @@ impl Interpreter for InsertInterpreter {
485485

486486
let ctx = self.ctx.clone();
487487
let overwrite = self.plan.overwrite;
488+
eprintln!(">>> main piepline {:?}", build_res.main_pipeline);
489+
eprintln!(">>> source piepline {:?}", build_res.sources_pipelines);
488490
build_res.main_pipeline.set_on_finished(move |may_error| {
489491
// capture out variable
490492
let overwrite = overwrite;

src/query/service/src/schedulers/fragments/fragmenter.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,9 @@ impl Fragmenter {
9494
}
9595

9696
pub fn build_fragment(mut self, plan: &PhysicalPlan) -> Result<PlanFragment> {
97+
eprintln!("origin plan {:?}", &plan);
9798
let root = self.replace(plan)?;
99+
eprintln!("after replace {:?}", &root);
98100
let mut root_fragment = PlanFragment {
99101
plan: root,
100102
fragment_type: FragmentType::Root,

src/query/service/tests/it/storages/fuse/block_writer.rs

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use common_arrow::parquet::metadata::ThriftFileMetaData;
1516
use common_exception::Result;
1617
use common_expression::DataBlock;
1718
use common_expression::FunctionContext;
@@ -56,13 +57,13 @@ impl<'a> BlockWriter<'a> {
5657
block: DataBlock,
5758
col_stats: StatisticsOfColumns,
5859
cluster_stats: Option<ClusterStatistics>,
59-
) -> Result<BlockMeta> {
60+
) -> Result<(BlockMeta, Option<ThriftFileMetaData>)> {
6061
let (location, block_id) = self.location_generator.gen_block_location();
6162

6263
let data_accessor = &self.data_accessor;
6364
let row_count = block.num_rows() as u64;
6465
let block_size = block.memory_size() as u64;
65-
let (bloom_filter_index_size, bloom_filter_index_location) = self
66+
let (bloom_filter_index_size, bloom_filter_index_location, meta) = self
6667
.build_block_index(data_accessor, schema.clone(), &block, block_id)
6768
.await?;
6869

@@ -88,7 +89,7 @@ impl<'a> BlockWriter<'a> {
8889
bloom_filter_index_size,
8990
Compression::Lz4Raw,
9091
);
91-
Ok(block_meta)
92+
Ok((block_meta, meta))
9293
}
9394

9495
pub async fn build_block_index(
@@ -97,7 +98,7 @@ impl<'a> BlockWriter<'a> {
9798
schema: TableSchemaRef,
9899
block: &DataBlock,
99100
block_id: Uuid,
100-
) -> Result<(u64, Option<Location>)> {
101+
) -> Result<(u64, Option<Location>, Option<ThriftFileMetaData>)> {
101102
let location = self
102103
.location_generator
103104
.block_bloom_index_location(&block_id);
@@ -109,18 +110,16 @@ impl<'a> BlockWriter<'a> {
109110
let filter_schema = bloom_index.filter_schema;
110111
let mut data = Vec::with_capacity(DEFAULT_BLOCK_INDEX_BUFFER_SIZE);
111112
let index_block_schema = &filter_schema;
112-
let (size, _) = blocks_to_parquet(
113+
let (size, meta) = blocks_to_parquet(
113114
index_block_schema,
114115
vec![index_block],
115116
&mut data,
116117
TableCompression::None,
117118
)?;
118-
119119
data_accessor.write(&location.0, data).await?;
120-
121-
Ok((size, Some(location)))
120+
Ok((size, Some(location), Some(meta)))
122121
} else {
123-
Ok((0u64, None))
122+
Ok((0u64, None, None))
124123
}
125124
}
126125
}

src/query/service/tests/it/storages/fuse/operations/gc.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -309,7 +309,7 @@ mod utils {
309309
let blocks: std::vec::Vec<DataBlock> = stream.try_collect().await?;
310310
for block in blocks {
311311
let stats = gen_columns_statistics(&block, None, &schema)?;
312-
let block_meta = block_writer
312+
let (block_meta, _index_meta) = block_writer
313313
.write(FuseStorageFormat::Parquet, &schema, block, stats, None)
314314
.await?;
315315
block_metas.push(Arc::new(block_meta));

src/query/service/tests/it/storages/fuse/operations/mutation/segments_compact_mutator.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -698,7 +698,7 @@ impl CompactSegmentTestFixture {
698698
let block = block?;
699699
let col_stats = gen_columns_statistics(&block, None, &schema)?;
700700

701-
let block_meta = block_writer
701+
let (block_meta, _index_meta) = block_writer
702702
.write(FuseStorageFormat::Parquet, &schema, block, col_stats, None)
703703
.await?;
704704

src/query/service/tests/it/storages/fuse/statistics.rs

Lines changed: 115 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,10 @@
1515
use std::collections::HashMap;
1616
use std::sync::Arc;
1717

18+
use common_arrow::parquet::metadata::FileMetaData;
19+
use common_arrow::parquet::metadata::ThriftFileMetaData;
1820
use common_base::base::tokio;
21+
use common_cache::Cache;
1922
use common_expression::type_check::check;
2023
use common_expression::types::number::Int32Type;
2124
use common_expression::types::number::NumberScalar;
@@ -35,6 +38,7 @@ use common_expression::Scalar;
3538
use common_expression::TableDataType;
3639
use common_expression::TableField;
3740
use common_expression::TableSchema;
41+
use common_expression::TableSchemaRefExt;
3842
use common_functions::aggregates::eval_aggr;
3943
use common_functions::scalars::BUILTIN_FUNCTIONS;
4044
use common_sql::evaluator::BlockOperator;
@@ -50,11 +54,19 @@ use databend_query::storages::fuse::statistics::ClusterStatsGenerator;
5054
use databend_query::storages::fuse::statistics::StatisticsAccumulator;
5155
use opendal::Operator;
5256
use rand::Rng;
57+
use storages_common_cache::InMemoryCacheBuilder;
58+
use storages_common_cache::InMemoryItemCacheHolder;
59+
use storages_common_index::BloomIndexMetaMini;
5360
use storages_common_table_meta::meta::BlockMeta;
5461
use storages_common_table_meta::meta::ClusterStatistics;
5562
use storages_common_table_meta::meta::ColumnStatistics;
5663
use storages_common_table_meta::meta::Compression;
5764
use storages_common_table_meta::meta::Statistics;
65+
use sysinfo::get_current_pid;
66+
use sysinfo::ProcessExt;
67+
use sysinfo::System;
68+
use sysinfo::SystemExt;
69+
use uuid::Uuid;
5870

5971
use crate::storages::fuse::block_writer::BlockWriter;
6072
use crate::storages::fuse::table_test_fixture::TestFixture;
@@ -236,7 +248,7 @@ async fn test_accumulator() -> common_exception::Result<()> {
236248
let block = item?;
237249
let col_stats = gen_columns_statistics(&block, None, &schema)?;
238250
let block_writer = BlockWriter::new(&operator, &loc_generator);
239-
let block_meta = block_writer
251+
let (block_meta, _index_meta) = block_writer
240252
.write(FuseStorageFormat::Parquet, &schema, block, col_stats, None)
241253
.await?;
242254
stats_acc.add_with_block_meta(block_meta);
@@ -553,3 +565,105 @@ fn test_reduce_block_meta() -> common_exception::Result<()> {
553565

554566
Ok(())
555567
}
568+
569+
fn populate_cache<T>(cache: &InMemoryItemCacheHolder<T>, item: T, num_cache: usize)
570+
where T: Clone {
571+
let mut c = cache.write();
572+
for _ in 0..num_cache {
573+
let uuid = Uuid::new_v4();
574+
(*c).put(
575+
format!("{}", uuid.simple()),
576+
std::sync::Arc::new(item.clone()),
577+
);
578+
}
579+
}
580+
581+
async fn setup() -> common_exception::Result<ThriftFileMetaData> {
582+
let fields = (0..23)
583+
.into_iter()
584+
.map(|_| TableField::new("id", TableDataType::Number(NumberDataType::Int32)))
585+
.collect::<Vec<_>>();
586+
587+
let schema = TableSchemaRefExt::create(fields);
588+
589+
let mut columns = vec![];
590+
for _ in 0..schema.fields().len() {
591+
// values do not matter
592+
let column = Int32Type::from_data(vec![1]);
593+
columns.push(column)
594+
}
595+
596+
let block = DataBlock::new_from_columns(columns);
597+
let operator = Operator::new(opendal::services::Memory::default())?.finish();
598+
let loc_generator = TableMetaLocationGenerator::with_prefix("/".to_owned());
599+
let col_stats = gen_columns_statistics(&block, None, &schema)?;
600+
let block_writer = BlockWriter::new(&operator, &loc_generator);
601+
let (_block_meta, thrift_file_meta) = block_writer
602+
.write(FuseStorageFormat::Parquet, &schema, block, col_stats, None)
603+
.await?;
604+
605+
Ok(thrift_file_meta.unwrap())
606+
}
607+
608+
fn show_memory_usage(case: &str, base_memory_usage: u64, num_cache_items: usize) {
609+
let sys = System::new_all();
610+
let pid = get_current_pid().unwrap();
611+
let process = sys.process(pid).unwrap();
612+
{
613+
let memory_after = process.memory();
614+
let delta = memory_after - base_memory_usage;
615+
let delta_gb = (delta as f64) / 1024.0 / 1024.0 / 1024.0;
616+
eprintln!(
617+
" cache type: {}, number of cached items {}, mem usage(B):{:+}, mem usage(GB){:+}",
618+
case, num_cache_items, delta, delta_gb
619+
);
620+
}
621+
}
622+
623+
#[tokio::test(flavor = "multi_thread")]
624+
#[ignore]
625+
async fn test_index_meta_cache_size_file_meta_data() -> common_exception::Result<()> {
626+
let thrift_file_meta = setup().await?;
627+
628+
let cache_number = 300_000;
629+
630+
let meta: FileMetaData = FileMetaData::try_from_thrift(thrift_file_meta.clone())?;
631+
632+
let sys = System::new_all();
633+
let pid = get_current_pid().unwrap();
634+
let process = sys.process(pid).unwrap();
635+
let base_memory_usage = process.memory();
636+
637+
let cache = InMemoryCacheBuilder::new_item_cache::<FileMetaData>(cache_number as u64);
638+
639+
populate_cache(&cache, meta, cache_number);
640+
show_memory_usage("FileMetaData", base_memory_usage, cache_number);
641+
642+
drop(cache);
643+
644+
Ok(())
645+
}
646+
647+
#[tokio::test(flavor = "multi_thread")]
648+
#[ignore]
649+
async fn test_index_meta_cache_size_bloom_meta() -> common_exception::Result<()> {
650+
let thrift_file_meta = setup().await?;
651+
652+
let cache_number = 300_000;
653+
654+
let meta: FileMetaData = FileMetaData::try_from_thrift(thrift_file_meta.clone())?;
655+
let bloom_index_meta = BloomIndexMetaMini::try_from(meta.clone())?;
656+
657+
let sys = System::new_all();
658+
let pid = get_current_pid().unwrap();
659+
let process = sys.process(pid).unwrap();
660+
let base_memory_usage = process.memory();
661+
662+
let cache = InMemoryCacheBuilder::new_item_cache::<BloomIndexMetaMini>(cache_number as u64);
663+
populate_cache(&cache, bloom_index_meta, cache_number);
664+
show_memory_usage("BloomIndexMetaMini", base_memory_usage, cache_number);
665+
666+
drop(cache);
667+
668+
Ok(())
669+
}

src/query/storages/common/cache-manager/src/caches.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ use storages_common_cache::CacheAccessor;
2424
use storages_common_cache::InMemoryItemCacheHolder;
2525
use storages_common_cache::NamedCache;
2626
use storages_common_index::filters::Xor8Filter;
27+
use storages_common_index::BloomIndexMetaMini;
2728
use storages_common_table_meta::meta::SegmentInfo;
2829
use storages_common_table_meta::meta::TableSnapshot;
2930
use storages_common_table_meta::meta::TableSnapshotStatistics;
@@ -39,9 +40,8 @@ pub type TableSnapshotStatisticCache = NamedCache<InMemoryItemCacheHolder<TableS
3940
/// In memory object cache of bloom filter.
4041
/// For each indexed data block, the bloom xor8 filter of column is cached individually
4142
pub type BloomIndexFilterCache = NamedCache<InMemoryItemCacheHolder<Xor8Filter>>;
42-
pub struct BloomIndexMeta(pub FileMetaData);
4343
/// In memory object cache of parquet FileMetaData of bloom index data
44-
pub type BloomIndexMetaCache = NamedCache<InMemoryItemCacheHolder<BloomIndexMeta>>;
44+
pub type BloomIndexMetaCache = NamedCache<InMemoryItemCacheHolder<BloomIndexMetaMini>>;
4545
/// In memory object cache of parquet FileMetaData of external parquet files
4646
pub type FileMetaDataCache = NamedCache<InMemoryItemCacheHolder<FileMetaData>>;
4747

@@ -87,7 +87,7 @@ impl CachedObject<TableSnapshotStatistics> for TableSnapshotStatistics {
8787
}
8888
}
8989

90-
impl CachedObject<BloomIndexMeta> for BloomIndexMeta {
90+
impl CachedObject<BloomIndexMetaMini> for BloomIndexMetaMini {
9191
type Cache = BloomIndexMetaCache;
9292
fn cache() -> Option<Self::Cache> {
9393
CacheManager::instance().get_bloom_index_meta_cache()

src/query/storages/common/index/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ test = false
1515
ignored = ["xorfilter-rs", "match-template"]
1616

1717
[dependencies]
18+
common-arrow = { path = "../../../../common/arrow" }
1819
common-exception = { path = "../../../../common/exception" }
1920
common-expression = { path = "../../../expression" }
2021
common-functions = { path = "../../../functions" }

0 commit comments

Comments
 (0)