-
Notifications
You must be signed in to change notification settings - Fork 314
Expand file tree
/
Copy pathurl_extract_investigation.py
More file actions
88 lines (69 loc) · 3.09 KB
/
url_extract_investigation.py
File metadata and controls
88 lines (69 loc) · 3.09 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
import logging
from datetime import timedelta
import json
import httpx
from core import taskmanager
from core.config.config import yeti_config
from core.schemas import observable, task
from core.schemas.entities import investigation
AGENT_HTTP_BASE = yeti_config.get("agents", "http_root")
AGENT_STREAM_ENDPOINT = f"{AGENT_HTTP_BASE}/run_stream?agent_name=ioc_analyzer"
FILTER_TAG = "extract_investigation"
class UrlExtractInvestigation(task.AnalyticsTask):
_defaults = {
"name": "UrlExtractInvestigation",
"description": f"Extracts investigation details (summaries, IOCs, etc.) from URLs tagged with '{FILTER_TAG}' using LLMs",
"frequency": timedelta(hours=1),
}
def run(self):
urls, _ = observable.Observable.filter(
query_args={"tags.name": FILTER_TAG, "type": "url"}
)
with httpx.Client(timeout=120.0) as client:
for url_obs in urls:
self.process_url(client, AGENT_STREAM_ENDPOINT, url_obs)
def process_url(
self, client: httpx.Client, endpoint: str, url_obs: observable.Observable
):
payload = {
"user_id": "analytics_task",
"session_id": f"extract_investigation_{url_obs.id}",
"text": f"Analyze {url_obs.value} as per your instructions.",
}
try:
last_response = ""
with client.stream("POST", endpoint, json=payload) as response:
response.raise_for_status()
for chunk in response.iter_text():
print(chunk)
parsed_event = json.loads(chunk[6:].strip())
for part in parsed_event["content"]["parts"]:
if "text" in part and not part.get("thought", False):
last_response = part["text"]
parsed_report = json.loads(last_response)
self.process_report(parsed_report, source=url_obs)
# Tag as processed and remove the original tag
url_obs.expire_tag(FILTER_TAG)
except httpx.HTTPError as e:
logging.exception(f"HTTP Error processing URL {url_obs.value} with Agent")
logging.debug(last_response)
except Exception as e:
logging.exception(f"Error processing URL {url_obs.value} with Agent")
logging.debug(last_response)
def process_report(self, report, source: observable.Url):
report_entity = investigation.Investigation(
name=report["title"],
description=report["summary"],
reference=source.value,
).save()
report_entity.link_to(source, "related_to", "source_url")
for ioc in report["iocs"]:
obs = observable.save(value=ioc["value"])
obs.add_context(
source=self.name, context={"description": ioc["description"]}
)
report_entity.link_to(obs, "contains", ioc["description"])
logging.info(
f"Created investigation: {report_entity.id} with {len(report['iocs'])} IOCs"
)
taskmanager.TaskManager.register_task(UrlExtractInvestigation)