|
| 1 | +import base64 |
1 | 2 | import json |
2 | 3 | import os |
| 4 | +import re |
3 | 5 | import sched |
4 | 6 | import ssl |
5 | 7 | import tempfile |
@@ -455,6 +457,11 @@ def match_alert_element_fuzzy(self, signature_value, alert_values, fuzzy_scoring |
455 | 457 | return False |
456 | 458 |
|
457 | 459 | def match_alert_elements(self, signatures, alert_data): |
| 460 | + return self._match_alert_elements_original( |
| 461 | + signatures, alert_data |
| 462 | + ) or self._match_alert_elements_for_command_line(signatures, alert_data) |
| 463 | + |
| 464 | + def _match_alert_elements_original(self, signatures, alert_data): |
458 | 465 | # Example for alert_data |
459 | 466 | # {"process_name": {"list": ["xx", "yy"], "fuzzy": 90}} |
460 | 467 | relevant_signatures = [ |
@@ -484,3 +491,44 @@ def match_alert_elements(self, signatures, alert_data): |
484 | 491 | if signatures_number == matching_number: |
485 | 492 | return True |
486 | 493 | return False |
| 494 | + |
| 495 | + def _match_alert_elements_for_command_line(self, signatures, alert_data): |
| 496 | + command_line_signatures = [ |
| 497 | + signature |
| 498 | + for signature in signatures |
| 499 | + if signature.get("type") == "command_line" |
| 500 | + ] |
| 501 | + if len(command_line_signatures) == 0: |
| 502 | + return False |
| 503 | + key_types = ["command_line", "process_name", "file_name"] |
| 504 | + alert_datas = [alert_data.get(key) for key in key_types if key in alert_data] |
| 505 | + for signature in command_line_signatures: |
| 506 | + signature_result = False |
| 507 | + signature_value = self._decode_value(signature["value"]).strip().lower() |
| 508 | + for alert_data in alert_datas: |
| 509 | + trimmed_lowered_datas = [s.strip().lower() for s in alert_data["data"]] |
| 510 | + signature_result = any( |
| 511 | + data in signature_value for data in trimmed_lowered_datas |
| 512 | + ) |
| 513 | + if signature_result: |
| 514 | + return True |
| 515 | + return False |
| 516 | + |
| 517 | + def _decode_value(self, signature_value): |
| 518 | + if _is_base64_encoded(signature_value): |
| 519 | + try: |
| 520 | + decoded_bytes = base64.b64decode(signature_value) |
| 521 | + decoded_str = decoded_bytes.decode("utf-8") |
| 522 | + return decoded_str |
| 523 | + except Exception as e: |
| 524 | + self.logger.error(str(e)) |
| 525 | + else: |
| 526 | + return signature_value |
| 527 | + |
| 528 | + |
| 529 | +def _is_base64_encoded(str_maybe_base64): |
| 530 | + # Check if the length is a multiple of 4 and matches the Base64 character set |
| 531 | + base64_pattern = re.compile(r"^[A-Za-z0-9+/]*={0,2}$") |
| 532 | + return len(str_maybe_base64) % 4 == 0 and bool( |
| 533 | + base64_pattern.match(str_maybe_base64) |
| 534 | + ) |
0 commit comments