@@ -8,8 +8,12 @@ use crate::{
88 api:: v1:: browser_sessions:: {
99 BROWSER_SESSIONS_EXCHANGE , BROWSER_SESSIONS_QUEUE , BROWSER_SESSIONS_ROUTING_KEY , EventBatch ,
1010 } ,
11+ cache:: Cache ,
1112 ch:: browser_events:: insert_browser_events,
13+ db:: DB ,
14+ features:: { Feature , is_feature_enabled} ,
1215 mq:: { MessageQueue , MessageQueueDeliveryTrait , MessageQueueReceiverTrait , MessageQueueTrait } ,
16+ traces:: limits:: update_workspace_limit_exceeded_by_project_id,
1317} ;
1418
1519#[ derive( Serialize , Deserialize , Clone ) ]
@@ -19,16 +23,28 @@ pub struct QueueBrowserEventMessage {
1923}
2024
2125pub async fn process_browser_events (
26+ db : Arc < DB > ,
2227 clickhouse : clickhouse:: Client ,
28+ cache : Arc < Cache > ,
2329 browser_events_message_queue : Arc < MessageQueue > ,
2430) {
2531 loop {
26- inner_process_browser_events ( clickhouse. clone ( ) , browser_events_message_queue. clone ( ) )
27- . await ;
32+ inner_process_browser_events (
33+ db. clone ( ) ,
34+ clickhouse. clone ( ) ,
35+ cache. clone ( ) ,
36+ browser_events_message_queue. clone ( ) ,
37+ )
38+ . await ;
2839 }
2940}
3041
31- async fn inner_process_browser_events ( clickhouse : clickhouse:: Client , queue : Arc < MessageQueue > ) {
42+ async fn inner_process_browser_events (
43+ db : Arc < DB > ,
44+ clickhouse : clickhouse:: Client ,
45+ cache : Arc < Cache > ,
46+ queue : Arc < MessageQueue > ,
47+ ) {
3248 // Add retry logic with exponential backoff for connection failures
3349 let get_receiver = || async {
3450 queue
@@ -87,13 +103,13 @@ async fn inner_process_browser_events(clickhouse: clickhouse::Client, queue: Arc
87103 continue ;
88104 }
89105
90- let insert_browser_events = || async {
91- insert_browser_events ( & clickhouse, project_id, & batch) . await . map_err ( |e| {
106+ let insert_browser_events_fn = || async {
107+ let bytes_written = insert_browser_events ( & clickhouse, project_id, & batch) . await . map_err ( |e| {
92108 log:: error!( "Failed attempt to insert browser events. Will retry according to backoff policy. Error: {:?}" , e) ;
93109 backoff:: Error :: transient ( e)
94110 } ) ?;
95111
96- Ok :: < ( ) , backoff:: Error < clickhouse:: error:: Error > > ( ( ) )
112+ Ok :: < usize , backoff:: Error < clickhouse:: error:: Error > > ( bytes_written )
97113 } ;
98114 // Starting with 1 second delay, delay multiplies by random factor between 1 and 2
99115 // up to 1 minute and until the total elapsed time is 1 minute
@@ -106,11 +122,30 @@ async fn inner_process_browser_events(clickhouse: clickhouse::Client, queue: Arc
106122 . with_max_elapsed_time ( Some ( std:: time:: Duration :: from_secs ( 1 * 60 ) ) )
107123 . build ( ) ;
108124
109- match backoff:: future:: retry ( exponential_backoff, insert_browser_events ) . await {
110- Ok ( _ ) => {
125+ match backoff:: future:: retry ( exponential_backoff, insert_browser_events_fn ) . await {
126+ Ok ( bytes_written ) => {
111127 if let Err ( e) = acker. ack ( ) . await {
112128 log:: error!( "Failed to ack MQ delivery (browser events): {:?}" , e) ;
113129 }
130+
131+ // Update workspace limits cache
132+ if is_feature_enabled ( Feature :: UsageLimit ) {
133+ if let Err ( e) = update_workspace_limit_exceeded_by_project_id (
134+ db. clone ( ) ,
135+ clickhouse. clone ( ) ,
136+ cache. clone ( ) ,
137+ project_id,
138+ bytes_written,
139+ )
140+ . await
141+ {
142+ log:: error!(
143+ "Failed to update workspace limit exceeded for project [{}]: {:?}" ,
144+ project_id,
145+ e
146+ ) ;
147+ }
148+ }
114149 }
115150 Err ( e) => {
116151 log:: error!(
0 commit comments