Skip to content

Commit 631c8f4

Browse files
authored
Add option for keeping local cache of recent data. (#585)
This PR adds local cache / hot tier for Parseable. This option can be enabled by setting following env vars P_CACHE_DIR - Local Path for file cache P_CACHE_SIZE - Size for cache in human readable size ( mb/mib/gib/gb ) When these flags are set, sync flow will move the parquet from staging into the specified cache directory instead of deleting it. Any LRU cached entry is deleted to satisfy the cache constraint upon insertion. LocalCacheManager is responsible for updating and persisting this data structure.
1 parent 83f3cc9 commit 631c8f4

File tree

18 files changed

+757
-283
lines changed

18 files changed

+757
-283
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
target
22
data
33
staging
4+
limitcache
45
examples
56
cert.pem
67
key.pem

Cargo.lock

Lines changed: 27 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

server/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,10 +96,12 @@ xxhash-rust = { version = "0.8", features = ["xxh3"] }
9696
xz2 = { version = "*", features = ["static"] }
9797
nom = "7.1.3"
9898
humantime = "2.1.0"
99+
human-size = "0.4"
99100
openid = { version = "0.12.0", default-features = false, features = ["rustls"] }
100101
url = "2.4.0"
101102
http-auth-basic = "0.3.3"
102103
serde_repr = "0.1.17"
104+
hashlru = { version = "0.11.0", features = ["serde"] }
103105

104106
[build-dependencies]
105107
cargo_toml = "0.15"

server/src/banner.rs

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919

2020
use crossterm::style::Stylize;
21+
use human_size::SpecificSize;
2122

2223
use crate::about;
2324
use crate::utils::uid::Uid;
@@ -100,5 +101,21 @@ async fn storage_info(config: &Config) {
100101
config.staging_dir().to_string_lossy(),
101102
storage.get_endpoint(),
102103
latency
103-
)
104+
);
105+
106+
if let Some(path) = &config.parseable.local_cache_path {
107+
let size: SpecificSize<human_size::Gigabyte> =
108+
SpecificSize::new(config.parseable.local_cache_size as f64, human_size::Byte)
109+
.unwrap()
110+
.into();
111+
112+
eprintln!(
113+
"\
114+
{:8}Cache: \"{}\"
115+
Cache Size: \"{}\"",
116+
"",
117+
path.display(),
118+
size
119+
);
120+
}
104121
}

server/src/handlers/http.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,21 @@ pub fn configure_routes(
200200
.to(logstream::get_retention)
201201
.authorize_for_stream(Action::GetRetention),
202202
),
203+
)
204+
.service(
205+
web::resource("/cache")
206+
// PUT "/logstream/{logstream}/cache" ==> Set retention for given logstream
207+
.route(
208+
web::put()
209+
.to(logstream::put_enable_cache)
210+
.authorize_for_stream(Action::PutCacheEnabled),
211+
)
212+
// GET "/logstream/{logstream}/cache" ==> Get retention for given logstream
213+
.route(
214+
web::get()
215+
.to(logstream::get_cache_enabled)
216+
.authorize_for_stream(Action::GetCacheEnabled),
217+
),
203218
);
204219

205220
// User API

server/src/handlers/http/logstream.rs

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,33 @@ pub async fn put_retention(
227227
))
228228
}
229229

230+
pub async fn get_cache_enabled(req: HttpRequest) -> Result<impl Responder, StreamError> {
231+
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();
232+
let cache_enabled = STREAM_INFO.cache_enabled(&stream_name)?;
233+
Ok((web::Json(cache_enabled), StatusCode::OK))
234+
}
235+
236+
pub async fn put_enable_cache(
237+
req: HttpRequest,
238+
body: web::Json<bool>,
239+
) -> Result<impl Responder, StreamError> {
240+
let enable_cache = body.into_inner();
241+
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();
242+
let storage = CONFIG.storage().get_object_store();
243+
244+
let mut stream_metadata = storage.get_stream_metadata(&stream_name).await?;
245+
stream_metadata.cache_enabled = enable_cache;
246+
storage
247+
.put_stream_manifest(&stream_name, &stream_metadata)
248+
.await?;
249+
250+
STREAM_INFO.set_stream_cache(&stream_name, enable_cache)?;
251+
Ok((
252+
format!("Cache setting updated for log stream {stream_name}"),
253+
StatusCode::OK,
254+
))
255+
}
256+
230257
pub async fn get_stats(req: HttpRequest) -> Result<impl Responder, StreamError> {
231258
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();
232259

0 commit comments

Comments
 (0)