Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions tests/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,38 @@ def test_tag_object(self, mock_post):
json={"tags": ["tag1"], "ids": ["1"]},
)

@patch("yeti.api.requests.Session.post")
def test_new_tag(self, mock_post):
mock_response = MagicMock()
mock_response.content = b'{"name": "testtag"}'
mock_post.return_value = mock_response

result = self.api.new_tag("testtag")
self.assertEqual(result, {"name": "testtag"})
mock_post.assert_called_with(
"http://fake-url/api/v2/tags/",
json={"name": "testtag"},
)

result = self.api.new_tag("wdesc", description="desc")
mock_post.assert_called_with(
"http://fake-url/api/v2/tags/",
json={"name": "wdesc", "description": "desc"},
)

@patch("yeti.api.requests.Session.post")
def test_search_tags(self, mock_post):
mock_response = MagicMock()
mock_response.content = b'{"tags": [{"name": "tag1"}]}'
mock_post.return_value = mock_response

result = self.api.search_tags("tag1")
self.assertEqual(result, [{"name": "tag1"}])
mock_post.assert_called_with(
"http://fake-url/api/v2/tags/search",
json={"name": "tag1", "count": 100, "page": 0},
)

@patch("yeti.api.requests.Session.post")
def test_link_objects(self, mock_post):
mock_response = MagicMock()
Expand Down
23 changes: 18 additions & 5 deletions tests/e2e.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
import os
import time
import unittest
from unittest.mock import MagicMock, patch

import requests

from yeti import errors
from yeti.api import YetiApi
Expand Down Expand Up @@ -85,7 +82,7 @@ def test_find_indicator(self):
self.assertEqual(indicator["name"], "testGet")
self.assertEqual(indicator["pattern"], "test[0-9]")
self.assertEqual(indicator["tags"][0]["name"], "testtag")

def test_link_objects(self):
self.api.auth_api_key(os.getenv("YETI_API_KEY"))
indicator = self.api.new_indicator(
Expand Down Expand Up @@ -120,4 +117,20 @@ def test_link_objects(self):
self.assertEqual(len(neighbors["vertices"]), 1)
self.assertEqual(
neighbors["vertices"][f'entities/{malware["id"]}']["name"], "testMalware"
)
)

def test_new_tag(self):
self.api.auth_api_key(os.getenv("YETI_API_KEY"))
tag = self.api.new_tag("testTag", description="test")
self.assertEqual(tag["name"], "testTag")
self.assertEqual(tag["description"], "test")

def test_search_tags(self):
self.api.auth_api_key(os.getenv("YETI_API_KEY"))
self.api.new_tag("testSearchTag", description="testDesc")
time.sleep(5)

tags = self.api.search_tags(name="testSearch")
self.assertEqual(len(tags), 1)
self.assertEqual(tags[0]["name"], "testSearchTag")
self.assertEqual(tags[0]["description"], "testDesc")
37 changes: 37 additions & 0 deletions yeti/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -739,6 +739,43 @@ def tag_object(
)
return json.loads(response)

def new_tag(self, name: str, description: str | None = None) -> dict[str, Any]:
"""Creates a new tag in Yeti.

Args:
name: The name of the tag to create.
description: An optional description for the tag.

Returns:
The response from the API; a dict representing the tag.
"""
params = {"name": name}
if description:
params["description"] = description
response = self.do_request(
"POST", f"{self._url_root}/api/v2/tags/", json_data=params
)
return json.loads(response)

def search_tags(self, name: str, count: int = 100, page: int = 0):
"""Searches for tags in Yeti.

Returns tag information based on a substring match of the tag name.

Args:
name: The name of the tag to search for (substring match).
count: The number of results to return (default is 100).
page: The page of results to return (default is 0, which means the first page).

Returns:
The response from the API; a list of dicts representing tags.
"""
params = {"name": name, "count": count, "page": page}
response = self.do_request(
"POST", f"{self._url_root}/api/v2/tags/search", json_data=params
)
return json.loads(response)["tags"]

def link_objects(
self,
source: YetiObject,
Expand Down