Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 27 additions & 4 deletions examples/feed-generator/generate.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,17 @@
import json
import os
from pymisp import ExpandedPyMISP
from settings import url, key, ssl, outputdir, filters, valid_attribute_distribution_levels
from settings import url, key, ssl, outputdir, filters, valid_attribute_distribution_levels, with_signatures
try:
from settings import with_distribution
except ImportError:
with_distribution = False

try:
from settings import with_signatures
except ImportError:
with_signatures = False

try:
from settings import with_local_tags
except ImportError:
Expand Down Expand Up @@ -39,14 +44,31 @@ def init():
return ExpandedPyMISP(url, key, ssl)


def saveEvent(event):
def saveEvent(event, misp):
stringified_event = json.dumps(event, indent=2)
if with_signatures and event['Event'].get('protected'):
signature = getSignature(stringified_event, misp)
try:
with open(os.path.join(outputdir, f'{event["Event"]["uuid"]}.asc'), 'w') as f:
f.write(signature)
except Exception as e:
print(e)
sys.exit('Could not create the event signature dump.')

try:
with open(os.path.join(outputdir, f'{event["Event"]["uuid"]}.json'), 'w') as f:
json.dump(event, f, indent=2)
f.write(stringified_event)
except Exception as e:
print(e)
sys.exit('Could not create the event dump.')

def getSignature(stringified_event, misp):
try:
signature = misp.direct_call('/cryptographicKeys/serverSign', stringified_event)
return signature
except Exception as e:
print(e)
sys.exit('Could not get the signature for the event from the MISP instance. Perhaps the user does not have the necessary permissions.')

def saveHashes(hashes):
try:
Expand Down Expand Up @@ -79,6 +101,7 @@ def saveManifest(manifest):
sys.exit("No events returned.")
manifest = {}
hashes = []
signatures = []
counter = 1
total = len(events)
for event in events:
Expand All @@ -97,7 +120,7 @@ def saveManifest(manifest):
continue
hashes += [[h, e.uuid] for h in e_feed['Event'].pop('_hashes')]
manifest.update(e_feed['Event'].pop('_manifest'))
saveEvent(e_feed)
saveEvent(e_feed, misp)
print("Event " + str(counter) + "/" + str(total) + " exported.")
counter += 1
saveManifest(manifest)
Expand Down
3 changes: 3 additions & 0 deletions examples/feed-generator/settings.default.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,3 +54,6 @@

# Include the exportable local tags along with the global tags. The default is True.
with_local_tags = True

# Include signatures for protected events. This will allow MISP to ingest and update protected events from the feed. It requires perm_server_sign to be set to true in the user's role on MISP's side.
with_signatures = False
34 changes: 32 additions & 2 deletions pymisp/mispevent.py
Original file line number Diff line number Diff line change
Expand Up @@ -1559,7 +1559,8 @@ def __repr__(self) -> str:
class MISPEvent(AnalystDataBehaviorMixin):

_fields_for_feed: set[str] = {'uuid', 'info', 'threat_level_id', 'analysis', 'timestamp',
'publish_timestamp', 'published', 'date', 'extends_uuid'}
'publish_timestamp', 'published', 'date', 'extends_uuid',
'protected'}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a note: we need to add in def _set_default an entry self.protected = False, to keep consistency with the other required fields.

I'm assuming protected set to false par default is the expected value.


_analyst_data_object_type = 'Event'

Expand All @@ -1581,6 +1582,7 @@ def __init__(self, describe_types: dict[str, Any] | None = None, strict_validati
self.EventReport: list[MISPEventReport] = []
self.Tag: list[MISPTag] = []
self.Galaxy: list[MISPGalaxy] = []
self.CryptographicKey: list[MISPCryptographicKey] = []

self.publish_timestamp: float | int | datetime
self.timestamp: float | int | datetime
Expand Down Expand Up @@ -1649,13 +1651,14 @@ def attributes_hashes(self, algorithm: str = 'sha512') -> list[str]:
to_return += attribute.hash_values(algorithm)
return to_return

def to_feed(self, valid_distributions: list[int] = [0, 1, 2, 3, 4, 5], with_meta: bool = False, with_distribution: bool=False, with_local_tags: bool = True, with_event_reports: bool = True) -> dict[str, Any]:
def to_feed(self, valid_distributions: list[int] = [0, 1, 2, 3, 4, 5], with_meta: bool = False, with_distribution: bool=False, with_local_tags: bool = True, with_event_reports: bool = True, with_cryptographic_keys: bool = True) -> dict[str, Any]:
""" Generate a json output for MISP Feed.

:param valid_distributions: only makes sense if the distribution key is set; i.e., the event is exported from a MISP instance.
:param with_distribution: exports distribution and Sharing Group info; otherwise all SharingGroup information is discarded (protecting privacy)
:param with_local_tags: tag export includes local exportable tags along with global exportable tags
:param with_event_reports: include event reports in the returned MISP event
:param with_cryptographic_keys: include the associated cryptographic keys in the returned protected MISP event
"""
required = ['info', 'Orgc']
for r in required:
Expand Down Expand Up @@ -1719,6 +1722,13 @@ def to_feed(self, valid_distributions: list[int] = [0, 1, 2, 3, 4, 5], with_meta
event_report.pop('SharingGroup', None)
event_report.pop('sharing_group_id', None)
to_return['EventReport'].append(event_report.to_dict())

if with_cryptographic_keys and self.cryptographic_keys:
to_return['CryptographicKey'] = []
for cryptographic_key in self.cryptographic_keys:
cryptographic_key.pop('parent_id', None)
cryptographic_key.pop('id', None)
to_return['CryptographicKey'].append(cryptographic_key.to_dict())

return {'Event': to_return}

Expand Down Expand Up @@ -1755,6 +1765,10 @@ def attributes(self, attributes: list[MISPAttribute]) -> None:
@property
def event_reports(self) -> list[MISPEventReport]:
return self.EventReport

@property
def cryptographic_keys(self) -> list[MISPCryptographicKey]:
return self.CryptographicKey

@property
def shadow_attributes(self) -> list[MISPShadowAttribute]:
Expand Down Expand Up @@ -1891,6 +1905,8 @@ def from_dict(self, **kwargs) -> None: # type: ignore[no-untyped-def]
[self.add_galaxy(**e) for e in kwargs.pop('Galaxy')]
if kwargs.get('EventReport'):
[self.add_event_report(**e) for e in kwargs.pop('EventReport')]
if kwargs.get('CryptographicKey'):
[self.add_cryprographic_key(**e) for e in kwargs.pop('CryptographicKey')]

# All other keys
if kwargs.get('id'):
Expand Down Expand Up @@ -2041,6 +2057,15 @@ def add_event_report(self, name: str, content: str, **kwargs) -> MISPEventReport
self.edited = True
return event_report

def add_cryprographic_key(self, parent_type: str, key_data: str, type: str, uuid: str, fingerprint: str, timestamp: str, **kwargs) -> MISPCryptographicKey: # type: ignore[no-untyped-def]
"""Add a Cryptographic Key. parent_type, key_data, type, uuid, fingerprint, timestamp are required but you can pass all
other parameters supported by MISPEventReport"""
cryptographic_key = MISPCryptographicKey()
cryptographic_key.from_dict(parent_type=parent_type, key_data=key_data, type=type, uuid=uuid, fingerprint=fingerprint, timestamp=timestamp, **kwargs)
self.cryptographic_keys.append(cryptographic_key)
self.edited = True
return cryptographic_key

def add_galaxy(self, galaxy: MISPGalaxy | dict[str, Any] | None = None, **kwargs) -> MISPGalaxy: # type: ignore[no-untyped-def]
"""Add a galaxy and sub-clusters into an event, either by passing
a MISPGalaxy or a dictionary.
Expand Down Expand Up @@ -2225,6 +2250,11 @@ def from_dict(self, **kwargs) -> None: # type: ignore[no-untyped-def]
kwargs = kwargs['Warninglist']
super().from_dict(**kwargs)

class MISPCryptographicKey(AbstractMISP):
def from_dict(self, **kwargs) -> None: # type: ignore[no-untyped-def]
if 'CryptographicKey' in kwargs:
kwargs = kwargs['CryptographicKey']
super().from_dict(**kwargs)

class MISPTaxonomy(AbstractMISP):

Expand Down
Loading