Skip to content

Commit 47d339a

Browse files
authored
refresh cache usages every 16MB (#958)
1 parent f626dd9 commit 47d339a

File tree

5 files changed

+74
-25
lines changed

5 files changed

+74
-25
lines changed

app-server/src/cache/keys.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,3 +6,5 @@ pub const PROJECT_API_KEY_CACHE_KEY: &str = "project_api_key";
66
pub const PROJECT_CACHE_KEY: &str = "project";
77
pub const WORKSPACE_LIMITS_CACHE_KEY: &str = "workspace_limits";
88
pub const PROJECT_EVALUATORS_BY_PATH_CACHE_KEY: &str = "project_evaluators_by_path";
9+
10+
pub const WORKSPACE_PARTIAL_USAGE_CACHE_KEY: &str = "workspace_partial_usage";

app-server/src/ch/events.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,20 +54,23 @@ impl CHEvent {
5454
}
5555
}
5656

57-
pub async fn insert_events(clickhouse: clickhouse::Client, events: Vec<CHEvent>) -> Result<()> {
57+
/// Insert events into ClickHouse and return the number of bytes inserted
58+
pub async fn insert_events(clickhouse: clickhouse::Client, events: Vec<CHEvent>) -> Result<usize> {
5859
if events.is_empty() {
59-
return Ok(());
60+
return Ok(0);
6061
}
6162

6263
let ch_insert = clickhouse.insert("events");
6364
match ch_insert {
6465
Ok(mut ch_insert) => {
66+
let mut total_size_bytes = 0;
6567
for event in events {
6668
ch_insert.write(&event).await?;
69+
total_size_bytes += event.size_bytes as usize;
6770
}
6871
let ch_insert_end_res = ch_insert.end().await;
6972
match ch_insert_end_res {
70-
Ok(_) => Ok(()),
73+
Ok(_) => Ok(total_size_bytes),
7174
Err(e) => Err(anyhow::anyhow!(
7275
"Clickhouse events insertion failed: {:?}",
7376
e

app-server/src/traces/consumer.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -369,20 +369,28 @@ async fn process_batch(
369369
log::error!("Failed to ack MQ delivery (batch): {:?}", e);
370370
});
371371

372-
match record_events(clickhouse.clone(), &all_events).await {
373-
Ok(_) => {}
372+
let total_events_ingested_bytes = match record_events(clickhouse.clone(), &all_events).await {
373+
Ok(bytes) => bytes,
374374
Err(e) => {
375375
log::error!("Failed to record events: {:?}", e);
376+
0
376377
}
377378
};
378379

380+
let total_ingested_bytes = spans_ingested_bytes
381+
.iter()
382+
.map(|b| b.span_bytes)
383+
.sum::<usize>()
384+
+ total_events_ingested_bytes;
385+
379386
for project_id in project_ids {
380387
if is_feature_enabled(Feature::UsageLimit) {
381388
if let Err(e) = update_workspace_limit_exceeded_by_project_id(
382389
db.clone(),
383390
clickhouse.clone(),
384391
cache.clone(),
385392
project_id,
393+
total_ingested_bytes,
386394
)
387395
.await
388396
{

app-server/src/traces/events.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,11 @@ use crate::{
55
db::events::Event,
66
};
77

8+
///
89
pub async fn record_events(
910
clickhouse: clickhouse::Client,
1011
event_payloads: &Vec<Event>,
11-
) -> Result<()> {
12+
) -> Result<usize> {
1213
let ch_events = event_payloads
1314
.iter()
1415
.map(|e| CHEvent::from_db_event(e))

app-server/src/traces/limits.rs

Lines changed: 54 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,15 @@ use uuid::Uuid;
99
use crate::{
1010
cache::{
1111
Cache, CacheTrait,
12-
keys::{PROJECT_CACHE_KEY, WORKSPACE_LIMITS_CACHE_KEY},
12+
keys::{PROJECT_CACHE_KEY, WORKSPACE_LIMITS_CACHE_KEY, WORKSPACE_PARTIAL_USAGE_CACHE_KEY},
1313
},
1414
ch,
1515
db::{self, DB, projects::ProjectWithWorkspaceBillingInfo, stats::WorkspaceLimitsExceeded},
1616
};
1717

18+
// Threshold in bytes (16MB) - only recompute workspace limits after this much data is written
19+
const RECOMPUTE_THRESHOLD_BYTES: usize = 16 * 1024 * 1024; // 16MB
20+
1821
pub async fn get_workspace_limit_exceeded_by_project_id(
1922
db: Arc<DB>,
2023
clickhouse: clickhouse::Client,
@@ -56,6 +59,7 @@ pub async fn update_workspace_limit_exceeded_by_project_id(
5659
clickhouse: clickhouse::Client,
5760
cache: Arc<Cache>,
5861
project_id: Uuid,
62+
written_bytes: usize,
5963
) -> Result<()> {
6064
tokio::spawn(async move {
6165
let project_info = get_workspace_info_for_project_id(db.clone(), cache.clone(), project_id)
@@ -74,26 +78,57 @@ pub async fn update_workspace_limit_exceeded_by_project_id(
7478
return;
7579
}
7680

77-
let cache_key = format!("{WORKSPACE_LIMITS_CACHE_KEY}:{workspace_id}");
78-
let workspace_limits_exceeded = is_workspace_over_limit(
79-
clickhouse,
80-
project_info.workspace_project_ids,
81-
project_info.bytes_limit,
82-
project_info.reset_time,
83-
)
84-
.await
85-
.map_err(|e| {
86-
log::error!(
87-
"Failed to update workspace limit exceeded for project [{}]: {:?}",
88-
project_id,
89-
e
90-
);
91-
})
92-
.unwrap();
93-
cache
94-
.insert::<WorkspaceLimitsExceeded>(&cache_key, workspace_limits_exceeded.clone())
81+
let partial_usage_cache_key = format!("{WORKSPACE_PARTIAL_USAGE_CACHE_KEY}:{workspace_id}");
82+
let limits_cache_key = format!("{WORKSPACE_LIMITS_CACHE_KEY}:{workspace_id}");
83+
84+
// Get current partial usage from cache
85+
let cache_result = cache.get::<usize>(&partial_usage_cache_key).await;
86+
87+
// If cache is missing or errored, we should recompute
88+
let (current_partial_usage, cache_available) = match cache_result {
89+
Ok(Some(value)) => (value, true),
90+
Ok(None) | Err(_) => (0, false),
91+
};
92+
93+
let new_partial_usage = current_partial_usage + written_bytes;
94+
95+
// Recompute if: cache was unavailable, or we've accumulated at least RECOMPUTE_THRESHOLD_BYTES
96+
let should_recompute = !cache_available || new_partial_usage >= RECOMPUTE_THRESHOLD_BYTES;
97+
98+
if should_recompute {
99+
// Perform the heavy computation
100+
let workspace_limits_exceeded = is_workspace_over_limit(
101+
clickhouse,
102+
project_info.workspace_project_ids,
103+
project_info.bytes_limit,
104+
project_info.reset_time,
105+
)
95106
.await
107+
.map_err(|e| {
108+
log::error!(
109+
"Failed to update workspace limit exceeded for project [{}]: {:?}",
110+
project_id,
111+
e
112+
);
113+
})
96114
.unwrap();
115+
116+
// Update the limits cache
117+
let _ = cache
118+
.insert::<WorkspaceLimitsExceeded>(
119+
&limits_cache_key,
120+
workspace_limits_exceeded.clone(),
121+
)
122+
.await;
123+
124+
// Reset the partial usage counter
125+
let _ = cache.insert::<usize>(&partial_usage_cache_key, 0).await;
126+
} else {
127+
// Just update the partial usage counter
128+
let _ = cache
129+
.insert::<usize>(&partial_usage_cache_key, new_partial_usage)
130+
.await;
131+
}
97132
});
98133

99134
Ok(())

0 commit comments

Comments
 (0)