Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 87 additions & 23 deletions pymisp/mispevent.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,28 +60,37 @@ def relationships(self) -> list[MISPRelationship]:

def add_note(self, note: str, language: str | None = None, **kwargs) -> MISPNote: # type: ignore[no-untyped-def]
the_note = MISPNote()
the_note.from_dict(note=note, language=language,
object_uuid=self.uuid, object_type=self.analyst_data_object_type,
**kwargs)
object_uuid = kwargs.pop('object_uuid', self.uuid)
object_type = kwargs.pop('object_type', self.analyst_data_object_type)
the_note.from_dict(
note=note, language=language, object_uuid=object_uuid,
object_type=object_type, contained=True, parent=self, **kwargs
)
self.notes.append(the_note)
self.edited = True
return the_note

def add_opinion(self, opinion: int, comment: str | None = None, **kwargs) -> MISPOpinion: # type: ignore[no-untyped-def]
the_opinion = MISPOpinion()
the_opinion.from_dict(opinion=opinion, comment=comment,
object_uuid=self.uuid, object_type=self.analyst_data_object_type,
**kwargs)
object_uuid = kwargs.pop('object_uuid', self.uuid)
object_type = kwargs.pop('object_type', self.analyst_data_object_type)
the_opinion.from_dict(
opinion=opinion, comment=comment, object_uuid=object_uuid,
object_type=object_type, contained=True, parent=self, **kwargs
)
self.opinions.append(the_opinion)
self.edited = True
return the_opinion

def add_relationship(self, related_object_type: AbstractMISP | str, related_object_uuid: str | None, relationship_type: str, **kwargs) -> MISPRelationship: # type: ignore[no-untyped-def]
the_relationship = MISPRelationship()
the_relationship.from_dict(related_object_type=related_object_type, related_object_uuid=related_object_uuid,
relationship_type=relationship_type,
object_uuid=self.uuid, object_type=self.analyst_data_object_type,
**kwargs)
the_relationship.from_dict(
related_object_type=related_object_type,
related_object_uuid=related_object_uuid,
relationship_type=relationship_type, object_uuid=self.uuid,
object_type=self.analyst_data_object_type, contained=True,
parent=self, **kwargs
)
self.relationships.append(the_relationship)
self.edited = True
return the_relationship
Expand All @@ -93,12 +102,8 @@ def from_dict(self, **kwargs) -> None: # type: ignore[no-untyped-def]
relationships = kwargs.pop('Relationship', [])
super().from_dict(**kwargs)
for note in notes:
note.pop('object_uuid', None)
note.pop('object_type', None)
self.add_note(**note)
for opinion in opinions:
opinion.pop('object_uuid', None)
opinion.pop('object_type', None)
self.add_opinion(**opinion)
for relationship in relationships:
relationship.pop('object_uuid', None)
Expand Down Expand Up @@ -2523,6 +2528,10 @@ class MISPAnalystData(AbstractMISP):
'Object', 'Note', 'Opinion', 'Relationship', 'Organisation',
'SharingGroup'}

@property
def analyst_data_object_type(self) -> str:
return self._analyst_data_object_type

@property
def org(self) -> MISPOrganisation:
return self.Org
Expand All @@ -2538,6 +2547,10 @@ def orgc(self, orgc: MISPOrganisation) -> None:
else:
raise PyMISPError('Orgc must be of type MISPOrganisation.')

@property
def parent(self) -> MISPAttribute | MISPEvent | MISPEventReport | MISPObject:
return self.__parent

def __new__(cls, *args, **kwargs):
if cls is MISPAnalystData:
raise TypeError(f"only children of '{cls.__name__}' may be instantiated")
Expand All @@ -2552,8 +2565,54 @@ def __init__(self, **kwargs: dict[str, Any]) -> None:
self.created: float | int | datetime
self.modified: float | int | datetime
self.SharingGroup: MISPSharingGroup
self._analyst_data_object_type: str # Must be defined in the child class

def add_note(self, note: str, language: str | None = None, object_uuid: str | None = None, object_type: str | None = None, parent: MISPEvent | MISPAttribute | MISPObject | MISPEventReport | None = None, **kwargs: dict[str, Any]) -> MISPNote:
misp_note = MISPNote()
if object_uuid is None:
object_uuid = self.uuid
if object_type is None:
object_type = self.analyst_data_object_type
if parent is None and hasattr(self, 'parent'):
parent = self.parent
misp_note.from_dict(
note=note, language=language, object_uuid=object_uuid,
object_type=object_type, parent=parent, contained=True, **kwargs
)
if parent is None:
if not hasattr(self, 'Note'):
self.Note: list[MISPNote] = []
self.Note.append(misp_note)
else:
self.parent.notes.append(misp_note)
self.edited = True
return misp_note

def add_opinion(self, opinion: int, comment: str | None = None, object_uuid: str | None = None, object_type: str | None = None, parent: MISPEvent | MISPAttribute | MISPObject | MISPEventReport | None = None, **kwargs: dict[str, Any]) -> MISPOpinion:
misp_opinion = MISPOpinion()
if object_uuid is None:
object_uuid = self.uuid
if object_type is None:
object_type = self.analyst_data_object_type
if parent is None and hasattr(self, 'parent'):
parent = self.parent
misp_opinion.from_dict(
opinion=opinion, comment=comment, object_uuid=object_uuid,
object_type=object_type, parent=parent, contained=True, **kwargs
)
if parent is None:
if not hasattr(self, 'Opinion'):
self.Opinion: list[MISPOpinion] = []
self.Opinion.append(misp_opinion)
else:
self.parent.opinions.append(misp_opinion)
self.edited = True
return misp_opinion

def from_dict(self, **kwargs) -> None: # type: ignore[no-untyped-def]
notes = kwargs.pop('Note', [])
opinions = kwargs.pop('Opinion', [])
self.__parent = kwargs.pop('parent', None)
self.distribution = kwargs.pop('distribution', None)
if self.distribution is not None:
self.distribution = int(self.distribution)
Expand Down Expand Up @@ -2607,14 +2666,19 @@ def from_dict(self, **kwargs) -> None: # type: ignore[no-untyped-def]

super().from_dict(**kwargs)

for note in notes:
self.add_note(**note)
for opinion in opinions:
self.add_opinion(**opinion)

def _set_default(self) -> None:
if not hasattr(self, 'created'):
self.created = datetime.timestamp(datetime.now())
if not hasattr(self, 'modified'):
self.modified = self.created


class MISPNote(AnalystDataBehaviorMixin, MISPAnalystData):
class MISPNote(MISPAnalystData):

_fields_for_feed: set[str] = MISPAnalystData._fields_for_feed.union({'note', 'language'})

Expand All @@ -2625,8 +2689,8 @@ def __init__(self, **kwargs: dict[str, Any]) -> None:
self.language: str
super().__init__(**kwargs)

def from_dict(self, **kwargs) -> None: # type: ignore[no-untyped-def]
if 'Note' in kwargs:
def from_dict(self, contained=False, **kwargs) -> None: # type: ignore[no-untyped-def]
if not contained and 'Note' in kwargs:
kwargs = kwargs['Note']
self.note = kwargs.pop('note', None)
if self.note is None:
Expand All @@ -2639,7 +2703,7 @@ def __repr__(self) -> str:
return f'<{self.__class__.__name__}(NotInitialized)'


class MISPOpinion(AnalystDataBehaviorMixin, MISPAnalystData):
class MISPOpinion(MISPAnalystData):

_fields_for_feed: set[str] = MISPAnalystData._fields_for_feed.union({'opinion', 'comment'})

Expand All @@ -2650,8 +2714,8 @@ def __init__(self, **kwargs: dict[str, Any]) -> None:
self.comment: str
super().__init__(**kwargs)

def from_dict(self, **kwargs) -> None: # type: ignore[no-untyped-def]
if 'Opinion' in kwargs:
def from_dict(self, contained=False, **kwargs) -> None: # type: ignore[no-untyped-def]
if not contained and 'Opinion' in kwargs:
kwargs = kwargs['Opinion']
self.opinion = kwargs.pop('opinion', None)
if self.opinion is not None:
Expand All @@ -2673,7 +2737,7 @@ def __repr__(self) -> str:
return f'<{self.__class__.__name__}(NotInitialized)'


class MISPRelationship(AnalystDataBehaviorMixin, MISPAnalystData):
class MISPRelationship(MISPAnalystData):

_fields_for_feed: set[str] = MISPAnalystData._fields_for_feed.union({'related_object_uuid', 'related_object_type', 'relationship_type'})

Expand All @@ -2685,8 +2749,8 @@ def __init__(self, **kwargs: dict[str, Any]) -> None:
self.relationship_type: str
super().__init__(**kwargs)

def from_dict(self, **kwargs) -> None: # type: ignore[no-untyped-def]
if 'Relationship' in kwargs:
def from_dict(self, contained=False, **kwargs) -> None: # type: ignore[no-untyped-def]
if not contained and 'Relationship' in kwargs:
kwargs = kwargs['Relationship']
self.related_object_type = kwargs.pop('related_object_type', None)
if self.related_object_type is None:
Expand Down
122 changes: 122 additions & 0 deletions tests/test_analyst_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
#!/usr/bin/env python

import unittest
from pymisp import (MISPAttribute, MISPEvent, MISPEventReport, MISPNote,
MISPObject, MISPOpinion)
from uuid import uuid4


class TestAnalystData(unittest.TestCase):
def setUp(self) -> None:
self.note_dict = {
"uuid": uuid4(),
"note": "note3"
}
self.opinion_dict = {
"uuid": uuid4(),
"opinion": 75,
"comment": "Agree"
}

def test_analyst_data_on_attribute(self) -> None:
attribute = MISPAttribute()
attribute.from_dict(type='filename', value='foo.exe')
self._attach_analyst_data(attribute)

def test_analyst_data_on_attribute_alternative(self) -> None:
event = MISPEvent()
event.info = 'Test on Attribute'
event.add_attribute('domain', 'foo.bar')
self._attach_analyst_data(event.attributes[0])

def test_analyst_data_on_event(self) -> None:
event = MISPEvent()
event.info = 'Test Event'
self._attach_analyst_data(event)

def test_analyst_data_on_event_report(self) -> None:
event_report = MISPEventReport()
event_report.from_dict(name='Test Report', content='This is a report')
self._attach_analyst_data(event_report)

def test_analyst_data_on_event_report_alternative(self) -> None:
event = MISPEvent()
event.info = 'Test on Event Report'
event.add_event_report('Test Report', 'This is a report')
self._attach_analyst_data(event.event_reports[0])

def test_analyst_data_on_object(self) -> None:
misp_object = MISPObject('file')
misp_object.add_attribute('filename', 'foo.exe')
self._attach_analyst_data(misp_object)

def test_analyst_data_on_object_alternative(self) -> None:
event = MISPEvent()
event.info = 'Test on Object'
misp_object = MISPObject('file')
misp_object.add_attribute('filename', 'foo.exe')
event.add_object(misp_object)
self._attach_analyst_data(event.objects[0])

def test_analyst_data_on_object_attribute(self) -> None:
misp_object = MISPObject('file')
object_attribute = misp_object.add_attribute('filename', 'foo.exe')
self._attach_analyst_data(object_attribute)

def test_analyst_data_object_object_attribute_alternative(self) -> None:
misp_object = MISPObject('file')
misp_object.add_attribute('filename', 'foo.exe')
self._attach_analyst_data(misp_object.attributes[0])

def _attach_analyst_data(
self, container: MISPAttribute | MISPEvent | MISPEventReport | MISPObject) -> None:
object_type = container._analyst_data_object_type
note1 = container.add_note(note='note1')
opinion1 = note1.add_opinion(opinion=25, comment='Disagree')
opinion2 = container.add_opinion(opinion=50, comment='Neutral')
note2 = opinion2.add_note(note='note2')

dict_note = MISPNote()
dict_note.from_dict(
object_type=object_type, object_uuid=container.uuid, **self.note_dict
)
note3 = container.add_note(**dict_note)
dict_opinion = MISPOpinion()
dict_opinion.from_dict(
object_type='Note', object_uuid=note3.uuid, **self.opinion_dict
)
container.add_opinion(**dict_opinion)

self.assertEqual(len(container.notes), 3)
self.assertEqual(len(container.opinions), 3)

misp_note1, misp_note2, misp_note3 = container.notes
misp_opinion1, misp_opinion2, misp_opinion3 = container.opinions

self.assertEqual(misp_note1.object_type, object_type)
self.assertEqual(misp_note1.object_uuid, container.uuid)
self.assertEqual(misp_note1.note, 'note1')

self.assertEqual(misp_note2.object_type, 'Opinion')
self.assertEqual(misp_note2.object_uuid, opinion2.uuid)
self.assertEqual(misp_note2.note, 'note2')

self.assertEqual(misp_note3.object_type, object_type)
self.assertEqual(misp_note3.object_uuid, container.uuid)
self.assertEqual(misp_note3.note, 'note3')

self.assertEqual(misp_opinion1.object_type, 'Note')
self.assertEqual(misp_opinion1.object_uuid, note1.uuid)
self.assertEqual(misp_opinion1.opinion, 25)
self.assertEqual(misp_opinion1.comment, 'Disagree')

self.assertEqual(misp_opinion2.object_type, object_type)
self.assertEqual(misp_opinion2.object_uuid, container.uuid)
self.assertEqual(misp_opinion2.opinion, 50)
self.assertEqual(misp_opinion2.comment, 'Neutral')

self.assertEqual(misp_opinion3.object_type, 'Note')
self.assertEqual(misp_opinion3.object_uuid, note3.uuid)
self.assertEqual(misp_opinion3.opinion, 75)
self.assertEqual(misp_opinion3.comment, 'Agree')

Loading