1818
1919use crate :: {
2020 option:: CONFIG ,
21- storage:: object_storage:: alert_json_path,
21+ storage:: { object_storage:: alert_json_path, ALERTS_ROOT_DIRECTORY , PARSEABLE_ROOT_DIRECTORY } ,
2222 sync:: schedule_alert_task,
23- utils:: { actix:: extract_session_key_from_req, uid :: Uid } ,
23+ utils:: actix:: extract_session_key_from_req,
2424} ;
2525use actix_web:: { web, HttpRequest , Responder } ;
2626use bytes:: Bytes ;
27- use tracing :: warn ;
27+ use relative_path :: RelativePathBuf ;
2828
29- use super :: { alerts_utils:: user_auth_for_query, AlertConfig , AlertError , AlertState , ALERTS } ;
29+ use super :: {
30+ alerts_utils:: user_auth_for_query, AlertConfig , AlertError , AlertRequest , AlertState , ALERTS ,
31+ } ;
3032
3133// GET /alerts
3234/// User needs at least a read access to the stream(s) that is being referenced in an alert
@@ -39,10 +41,11 @@ pub async fn list(req: HttpRequest) -> Result<impl Responder, AlertError> {
3941}
4042
4143// POST /alerts
42- pub async fn post ( req : HttpRequest , alert : AlertConfig ) -> Result < impl Responder , AlertError > {
44+ pub async fn post ( req : HttpRequest , alert : AlertRequest ) -> Result < impl Responder , AlertError > {
45+ let alert: AlertConfig = alert. into ( ) ;
4346 // validate the incoming alert query
4447 // does the user have access to these tables or not?
45- let session_key = extract_session_key_from_req ( & req) . unwrap ( ) ;
48+ let session_key = extract_session_key_from_req ( & req) ? ;
4649 user_auth_for_query ( & session_key, & alert. query ) . await ?;
4750
4851 // now that we've validated that the user can run this query
@@ -71,18 +74,27 @@ pub async fn get(req: HttpRequest) -> Result<impl Responder, AlertError> {
7174 . get ( "alert_id" )
7275 . ok_or ( AlertError :: Metadata ( "No alert ID Provided" ) ) ?;
7376
74- let alert = ALERTS . get_alert_by_id ( session_key, id) . await ?;
77+ let alert = ALERTS . get_alert_by_id ( id) . await ?;
78+ // validate that the user has access to the tables mentioned
79+ user_auth_for_query ( & session_key, & alert. query ) . await ?;
80+
7581 Ok ( web:: Json ( alert) )
7682}
7783
7884// DELETE /alerts/{alert_id}
7985/// Deletion should happen from disk, sheduled tasks, then memory
8086pub async fn delete ( req : HttpRequest ) -> Result < impl Responder , AlertError > {
87+ let session_key = extract_session_key_from_req ( & req) ?;
8188 let alert_id = req
8289 . match_info ( )
8390 . get ( "alert_id" )
8491 . ok_or ( AlertError :: Metadata ( "No alert ID Provided" ) ) ?;
8592
93+ let alert = ALERTS . get_alert_by_id ( alert_id) . await ?;
94+
95+ // validate that the user has access to the tables mentioned
96+ user_auth_for_query ( & session_key, & alert. query ) . await ?;
97+
8698 // delete from disk and memory
8799 ALERTS . delete ( alert_id) . await ?;
88100
@@ -95,31 +107,34 @@ pub async fn delete(req: HttpRequest) -> Result<impl Responder, AlertError> {
95107// PUT /alerts/{alert_id}
96108/// first save on disk, then in memory
97109/// then modify scheduled task
98- pub async fn modify (
99- req : HttpRequest ,
100- mut alert : AlertConfig ,
101- ) -> Result < impl Responder , AlertError > {
110+ pub async fn modify ( req : HttpRequest , alert : AlertRequest ) -> Result < impl Responder , AlertError > {
102111 let session_key = extract_session_key_from_req ( & req) ?;
103112 let alert_id = req
104113 . match_info ( )
105114 . get ( "alert_id" )
106115 . ok_or ( AlertError :: Metadata ( "No alert ID Provided" ) ) ?;
107116
108- // ensure that the user doesn't unknowingly change the ID
109- if alert_id != alert. id . to_string ( ) {
110- warn ! ( "Alert modify request is trying to change Alert ID, reverting ID" ) ;
111- alert. id = Uid :: from_string ( alert_id)
112- . map_err ( |_| AlertError :: CustomError ( "Unable to get Uid from String" . to_owned ( ) ) ) ?;
113- }
117+ // check if alert id exists in map
118+ ALERTS . get_alert_by_id ( alert_id) . await ?;
114119
115120 // validate that the user has access to the tables mentioned
116121 user_auth_for_query ( & session_key, & alert. query ) . await ?;
117122
118- // // fetch the alert from this ID to get AlertState
119- // let state = ALERTS.get_alert_by_id(session_key, alert_id).await?.state;
120-
121123 let store = CONFIG . storage ( ) . get_object_store ( ) ;
122124
125+ // fetch the alert object for the relevant ID
126+ let old_alert_config: AlertConfig = serde_json:: from_slice (
127+ & store
128+ . get_object ( & RelativePathBuf :: from_iter ( [
129+ PARSEABLE_ROOT_DIRECTORY ,
130+ ALERTS_ROOT_DIRECTORY ,
131+ & format ! ( "{alert_id}.json" ) ,
132+ ] ) )
133+ . await ?,
134+ ) ?;
135+
136+ let alert = alert. modify ( old_alert_config) ;
137+
123138 // modify on disk
124139 store. put_alert ( & alert. id . to_string ( ) , & alert) . await ?;
125140
@@ -136,11 +151,18 @@ pub async fn modify(
136151
137152// PUT /alerts/{alert_id}/update_state
138153pub async fn update_state ( req : HttpRequest , state : String ) -> Result < impl Responder , AlertError > {
154+ let session_key = extract_session_key_from_req ( & req) ?;
139155 let alert_id = req
140156 . match_info ( )
141157 . get ( "alert_id" )
142158 . ok_or ( AlertError :: Metadata ( "No alert ID Provided" ) ) ?;
143159
160+ // check if alert id exists in map
161+ let alert = ALERTS . get_alert_by_id ( alert_id) . await ?;
162+
163+ // validate that the user has access to the tables mentioned
164+ user_auth_for_query ( & session_key, & alert. query ) . await ?;
165+
144166 // get current state
145167 let current_state = ALERTS . get_state ( alert_id) . await ?;
146168
0 commit comments