Skip to content

Commit 68350d3

Browse files
committed
cache fakeip
1 parent fdbc5af commit 68350d3

File tree

2 files changed

+127
-21
lines changed

2 files changed

+127
-21
lines changed

ytflow/src/config/plugin/fakeip.rs

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use serde::Deserialize;
22

33
use crate::config::factory::*;
44
use crate::config::*;
5+
use crate::data::PluginId;
56

67
#[cfg_attr(not(feature = "plugins"), allow(dead_code))]
78
#[derive(Clone, Deserialize)]
@@ -10,12 +11,17 @@ pub struct FakeIpFactory<'a> {
1011
prefix_v6: [u8; 14],
1112
// Reserved for CNAME, TXT and SRV support
1213
fallback: &'a str,
14+
#[serde(skip)]
15+
plugin_id: Option<PluginId>,
1316
}
1417

1518
impl<'de> FakeIpFactory<'de> {
1619
pub(in super::super) fn parse(plugin: &'de Plugin) -> ConfigResult<ParsedPlugin<'de, Self>> {
17-
let Plugin { name, param, .. } = plugin;
18-
let config: Self = parse_param(name, param)?;
20+
let Plugin {
21+
name, param, id, ..
22+
} = plugin;
23+
let mut config: Self = parse_param(name, param)?;
24+
config.plugin_id = *id;
1925
Ok(ParsedPlugin {
2026
factory: config.clone(),
2127
requires: vec![Descriptor {
@@ -34,12 +40,27 @@ impl<'de> FakeIpFactory<'de> {
3440
impl<'de> Factory for FakeIpFactory<'de> {
3541
#[cfg(feature = "plugins")]
3642
fn load(&mut self, plugin_name: String, set: &mut PartialPluginSet) -> LoadResult<()> {
37-
use crate::plugin::fakeip;
43+
use crate::{data::PluginCache, plugin::fakeip};
3844

39-
let resolver = Arc::new(fakeip::FakeIp::new(self.prefix_v4, self.prefix_v6));
45+
let db = set
46+
.db
47+
.ok_or_else(|| LoadError::DatabaseRequired {
48+
plugin: plugin_name.clone(),
49+
})?
50+
.clone();
51+
let cache = PluginCache::new(
52+
self.plugin_id.ok_or_else(|| LoadError::DatabaseRequired {
53+
plugin: plugin_name.clone(),
54+
})?,
55+
Some(db.clone()),
56+
);
57+
let plugin = Arc::new(fakeip::FakeIp::new(self.prefix_v4, self.prefix_v6, cache));
58+
set.fully_constructed
59+
.long_running_tasks
60+
.push(tokio::spawn(fakeip::cache_writer(plugin.clone())));
4061
set.fully_constructed
4162
.resolver
42-
.insert(plugin_name + ".resolver", resolver);
63+
.insert(plugin_name + ".resolver", plugin);
4364
Ok(())
4465
}
4566
}

ytflow/src/plugin/fakeip.rs

Lines changed: 101 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,48 +1,93 @@
1+
use std::collections::BTreeMap;
12
use std::num::NonZeroUsize;
2-
use std::sync::Mutex;
3+
use std::sync::{Arc, Mutex};
34

45
use async_trait::async_trait;
56
use lru::LruCache;
7+
use serde::{Deserialize, Serialize};
68
use smallvec::smallvec;
9+
use tokio::sync::Notify;
710

11+
use crate::data::PluginCache;
812
use crate::flow::*;
913

1014
const CACHE_SIZE: NonZeroUsize = NonZeroUsize::new(1000).unwrap();
15+
const PLUGIN_CACHE_KEY: &str = "map";
1116

1217
struct Inner {
1318
current: u16,
1419
cache: LruCache<String, u16>,
1520
}
1621

22+
#[derive(Debug, Clone, Serialize, Deserialize)]
23+
struct InnerCache {
24+
current: u16,
25+
cache: BTreeMap<String, u16>,
26+
}
27+
1728
pub struct FakeIp {
1829
prefix_v4: u16,
1930
prefix_v6: [u8; 14],
20-
inner: Mutex<Inner>,
31+
inner: Arc<Mutex<Inner>>,
32+
plugin_cache: PluginCache,
33+
new_notify: Arc<Notify>,
2134
}
2235

2336
impl FakeIp {
24-
pub fn new(prefix_v4: [u8; 2], prefix_v6: [u8; 14]) -> Self {
25-
// TODO: persist current cursor into cache
37+
pub fn new(prefix_v4: [u8; 2], prefix_v6: [u8; 14], plugin_cache: PluginCache) -> Self {
38+
let mut lru = LruCache::new(CACHE_SIZE);
39+
let inner = match plugin_cache
40+
.get::<InnerCache>(PLUGIN_CACHE_KEY)
41+
.ok()
42+
.flatten()
43+
{
44+
Some(cache) => {
45+
for (k, v) in cache.cache {
46+
lru.put(k, v);
47+
}
48+
Inner {
49+
current: cache.current,
50+
cache: lru,
51+
}
52+
}
53+
None => Inner {
54+
current: 1,
55+
cache: lru,
56+
},
57+
};
2658
Self {
2759
prefix_v4: u16::from_be_bytes(prefix_v4),
2860
prefix_v6,
29-
inner: Mutex::new(Inner {
30-
current: 1,
31-
cache: LruCache::new(CACHE_SIZE),
32-
}),
61+
inner: Arc::new(Mutex::new(inner)),
62+
plugin_cache,
63+
new_notify: Arc::new(Notify::new()),
3364
}
3465
}
3566
fn lookup_or_alloc(&self, domain: String) -> u16 {
36-
let mut inner = self.inner.lock().unwrap();
37-
let cached = inner.cache.get(&*domain).copied();
38-
if let Some(cached) = cached {
39-
return cached;
40-
}
41-
let ret = inner.current;
42-
inner.cache.put(domain, ret);
43-
inner.current += 1;
67+
let ret = {
68+
let mut inner = self.inner.lock().unwrap();
69+
let cached = inner.cache.get(&*domain).copied();
70+
if let Some(cached) = cached {
71+
return cached;
72+
}
73+
let ret = inner.current;
74+
inner.cache.put(domain, ret);
75+
inner.current = inner.current.wrapping_add(1);
76+
ret
77+
};
78+
self.new_notify.notify_one();
4479
ret
4580
}
81+
fn save_cache(&self) {
82+
let cache = {
83+
let inner = self.inner.lock().unwrap();
84+
InnerCache {
85+
current: inner.current,
86+
cache: inner.cache.iter().map(|(k, v)| (k.clone(), *v)).collect(),
87+
}
88+
};
89+
self.plugin_cache.set(PLUGIN_CACHE_KEY, &cache).ok();
90+
}
4691
}
4792

4893
#[async_trait]
@@ -62,3 +107,43 @@ impl Resolver for FakeIp {
62107
Ok(smallvec![bytes.into()])
63108
}
64109
}
110+
111+
impl Drop for FakeIp {
112+
fn drop(&mut self) {
113+
self.save_cache();
114+
}
115+
}
116+
117+
pub async fn cache_writer(plugin: Arc<FakeIp>) {
118+
let (plugin, notify) = {
119+
let notify = plugin.new_notify.clone();
120+
let weak = Arc::downgrade(&plugin);
121+
drop(plugin);
122+
(weak, notify)
123+
};
124+
if plugin.strong_count() == 0 {
125+
panic!("fakeip has no strong reference left for cache_writer");
126+
}
127+
128+
use tokio::select;
129+
use tokio::time::{sleep, Duration};
130+
loop {
131+
let mut notified_fut = notify.notified();
132+
let mut sleep_fut = sleep(Duration::from_secs(3600));
133+
'debounce: loop {
134+
select! {
135+
_ = notified_fut => {
136+
notified_fut = notify.notified();
137+
sleep_fut = sleep(Duration::from_secs(3));
138+
}
139+
_ = sleep_fut => {
140+
break 'debounce;
141+
}
142+
}
143+
}
144+
match plugin.upgrade() {
145+
Some(plugin) => plugin.save_cache(),
146+
None => break,
147+
}
148+
}
149+
}

0 commit comments

Comments
 (0)