11use crate :: cachestore:: { CacheItem , CacheStore , EvictionResult , QueueItem } ;
22use crate :: metastore:: { Column , ColumnType } ;
33
4+ use crate :: cluster:: rate_limiter:: { ProcessRateLimiter , TaskType , TraceIndex } ;
45use crate :: queryplanner:: { QueryPlan , QueryPlanner } ;
56use crate :: sql:: parser:: {
67 CacheCommand , CacheStoreCommand , CubeStoreParser , QueueCommand ,
@@ -13,23 +14,30 @@ use crate::util::metrics;
1314use crate :: { app_metrics, CubeError } ;
1415use async_trait:: async_trait;
1516use datafusion:: sql:: parser:: Statement as DFStatement ;
17+ use deepsize:: DeepSizeOf ;
1618use sqlparser:: ast:: Statement ;
1719use std:: path:: Path ;
1820use std:: sync:: Arc ;
19- use std:: time:: SystemTime ;
21+ use std:: time:: { Duration , SystemTime } ;
2022
2123pub struct CacheStoreSqlService {
2224 cachestore : Arc < dyn CacheStore > ,
2325 query_planner : Arc < dyn QueryPlanner > ,
26+ process_rate_limiter : Arc < dyn ProcessRateLimiter > ,
2427}
2528
2629crate :: di_service!( CacheStoreSqlService , [ SqlService ] ) ;
2730
2831impl CacheStoreSqlService {
29- pub fn new ( cachestore : Arc < dyn CacheStore > , query_planner : Arc < dyn QueryPlanner > ) -> Self {
32+ pub fn new (
33+ cachestore : Arc < dyn CacheStore > ,
34+ query_planner : Arc < dyn QueryPlanner > ,
35+ process_rate_limiter : Arc < dyn ProcessRateLimiter > ,
36+ ) -> Self {
3037 Self {
3138 cachestore,
3239 query_planner,
40+ process_rate_limiter,
3341 }
3442 }
3543
@@ -182,15 +190,23 @@ impl CacheStoreSqlService {
182190 command. as_tag_command( ) ,
183191 ) ] ) ,
184192 ) ;
193+
194+ let timeout = Some ( Duration :: from_secs ( 90 ) ) ;
195+ let wait_ms = self
196+ . process_rate_limiter
197+ . wait_for_allow ( TaskType :: Cache , timeout)
198+ . await ?;
199+
185200 let execution_time = SystemTime :: now ( ) ;
186201
187- let ( result, track_time) = match command {
202+ let ( result, additional_traffic , track_time) = match command {
188203 CacheCommand :: Set {
189204 key,
190205 value,
191206 ttl,
192207 nx,
193208 } => {
209+ let value_size = key. value . deep_size_of ( ) + value. deep_size_of ( ) ;
194210 let key = key. value ;
195211
196212 let success = self
@@ -203,6 +219,7 @@ impl CacheStoreSqlService {
203219 vec ! [ Column :: new( "success" . to_string( ) , ColumnType :: Boolean , 0 ) ] ,
204220 vec ! [ Row :: new( vec![ TableValue :: Boolean ( success) ] ) ] ,
205221 ) ) ,
222+ Some ( value_size) ,
206223 true ,
207224 )
208225 }
@@ -219,30 +236,33 @@ impl CacheStoreSqlService {
219236 vec ! [ Column :: new( "value" . to_string( ) , ColumnType :: String , 0 ) ] ,
220237 vec ! [ Row :: new( vec![ value] ) ] ,
221238 ) ) ,
239+ None ,
222240 true ,
223241 )
224242 }
225243 CacheCommand :: Keys { prefix } => {
226244 let rows = self . cachestore . cache_keys ( prefix. value ) . await ?;
245+
227246 (
228247 Arc :: new ( DataFrame :: new (
229248 vec ! [ Column :: new( "key" . to_string( ) , ColumnType :: String , 0 ) ] ,
230249 rows. iter ( )
231250 . map ( |i| Row :: new ( vec ! [ TableValue :: String ( i. get_row( ) . get_path( ) ) ] ) )
232251 . collect ( ) ,
233252 ) ) ,
253+ None ,
234254 true ,
235255 )
236256 }
237257 CacheCommand :: Remove { key } => {
238258 self . cachestore . cache_delete ( key. value ) . await ?;
239259
240- ( Arc :: new ( DataFrame :: new ( vec ! [ ] , vec ! [ ] ) ) , true )
260+ ( Arc :: new ( DataFrame :: new ( vec ! [ ] , vec ! [ ] ) ) , None , true )
241261 }
242262 CacheCommand :: Truncate { } => {
243263 self . cachestore . cache_truncate ( ) . await ?;
244264
245- ( Arc :: new ( DataFrame :: new ( vec ! [ ] , vec ! [ ] ) ) , false )
265+ ( Arc :: new ( DataFrame :: new ( vec ! [ ] , vec ! [ ] ) ) , None , false )
246266 }
247267 CacheCommand :: Incr { path } => {
248268 let row = self . cachestore . cache_incr ( path. value ) . await ?;
@@ -254,11 +274,26 @@ impl CacheStoreSqlService {
254274 row. get_row( ) . get_value( ) . clone( ) ,
255275 ) ] ) ] ,
256276 ) ) ,
277+ None ,
257278 true ,
258279 )
259280 }
260281 } ;
261282
283+ let trace_index = TraceIndex {
284+ // Important, it is used to aggregate all stats for cache by id
285+ table_id : Some ( 1 ) ,
286+ trace_obj : None ,
287+ } ;
288+ self . process_rate_limiter
289+ . commit_task_usage (
290+ TaskType :: Cache ,
291+ ( result. deep_size_of ( ) + additional_traffic. unwrap_or ( 0 ) ) as i64 ,
292+ wait_ms,
293+ trace_index,
294+ )
295+ . await ;
296+
262297 let execution_time = execution_time. elapsed ( ) ?;
263298
264299 if track_time {
@@ -282,15 +317,23 @@ impl CacheStoreSqlService {
282317 command. as_tag_command( ) ,
283318 ) ] ) ,
284319 ) ;
320+
321+ let timeout = Some ( Duration :: from_secs ( 90 ) ) ;
322+ let wait_ms = self
323+ . process_rate_limiter
324+ . wait_for_allow ( TaskType :: Queue , timeout)
325+ . await ?;
326+
285327 let execution_time = SystemTime :: now ( ) ;
286328
287- let ( result, track_time) = match command {
329+ let ( result, additional_traffic , track_time) = match command {
288330 QueueCommand :: Add {
289331 key,
290332 priority,
291333 orphaned,
292334 value,
293335 } => {
336+ let value_size = key. value . deep_size_of ( ) + value. deep_size_of ( ) ;
294337 let response = self
295338 . cachestore
296339 . queue_add ( QueueItem :: new (
@@ -315,13 +358,14 @@ impl CacheStoreSqlService {
315358 TableValue :: Int ( response. pending as i64 ) ,
316359 ] ) ] ,
317360 ) ) ,
361+ Some ( value_size) ,
318362 true ,
319363 )
320364 }
321365 QueueCommand :: Truncate { } => {
322366 self . cachestore . queue_truncate ( ) . await ?;
323367
324- ( Arc :: new ( DataFrame :: new ( vec ! [ ] , vec ! [ ] ) ) , false )
368+ ( Arc :: new ( DataFrame :: new ( vec ! [ ] , vec ! [ ] ) ) , None , false )
325369 }
326370 QueueCommand :: Cancel { key } => {
327371 let columns = vec ! [
@@ -336,17 +380,17 @@ impl CacheStoreSqlService {
336380 vec ! [ ]
337381 } ;
338382
339- ( Arc :: new ( DataFrame :: new ( columns, rows) ) , true )
383+ ( Arc :: new ( DataFrame :: new ( columns, rows) ) , None , true )
340384 }
341385 QueueCommand :: Heartbeat { key } => {
342386 self . cachestore . queue_heartbeat ( key) . await ?;
343387
344- ( Arc :: new ( DataFrame :: new ( vec ! [ ] , vec ! [ ] ) ) , true )
388+ ( Arc :: new ( DataFrame :: new ( vec ! [ ] , vec ! [ ] ) ) , None , true )
345389 }
346390 QueueCommand :: MergeExtra { key, payload } => {
347391 self . cachestore . queue_merge_extra ( key, payload) . await ?;
348392
349- ( Arc :: new ( DataFrame :: new ( vec ! [ ] , vec ! [ ] ) ) , true )
393+ ( Arc :: new ( DataFrame :: new ( vec ! [ ] , vec ! [ ] ) ) , None , true )
350394 }
351395 QueueCommand :: Ack { key, result } => {
352396 let success = self . cachestore . queue_ack ( key, result) . await ?;
@@ -356,6 +400,7 @@ impl CacheStoreSqlService {
356400 vec ! [ Column :: new( "success" . to_string( ) , ColumnType :: Boolean , 0 ) ] ,
357401 vec ! [ Row :: new( vec![ TableValue :: Boolean ( success) ] ) ] ,
358402 ) ) ,
403+ None ,
359404 true ,
360405 )
361406 }
@@ -375,6 +420,7 @@ impl CacheStoreSqlService {
375420 ] ,
376421 rows,
377422 ) ) ,
423+ None ,
378424 true ,
379425 )
380426 }
@@ -401,6 +447,7 @@ impl CacheStoreSqlService {
401447 . map ( |item| QueueItem :: queue_to_cancel_row ( item) )
402448 . collect ( ) ,
403449 ) ) ,
450+ None ,
404451 true ,
405452 )
406453 }
@@ -434,6 +481,7 @@ impl CacheStoreSqlService {
434481 . map ( |item| QueueItem :: queue_list_row ( item, with_payload) )
435482 . collect ( ) ,
436483 ) ) ,
484+ None ,
437485 true ,
438486 )
439487 }
@@ -458,6 +506,7 @@ impl CacheStoreSqlService {
458506 ] ,
459507 result. into_queue_retrieve_rows ( extended) ,
460508 ) ) ,
509+ None ,
461510 true ,
462511 )
463512 }
@@ -477,6 +526,7 @@ impl CacheStoreSqlService {
477526 ] ,
478527 rows,
479528 ) ) ,
529+ None ,
480530 true ,
481531 )
482532 }
@@ -497,11 +547,26 @@ impl CacheStoreSqlService {
497547 ] ,
498548 rows,
499549 ) ) ,
550+ None ,
500551 false ,
501552 )
502553 }
503554 } ;
504555
556+ let trace_index = TraceIndex {
557+ // Important, it is used to aggregate all stats for queue by id
558+ table_id : Some ( 1 ) ,
559+ trace_obj : None ,
560+ } ;
561+ self . process_rate_limiter
562+ . commit_task_usage (
563+ TaskType :: Queue ,
564+ ( result. deep_size_of ( ) + additional_traffic. unwrap_or ( 0 ) ) as i64 ,
565+ wait_ms,
566+ trace_index,
567+ )
568+ . await ;
569+
505570 let execution_time = execution_time. elapsed ( ) ?;
506571
507572 if track_time {
0 commit comments