|
17 | 17 | import os |
18 | 18 | import re |
19 | 19 | import time |
20 | | -from collections.abc import Iterator |
| 20 | +from collections.abc import Generator, Iterator |
21 | 21 | from html import unescape |
22 | 22 | from pathlib import Path |
23 | 23 |
|
|
38 | 38 | from soar_sdk.logging import getLogger |
39 | 39 | from soar_sdk.models.artifact import Artifact |
40 | 40 | from soar_sdk.models.container import Container |
41 | | -from soar_sdk.params import OnPollParams |
| 41 | +from soar_sdk.models.finding import Finding, FindingAttachment |
| 42 | +from soar_sdk.params import OnESPollParams, OnPollParams |
42 | 43 |
|
43 | 44 | from .consts import ( |
44 | 45 | MSGOFFICE365_CONTAINER_DESCRIPTION, |
@@ -763,6 +764,106 @@ def on_poll( |
763 | 764 | state["first_run"] = False |
764 | 765 |
|
765 | 766 |
|
| 767 | +@app.on_es_poll() |
| 768 | +def on_es_poll( |
| 769 | + params: OnESPollParams, soar: SOARClient, asset: Asset |
| 770 | +) -> Generator[Finding, int | None]: |
| 771 | + """Poll for new emails and create ES findings for each email.""" |
| 772 | + helper = MsGraphHelper(soar, asset) |
| 773 | + helper.get_token() |
| 774 | + |
| 775 | + state = getattr(asset, "ingest_state", None) or {} |
| 776 | + email_address = asset.email_address |
| 777 | + if not email_address: |
| 778 | + raise ValueError("Email address is required for ES polling") |
| 779 | + |
| 780 | + folder = asset.folder or MSGOFFICE365_DEFAULT_FOLDER |
| 781 | + folder_id = folder |
| 782 | + if asset.get_folder_id: |
| 783 | + resolved_id = helper.get_folder_id(folder, email_address) |
| 784 | + if resolved_id: |
| 785 | + folder_id = resolved_id |
| 786 | + |
| 787 | + is_first_run = state.get("es_first_run", True) |
| 788 | + max_emails = asset.first_run_max_emails if is_first_run else asset.max_containers |
| 789 | + last_time = state.get("es_last_time") |
| 790 | + |
| 791 | + endpoint = f"/users/{email_address}/mailFolders/{folder_id}/messages" |
| 792 | + select_fields = ",".join(MSGOFFICE365_SELECT_PARAMETER_LIST) |
| 793 | + api_params = { |
| 794 | + "$select": select_fields, |
| 795 | + "$top": str(min(max_emails, MSGOFFICE365_PER_PAGE_COUNT)), |
| 796 | + "$orderby": MSGOFFICE365_ORDERBY_RECEIVED_DESC |
| 797 | + if asset.ingest_manner == "latest first" |
| 798 | + else "receivedDateTime asc", |
| 799 | + } |
| 800 | + |
| 801 | + if last_time: |
| 802 | + api_params["$filter"] = f"receivedDateTime gt {last_time}" |
| 803 | + |
| 804 | + emails_processed = 0 |
| 805 | + latest_time = last_time |
| 806 | + |
| 807 | + while emails_processed < max_emails: |
| 808 | + resp = helper.make_rest_call_helper(endpoint, params=api_params) |
| 809 | + emails = resp.get("value", []) |
| 810 | + |
| 811 | + if not emails: |
| 812 | + break |
| 813 | + |
| 814 | + for email_data in emails: |
| 815 | + if emails_processed >= max_emails: |
| 816 | + break |
| 817 | + |
| 818 | + email_time = email_data.get("receivedDateTime") |
| 819 | + if email_time and (not latest_time or email_time > latest_time): |
| 820 | + latest_time = email_time |
| 821 | + |
| 822 | + email_id = email_data.get("id") |
| 823 | + subject = email_data.get("subject") or email_id |
| 824 | + |
| 825 | + attachments = [] |
| 826 | + try: |
| 827 | + eml_content = helper.make_rest_call_helper( |
| 828 | + f"/users/{email_address}/messages/{email_id}/$value", |
| 829 | + download=True, |
| 830 | + ) |
| 831 | + if eml_content: |
| 832 | + if isinstance(eml_content, str): |
| 833 | + eml_content = eml_content.encode("utf-8") |
| 834 | + attachments.append( |
| 835 | + FindingAttachment( |
| 836 | + file_name=f"{subject[:50]}.eml", |
| 837 | + data=eml_content, |
| 838 | + ) |
| 839 | + ) |
| 840 | + except Exception as e: |
| 841 | + logger.warning(f"Failed to fetch email EML: {e}") |
| 842 | + |
| 843 | + yield Finding( |
| 844 | + rule_title=f"Email: {subject[:100]}" |
| 845 | + if subject |
| 846 | + else f"Email ID: {email_id}", |
| 847 | + attachments=attachments if attachments else None, |
| 848 | + ) |
| 849 | + |
| 850 | + emails_processed += 1 |
| 851 | + |
| 852 | + next_link = resp.get("@odata.nextLink") |
| 853 | + if not next_link or emails_processed >= max_emails: |
| 854 | + break |
| 855 | + api_params = None |
| 856 | + resp = helper.make_rest_call_helper(endpoint, nextLink=next_link) |
| 857 | + |
| 858 | + if latest_time: |
| 859 | + state["es_last_time"] = latest_time |
| 860 | + state["es_first_run"] = False |
| 861 | + if hasattr(asset, "ingest_state"): |
| 862 | + asset.ingest_state.put_all(state) |
| 863 | + |
| 864 | + logger.info(f"Processed {emails_processed} emails for ES findings") |
| 865 | + |
| 866 | + |
766 | 867 | # Import action modules to register them with the app |
767 | 868 | from .actions import ( # noqa: F401 |
768 | 869 | block_sender, |
|
0 commit comments