Skip to content

Commit a0c4019

Browse files
authored
Merge pull request #10692 from dantengsky/refactor-shrink-bloom-index-meta-cache-size
refactor: shrink bloom index meta cache size
2 parents 54059d3 + a4de17b commit a0c4019

File tree

17 files changed

+436
-54
lines changed

17 files changed

+436
-54
lines changed

Cargo.lock

Lines changed: 27 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/query/service/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ common-arrow = { path = "../../common/arrow" }
4646
common-ast = { path = "../ast" }
4747
common-auth = { path = "../../common/auth" }
4848
common-base = { path = "../../common/base" }
49+
common-cache = { path = "../../common/cache" }
4950
common-catalog = { path = "../catalog" }
5051
common-compress = { path = "../../common/compress" }
5152
common-config = { path = "../config" }
@@ -162,6 +163,7 @@ num = "0.4.0"
162163
p256 = "0.13"
163164
pretty_assertions = "1.3.0"
164165
reqwest = { workspace = true }
166+
sysinfo = "0.28.3"
165167
temp-env = "0.3.0"
166168
tempfile = "3.4.0"
167169
toml = { version = "0.7.3", default-features = false }

src/query/service/tests/it/storages/fuse/block_writer.rs

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use common_arrow::parquet::metadata::ThriftFileMetaData;
1516
use common_exception::Result;
1617
use common_expression::DataBlock;
1718
use common_expression::FunctionContext;
@@ -56,13 +57,13 @@ impl<'a> BlockWriter<'a> {
5657
block: DataBlock,
5758
col_stats: StatisticsOfColumns,
5859
cluster_stats: Option<ClusterStatistics>,
59-
) -> Result<BlockMeta> {
60+
) -> Result<(BlockMeta, Option<ThriftFileMetaData>)> {
6061
let (location, block_id) = self.location_generator.gen_block_location();
6162

6263
let data_accessor = &self.data_accessor;
6364
let row_count = block.num_rows() as u64;
6465
let block_size = block.memory_size() as u64;
65-
let (bloom_filter_index_size, bloom_filter_index_location) = self
66+
let (bloom_filter_index_size, bloom_filter_index_location, meta) = self
6667
.build_block_index(data_accessor, schema.clone(), &block, block_id)
6768
.await?;
6869

@@ -88,7 +89,7 @@ impl<'a> BlockWriter<'a> {
8889
bloom_filter_index_size,
8990
Compression::Lz4Raw,
9091
);
91-
Ok(block_meta)
92+
Ok((block_meta, meta))
9293
}
9394

9495
pub async fn build_block_index(
@@ -97,7 +98,7 @@ impl<'a> BlockWriter<'a> {
9798
schema: TableSchemaRef,
9899
block: &DataBlock,
99100
block_id: Uuid,
100-
) -> Result<(u64, Option<Location>)> {
101+
) -> Result<(u64, Option<Location>, Option<ThriftFileMetaData>)> {
101102
let location = self
102103
.location_generator
103104
.block_bloom_index_location(&block_id);
@@ -109,18 +110,16 @@ impl<'a> BlockWriter<'a> {
109110
let filter_schema = bloom_index.filter_schema;
110111
let mut data = Vec::with_capacity(DEFAULT_BLOCK_INDEX_BUFFER_SIZE);
111112
let index_block_schema = &filter_schema;
112-
let (size, _) = blocks_to_parquet(
113+
let (size, meta) = blocks_to_parquet(
113114
index_block_schema,
114115
vec![index_block],
115116
&mut data,
116117
TableCompression::None,
117118
)?;
118-
119119
data_accessor.write(&location.0, data).await?;
120-
121-
Ok((size, Some(location)))
120+
Ok((size, Some(location), Some(meta)))
122121
} else {
123-
Ok((0u64, None))
122+
Ok((0u64, None, None))
124123
}
125124
}
126125
}
Lines changed: 165 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
// Copyright 2023 Datafuse Labs.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use common_arrow::parquet::metadata::FileMetaData;
16+
use common_arrow::parquet::metadata::ThriftFileMetaData;
17+
use common_base::base::tokio;
18+
use common_cache::Cache;
19+
use common_expression::types::Int32Type;
20+
use common_expression::types::NumberDataType;
21+
use common_expression::DataBlock;
22+
use common_expression::FromData;
23+
use common_expression::TableDataType;
24+
use common_expression::TableField;
25+
use common_expression::TableSchemaRefExt;
26+
use common_storages_fuse::io::TableMetaLocationGenerator;
27+
use common_storages_fuse::statistics::gen_columns_statistics;
28+
use common_storages_fuse::FuseStorageFormat;
29+
use opendal::Operator;
30+
use storages_common_cache::InMemoryCacheBuilder;
31+
use storages_common_cache::InMemoryItemCacheHolder;
32+
use storages_common_index::BloomIndexMeta;
33+
use sysinfo::get_current_pid;
34+
use sysinfo::ProcessExt;
35+
use sysinfo::System;
36+
use sysinfo::SystemExt;
37+
use uuid::Uuid;
38+
39+
use crate::storages::fuse::block_writer::BlockWriter;
40+
41+
// NOTE:
42+
//
43+
// usage of memory is observed at *process* level, please do not combine them into
44+
// one test function.
45+
//
46+
// by default, these cases are ignored (in CI).
47+
//
48+
// please run the following two cases individually (in different process)
49+
50+
#[tokio::test(flavor = "multi_thread")]
51+
#[ignore]
52+
async fn test_index_meta_cache_size_file_meta_data() -> common_exception::Result<()> {
53+
let thrift_file_meta = setup().await?;
54+
55+
let cache_number = 300_000;
56+
57+
let meta: FileMetaData = FileMetaData::try_from_thrift(thrift_file_meta)?;
58+
59+
let sys = System::new_all();
60+
let pid = get_current_pid().unwrap();
61+
let process = sys.process(pid).unwrap();
62+
let base_memory_usage = process.memory();
63+
let scenario = "FileMetaData";
64+
65+
eprintln!(
66+
"scenario {}, pid {}, base memory {}",
67+
scenario, pid, base_memory_usage
68+
);
69+
70+
let cache = InMemoryCacheBuilder::new_item_cache::<FileMetaData>(cache_number as u64);
71+
72+
populate_cache(&cache, meta, cache_number);
73+
show_memory_usage(scenario, base_memory_usage, cache_number);
74+
75+
drop(cache);
76+
77+
Ok(())
78+
}
79+
80+
#[tokio::test(flavor = "multi_thread")]
81+
#[ignore]
82+
async fn test_index_meta_cache_size_bloom_meta() -> common_exception::Result<()> {
83+
let thrift_file_meta = setup().await?;
84+
85+
let cache_number = 300_000;
86+
87+
let bloom_index_meta = BloomIndexMeta::try_from(thrift_file_meta)?;
88+
89+
let sys = System::new_all();
90+
let pid = get_current_pid().unwrap();
91+
let process = sys.process(pid).unwrap();
92+
let base_memory_usage = process.memory();
93+
94+
let scenario = "BloomIndexMeta(mini)";
95+
eprintln!(
96+
"scenario {}, pid {}, base memory {}",
97+
scenario, pid, base_memory_usage
98+
);
99+
100+
let cache = InMemoryCacheBuilder::new_item_cache::<BloomIndexMeta>(cache_number as u64);
101+
populate_cache(&cache, bloom_index_meta, cache_number);
102+
show_memory_usage("BloomIndexMeta(Mini)", base_memory_usage, cache_number);
103+
104+
drop(cache);
105+
106+
Ok(())
107+
}
108+
109+
fn populate_cache<T>(cache: &InMemoryItemCacheHolder<T>, item: T, num_cache: usize)
110+
where T: Clone {
111+
let mut c = cache.write();
112+
for _ in 0..num_cache {
113+
let uuid = Uuid::new_v4();
114+
(*c).put(
115+
format!("{}", uuid.simple()),
116+
std::sync::Arc::new(item.clone()),
117+
);
118+
}
119+
}
120+
121+
async fn setup() -> common_exception::Result<ThriftFileMetaData> {
122+
let fields = (0..23)
123+
.map(|_| TableField::new("id", TableDataType::Number(NumberDataType::Int32)))
124+
.collect::<Vec<_>>();
125+
126+
let schema = TableSchemaRefExt::create(fields);
127+
128+
let mut columns = vec![];
129+
for _ in 0..schema.fields().len() {
130+
// values do not matter
131+
let column = Int32Type::from_data(vec![1]);
132+
columns.push(column)
133+
}
134+
135+
let block = DataBlock::new_from_columns(columns);
136+
let operator = Operator::new(opendal::services::Memory::default())?.finish();
137+
let loc_generator = TableMetaLocationGenerator::with_prefix("/".to_owned());
138+
let col_stats = gen_columns_statistics(&block, None, &schema)?;
139+
let block_writer = BlockWriter::new(&operator, &loc_generator);
140+
let (_block_meta, thrift_file_meta) = block_writer
141+
.write(FuseStorageFormat::Parquet, &schema, block, col_stats, None)
142+
.await?;
143+
144+
Ok(thrift_file_meta.unwrap())
145+
}
146+
147+
fn show_memory_usage(case: &str, base_memory_usage: u64, num_cache_items: usize) {
148+
let sys = System::new_all();
149+
let pid = get_current_pid().unwrap();
150+
let process = sys.process(pid).unwrap();
151+
{
152+
let memory_after = process.memory();
153+
let delta = memory_after - base_memory_usage;
154+
let delta_gb = (delta as f64) / 1024.0 / 1024.0 / 1024.0;
155+
eprintln!(
156+
"
157+
cache item type : {},
158+
number of cached items {},
159+
mem usage(B):{:+},
160+
mem usage(GB){:+}
161+
",
162+
case, num_cache_items, delta, delta_gb
163+
);
164+
}
165+
}

src/query/service/tests/it/storages/fuse/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
#![allow(clippy::too_many_arguments)]
1616
mod block_writer;
17+
mod bloom_index_meta_size;
1718
mod io;
1819
mod meta;
1920
mod operations;

src/query/service/tests/it/storages/fuse/operations/gc.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -309,7 +309,7 @@ mod utils {
309309
let blocks: std::vec::Vec<DataBlock> = stream.try_collect().await?;
310310
for block in blocks {
311311
let stats = gen_columns_statistics(&block, None, &schema)?;
312-
let block_meta = block_writer
312+
let (block_meta, _index_meta) = block_writer
313313
.write(FuseStorageFormat::Parquet, &schema, block, stats, None)
314314
.await?;
315315
block_metas.push(Arc::new(block_meta));

src/query/service/tests/it/storages/fuse/operations/mutation/segments_compact_mutator.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -698,7 +698,7 @@ impl CompactSegmentTestFixture {
698698
let block = block?;
699699
let col_stats = gen_columns_statistics(&block, None, &schema)?;
700700

701-
let block_meta = block_writer
701+
let (block_meta, _index_meta) = block_writer
702702
.write(FuseStorageFormat::Parquet, &schema, block, col_stats, None)
703703
.await?;
704704

src/query/service/tests/it/storages/fuse/statistics.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -236,7 +236,7 @@ async fn test_accumulator() -> common_exception::Result<()> {
236236
let block = item?;
237237
let col_stats = gen_columns_statistics(&block, None, &schema)?;
238238
let block_writer = BlockWriter::new(&operator, &loc_generator);
239-
let block_meta = block_writer
239+
let (block_meta, _index_meta) = block_writer
240240
.write(FuseStorageFormat::Parquet, &schema, block, col_stats, None)
241241
.await?;
242242
stats_acc.add_with_block_meta(block_meta);

src/query/storages/common/cache-manager/src/caches.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ use storages_common_cache::CacheAccessor;
2424
use storages_common_cache::InMemoryItemCacheHolder;
2525
use storages_common_cache::NamedCache;
2626
use storages_common_index::filters::Xor8Filter;
27+
use storages_common_index::BloomIndexMeta;
2728
use storages_common_table_meta::meta::SegmentInfo;
2829
use storages_common_table_meta::meta::TableSnapshot;
2930
use storages_common_table_meta::meta::TableSnapshotStatistics;
@@ -39,7 +40,6 @@ pub type TableSnapshotStatisticCache = NamedCache<InMemoryItemCacheHolder<TableS
3940
/// In memory object cache of bloom filter.
4041
/// For each indexed data block, the bloom xor8 filter of column is cached individually
4142
pub type BloomIndexFilterCache = NamedCache<InMemoryItemCacheHolder<Xor8Filter>>;
42-
pub struct BloomIndexMeta(pub FileMetaData);
4343
/// In memory object cache of parquet FileMetaData of bloom index data
4444
pub type BloomIndexMetaCache = NamedCache<InMemoryItemCacheHolder<BloomIndexMeta>>;
4545
/// In memory object cache of parquet FileMetaData of external parquet files

src/query/storages/common/index/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ test = false
1515
ignored = ["xorfilter-rs", "match-template"]
1616

1717
[dependencies]
18+
common-arrow = { path = "../../../../common/arrow" }
1819
common-exception = { path = "../../../../common/exception" }
1920
common-expression = { path = "../../../expression" }
2021
common-functions = { path = "../../../functions" }

0 commit comments

Comments
 (0)