Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions .github/workflows/e2e.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
name: e2e tests

on: [pull_request]

jobs:
e2e:
runs-on: ubuntu-latest
env:
YETI_ENDPOINT: "http://localhost:80"
strategy:
matrix:
os: [ubuntu-latest]
python-version: ["3.10"]
steps:
- uses: actions/checkout@v4

- name: Install dependencies
run: |
sudo apt-get update
sudo apt-get install -y python3-pip git
sudo pip3 install poetry
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python-version }}
cache: poetry

- name: Install a Yeti prod deployment
run: |
git clone https://github.com/yeti-platform/yeti-docker
cd yeti-docker/prod
./init.sh
sleep 10
- name: Create test Yeti user
run: |
cd yeti-docker/prod
echo "YETI_API_KEY=$(docker compose run --rm api create-user test test --admin | awk -F'test:' '{print $2}')" >> $GITHUB_ENV
- name: Install Python dependencies
run: poetry install --no-root

- name: e2e testing
run: |
YETI_API_KEY=$YETI_API_KEY poetry run python -m unittest tests/e2e.py
2 changes: 1 addition & 1 deletion .github/workflows/unittests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,4 @@ jobs:
run: poetry install --no-root
- name: Test with unittest
run: |
poetry run python -m unittest discover -s tests/ -p '*.py'
poetry run python -m unittest tests/api.py
88 changes: 88 additions & 0 deletions tests/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,94 @@ def test_get_yara_bundle_with_overlays(self, mock_post):
},
)

@patch("yeti.api.requests.Session.get")
def test_find_indicator(self, mock_get):
mock_response = MagicMock()
mock_response.content = b'{"id": "found_indicator"}'
mock_get.return_value = mock_response

result = self.api.find_indicator(name="test_indicator", type="indicator.test")
self.assertEqual(result, {"id": "found_indicator"})
mock_get.assert_called_with(
"http://fake-url/api/v2/indicators/?name=test_indicator&type=indicator.test",
)

# Test 404 case
mock_exception_with_status_code = requests.exceptions.HTTPError()
mock_exception_with_status_code.response = MagicMock()
mock_exception_with_status_code.response.status_code = 404
mock_response.raise_for_status.side_effect = mock_exception_with_status_code
mock_get.return_value = mock_response

result = self.api.find_indicator(name="not_found", type="indicator.test")
self.assertIsNone(result)

@patch("yeti.api.requests.Session.get")
def test_find_entity(self, mock_get):
mock_response = MagicMock()
mock_response.content = b'{"id": "found_entity"}'
mock_get.return_value = mock_response

result = self.api.find_entity(name="test_entity", type="entity.test")
self.assertEqual(result, {"id": "found_entity"})
mock_get.assert_called_with(
"http://fake-url/api/v2/entities/?name=test_entity&type=entity.test",
)

# Test 404 case
mock_exception_with_status_code = requests.exceptions.HTTPError()
mock_exception_with_status_code.response = MagicMock()
mock_exception_with_status_code.response.status_code = 404
mock_response.raise_for_status.side_effect = mock_exception_with_status_code
mock_get.return_value = mock_response

result = self.api.find_entity(name="not_found", type="entity.test")
self.assertIsNone(result)

@patch("yeti.api.requests.Session.get")
def test_find_observable(self, mock_get):
mock_response = MagicMock()
mock_response.content = b'{"id": "found_observable"}'
mock_get.return_value = mock_response

result = self.api.find_observable(value="test_value", type="observable.test")
self.assertEqual(result, {"id": "found_observable"})
mock_get.assert_called_with(
"http://fake-url/api/v2/observables/?value=test_value&type=observable.test",
)

# Test 404 case
mock_exception_with_status_code = requests.exceptions.HTTPError()
mock_exception_with_status_code.response = MagicMock()
mock_exception_with_status_code.response.status_code = 404
mock_response.raise_for_status.side_effect = mock_exception_with_status_code
mock_get.return_value = mock_response

result = self.api.find_observable(value="not_found", type="observable.test")
self.assertIsNone(result)

@patch("yeti.api.requests.Session.get")
def test_find_dfiq(self, mock_get):
mock_response = MagicMock()
mock_response.content = b'{"id": "found_dfiq"}'
mock_get.return_value = mock_response

result = self.api.find_dfiq(name="test_dfiq", dfiq_type="scenario")
self.assertEqual(result, {"id": "found_dfiq"})
mock_get.assert_called_with(
"http://fake-url/api/v2/dfiq/?name=test_dfiq&type=scenario",
)

# Test 404 case
mock_exception_with_status_code = requests.exceptions.HTTPError()
mock_exception_with_status_code.response = MagicMock()
mock_exception_with_status_code.response.status_code = 404
mock_response.raise_for_status.side_effect = mock_exception_with_status_code
mock_get.return_value = mock_response

result = self.api.find_dfiq(name="not_found", dfiq_type="scenario")
self.assertIsNone(result)


if __name__ == "__main__":
unittest.main()
83 changes: 83 additions & 0 deletions tests/e2e.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
import os
import time
import unittest
from unittest.mock import MagicMock, patch

import requests

from yeti import errors
from yeti.api import YetiApi


class YetiEndToEndTest(unittest.TestCase):
def setUp(self):
self.api = YetiApi(os.getenv("YETI_ENDPOINT"))

def test_auth_api_key(self):
self.api.auth_api_key(os.getenv("YETI_API_KEY"))
self.api.search_indicators(name="test")

def test_no_auth(self):
with self.assertRaises(errors.YetiAuthError) as error:
self.api.search_indicators(name="test")
self.assertIn(
"401 Client Error: Unauthorized for url: ",
str(error.exception),
)

def test_new_indicator(self):
self.api.auth_api_key(os.getenv("YETI_API_KEY"))
indicator = self.api.new_indicator(
{
"name": "test",
"type": "regex",
"description": "test",
"pattern": "test[0-9]",
"diamond": "victim",
}
)
self.assertEqual(indicator["name"], "test")
self.assertRegex(indicator["id"], r"[0-9]+")

def test_auth_refresh(self):
self.api.auth_api_key(os.getenv("YETI_API_KEY"))
self.api.search_indicators(name="test")

time.sleep(3)

self.api.search_indicators(name="test")

def test_search_indicators(self):
self.api.auth_api_key(os.getenv("YETI_API_KEY"))
self.api.auth_api_key(os.getenv("YETI_API_KEY"))
self.api.new_indicator(
{
"name": "testSearch",
"type": "regex",
"description": "test",
"pattern": "test[0-9]",
"diamond": "victim",
}
)
time.sleep(5)
result = self.api.search_indicators(name="testSear")
self.assertEqual(len(result), 1, result)
self.assertEqual(result[0]["name"], "testSearch")

def test_find_indicator(self):
self.api.auth_api_key(os.getenv("YETI_API_KEY"))
self.api.auth_api_key(os.getenv("YETI_API_KEY"))
self.api.new_indicator(
{
"name": "testGet",
"type": "regex",
"description": "test",
"pattern": "test[0-9]",
"diamond": "victim",
}
)
time.sleep(5)
indicator = self.api.find_indicator(name="testGet", type="regex")

self.assertEqual(indicator["name"], "testGet")
self.assertEqual(indicator["pattern"], "test[0-9]")
97 changes: 95 additions & 2 deletions yeti/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import json
import logging
import urllib.parse
from typing import Any, Sequence

import requests
Expand Down Expand Up @@ -63,16 +64,18 @@ def do_request(
body: bytes | None = None,
headers: dict[str, Any] | None = None,
retries: int = 3,
params: dict[str, Any] | None = None,
) -> bytes:
"""Issues a request to the given URL.

Args:
method: The HTTP method to use.
url: The URL to issue the request to.
json: The JSON payload to include in the request.
json_data: The JSON payload to include in the request.
body: The body to include in the request.
headers: Extra headers to include in the request.
retries: The number of times to retry the request.
params: The query parameters to include in the request.

Returns:
The response from the API; a bytes object.
Expand All @@ -90,6 +93,8 @@ def do_request(
request_kwargs["json"] = json_data
if body:
request_kwargs["body"] = body
if params:
url = f"{url}?{urllib.parse.urlencode(params)}"

try:
if method == "POST":
Expand Down Expand Up @@ -145,6 +150,28 @@ def refresh_auth(self):
else:
logger.warning("No auth function set, cannot refresh auth.")

def find_indicator(self, name: str, type: str) -> YetiObject | None:
"""Finds an indicator in Yeti by name and type.

Args:
name: The name of the indicator to find.
type: The type of the indicator to find.

Returns:
The response from the API; a dict representing the indicator.
"""
try:
response = self.do_request(
"GET",
f"{self._url_root}/api/v2/indicators/",
params={"name": name, "type": type},
)
except errors.YetiApiError as e:
if e.status_code == 404:
return None
raise
return json.loads(response)

def search_indicators(
self,
name: str | None = None,
Expand All @@ -163,7 +190,7 @@ def search_indicators(
tags: The tags of the indicator to search for.

Returns:
The response from the API; a dict representing the indicator.
The response from the API; a list of dicts representing indicators.
"""

if not any([name, indicator_type, pattern, tags]):
Expand All @@ -188,6 +215,28 @@ def search_indicators(
)
return json.loads(response)["indicators"]

def find_entity(self, name: str, type: str) -> YetiObject | None:
"""Finds an entity in Yeti by name.

Args:
name: The name of the entity to find.
type: The type of the entity to find.

Returns:
The response from the API; a dict representing the entity.
"""
try:
response = self.do_request(
"GET",
f"{self._url_root}/api/v2/entities/",
params={"name": name, "type": type},
)
except errors.YetiApiError as e:
if e.status_code == 404:
return None
raise
return json.loads(response)

def search_entities(self, name: str) -> list[YetiObject]:
params = {"query": {"name": name}, "count": 0}
response = self.do_request(
Expand All @@ -197,6 +246,28 @@ def search_entities(self, name: str) -> list[YetiObject]:
)
return json.loads(response)["entities"]

def find_observable(self, value: str, type: str) -> YetiObject | None:
"""Finds an observable in Yeti by value and type.

Args:
value: The value of the observable to find.
type: The type of the observable to find.

Returns:
The response from the API; a dict representing the observable.
"""
try:
response = self.do_request(
"GET",
f"{self._url_root}/api/v2/observables/",
params={"value": value, "type": type},
)
except errors.YetiApiError as e:
if e.status_code == 404:
return None
raise
return json.loads(response)

def search_observables(self, value: str) -> list[YetiObject]:
"""Searches for an observable in Yeti.

Expand Down Expand Up @@ -330,6 +401,28 @@ def get_yara_bundle_with_overlays(

return json.loads(result)

def find_dfiq(self, name: str, dfiq_type: str) -> YetiObject | None:
"""Finds a DFIQ in Yeti by name and type.

Args:
name: The name of the DFIQ to find.
dfiq_type: The type of the DFIQ to find.

Returns:
The response from the API; a dict representing the DFIQ object.
"""
try:
response = self.do_request(
"GET",
f"{self._url_root}/api/v2/dfiq/",
params={"name": name, "type": dfiq_type},
)
except errors.YetiApiError as e:
if e.status_code == 404:
return None
raise
return json.loads(response)

def search_dfiq(self, name: str, dfiq_type: str | None = None) -> list[YetiObject]:
"""Searches for a DFIQ in Yeti.

Expand Down