Skip to content

Commit 650b523

Browse files
authored
expire workspace and project caches so that free tier resets after period reset (#956)
* expire workspace and project caches so that free tier resets after period reset * add ttls to new caches * remove tokio task
1 parent 46e4a74 commit 650b523

File tree

7 files changed

+175
-109
lines changed

7 files changed

+175
-109
lines changed

app-server/src/api/v1/browser_sessions.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -93,9 +93,12 @@ async fn create_session_event(
9393
cache.into_inner(),
9494
project_api_key.project_id,
9595
)
96-
.await?;
96+
.await
97+
.map_err(|e| {
98+
log::error!("Failed to get workspace limits: {:?}", e);
99+
});
97100

98-
if limits_exceeded.bytes_ingested {
101+
if limits_exceeded.is_ok_and(|limits_exceeded| limits_exceeded.bytes_ingested) {
99102
return Ok(HttpResponse::Forbidden().json("Workspace data limit exceeded"));
100103
}
101104
}

app-server/src/api/v1/traces.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,12 @@ pub async fn process_traces(
4444
cache.clone(),
4545
project_api_key.project_id,
4646
)
47-
.await?;
47+
.await
48+
.map_err(|e| {
49+
log::error!("Failed to get workspace limits: {:?}", e);
50+
});
4851

49-
if limits_exceeded.bytes_ingested {
52+
if limits_exceeded.is_ok_and(|limits_exceeded| limits_exceeded.bytes_ingested) {
5053
return Ok(HttpResponse::Forbidden().json("Workspace data limit exceeded"));
5154
}
5255
}

app-server/src/cache/in_memory.rs

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,16 @@ impl CacheTrait for InMemoryCache {
5353
Ok(())
5454
}
5555

56-
async fn increment(&self, key: &str, amount: i64) -> Result<Option<i64>, CacheError> {
56+
async fn insert_with_ttl<T>(&self, key: &str, value: T, seconds: u64) -> Result<(), CacheError>
57+
where
58+
T: Serialize + Send,
59+
{
60+
self.insert(key, value).await?;
61+
self.set_ttl(key, seconds).await?;
62+
Ok(())
63+
}
64+
65+
async fn increment(&self, key: &str, amount: i64) -> Result<i64, CacheError> {
5766
// Note: This is not truly atomic for in-memory cache, but should be fine for dev/testing.
5867
// Production should use Redis where increment is atomic.
5968
// Like Redis INCRBY, this creates the key with value=0 if it doesn't exist
@@ -66,6 +75,6 @@ impl CacheTrait for InMemoryCache {
6675
let new_bytes = serde_json::to_vec(&new_value).map_err(|e| CacheError::SerDeError(e))?;
6776

6877
self.cache.insert(String::from(key), new_bytes).await;
69-
Ok(Some(new_value))
78+
Ok(new_value)
7079
}
7180
}

app-server/src/cache/mod.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,13 @@ pub trait CacheTrait {
3232
T: Serialize + Send;
3333
async fn remove(&self, key: &str) -> Result<(), CacheError>;
3434
async fn set_ttl(&self, key: &str, seconds: u64) -> Result<(), CacheError>;
35+
async fn insert_with_ttl<T>(&self, key: &str, value: T, seconds: u64) -> Result<(), CacheError>
36+
where
37+
T: Serialize + Send;
3538
/// Atomically increment a numeric value by the given amount.
3639
/// If the key doesn't exist, it will be created with value 0 before incrementing.
3740
/// Returns the new value after incrementing.
3841
/// Callers should use get() first if they need to distinguish between missing keys
3942
/// and existing keys (to trigger recomputation logic, for example).
40-
async fn increment(&self, key: &str, amount: i64) -> Result<Option<i64>, CacheError>;
43+
async fn increment(&self, key: &str, amount: i64) -> Result<i64, CacheError>;
4144
}

app-server/src/cache/redis.rs

Lines changed: 44 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -60,54 +60,72 @@ impl CacheTrait for RedisCache {
6060
}
6161
};
6262

63-
if let Err(e) = self
64-
.connection
63+
self.connection
6564
.clone()
6665
.set::<_, Vec<u8>, ()>(String::from(key), bytes)
6766
.await
68-
{
69-
log::error!("Redis set error: {}", e);
70-
Err(CacheError::InternalError(anyhow::Error::from(e)))
71-
} else {
72-
Ok(())
73-
}
67+
.map_err(|e| {
68+
log::error!("Redis set error: {}", e);
69+
CacheError::InternalError(anyhow::Error::from(e))
70+
})?;
71+
Ok(())
7472
}
7573

7674
async fn remove(&self, key: &str) -> Result<(), CacheError> {
77-
if let Err(e) = self
78-
.connection
75+
self.connection
7976
.clone()
8077
.del::<_, ()>(String::from(key))
8178
.await
82-
{
83-
log::error!("Redis delete error: {}", e);
84-
Err(CacheError::InternalError(anyhow::Error::from(e)))
85-
} else {
86-
Ok(())
87-
}
79+
.map_err(|e| {
80+
log::error!("Redis delete error: {}", e);
81+
CacheError::InternalError(anyhow::Error::from(e))
82+
})?;
83+
Ok(())
8884
}
8985

9086
async fn set_ttl(&self, key: &str, seconds: u64) -> Result<(), CacheError> {
91-
if let Err(e) = self
92-
.connection
87+
self.connection
9388
.clone()
9489
.expire::<_, ()>(String::from(key), seconds as i64)
9590
.await
96-
{
97-
log::error!("Redis set ttl error: {}", e);
98-
Err(CacheError::InternalError(anyhow::Error::from(e)))
99-
} else {
100-
Ok(())
101-
}
91+
.map_err(|e| {
92+
log::error!("Redis set ttl error: {}", e);
93+
CacheError::InternalError(anyhow::Error::from(e))
94+
})?;
95+
Ok(())
96+
}
97+
98+
async fn insert_with_ttl<T>(&self, key: &str, value: T, seconds: u64) -> Result<(), CacheError>
99+
where
100+
T: Serialize + Send,
101+
{
102+
let bytes = match serde_json::to_vec(&value) {
103+
Ok(bytes) => bytes,
104+
Err(e) => {
105+
log::error!("Serialization error: {}", e);
106+
return Err(CacheError::SerDeError(e));
107+
}
108+
};
109+
110+
self.connection
111+
.clone()
112+
.set_ex::<_, Vec<u8>, ()>(String::from(key), bytes, seconds)
113+
.await
114+
.map_err(|e| {
115+
log::error!("Redis set error: {}", e);
116+
CacheError::InternalError(anyhow::Error::from(e))
117+
})?;
118+
119+
Ok(())
102120
}
103121

104-
async fn increment(&self, key: &str, amount: i64) -> Result<Option<i64>, CacheError> {
122+
async fn increment(&self, key: &str, amount: i64) -> Result<i64, CacheError> {
105123
// Use atomic INCRBY command
106124
// Note: Redis INCRBY will create the key if it doesn't exist, starting from 0
107125
// The caller should check with get() first if they want to handle missing keys differently
108126
let result: RedisResult<i64> = self.connection.clone().incr(key, amount).await;
109127
match result {
110-
Ok(new_value) => Ok(Some(new_value)),
128+
Ok(new_value) => Ok(new_value),
111129
Err(e) => {
112130
log::error!("Redis increment error: {}", e);
113131
Err(CacheError::InternalError(anyhow::Error::from(e)))

app-server/src/traces/limits.rs

Lines changed: 93 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,9 @@ use crate::{
1313
ch::limits::get_workspace_bytes_ingested_by_project_ids,
1414
db::{self, DB, projects::ProjectWithWorkspaceBillingInfo, stats::WorkspaceLimitsExceeded},
1515
};
16+
// For workspaces over the limit, expire the cache after 24 hours,
17+
// so that it resets in the next billing period (+/- 1 day).
18+
const WORKSPACE_USAGE_EXCEEDED_TTL_SECONDS: u64 = 60 * 60 * 24; // 24 hours
1619

1720
pub async fn get_workspace_limit_exceeded_by_project_id(
1821
db: Arc<DB>,
@@ -49,7 +52,7 @@ pub async fn get_workspace_limit_exceeded_by_project_id(
4952
project_id,
5053
e
5154
);
52-
0 as i64
55+
0
5356
}
5457
};
5558

@@ -59,7 +62,11 @@ pub async fn get_workspace_limit_exceeded_by_project_id(
5962
};
6063

6164
let _ = cache
62-
.insert::<WorkspaceLimitsExceeded>(&cache_key, workspace_limits_exceeded.clone())
65+
.insert_with_ttl::<WorkspaceLimitsExceeded>(
66+
&cache_key,
67+
workspace_limits_exceeded.clone(),
68+
WORKSPACE_USAGE_EXCEEDED_TTL_SECONDS,
69+
)
6370
.await;
6471
Ok(workspace_limits_exceeded)
6572
}
@@ -73,88 +80,99 @@ pub async fn update_workspace_limit_exceeded_by_project_id(
7380
project_id: Uuid,
7481
written_bytes: usize,
7582
) -> Result<()> {
76-
tokio::spawn(async move {
77-
let project_info =
78-
match get_workspace_info_for_project_id(db.clone(), cache.clone(), project_id).await {
79-
Ok(info) => info,
83+
let project_info =
84+
match get_workspace_info_for_project_id(db.clone(), cache.clone(), project_id).await {
85+
Ok(info) => info,
86+
Err(e) => {
87+
log::error!(
88+
"Failed to get workspace info for project [{}]: {:?}",
89+
project_id,
90+
e
91+
);
92+
return Err(anyhow::anyhow!(
93+
"Failed to get workspace info for project [{}]: {:?}",
94+
project_id,
95+
e
96+
));
97+
}
98+
};
99+
let workspace_id = project_info.workspace_id;
100+
if project_info.tier_name.trim().to_lowercase() != "free" {
101+
// We don't need to update the workspace limits cache for non-free tiers
102+
return Ok(());
103+
}
104+
105+
let bytes_usage_cache_key = format!("{WORKSPACE_BYTES_USAGE_CACHE_KEY}:{workspace_id}");
106+
let limits_cache_key = format!("{WORKSPACE_LIMITS_CACHE_KEY}:{workspace_id}");
107+
108+
// First, try to read from cache to check if it exists
109+
let cache_result = cache.get::<i64>(&bytes_usage_cache_key).await;
110+
111+
match cache_result {
112+
Ok(Some(_)) => {
113+
// Cache exists - atomically increment it
114+
let increment_result = cache
115+
.increment(&bytes_usage_cache_key, written_bytes as i64)
116+
.await;
117+
118+
if let Ok(new_partial_usage) = increment_result {
119+
let workspace_limits_exceeded = WorkspaceLimitsExceeded {
120+
steps: false,
121+
bytes_ingested: new_partial_usage >= project_info.bytes_limit,
122+
};
123+
124+
// Update the limits cache
125+
cache
126+
.insert_with_ttl::<WorkspaceLimitsExceeded>(
127+
&limits_cache_key,
128+
workspace_limits_exceeded,
129+
WORKSPACE_USAGE_EXCEEDED_TTL_SECONDS,
130+
)
131+
.await?;
132+
}
133+
}
134+
Ok(None) | Err(_) => {
135+
// Cache miss or error - perform full recomputation
136+
let bytes_ingested = match get_workspace_bytes_ingested_by_project_ids(
137+
clickhouse.clone(),
138+
project_info.workspace_project_ids,
139+
project_info.reset_time,
140+
)
141+
.await
142+
{
143+
Ok(bytes_ingested) => bytes_ingested as i64,
80144
Err(e) => {
81145
log::error!(
82-
"Failed to get workspace info for project [{}]: {:?}",
146+
"Failed to get workspace bytes ingested for project [{}]: {:?}",
83147
project_id,
84148
e
85149
);
86-
return;
150+
0 as i64
87151
}
88152
};
89-
let workspace_id = project_info.workspace_id;
90-
if project_info.tier_name.trim().to_lowercase() != "free" {
91-
// We don't need to update the workspace limits cache for non-free tiers
92-
return;
93-
}
94153

95-
let bytes_usage_cache_key = format!("{WORKSPACE_BYTES_USAGE_CACHE_KEY}:{workspace_id}");
96-
let limits_cache_key = format!("{WORKSPACE_LIMITS_CACHE_KEY}:{workspace_id}");
97-
98-
// First, try to read from cache to check if it exists
99-
let cache_result = cache.get::<i64>(&bytes_usage_cache_key).await;
100-
101-
match cache_result {
102-
Ok(Some(_)) => {
103-
// Cache exists - atomically increment it
104-
let increment_result = cache
105-
.increment(&bytes_usage_cache_key, written_bytes as i64)
106-
.await;
107-
108-
if let Ok(Some(new_partial_usage)) = increment_result {
109-
let workspace_limits_exceeded = WorkspaceLimitsExceeded {
110-
steps: false,
111-
bytes_ingested: new_partial_usage >= project_info.bytes_limit,
112-
};
113-
114-
// Update the limits cache
115-
let _ = cache
116-
.insert::<WorkspaceLimitsExceeded>(
117-
&limits_cache_key,
118-
workspace_limits_exceeded,
119-
)
120-
.await;
121-
}
122-
}
123-
Ok(None) | Err(_) => {
124-
// Cache miss or error - perform full recomputation
125-
let bytes_ingested = match get_workspace_bytes_ingested_by_project_ids(
126-
clickhouse.clone(),
127-
project_info.workspace_project_ids,
128-
project_info.reset_time,
129-
)
130-
.await
131-
{
132-
Ok(bytes_ingested) => bytes_ingested as i64,
133-
Err(e) => {
134-
log::error!(
135-
"Failed to get workspace bytes ingested for project [{}]: {:?}",
136-
project_id,
137-
e
138-
);
139-
0 as i64
140-
}
141-
};
142-
143-
let workspace_limits_exceeded = WorkspaceLimitsExceeded {
144-
steps: false,
145-
bytes_ingested: bytes_ingested >= project_info.bytes_limit,
146-
};
154+
let workspace_limits_exceeded = WorkspaceLimitsExceeded {
155+
steps: false,
156+
bytes_ingested: bytes_ingested >= project_info.bytes_limit,
157+
};
147158

148-
let _ = cache
149-
.insert::<WorkspaceLimitsExceeded>(&limits_cache_key, workspace_limits_exceeded)
150-
.await;
159+
cache
160+
.insert_with_ttl::<WorkspaceLimitsExceeded>(
161+
&limits_cache_key,
162+
workspace_limits_exceeded,
163+
WORKSPACE_USAGE_EXCEEDED_TTL_SECONDS,
164+
)
165+
.await?;
151166

152-
let _ = cache
153-
.insert::<i64>(&bytes_usage_cache_key, bytes_ingested as i64)
154-
.await;
155-
}
167+
cache
168+
.insert_with_ttl::<i64>(
169+
&bytes_usage_cache_key,
170+
bytes_ingested as i64,
171+
WORKSPACE_USAGE_EXCEEDED_TTL_SECONDS,
172+
)
173+
.await?;
156174
}
157-
});
175+
}
158176

159177
Ok(())
160178
}
@@ -173,9 +191,9 @@ async fn get_workspace_info_for_project_id(
173191
Ok(None) | Err(_) => {
174192
let info =
175193
db::projects::get_project_and_workspace_billing_info(&db.pool, &project_id).await?;
176-
let _ = cache
194+
cache
177195
.insert::<ProjectWithWorkspaceBillingInfo>(&cache_key, info.clone())
178-
.await;
196+
.await?;
179197
Ok(info)
180198
}
181199
}

0 commit comments

Comments
 (0)