@@ -60,89 +60,89 @@ func (c *Client) getClientConfig() *protocol.ClientConfig {
6060 }
6161}
6262
63- func (c * Client ) handleActionChangeConfig (stream protocol.UI_NotificationsClient , notification * protocol.Notification ) {
63+ func (c * Client ) handleActionChangeConfig (stream protocol.UI_NotificationsClient , ntf * protocol.Notification ) {
6464 log .Info ("[notification] Reloading configuration" )
6565 // Parse received configuration first, to get the new proc monitor method.
66- newConf , err := config .Parse (notification .Data )
66+ newConf , err := config .Parse (ntf .Data )
6767 if err != nil {
68- log .Warning ("[notification] error parsing received config: %v" , notification .Data )
69- c .sendNotificationReply (stream , notification .Id , "" , err )
68+ log .Warning ("[notification] error parsing received config: %v" , ntf .Data )
69+ c .sendNotificationReply (stream , ntf . Type , ntf .Id , "" , err )
7070 return
7171 }
7272
7373 if err := c .reloadConfiguration (true , & newConf ); err != nil {
74- c .sendNotificationReply (stream , notification .Id , "" , err .Msg )
74+ c .sendNotificationReply (stream , ntf . Type , ntf .Id , "" , err .Msg )
7575 return
7676 }
7777
7878 // this save operation triggers a regular re-loadConfiguration()
79- err = config .Save (configFile , notification .Data )
79+ err = config .Save (configFile , ntf .Data )
8080 if err != nil {
8181 log .Warning ("[notification] CHANGE_CONFIG not applied %s" , err )
8282 }
8383
84- c .sendNotificationReply (stream , notification .Id , "" , err )
84+ c .sendNotificationReply (stream , ntf . Type , ntf .Id , "" , err )
8585}
8686
87- func (c * Client ) handleActionEnableRule (stream protocol.UI_NotificationsClient , notification * protocol.Notification ) {
87+ func (c * Client ) handleActionEnableRule (stream protocol.UI_NotificationsClient , ntf * protocol.Notification ) {
8888 var err error
89- for _ , rul := range notification .Rules {
89+ for _ , rul := range ntf .Rules {
9090 log .Info ("[notification] enable rule: %s" , rul .Name )
9191 // protocol.Rule(protobuf) != rule.Rule(json)
9292 r , _ := rule .Deserialize (rul )
9393 r .Enabled = true
9494 // save to disk only if the duration is rule.Always
9595 err = c .rules .Replace (r , r .Duration == rule .Always )
9696 }
97- c .sendNotificationReply (stream , notification .Id , "" , err )
97+ c .sendNotificationReply (stream , ntf . Type , ntf .Id , "" , err )
9898}
9999
100- func (c * Client ) handleActionDisableRule (stream protocol.UI_NotificationsClient , notification * protocol.Notification ) {
100+ func (c * Client ) handleActionDisableRule (stream protocol.UI_NotificationsClient , ntf * protocol.Notification ) {
101101 var err error
102- for _ , rul := range notification .Rules {
102+ for _ , rul := range ntf .Rules {
103103 log .Info ("[notification] disable rule: %s" , rul )
104104 r , _ := rule .Deserialize (rul )
105105 r .Enabled = false
106106 err = c .rules .Replace (r , r .Duration == rule .Always )
107107 }
108- c .sendNotificationReply (stream , notification .Id , "" , err )
108+ c .sendNotificationReply (stream , ntf . Type , ntf .Id , "" , err )
109109}
110110
111- func (c * Client ) handleActionChangeRule (stream protocol.UI_NotificationsClient , notification * protocol.Notification ) {
111+ func (c * Client ) handleActionChangeRule (stream protocol.UI_NotificationsClient , ntf * protocol.Notification ) {
112112 var rErr error
113- for _ , rul := range notification .Rules {
113+ for _ , rul := range ntf .Rules {
114114 r , err := rule .Deserialize (rul )
115115 if r == nil {
116116 rErr = fmt .Errorf ("Invalid rule, %s" , err )
117117 continue
118118 }
119- log .Info ("[notification] change rule: %s %d" , r , notification .Id )
119+ log .Info ("[notification] change rule: %s %d" , r , ntf .Id )
120120 if err := c .rules .Replace (r , r .Duration == rule .Always ); err != nil {
121121 log .Warning ("[notification] Error changing rule: %s %s" , err , r )
122122 rErr = err
123123 }
124124 }
125- c .sendNotificationReply (stream , notification .Id , "" , rErr )
125+ c .sendNotificationReply (stream , ntf . Type , ntf .Id , "" , rErr )
126126}
127127
128- func (c * Client ) handleActionDeleteRule (stream protocol.UI_NotificationsClient , notification * protocol.Notification ) {
128+ func (c * Client ) handleActionDeleteRule (stream protocol.UI_NotificationsClient , ntf * protocol.Notification ) {
129129 var err error
130- for _ , rul := range notification .Rules {
131- log .Info ("[notification] delete rule: %s %d" , rul .Name , notification .Id )
130+ for _ , rul := range ntf .Rules {
131+ log .Info ("[notification] delete rule: %s %d" , rul .Name , ntf .Id )
132132 err = c .rules .Delete (rul .Name )
133133 if err != nil {
134134 log .Error ("[notification] Error deleting rule: %s %s" , err , rul )
135135 }
136136 }
137- c .sendNotificationReply (stream , notification .Id , "" , err )
137+ c .sendNotificationReply (stream , ntf . Type , ntf .Id , "" , err )
138138}
139139
140- func (c * Client ) handleActionTaskStart (stream protocol.UI_NotificationsClient , notification * protocol.Notification ) {
140+ func (c * Client ) handleActionTaskStart (stream protocol.UI_NotificationsClient , ntf * protocol.Notification ) {
141141 var taskConf base.TaskNotification
142- err := json .Unmarshal ([]byte (notification .Data ), & taskConf )
142+ err := json .Unmarshal ([]byte (ntf .Data ), & taskConf )
143143 if err != nil {
144- log .Error ("parsing TaskStart, err: %s, %s" , err , notification .Data )
145- c .sendNotificationReply (stream , notification .Id , "" , err )
144+ log .Error ("parsing TaskStart, err: %s, %s" , err , ntf .Data )
145+ c .sendNotificationReply (stream , ntf . Type , ntf .Id , "" , err )
146146 return
147147 }
148148 switch taskConf .Name {
@@ -158,32 +158,32 @@ func (c *Client) handleActionTaskStart(stream protocol.UI_NotificationsClient, n
158158 pid , err := strconv .Atoi (conf ["pid" ].(string ))
159159 if err != nil {
160160 log .Error ("[pidmon] TaskStart.Data, PID err: %s, %v" , err , taskConf )
161- c .sendNotificationReply (stream , notification .Id , "" , err )
161+ c .sendNotificationReply (stream , ntf . Type , ntf .Id , "" , err )
162162 return
163163 }
164164 interval , _ := conf ["interval" ].(string )
165- c .monitorProcessDetails (pid , interval , stream , notification )
165+ c .monitorProcessDetails (pid , interval , stream , ntf )
166166 case nodemonitor .Name :
167167 conf , ok := taskConf .Data .(map [string ]interface {})
168168 if ! ok {
169169 log .Error ("[nodemon] TaskStart.Data, \" node\" err (string expected): %v" , taskConf )
170170 return
171171 }
172- c .monitorNode (conf ["node" ].(string ), conf ["interval" ].(string ), stream , notification )
172+ c .monitorNode (conf ["node" ].(string ), conf ["interval" ].(string ), stream , ntf )
173173 case socketsmonitor .Name :
174- c .monitorSockets (taskConf .Data , stream , notification )
174+ c .monitorSockets (taskConf .Data , stream , ntf )
175175 default :
176176 log .Debug ("TaskStart, unknown task: %v" , taskConf )
177- //c.sendNotificationReply(stream, notification .Id, "", err)
177+ //c.sendNotificationReply(stream, ntf.Type, ntf .Id, "", err)
178178 }
179179}
180180
181- func (c * Client ) handleActionTaskStop (stream protocol.UI_NotificationsClient , notification * protocol.Notification ) {
181+ func (c * Client ) handleActionTaskStop (stream protocol.UI_NotificationsClient , ntf * protocol.Notification ) {
182182 var taskConf base.TaskNotification
183- err := json .Unmarshal ([]byte (notification .Data ), & taskConf )
183+ err := json .Unmarshal ([]byte (ntf .Data ), & taskConf )
184184 if err != nil {
185- log .Error ("parsing TaskStop, err: %s, %s" , err , notification .Data )
186- c .sendNotificationReply (stream , notification . Id , "" , fmt .Errorf ("Error stopping task: %s" , notification .Data ))
185+ log .Error ("parsing TaskStop, err: %s, %s" , err , ntf .Data )
186+ c .sendNotificationReply (stream , ntf . Type , ntf . Id , "" , fmt .Errorf ("Error stopping task: %s" , ntf .Data ))
187187 return
188188 }
189189 switch taskConf .Name {
@@ -195,8 +195,8 @@ func (c *Client) handleActionTaskStop(stream protocol.UI_NotificationsClient, no
195195 }
196196 pid , err := strconv .Atoi (conf ["pid" ].(string ))
197197 if err != nil {
198- log .Error ("TaskStop.Data, err: %s, %s, %v+, %q" , err , notification .Data , taskConf .Data , taskConf .Data )
199- c .sendNotificationReply (stream , notification .Id , "" , err )
198+ log .Error ("TaskStop.Data, err: %s, %s, %v+, %q" , err , ntf .Data , taskConf .Data , taskConf .Data )
199+ c .sendNotificationReply (stream , ntf . Type , ntf .Id , "" , err )
200200 return
201201 }
202202 TaskMgr .RemoveTask (fmt .Sprint (taskConf .Name , "-" , pid ))
@@ -214,47 +214,47 @@ func (c *Client) handleActionTaskStop(stream protocol.UI_NotificationsClient, no
214214
215215 default :
216216 log .Debug ("TaskStop, unknown task: %v" , taskConf )
217- //c.sendNotificationReply(stream, notification .Id, "", err)
217+ //c.sendNotificationReply(stream, ntf.Type, ntf .Id, "", err)
218218 }
219219}
220220
221- func (c * Client ) handleActionEnableInterception (stream protocol.UI_NotificationsClient , notification * protocol.Notification ) {
221+ func (c * Client ) handleActionEnableInterception (stream protocol.UI_NotificationsClient , ntf * protocol.Notification ) {
222222 log .Info ("[notification] starting interception" )
223223 if err := monitor .ReconfigureMonitorMethod (c .config .ProcMonitorMethod , c .config .Ebpf , c .config .Audit ); err != nil && err .What > monitor .NoError {
224224 log .Warning ("[notification] error enabling monitor (%s): %s" , c .config .ProcMonitorMethod , err .Msg )
225- c .sendNotificationReply (stream , notification .Id , "" , err .Msg )
225+ c .sendNotificationReply (stream , ntf . Type , ntf .Id , "" , err .Msg )
226226 return
227227 }
228228 if err := firewall .EnableInterception (); err != nil {
229229 log .Warning ("[notification] firewall.EnableInterception() error: %s" , err )
230- c .sendNotificationReply (stream , notification .Id , "" , err )
230+ c .sendNotificationReply (stream , ntf . Type , ntf .Id , "" , err )
231231 return
232232 }
233- c .sendNotificationReply (stream , notification .Id , "" , nil )
233+ c .sendNotificationReply (stream , ntf . Type , ntf .Id , "" , nil )
234234}
235235
236- func (c * Client ) handleActionDisableInterception (stream protocol.UI_NotificationsClient , notification * protocol.Notification ) {
236+ func (c * Client ) handleActionDisableInterception (stream protocol.UI_NotificationsClient , ntf * protocol.Notification ) {
237237 log .Info ("[notification] stopping interception" )
238238 monitor .End ()
239239 if err := firewall .DisableInterception (); err != nil {
240240 log .Warning ("firewall.DisableInterception() error: %s" , err )
241- c .sendNotificationReply (stream , notification .Id , "" , err )
241+ c .sendNotificationReply (stream , ntf . Type , ntf .Id , "" , err )
242242 return
243243 }
244- c .sendNotificationReply (stream , notification .Id , "" , nil )
244+ c .sendNotificationReply (stream , ntf . Type , ntf .Id , "" , nil )
245245}
246246
247- func (c * Client ) handleActionReloadFw (stream protocol.UI_NotificationsClient , notification * protocol.Notification ) {
247+ func (c * Client ) handleActionReloadFw (stream protocol.UI_NotificationsClient , ntf * protocol.Notification ) {
248248 log .Info ("[notification] reloading firewall" )
249249
250- sysfw , err := firewall .Deserialize (notification .SysFirewall )
250+ sysfw , err := firewall .Deserialize (ntf .SysFirewall )
251251 if err != nil {
252252 log .Warning ("firewall.Deserialize() error: %s" , err )
253- c .sendNotificationReply (stream , notification .Id , "" , fmt .Errorf ("Error reloading firewall, invalid rules" ))
253+ c .sendNotificationReply (stream , ntf . Type , ntf .Id , "" , fmt .Errorf ("Error reloading firewall, invalid rules" ))
254254 return
255255 }
256256 if err := firewall .SaveConfiguration (sysfw ); err != nil {
257- c .sendNotificationReply (stream , notification .Id , "" , fmt .Errorf ("Error saving system firewall rules: %s" , err ))
257+ c .sendNotificationReply (stream , ntf . Type , ntf .Id , "" , fmt .Errorf ("Error saving system firewall rules: %s" , err ))
258258 return
259259 }
260260 // TODO:
@@ -273,62 +273,62 @@ func (c *Client) handleActionReloadFw(stream protocol.UI_NotificationsClient, no
273273 // FIXME: can this operation last longer than 2s? if there're more than.. 100...10000 rules?
274274 case <- time .After (2 * time .Second ):
275275 log .Debug ("[notification] reload firewall. timeout fired, no errors?" )
276- c .sendNotificationReply (stream , notification .Id , "" , nil )
276+ c .sendNotificationReply (stream , ntf . Type , ntf .Id , "" , nil )
277277 goto Exit
278278
279279 }
280280 }
281281 ExitWithError:
282- c .sendNotificationReply (stream , notification .Id , "" , fmt .Errorf ("%s" , errors ))
282+ c .sendNotificationReply (stream , ntf . Type , ntf .Id , "" , fmt .Errorf ("%s" , errors ))
283283 Exit:
284284 }(c )
285285
286286}
287287
288- func (c * Client ) handleNotification (stream protocol.UI_NotificationsClient , notification * protocol.Notification ) {
288+ func (c * Client ) handleNotification (stream protocol.UI_NotificationsClient , ntf * protocol.Notification ) {
289289 switch {
290- case notification .Type == protocol .Action_TASK_START :
291- c .handleActionTaskStart (stream , notification )
290+ case ntf .Type == protocol .Action_TASK_START :
291+ c .handleActionTaskStart (stream , ntf )
292292
293- case notification .Type == protocol .Action_TASK_STOP :
294- c .handleActionTaskStop (stream , notification )
293+ case ntf .Type == protocol .Action_TASK_STOP :
294+ c .handleActionTaskStop (stream , ntf )
295295
296- case notification .Type == protocol .Action_CHANGE_CONFIG :
297- c .handleActionChangeConfig (stream , notification )
296+ case ntf .Type == protocol .Action_CHANGE_CONFIG :
297+ c .handleActionChangeConfig (stream , ntf )
298298
299- case notification .Type == protocol .Action_ENABLE_INTERCEPTION :
300- c .handleActionEnableInterception (stream , notification )
299+ case ntf .Type == protocol .Action_ENABLE_INTERCEPTION :
300+ c .handleActionEnableInterception (stream , ntf )
301301
302- case notification .Type == protocol .Action_DISABLE_INTERCEPTION :
303- c .handleActionDisableInterception (stream , notification )
302+ case ntf .Type == protocol .Action_DISABLE_INTERCEPTION :
303+ c .handleActionDisableInterception (stream , ntf )
304304
305- case notification .Type == protocol .Action_RELOAD_FW_RULES :
306- c .handleActionReloadFw (stream , notification )
305+ case ntf .Type == protocol .Action_RELOAD_FW_RULES :
306+ c .handleActionReloadFw (stream , ntf )
307307
308308 // ENABLE_RULE just replaces the rule on disk
309- case notification .Type == protocol .Action_ENABLE_RULE :
310- c .handleActionEnableRule (stream , notification )
309+ case ntf .Type == protocol .Action_ENABLE_RULE :
310+ c .handleActionEnableRule (stream , ntf )
311311
312- case notification .Type == protocol .Action_DISABLE_RULE :
313- c .handleActionDisableRule (stream , notification )
312+ case ntf .Type == protocol .Action_DISABLE_RULE :
313+ c .handleActionDisableRule (stream , ntf )
314314
315- case notification .Type == protocol .Action_DELETE_RULE :
316- c .handleActionDeleteRule (stream , notification )
315+ case ntf .Type == protocol .Action_DELETE_RULE :
316+ c .handleActionDeleteRule (stream , ntf )
317317
318318 // CHANGE_RULE can add() or replace() an existing rule.
319- case notification .Type == protocol .Action_CHANGE_RULE :
320- c .handleActionChangeRule (stream , notification )
319+ case ntf .Type == protocol .Action_CHANGE_RULE :
320+ c .handleActionChangeRule (stream , ntf )
321321 }
322322}
323323
324- func (c * Client ) sendNotificationReply (stream protocol.UI_NotificationsClient , nID uint64 , data string , err error ) error {
324+ func (c * Client ) sendNotificationReply (stream protocol.UI_NotificationsClient , nType protocol. Action , nID uint64 , data string , err error ) error {
325325 reply := NewReply (nID , protocol .NotificationReplyCode_OK , data )
326326 if err != nil {
327327 reply .Code = protocol .NotificationReplyCode_ERROR
328328 reply .Data = fmt .Sprint (err )
329329 }
330330 if err := stream .Send (reply ); err != nil {
331- log .Error ("Error replying to notification: %s %d" , err , reply .Id )
331+ log .Error ("Error replying to notification, type : %d, id: %d , err: %s" , nType , reply .Id , err )
332332 return err
333333 }
334334
@@ -371,7 +371,9 @@ func (c *Client) listenForNotifications() {
371371 var err error
372372 // open the stream channel
373373 streamReply := & protocol.NotificationReply {Id : 0 , Code : protocol .NotificationReplyCode_OK }
374+ c .Lock ()
374375 c .streamNotifications , err = c .client .Notifications (ctx )
376+ c .Unlock ()
375377 if err != nil {
376378 log .Error ("establishing notifications channel %s" , err )
377379 return
0 commit comments