Skip to content

Commit c94adbf

Browse files
authored
cache increment (#959)
* tmp * fixes * comment * key name * fixes * comment
1 parent 47d339a commit c94adbf

File tree

7 files changed

+138
-104
lines changed

7 files changed

+138
-104
lines changed

app-server/src/cache/in_memory.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,4 +52,20 @@ impl CacheTrait for InMemoryCache {
5252
});
5353
Ok(())
5454
}
55+
56+
async fn increment(&self, key: &str, amount: i64) -> Result<Option<i64>, CacheError> {
57+
// Note: This is not truly atomic for in-memory cache, but should be fine for dev/testing.
58+
// Production should use Redis where increment is atomic.
59+
// Like Redis INCRBY, this creates the key with value=0 if it doesn't exist
60+
let current_value: i64 = match self.cache.get(key).await {
61+
Some(bytes) => serde_json::from_slice(&bytes).map_err(|e| CacheError::SerDeError(e))?,
62+
None => 0,
63+
};
64+
65+
let new_value = current_value + amount;
66+
let new_bytes = serde_json::to_vec(&new_value).map_err(|e| CacheError::SerDeError(e))?;
67+
68+
self.cache.insert(String::from(key), new_bytes).await;
69+
Ok(Some(new_value))
70+
}
5571
}

app-server/src/cache/keys.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,4 +7,4 @@ pub const PROJECT_CACHE_KEY: &str = "project";
77
pub const WORKSPACE_LIMITS_CACHE_KEY: &str = "workspace_limits";
88
pub const PROJECT_EVALUATORS_BY_PATH_CACHE_KEY: &str = "project_evaluators_by_path";
99

10-
pub const WORKSPACE_PARTIAL_USAGE_CACHE_KEY: &str = "workspace_partial_usage";
10+
pub const WORKSPACE_BYTES_USAGE_CACHE_KEY: &str = "workspace_bytes_usage";

app-server/src/cache/mod.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,4 +32,10 @@ pub trait CacheTrait {
3232
T: Serialize + Send;
3333
async fn remove(&self, key: &str) -> Result<(), CacheError>;
3434
async fn set_ttl(&self, key: &str, seconds: u64) -> Result<(), CacheError>;
35+
/// Atomically increment a numeric value by the given amount.
36+
/// If the key doesn't exist, it will be created with value 0 before incrementing.
37+
/// Returns the new value after incrementing.
38+
/// Callers should use get() first if they need to distinguish between missing keys
39+
/// and existing keys (to trigger recomputation logic, for example).
40+
async fn increment(&self, key: &str, amount: i64) -> Result<Option<i64>, CacheError>;
3541
}

app-server/src/cache/redis.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,4 +100,18 @@ impl CacheTrait for RedisCache {
100100
Ok(())
101101
}
102102
}
103+
104+
async fn increment(&self, key: &str, amount: i64) -> Result<Option<i64>, CacheError> {
105+
// Use atomic INCRBY command
106+
// Note: Redis INCRBY will create the key if it doesn't exist, starting from 0
107+
// The caller should check with get() first if they want to handle missing keys differently
108+
let result: RedisResult<i64> = self.connection.clone().incr(key, amount).await;
109+
match result {
110+
Ok(new_value) => Ok(Some(new_value)),
111+
Err(e) => {
112+
log::error!("Redis increment error: {}", e);
113+
Err(CacheError::InternalError(anyhow::Error::from(e)))
114+
}
115+
}
116+
}
103117
}

app-server/src/ch/limits.rs

Lines changed: 5 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,6 @@ use chrono::{DateTime, Months, Utc};
33
use clickhouse::Client;
44
use uuid::Uuid;
55

6-
use crate::db::stats::WorkspaceLimitsExceeded;
7-
86
/// Calculate how many complete months have elapsed from start_date to end_date
97
/// This mimics Python's dateutil.relativedelta behavior
108
fn complete_months_elapsed(start_date: DateTime<Utc>, end_date: DateTime<Utc>) -> u32 {
@@ -23,12 +21,11 @@ fn complete_months_elapsed(start_date: DateTime<Utc>, end_date: DateTime<Utc>) -
2321
months_elapsed
2422
}
2523

26-
pub async fn is_workspace_over_limit(
24+
pub async fn get_workspace_bytes_ingested_by_project_ids(
2725
clickhouse: Client,
2826
project_ids: Vec<Uuid>,
2927
reset_time: DateTime<Utc>,
30-
bytes_limit: i64,
31-
) -> Result<WorkspaceLimitsExceeded> {
28+
) -> Result<usize> {
3229
let now = Utc::now();
3330
let months_elapsed = complete_months_elapsed(reset_time, now);
3431

@@ -41,6 +38,7 @@ pub async fn is_workspace_over_limit(
4138
} else {
4239
reset_time
4340
};
41+
4442
let query = "WITH spans_bytes_ingested AS (
4543
SELECT
4644
SUM(spans.size_bytes) as spans_bytes_ingested
@@ -67,25 +65,14 @@ pub async fn is_workspace_over_limit(
6765
FROM spans_bytes_ingested, browser_session_events_bytes_ingested, events_bytes_ingested
6866
";
6967

70-
let total_bytes_ingested = clickhouse
68+
let result = clickhouse
7169
.query(&query)
7270
.param("project_ids", project_ids)
7371
.param("latest_reset_time", latest_reset_time.naive_utc())
7472
.fetch_optional::<usize>()
7573
.await?;
7674

77-
let Some(bytes_ingested) = total_bytes_ingested else {
78-
log::error!("No bytes ingested found for workspace in ClickHouse");
79-
return Ok(WorkspaceLimitsExceeded {
80-
steps: false,
81-
bytes_ingested: false,
82-
});
83-
};
84-
85-
Ok(WorkspaceLimitsExceeded {
86-
bytes_ingested: bytes_ingested > (bytes_limit.abs() as usize),
87-
steps: false,
88-
})
75+
Ok(result.unwrap_or(0))
8976
}
9077

9178
#[cfg(test)]

app-server/src/traces/consumer.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,6 @@ async fn process_spans_and_events_batch(
149149
) {
150150
let mut all_spans = Vec::new();
151151
let mut all_events = Vec::new();
152-
let mut project_ids = Vec::new();
153152
let mut spans_ingested_bytes = Vec::new();
154153

155154
// Process all spans in parallel (heavy processing)
@@ -173,7 +172,6 @@ async fn process_spans_and_events_batch(
173172
// Collect results from parallel processing
174173
for (span, events, ingested_bytes) in processing_results {
175174
spans_ingested_bytes.push(ingested_bytes.clone());
176-
project_ids.push(span.project_id);
177175
all_spans.push(span);
178176
all_events.extend(events.into_iter());
179177
}
@@ -206,7 +204,6 @@ async fn process_spans_and_events_batch(
206204
all_spans,
207205
spans_ingested_bytes,
208206
all_events,
209-
project_ids,
210207
db,
211208
clickhouse,
212209
cache,
@@ -229,7 +226,6 @@ async fn process_batch(
229226
mut spans: Vec<Span>,
230227
spans_ingested_bytes: Vec<IngestedBytes>,
231228
events: Vec<Event>,
232-
project_ids: Vec<Uuid>,
233229
db: Arc<DB>,
234230
clickhouse: clickhouse::Client,
235231
cache: Arc<Cache>,
@@ -383,7 +379,10 @@ async fn process_batch(
383379
.sum::<usize>()
384380
+ total_events_ingested_bytes;
385381

386-
for project_id in project_ids {
382+
// we get project id from the first span in the batch
383+
// because all spans in the batch have the same project id
384+
// batching is happening on the Otel SpanProcessor level
385+
if let Some(project_id) = stripped_spans.first().map(|s| s.project_id) {
387386
if is_feature_enabled(Feature::UsageLimit) {
388387
if let Err(e) = update_workspace_limit_exceeded_by_project_id(
389388
db.clone(),

app-server/src/traces/limits.rs

Lines changed: 92 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -3,21 +3,17 @@
33
use std::sync::Arc;
44

55
use anyhow::Result;
6-
use chrono::{DateTime, Utc};
76
use uuid::Uuid;
87

98
use crate::{
109
cache::{
1110
Cache, CacheTrait,
12-
keys::{PROJECT_CACHE_KEY, WORKSPACE_LIMITS_CACHE_KEY, WORKSPACE_PARTIAL_USAGE_CACHE_KEY},
11+
keys::{PROJECT_CACHE_KEY, WORKSPACE_BYTES_USAGE_CACHE_KEY, WORKSPACE_LIMITS_CACHE_KEY},
1312
},
14-
ch,
13+
ch::limits::get_workspace_bytes_ingested_by_project_ids,
1514
db::{self, DB, projects::ProjectWithWorkspaceBillingInfo, stats::WorkspaceLimitsExceeded},
1615
};
1716

18-
// Threshold in bytes (16MB) - only recompute workspace limits after this much data is written
19-
const RECOMPUTE_THRESHOLD_BYTES: usize = 16 * 1024 * 1024; // 16MB
20-
2117
pub async fn get_workspace_limit_exceeded_by_project_id(
2218
db: Arc<DB>,
2319
clickhouse: clickhouse::Client,
@@ -39,13 +35,29 @@ pub async fn get_workspace_limit_exceeded_by_project_id(
3935
match cache_res {
4036
Ok(Some(workspace_limits_exceeded)) => Ok(workspace_limits_exceeded),
4137
Ok(None) | Err(_) => {
42-
let workspace_limits_exceeded = is_workspace_over_limit(
43-
clickhouse,
38+
let bytes_ingested = match get_workspace_bytes_ingested_by_project_ids(
39+
clickhouse.clone(),
4440
project_info.workspace_project_ids,
45-
project_info.bytes_limit,
4641
project_info.reset_time,
4742
)
48-
.await?;
43+
.await
44+
{
45+
Ok(bytes_ingested) => bytes_ingested as i64,
46+
Err(e) => {
47+
log::error!(
48+
"Failed to get workspace bytes ingested for project [{}]: {:?}",
49+
project_id,
50+
e
51+
);
52+
0 as i64
53+
}
54+
};
55+
56+
let workspace_limits_exceeded = WorkspaceLimitsExceeded {
57+
steps: false,
58+
bytes_ingested: bytes_ingested >= project_info.bytes_limit,
59+
};
60+
4961
let _ = cache
5062
.insert::<WorkspaceLimitsExceeded>(&cache_key, workspace_limits_exceeded.clone())
5163
.await;
@@ -62,72 +74,85 @@ pub async fn update_workspace_limit_exceeded_by_project_id(
6274
written_bytes: usize,
6375
) -> Result<()> {
6476
tokio::spawn(async move {
65-
let project_info = get_workspace_info_for_project_id(db.clone(), cache.clone(), project_id)
66-
.await
67-
.map_err(|e| {
68-
log::error!(
69-
"Failed to get workspace info for project [{}]: {:?}",
70-
project_id,
71-
e
72-
);
73-
})
74-
.unwrap();
77+
let project_info =
78+
match get_workspace_info_for_project_id(db.clone(), cache.clone(), project_id).await {
79+
Ok(info) => info,
80+
Err(e) => {
81+
log::error!(
82+
"Failed to get workspace info for project [{}]: {:?}",
83+
project_id,
84+
e
85+
);
86+
return;
87+
}
88+
};
7589
let workspace_id = project_info.workspace_id;
7690
if project_info.tier_name.trim().to_lowercase() != "free" {
7791
// We don't need to update the workspace limits cache for non-free tiers
7892
return;
7993
}
8094

81-
let partial_usage_cache_key = format!("{WORKSPACE_PARTIAL_USAGE_CACHE_KEY}:{workspace_id}");
95+
let bytes_usage_cache_key = format!("{WORKSPACE_BYTES_USAGE_CACHE_KEY}:{workspace_id}");
8296
let limits_cache_key = format!("{WORKSPACE_LIMITS_CACHE_KEY}:{workspace_id}");
8397

84-
// Get current partial usage from cache
85-
let cache_result = cache.get::<usize>(&partial_usage_cache_key).await;
86-
87-
// If cache is missing or errored, we should recompute
88-
let (current_partial_usage, cache_available) = match cache_result {
89-
Ok(Some(value)) => (value, true),
90-
Ok(None) | Err(_) => (0, false),
91-
};
92-
93-
let new_partial_usage = current_partial_usage + written_bytes;
94-
95-
// Recompute if: cache was unavailable, or we've accumulated at least RECOMPUTE_THRESHOLD_BYTES
96-
let should_recompute = !cache_available || new_partial_usage >= RECOMPUTE_THRESHOLD_BYTES;
97-
98-
if should_recompute {
99-
// Perform the heavy computation
100-
let workspace_limits_exceeded = is_workspace_over_limit(
101-
clickhouse,
102-
project_info.workspace_project_ids,
103-
project_info.bytes_limit,
104-
project_info.reset_time,
105-
)
106-
.await
107-
.map_err(|e| {
108-
log::error!(
109-
"Failed to update workspace limit exceeded for project [{}]: {:?}",
110-
project_id,
111-
e
112-
);
113-
})
114-
.unwrap();
115-
116-
// Update the limits cache
117-
let _ = cache
118-
.insert::<WorkspaceLimitsExceeded>(
119-
&limits_cache_key,
120-
workspace_limits_exceeded.clone(),
98+
// First, try to read from cache to check if it exists
99+
let cache_result = cache.get::<i64>(&bytes_usage_cache_key).await;
100+
101+
match cache_result {
102+
Ok(Some(_)) => {
103+
// Cache exists - atomically increment it
104+
let increment_result = cache
105+
.increment(&bytes_usage_cache_key, written_bytes as i64)
106+
.await;
107+
108+
if let Ok(Some(new_partial_usage)) = increment_result {
109+
let workspace_limits_exceeded = WorkspaceLimitsExceeded {
110+
steps: false,
111+
bytes_ingested: new_partial_usage >= project_info.bytes_limit,
112+
};
113+
114+
// Update the limits cache
115+
let _ = cache
116+
.insert::<WorkspaceLimitsExceeded>(
117+
&limits_cache_key,
118+
workspace_limits_exceeded,
119+
)
120+
.await;
121+
}
122+
}
123+
Ok(None) | Err(_) => {
124+
// Cache miss or error - perform full recomputation
125+
let bytes_ingested = match get_workspace_bytes_ingested_by_project_ids(
126+
clickhouse.clone(),
127+
project_info.workspace_project_ids,
128+
project_info.reset_time,
121129
)
122-
.await;
123-
124-
// Reset the partial usage counter
125-
let _ = cache.insert::<usize>(&partial_usage_cache_key, 0).await;
126-
} else {
127-
// Just update the partial usage counter
128-
let _ = cache
129-
.insert::<usize>(&partial_usage_cache_key, new_partial_usage)
130-
.await;
130+
.await
131+
{
132+
Ok(bytes_ingested) => bytes_ingested as i64,
133+
Err(e) => {
134+
log::error!(
135+
"Failed to get workspace bytes ingested for project [{}]: {:?}",
136+
project_id,
137+
e
138+
);
139+
0 as i64
140+
}
141+
};
142+
143+
let workspace_limits_exceeded = WorkspaceLimitsExceeded {
144+
steps: false,
145+
bytes_ingested: bytes_ingested >= project_info.bytes_limit,
146+
};
147+
148+
let _ = cache
149+
.insert::<WorkspaceLimitsExceeded>(&limits_cache_key, workspace_limits_exceeded)
150+
.await;
151+
152+
let _ = cache
153+
.insert::<i64>(&bytes_usage_cache_key, bytes_ingested as i64)
154+
.await;
155+
}
131156
}
132157
});
133158

@@ -155,16 +180,3 @@ async fn get_workspace_info_for_project_id(
155180
}
156181
}
157182
}
158-
159-
async fn is_workspace_over_limit(
160-
clickhouse: clickhouse::Client,
161-
project_ids: Vec<Uuid>,
162-
bytes_limit: i64,
163-
reset_time: DateTime<Utc>,
164-
) -> Result<WorkspaceLimitsExceeded> {
165-
let workspace_limits_exceeded =
166-
ch::limits::is_workspace_over_limit(clickhouse, project_ids, reset_time, bytes_limit)
167-
.await?;
168-
169-
Ok(workspace_limits_exceeded)
170-
}

0 commit comments

Comments
 (0)