@@ -19,7 +19,7 @@ def log(msg, *args):
1919 sys .stderr .write (msg + " " .join ([str (a ) for a in args ]) + "\n " )
2020
2121
22- def attach_resp_to_event ( event , data , ipfield , allowed_fields = None ):
22+ def attach_resp_to_record ( record , data , ipfield , allowed_fields = None ):
2323 allowed = set (allowed_fields ) if allowed_fields else None
2424
2525 prefix = f"crowdsec_{ ipfield } _"
@@ -78,12 +78,11 @@ def attach_resp_to_event(event, data, ipfield, allowed_fields=None):
7878 for field , value in mapped_fields .items ():
7979 short_field = field [len (prefix ):]
8080 if allowed is None or short_field in allowed :
81- event [field ] = value
81+ record [field ] = value
8282
83- return event
83+ return record
8484
85-
86- @Configuration ()
85+ @Configuration (distributed = False )
8786class CsSmokeCommand (StreamingCommand ):
8887
8988 """%(synopsis)
@@ -112,7 +111,7 @@ class CsSmokeCommand(StreamingCommand):
112111 require = False ,
113112 )
114113
115- def stream (self , events ):
114+ def stream (self , records ):
116115 api_key = ""
117116 for passw in self .service .storage_passwords .list ():
118117 if passw .name == "crowdsec-splunk-app_realm:api_key:" :
@@ -137,41 +136,104 @@ def stream(self, events):
137136 batching_enabled , batch_size = self ._load_batching_settings ()
138137
139138 max_batch_size = batch_size if batching_enabled else 1
140- yield from self ._process_events (events , headers , allowed_fields , max_batch_size )
141-
139+ yield from self ._process_records (records , headers , allowed_fields , max_batch_size )
140+
141+ def _add_default_fields_to_record (self , record , allowed_fields ):
142+ allowed = set (allowed_fields ) if allowed_fields else None
143+ prefix = f"crowdsec_{ self .ipfield } _"
144+
145+ default_fields = {
146+ f"{ prefix } reputation" : "" ,
147+ f"{ prefix } confidence" : "" ,
148+ f"{ prefix } ip_range_score" : "" ,
149+ f"{ prefix } ip" : "" ,
150+ f"{ prefix } ip_range" : "" ,
151+ f"{ prefix } ip_range_24" : "" ,
152+ f"{ prefix } ip_range_24_reputation" : "" ,
153+ f"{ prefix } ip_range_24_score" : "" ,
154+ f"{ prefix } as_name" : "" ,
155+ f"{ prefix } as_num" : "" ,
156+ f"{ prefix } country" : "" ,
157+ f"{ prefix } city" : "" ,
158+ f"{ prefix } latitude" : "" ,
159+ f"{ prefix } longitude" : "" ,
160+ f"{ prefix } reverse_dns" : "" ,
161+ f"{ prefix } behaviors" : "" ,
162+ f"{ prefix } mitre_techniques" : "" ,
163+ f"{ prefix } cves" : "" ,
164+ f"{ prefix } first_seen" : "" ,
165+ f"{ prefix } last_seen" : "" ,
166+ f"{ prefix } full_age" : "" ,
167+ f"{ prefix } days_age" : "" ,
168+ f"{ prefix } false_positives" : "" ,
169+ f"{ prefix } classifications" : "" ,
170+ f"{ prefix } attack_details" : "" ,
171+ f"{ prefix } target_countries" : "" ,
172+ f"{ prefix } background_noise" : "" ,
173+ f"{ prefix } background_noise_score" : "" ,
174+ f"{ prefix } overall_aggressiveness" : "" ,
175+ f"{ prefix } overall_threat" : "" ,
176+ f"{ prefix } overall_trust" : "" ,
177+ f"{ prefix } overall_anomaly" : "" ,
178+ f"{ prefix } overall_total" : "" ,
179+ f"{ prefix } last_day_aggressiveness" : "" ,
180+ f"{ prefix } last_day_threat" : "" ,
181+ f"{ prefix } last_day_trust" : "" ,
182+ f"{ prefix } last_day_anomaly" : "" ,
183+ f"{ prefix } last_day_total" : "" ,
184+ f"{ prefix } last_week_aggressiveness" : "" ,
185+ f"{ prefix } last_week_threat" : "" ,
186+ f"{ prefix } last_week_trust" : "" ,
187+ f"{ prefix } last_week_anomaly" : "" ,
188+ f"{ prefix } last_week_total" : "" ,
189+ f"{ prefix } last_month_aggressiveness" : "" ,
190+ f"{ prefix } last_month_threat" : "" ,
191+ f"{ prefix } last_month_trust" : "" ,
192+ f"{ prefix } last_month_anomaly" : "" ,
193+ f"{ prefix } last_month_total" : "" ,
194+ f"{ prefix } references" : "" ,
195+ }
142196
197+ for field , value in default_fields .items ():
198+ short_field = field [len (prefix ):]
199+ if allowed is None or short_field in allowed :
200+ record [field ] = value
143201
144- def _enrich_single_event (self , event , event_dest_ip , headers , allowed_fields ):
202+ def _enrich_single_record (self , record , record_dest_ip , headers , allowed_fields ):
145203 params = (
146- ("ipAddress" , event_dest_ip ),
204+ ("ipAddress" , record_dest_ip ),
147205 ("verbose" , "" ),
148206 )
149207 response = req .get (
150- f"https://cti.api.crowdsec.net/v2/smoke/{ event_dest_ip } " ,
208+ f"https://cti.api.crowdsec.net/v2/smoke/{ record_dest_ip } " ,
151209 headers = headers ,
152210 params = params ,
153211 )
154212 if response .status_code == 200 :
155213 data = response .json ()
156- event = attach_resp_to_event ( event , data , self .ipfield , allowed_fields )
214+ record = attach_resp_to_record ( record , data , self .ipfield , allowed_fields )
157215 elif response .status_code == 429 :
158- event [f"crowdsec_{ self .ipfield } _error" ] = '"Quota exceeded for CrowdSec CTI API. Please visit https://www.crowdsec.net/pricing to upgrade your plan."'
216+ record [f"crowdsec_{ self .ipfield } _error" ] = '"Quota exceeded for CrowdSec CTI API. Please visit https://www.crowdsec.net/pricing to upgrade your plan."'
159217 elif response .status_code == 404 :
160- event [f"crowdsec_{ self .ipfield } _reputation" ] = "unknown"
161- event [f"crowdsec_{ self .ipfield } _confidence" ] = "none"
218+ record [f"crowdsec_{ self .ipfield } _reputation" ] = "unknown"
219+ record [f"crowdsec_{ self .ipfield } _confidence" ] = "none"
162220 else :
163- event [f"crowdsec_{ self .ipfield } _error" ] = f"Error { response .status_code } : { response .text } "
164- return event
221+ record [f"crowdsec_{ self .ipfield } _error" ] = f"Error { response .status_code } : { response .text } "
222+ return record
165223
166- def _process_events (self , events , headers , allowed_fields , batch_size ):
224+ def _process_records (self , records , headers , allowed_fields , batch_size ):
167225 buffer = []
168- for event in events :
169- event_dest_ip = event .get (self .ipfield )
170- if not event_dest_ip :
171- event [f"crowdsec_{ self .ipfield } _error" ] = f"Field { self .ipfield } not found on event"
172- yield event
226+ first_record = True
227+ for record in records :
228+ if first_record :
229+ self ._add_default_fields_to_record (record , allowed_fields )
230+ first_record = False
231+ record_dest_ip = record .get (self .ipfield )
232+ if not record_dest_ip :
233+ record [f"crowdsec_{ self .ipfield } _error" ] = f"Field { self .ipfield } not found in record"
234+ yield record
173235 continue
174- buffer .append ((event , event_dest_ip ))
236+ buffer .append ((record , record_dest_ip ))
175237 if len (buffer ) >= batch_size :
176238 yield from self ._execute_batch (buffer , headers , allowed_fields )
177239 buffer = []
@@ -181,8 +243,8 @@ def _process_events(self, events, headers, allowed_fields, batch_size):
181243
182244 def _execute_batch (self , buffer , headers , allowed_fields ):
183245 if len (buffer ) == 1 :
184- event , ip = buffer [0 ]
185- yield self ._enrich_single_event ( event , ip , headers , allowed_fields )
246+ record , ip = buffer [0 ]
247+ yield self ._enrich_single_record ( record , ip , headers , allowed_fields )
186248 return
187249
188250 ips = [ip for _ , ip in buffer ]
@@ -195,26 +257,26 @@ def _execute_batch(self, buffer, headers, allowed_fields):
195257
196258 if response .status_code == 200 :
197259 payload = self ._normalize_batch_response (response .json ())
198- for event , ip in buffer :
260+ for record , ip in buffer :
199261 for entry in payload :
200262 if entry .get ("ip" ) == ip :
201- attach_resp_to_event ( event , entry , self .ipfield , allowed_fields )
202- yield event
263+ attach_resp_to_record ( record , entry , self .ipfield , allowed_fields )
264+ yield record
203265 break
204266 else :
205- event [f"crowdsec_{ self .ipfield } _reputation" ] = "unknown"
206- event [f"crowdsec_{ self .ipfield } _confidence" ] = "none"
207- yield event
267+ record [f"crowdsec_{ self .ipfield } _reputation" ] = "unknown"
268+ record [f"crowdsec_{ self .ipfield } _confidence" ] = "none"
269+ yield record
208270 elif response .status_code == 429 :
209271 error_msg = '"Quota exceeded for CrowdSec CTI API. Please visit https://www.crowdsec.net/pricing to upgrade your plan."'
210- for event , _ in buffer :
211- event [f"crowdsec_{ self .ipfield } _error" ] = error_msg
212- yield event
272+ for record , _ in buffer :
273+ record [f"crowdsec_{ self .ipfield } _error" ] = error_msg
274+ yield record
213275 else :
214276 error_msg = f"Error { response .status_code } : { response .text } "
215- for event , _ in buffer :
216- event [f"crowdsec_{ self .ipfield } _error" ] = error_msg
217- yield event
277+ for record , _ in buffer :
278+ record [f"crowdsec_{ self .ipfield } _error" ] = error_msg
279+ yield record
218280
219281 def _normalize_batch_response (self , data ):
220282 if isinstance (data , dict ) and "items" in data and isinstance (data ["items" ], list ):
0 commit comments