|
1 | 1 | use crate::cache::actor_model::{ |
2 | | - CacheManagerLocalReq, CacheManagerRaftReq, CacheManagerRaftResult, CacheSetParam, |
| 2 | + CacheManagerLocalReq, CacheManagerRaftReq, CacheManagerRaftResult, CacheManagerResult, |
| 3 | + CacheSetParam, |
3 | 4 | }; |
4 | 5 | use crate::cache::model::{CacheKey, CacheValue}; |
5 | 6 | use crate::common::constant::DIRECT_CACHE_TABLE_NAME; |
| 7 | +use crate::common::datetime_utils::now_millis_i64; |
6 | 8 | use crate::common::datetime_utils::now_second_i32; |
| 9 | +use crate::common::limiter_utils::LimiterData; |
7 | 10 | use crate::common::pb::data_object::DirectCacheItemDo; |
| 11 | +use crate::raft::cache::CacheLimiterReq; |
8 | 12 | use crate::raft::filestore::model::SnapshotRecordDto; |
9 | 13 | use crate::raft::filestore::raftapply::{RaftApplyDataRequest, RaftApplyDataResponse}; |
10 | 14 | use crate::raft::filestore::raftsnapshot::{SnapshotWriterActor, SnapshotWriterRequest}; |
11 | 15 | use actix::prelude::*; |
12 | 16 | use bean_factory::{bean, Inject}; |
13 | | -use inner_mem_cache::{MemCache, TimeoutSet}; |
| 17 | +use inner_mem_cache::TimeoutSet; |
14 | 18 | use quick_protobuf::{BytesReader, Writer}; |
| 19 | +use ratelimiter_rs::RateLimiter; |
15 | 20 | use std::collections::HashMap; |
| 21 | +use std::convert::TryInto; |
16 | 22 | use std::sync::Arc; |
17 | 23 |
|
18 | 24 | pub struct CacheItem { |
@@ -211,6 +217,43 @@ impl DirectCacheManager { |
211 | 217 | } |
212 | 218 | } |
213 | 219 |
|
| 220 | + fn handle_limit(&mut self, limit_req: CacheLimiterReq) -> anyhow::Result<CacheManagerResult> { |
| 221 | + let (rate_to_ms_conversion, key, limit) = match limit_req { |
| 222 | + CacheLimiterReq::Second { key, limit } => (1000, key, limit), |
| 223 | + CacheLimiterReq::Minutes { key, limit } => (60 * 1000, key, limit), |
| 224 | + CacheLimiterReq::Hour { key, limit } => (60 * 60 * 1000, key, limit), |
| 225 | + CacheLimiterReq::Day { key, limit } => (24 * 60 * 60 * 1000, key, limit), |
| 226 | + CacheLimiterReq::OtherMills { |
| 227 | + key, |
| 228 | + limit, |
| 229 | + rate_to_ms_conversion, |
| 230 | + } => (rate_to_ms_conversion, key, limit), |
| 231 | + }; |
| 232 | + let key = CacheKey::new(crate::cache::model::CacheType::String, key); |
| 233 | + let now = now_millis_i64(); |
| 234 | + let mut limiter = if let Some(v) = self.get_valid_value(&key) { |
| 235 | + if let Some(s) = v.try_to_string() { |
| 236 | + let limiter_data: LimiterData = s.as_str().try_into()?; |
| 237 | + limiter_data.to_rate_limiter() |
| 238 | + } else { |
| 239 | + RateLimiter::load(rate_to_ms_conversion, 0, now) |
| 240 | + } |
| 241 | + } else { |
| 242 | + RateLimiter::load(rate_to_ms_conversion, 0, now) |
| 243 | + }; |
| 244 | + let r = limiter.acquire(limit, limit as i64); |
| 245 | + if r { |
| 246 | + let limiter_data: LimiterData = limiter.into(); |
| 247 | + let cache_value = CacheValue::String(Arc::new(limiter_data.to_string())); |
| 248 | + self.do_set( |
| 249 | + key.clone(), |
| 250 | + cache_value, |
| 251 | + ((now + rate_to_ms_conversion as i64) / 1000) as i32, |
| 252 | + ); |
| 253 | + } |
| 254 | + Ok(CacheManagerRaftResult::Limiter(r)) |
| 255 | + } |
| 256 | + |
214 | 257 | fn build_snapshot(&self, writer: Addr<SnapshotWriterActor>) -> anyhow::Result<()> { |
215 | 258 | let now = now_second_i32(); |
216 | 259 | for (key, v) in self.cache.iter() { |
@@ -295,6 +338,7 @@ impl Handler<CacheManagerRaftReq> for DirectCacheManager { |
295 | 338 | CacheManagerRaftReq::Ttl(key) => Ok(self.get_ttl(&key)), |
296 | 339 | CacheManagerRaftReq::Incr(key, expire) => Ok(self.incr(key, expire)), |
297 | 340 | CacheManagerRaftReq::Decr(key, expire) => Ok(self.decr(key, expire)), |
| 341 | + CacheManagerRaftReq::Limit(limit_req) => self.handle_limit(limit_req), |
298 | 342 | } |
299 | 343 | } |
300 | 344 | } |
|
0 commit comments