Skip to content

Commit bb2dca5

Browse files
committed
Optimize geo cache store memory reads
1 parent 359d7b9 commit bb2dca5

File tree

3 files changed

+172
-77
lines changed

3 files changed

+172
-77
lines changed

landscape-common/src/store/storev4.rs

Lines changed: 143 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ enum SaveUnit<K, V> {
2727
}
2828

2929
/// 用来记录当前这条数据在文件中的位置
30-
#[derive(Debug)]
30+
#[derive(Debug, Clone, Copy)]
3131
pub struct UnitPosition {
3232
pub era: u64,
3333
pub start: u64,
@@ -205,7 +205,7 @@ where
205205
(current_era, BufWriter::new(writer_file), index, readers, junk_data_size)
206206
};
207207

208-
let mut store = StoreFileManager {
208+
StoreFileManager {
209209
path: data_floder,
210210
name,
211211
current_era,
@@ -214,11 +214,7 @@ where
214214
index,
215215
readers,
216216
_marker: std::marker::PhantomData,
217-
};
218-
if store.readers.len() > 5 {
219-
store.periodization();
220217
}
221-
store
222218
}
223219

224220
/// 进行文件精简
@@ -249,20 +245,20 @@ where
249245
.unwrap();
250246
let mut temp_collect_file_era_writer = BufWriter::new(periodization_file);
251247

252-
let mut new_index: HashMap<K, UnitPosition> = HashMap::new();
248+
let mut new_index: HashMap<K, UnitPosition> = HashMap::with_capacity(self.index.len());
253249
let mut periodization_file_pos = 0;
254-
for (key, UnitPosition { era, start, len }) in self.index.iter_mut() {
255-
if !self.readers.contains_key(&era) {
256-
let missing_reader = self.path.join(format!("{}.{}", &era, &self.name));
250+
for (key, pos) in &self.index {
251+
if !self.readers.contains_key(&pos.era) {
252+
let missing_reader = self.path.join(format!("{}.{}", pos.era, &self.name));
257253
// reader.seek(pos)
258254
let missing_reader =
259255
OpenOptions::new().read(true).truncate(false).open(missing_reader).unwrap();
260-
self.readers.insert(*era, BufReader::new(missing_reader));
256+
self.readers.insert(pos.era, BufReader::new(missing_reader));
261257
};
262258

263-
let reader = self.readers.get_mut(&era).unwrap();
264-
reader.seek(SeekFrom::Start(*start)).unwrap();
265-
let mut data = reader.take(*len);
259+
let reader = self.readers.get_mut(&pos.era).unwrap();
260+
reader.seek(SeekFrom::Start(pos.start)).unwrap();
261+
let mut data = reader.take(pos.len);
266262
let copy_length = std::io::copy(&mut data, &mut temp_collect_file_era_writer).unwrap();
267263
new_index.insert(
268264
key.clone(),
@@ -328,36 +324,36 @@ where
328324
}
329325
}
330326

331-
/// 根据 key 获取该结构
332-
pub fn get(&mut self, key: &K) -> Option<V> {
333-
let pos = self.index.get(key)?;
327+
fn read_data_at(&mut self, pos: UnitPosition) -> Option<V> {
334328
let reader = self.readers.get_mut(&pos.era)?;
335329
reader.seek(SeekFrom::Start(pos.start)).ok()?;
336330

337-
// 读取数据到内存中
338-
let mut buffer = vec![0u8; pos.len as usize];
339-
reader.read_exact(&mut buffer).ok()?;
340-
341-
// 使用 bincode 反序列化
342-
if let Ok((SaveUnit::<K, V>::Data(obj), _)) =
343-
bincode::serde::decode_from_slice(&buffer, bincode::config::standard())
344-
{
345-
Some(obj)
346-
} else {
347-
None
331+
let mut limited_reader = reader.take(pos.len);
332+
match bincode::serde::decode_from_std_read::<SaveUnit<K, V>, _, _>(
333+
&mut limited_reader,
334+
bincode::config::standard(),
335+
) {
336+
Ok(SaveUnit::Data(obj)) => Some(obj),
337+
Ok(SaveUnit::Del(_)) | Err(_) => None,
348338
}
349339
}
350340

341+
/// 根据 key 获取该结构
342+
pub fn get(&mut self, key: &K) -> Option<V> {
343+
let pos = *self.index.get(key)?;
344+
self.read_data_at(pos)
345+
}
346+
351347
pub fn keys(&self) -> Vec<K> {
352-
let mut result = Vec::new();
348+
let mut result = Vec::with_capacity(self.index.len());
353349
for (key, _) in &self.index {
354350
result.push(key.clone());
355351
}
356352
result
357353
}
358354

359355
pub fn keys_ref(&self) -> Vec<&K> {
360-
let mut result = Vec::new();
356+
let mut result = Vec::with_capacity(self.index.len());
361357
for (key, _) in &self.index {
362358
result.push(key);
363359
}
@@ -376,22 +372,11 @@ where
376372
}
377373

378374
pub fn list(&mut self) -> Vec<V> {
379-
let mut result = Vec::new();
380-
for (_, pos) in &self.index {
381-
// 依次读出
382-
if let Some(reader) = self.readers.get_mut(&pos.era) {
383-
reader.seek(SeekFrom::Start(pos.start)).unwrap();
384-
let mut buffer = vec![0u8; pos.len as usize];
385-
386-
// 使用 bincode 反序列化
387-
if reader.read_exact(&mut buffer).is_ok() {
388-
// 使用 bincode 反序列化
389-
if let Ok((SaveUnit::<K, V>::Data(obj), _)) =
390-
bincode::serde::decode_from_slice(&buffer, bincode::config::standard())
391-
{
392-
result.push(obj);
393-
}
394-
}
375+
let positions: Vec<_> = self.index.values().copied().collect();
376+
let mut result = Vec::with_capacity(positions.len());
377+
for pos in positions {
378+
if let Some(obj) = self.read_data_at(pos) {
379+
result.push(obj);
395380
}
396381
}
397382
result
@@ -470,3 +455,115 @@ where
470455
self.readers.insert(self.current_era, BufReader::new(reader_file));
471456
}
472457
}
458+
459+
#[cfg(test)]
460+
mod tests {
461+
use std::{fs, path::Path};
462+
463+
use tempfile::tempdir;
464+
465+
use super::*;
466+
467+
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
468+
struct TestValue {
469+
key: String,
470+
payload: String,
471+
}
472+
473+
impl TestValue {
474+
fn new(key: &str, payload: impl Into<String>) -> Self {
475+
Self { key: key.to_string(), payload: payload.into() }
476+
}
477+
}
478+
479+
impl LandscapeStoreTrait for TestValue {
480+
type K = String;
481+
482+
fn get_store_key(&self) -> Self::K {
483+
self.key.clone()
484+
}
485+
}
486+
487+
fn open_store(path: &Path) -> StoreFileManager<String, TestValue> {
488+
StoreFileManager::new(path.to_path_buf(), "items".to_string())
489+
}
490+
491+
fn store_files(path: &Path) -> Vec<String> {
492+
let mut entries = fs::read_dir(path.join("items"))
493+
.unwrap()
494+
.map(|entry| entry.unwrap().file_name().into_string().unwrap())
495+
.collect::<Vec<_>>();
496+
entries.sort();
497+
entries
498+
}
499+
500+
#[test]
501+
fn get_and_list_rebuild_index_across_eras() {
502+
let dir = tempdir().unwrap();
503+
504+
{
505+
let mut store = open_store(dir.path());
506+
store.set(TestValue::new("alpha", "v1"));
507+
store.set(TestValue::new("beta", "v1"));
508+
}
509+
510+
{
511+
let mut store = open_store(dir.path());
512+
store.set(TestValue::new("alpha", "v2"));
513+
store.del(&"beta".to_string());
514+
store.set(TestValue::new("gamma", "v1"));
515+
}
516+
517+
let mut store = open_store(dir.path());
518+
assert_eq!(store.len(), 2);
519+
assert_eq!(store.get(&"alpha".to_string()).unwrap().payload, "v2");
520+
assert_eq!(store.get(&"gamma".to_string()).unwrap().payload, "v1");
521+
assert_eq!(store.get(&"beta".to_string()), None);
522+
523+
let mut values =
524+
store.list().into_iter().map(|value| (value.key, value.payload)).collect::<Vec<_>>();
525+
values.sort();
526+
527+
assert_eq!(
528+
values,
529+
vec![("alpha".to_string(), "v2".to_string()), ("gamma".to_string(), "v1".to_string())]
530+
);
531+
}
532+
533+
#[test]
534+
fn startup_does_not_periodize_existing_eras() {
535+
let dir = tempdir().unwrap();
536+
537+
for _ in 0..6 {
538+
let _store = open_store(dir.path());
539+
}
540+
541+
assert_eq!(
542+
store_files(dir.path()),
543+
vec![
544+
"1.items".to_string(),
545+
"2.items".to_string(),
546+
"3.items".to_string(),
547+
"4.items".to_string(),
548+
"5.items".to_string(),
549+
"6.items".to_string(),
550+
]
551+
);
552+
}
553+
554+
#[test]
555+
fn junk_threshold_still_triggers_periodization() {
556+
let dir = tempdir().unwrap();
557+
let mut store = open_store(dir.path());
558+
let large_payload = "x".repeat(3 * 1024 * 1024);
559+
560+
for version in 0..4 {
561+
store.set(TestValue::new("alpha", format!("{version}-{large_payload}")));
562+
}
563+
564+
let files = store_files(dir.path());
565+
assert_eq!(files.len(), 2);
566+
assert_eq!(store.len(), 1);
567+
assert!(store.get(&"alpha".to_string()).unwrap().payload.starts_with("3-"));
568+
}
569+
}

landscape/src/geo/ip_service.rs

Lines changed: 13 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -88,34 +88,29 @@ impl GeoIpService {
8888
// Deduplicate by cidr (ip + prefix) — keep the first occurrence (highest priority).
8989
// Configs are sorted by ascending index before calling, so the first seen = highest priority.
9090
let mut seen = std::collections::HashSet::new();
91-
let mut result = vec![];
91+
let mut result = Vec::with_capacity(configs.len());
9292
for config in configs.into_iter() {
93-
let mut source = vec![];
94-
for each in config.source.iter() {
93+
let priority = config.index as u16;
94+
let mark = config.mark;
95+
for each in config.source.into_iter() {
9596
match each {
9697
WanIPRuleSource::GeoKey(config_key) => {
9798
if let Some(ips) = lock.get(&config_key.get_file_cache_key()) {
98-
source.extend(ips.values.iter().cloned());
99+
result.reserve(ips.values.len());
100+
for cidr in ips.values {
101+
if seen.insert(cidr.clone()) {
102+
result.push(IpMarkInfo { mark, cidr, priority });
103+
}
104+
}
99105
}
100106
}
101107
WanIPRuleSource::Config(c) => {
102-
source.push(c.clone());
108+
if seen.insert(c.clone()) {
109+
result.push(IpMarkInfo { mark, cidr: c, priority });
110+
}
103111
}
104112
}
105113
}
106-
107-
let ip_marks = source.into_iter().filter_map(|cidr| {
108-
if seen.insert(cidr.clone()) {
109-
Some(IpMarkInfo {
110-
mark: config.mark,
111-
cidr,
112-
priority: config.index as u16,
113-
})
114-
} else {
115-
None
116-
}
117-
});
118-
result.extend(ip_marks);
119114
}
120115
result
121116
}

landscape/src/geo/site_service.rs

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use landscape_common::{
99
rule::{DNSRuleConfig, DNSRuntimeRule, DomainConfig, RuleSource},
1010
ChainDnsServerInitInfo,
1111
},
12-
geo::{GeoDomainConfig, GeoFileCacheKey, GeoSiteFileConfig, GeoSiteSource},
12+
geo::{GeoDomainConfig, GeoFileCacheKey, GeoSiteSource},
1313
service::controller::ConfigController,
1414
store::storev4::LandscapeStoreTrait,
1515
utils::time::{get_f64_timestamp, MILL_A_DAY},
@@ -245,8 +245,8 @@ impl GeoSiteService {
245245
) -> ExpandedRuleSources {
246246
let mut lock = self.file_cache.lock().await;
247247

248-
let mut source = vec![];
249-
let mut used_geo_keys = HashSet::new();
248+
let mut source = Vec::with_capacity(rule_source.len());
249+
let mut used_geo_keys = HashSet::with_capacity(rule_source.len());
250250

251251
let mut inverse_keys: HashMap<String, HashSet<String>> = HashMap::new();
252252
for each in rule_source.into_iter() {
@@ -255,21 +255,23 @@ impl GeoSiteService {
255255
inverse_keys.entry(k.name).or_default().insert(k.key);
256256
}
257257
RuleSource::GeoKey(k) => {
258+
let attr_key = k.attribute_key.clone();
258259
let file_cache_key = k.get_file_cache_key();
259260
if applied_config.contains(&file_cache_key) {
260261
continue;
261262
}
262-
let predicate: Box<dyn Fn(&GeoSiteFileConfig) -> bool> =
263-
if let Some(ref attr) = k.attribute_key {
264-
let attr = attr.clone();
265-
Box::new(move |config: &GeoSiteFileConfig| {
266-
config.attributes.contains(&attr)
267-
})
268-
} else {
269-
Box::new(move |_: &GeoSiteFileConfig| true)
270-
};
271263
if let Some(domains) = lock.get(&file_cache_key) {
272-
source.extend(domains.values.into_iter().filter(predicate).map(Into::into));
264+
source.reserve(domains.values.len());
265+
match attr_key {
266+
Some(attr) => source.extend(
267+
domains
268+
.values
269+
.into_iter()
270+
.filter(|config| config.attributes.contains(&attr))
271+
.map(Into::into),
272+
),
273+
None => source.extend(domains.values.into_iter().map(Into::into)),
274+
}
273275
}
274276
applied_config.insert(file_cache_key);
275277
used_geo_keys.insert(k.get_file_cache_key());
@@ -290,6 +292,7 @@ impl GeoSiteService {
290292
if !excluded_names.contains(&key.key) {
291293
if !applied_config.contains(key) {
292294
if let Some(domains) = lock.get(key) {
295+
source.reserve(domains.values.len());
293296
applied_config.insert(key.clone());
294297
used_geo_keys.insert(key.clone());
295298
source.extend(domains.values.into_iter().map(Into::into));

0 commit comments

Comments
 (0)