Skip to content

Commit bca8613

Browse files
committed
graph: Use ObjectStore for IPFS cache when configured
1 parent 69d5e2d commit bca8613

File tree

1 file changed

+62
-8
lines changed

1 file changed

+62
-8
lines changed

graph/src/components/link_resolver/ipfs.rs

Lines changed: 62 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use std::path::PathBuf;
12
use std::sync::Arc;
23
use std::sync::Mutex;
34
use std::time::Duration;
@@ -10,6 +11,9 @@ use futures03::compat::Stream01CompatExt;
1011
use futures03::stream::StreamExt;
1112
use futures03::stream::TryStreamExt;
1213
use lru_time_cache::LruCache;
14+
use object_store::local::LocalFileSystem;
15+
use object_store::path::Path;
16+
use object_store::ObjectStore;
1317
use serde_json::Value;
1418

1519
use crate::derive::CheapClone;
@@ -29,22 +33,60 @@ enum Cache {
2933
Memory {
3034
cache: Arc<Mutex<LruCache<ContentPath, Vec<u8>>>>,
3135
},
36+
Disk {
37+
store: Arc<dyn ObjectStore>,
38+
},
39+
}
40+
41+
fn log_err(logger: &Logger, e: &object_store::Error, log_not_found: bool) {
42+
if log_not_found || !matches!(e, object_store::Error::NotFound { .. }) {
43+
warn!(
44+
logger,
45+
"Failed to get IPFS object from disk cache; fetching from IPFS";
46+
"error" => e.to_string(),
47+
);
48+
}
3249
}
3350

3451
impl Cache {
35-
fn new(capacity: usize) -> Self {
36-
Self::Memory {
37-
cache: Arc::new(Mutex::new(LruCache::with_capacity(capacity))),
52+
fn new(capacity: usize, path: Option<PathBuf>) -> Self {
53+
match path {
54+
Some(path) => {
55+
let fs = match LocalFileSystem::new_with_prefix(&path) {
56+
Err(e) => {
57+
panic!(
58+
"Failed to create IPFS file based cache at {}: {}",
59+
path.display(),
60+
e
61+
);
62+
}
63+
Ok(fs) => fs,
64+
};
65+
Cache::Disk {
66+
store: Arc::new(fs),
67+
}
68+
}
69+
None => Self::Memory {
70+
cache: Arc::new(Mutex::new(LruCache::with_capacity(capacity))),
71+
},
3872
}
3973
}
4074

41-
async fn find(&self, path: &ContentPath) -> Option<Vec<u8>> {
75+
async fn find(&self, logger: &Logger, path: &ContentPath) -> Option<Vec<u8>> {
4276
match self {
4377
Cache::Memory { cache } => cache.lock().unwrap().get(path).cloned(),
78+
Cache::Disk { store } => {
79+
let log_err = |e: &object_store::Error| log_err(logger, e, false);
80+
81+
let path = Path::from(path.cid().to_string());
82+
let object = store.get(&path).await.inspect_err(log_err).ok()?;
83+
let data = object.bytes().await.inspect_err(log_err).ok()?;
84+
Some(data.to_vec())
85+
}
4486
}
4587
}
4688

47-
async fn insert(&self, path: ContentPath, data: Vec<u8>) {
89+
async fn insert(&self, logger: &Logger, path: ContentPath, data: Vec<u8>) {
4890
match self {
4991
Cache::Memory { cache } => {
5092
let mut cache = cache.lock().unwrap();
@@ -53,6 +95,15 @@ impl Cache {
5395
cache.insert(path.clone(), data.clone());
5496
}
5597
}
98+
Cache::Disk { store } => {
99+
let log_err = |e: &object_store::Error| log_err(logger, e, true);
100+
let path = Path::from(path.cid().to_string());
101+
store
102+
.put(&path, data.into())
103+
.await
104+
.inspect_err(log_err)
105+
.ok();
106+
}
56107
}
57108
}
58109
}
@@ -81,7 +132,10 @@ impl IpfsResolver {
81132

82133
Self {
83134
client,
84-
cache: Cache::new(env.max_ipfs_cache_size as usize),
135+
cache: Cache::new(
136+
env.max_ipfs_cache_size as usize,
137+
env.ipfs_cache_location.clone(),
138+
),
85139
timeout: env.ipfs_timeout,
86140
max_file_size: env.max_ipfs_file_bytes,
87141
max_map_file_size: env.max_ipfs_map_file_size,
@@ -111,7 +165,7 @@ impl LinkResolverTrait for IpfsResolver {
111165
let max_file_size = self.max_file_size;
112166
let max_cache_file_size = self.max_cache_file_size;
113167

114-
if let Some(data) = self.cache.find(&path).await {
168+
if let Some(data) = self.cache.find(&logger, &path).await {
115169
trace!(logger, "IPFS cat cache hit"; "hash" => path.to_string());
116170
return Ok(data.to_owned());
117171
}
@@ -132,7 +186,7 @@ impl LinkResolverTrait for IpfsResolver {
132186
.to_vec();
133187

134188
if data.len() <= max_cache_file_size {
135-
self.cache.insert(path.clone(), data.clone()).await;
189+
self.cache.insert(&logger, path.clone(), data.clone()).await;
136190
} else {
137191
debug!(
138192
logger,

0 commit comments

Comments
 (0)