Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
174 changes: 136 additions & 38 deletions classes/DetectionRuleConverter.py
Original file line number Diff line number Diff line change
@@ -1,46 +1,103 @@
import subprocess
import re
from subprocess import DEVNULL


class DetectionRuleConverter(object):

@staticmethod
def convertSigmaRule(sigma_config_path, rule_path):
command = ['sigmac -t splunk -c ' + sigma_config_path + ' ' + rule_path]
def convertSigmaRule(sigma_path, sigma_config_path, rule_path):
command = [sigma_path + ' -t splunk ' + sigma_config_path + ' ' + rule_path]
sigma_search = subprocess.run(
command, shell=True, stdout=subprocess.PIPE, stderr=DEVNULL, universal_newlines=True)
sigma_search_output = sigma_search.stdout
commandfieldlist = [sigma_path + ' -t fieldlist ' + sigma_config_path + ' ' + rule_path]
sigma_search_fieldlist = subprocess.run(
commandfieldlist, shell=True, stdout=subprocess.PIPE, stderr=DEVNULL, universal_newlines=True)
sigma_search_fields_output = sigma_search_fieldlist.stdout

if sigma_search.returncode != 0:
print("# Failure converting the Sigma File: " + rule_path)
return "Converter Failure"
else:
return sigma_search_output
else:
sigma_search_dict = {'search': '', 'fields': []}
length = len(sigma_search_output.splitlines())
if length == 1:
sigma_search_dict['search'] = sigma_search_output
elif length > 1:
search = sigma_search_output.splitlines()[0]
table = ""
countCmd = search.count("|")
if countCmd > 1:
sigma_search_multiline_output = search
elif countCmd==1 and "| table" in search:
tableindex = search.find('| table')
sigma_search_multiline_output = search[:tableindex]
table = search[tableindex:]
else:
sigma_search_multiline_output = " ( "+search+" ) "
for i in range(1, length):
search = sigma_search_output.splitlines()[i]
countCmd = search.count("|")
if countCmd > 1:
sigma_search_multiline_output = sigma_search_multiline_output + " | append [ search " + search + " ] "
elif countCmd==1 and "| table" in search:
tableindex = search.find('| table')
sigma_search_multiline_output = sigma_search_multiline_output + " OR ( " + search[:tableindex] + " ) "
table += search[tableindex:]
else:
sigma_search_multiline_output = sigma_search_multiline_output + " OR ( " + search + " ) "
if table:
outputTable=[]
for field in table.replace("| table ",",").split(","):
if field and field not in outputTable:
outputTable.append(field)
sigma_search_multiline_output += "| table "+' '.join(outputTable)
sigma_search_dict['search'] = sigma_search_multiline_output
if sigma_search_fields_output:
sigma_search_dict['fields'] = sigma_search_fields_output.splitlines()
return sigma_search_dict

@staticmethod
def addSearchFilter(initial_filter, search, sigma_rule):
search = initial_filter + " " + search
if '| append [ search ' in search:
search = search.replace('| append [ search ','| append [ search ' + initial_filter)
search = search.replace("%RULE_NAME%",sigma_rule["title"])
return search

@staticmethod
def addToSummaryIndex(search, sigma2splunkalertconfig, sigma_rule):
if "summary_index" in sigma2splunkalertconfig["alert_action"]:
if "tags" in sigma_rule:
summaryindexconfig = sigma2splunkalertconfig["alert_action"]["summary_index"]
search = search[:-1] + ' | collect index=' + \
summaryindexconfig["name"] + ' '
if ("enrich_tags" in summaryindexconfig) or ("enrich_level" in summaryindexconfig):
search = search + 'marker="'
if "enrich_tags" in summaryindexconfig:
for tag in sigma_rule["tags"]:
search = search + "sigma_tag=" + tag + ","
summaryindexconfig = sigma2splunkalertconfig["alert_action"]["summary_index"]
search = search[:-1] + ' | rename _time as orig_time, rawHash as orig_rawHash, index as orig_index, sourcetype as orig_sourcetype, host as orig_host, count as orig_count | search NOT([| search `sigma_matches` source="'+sigma_rule["title"]+'" | stats count by orig_time orig_time_end orig_rawHash | fields orig_time orig_time_end orig_rawHash ]) | addinfo | rename info_search_time as _time | fields - info_* | collect index=' + \
summaryindexconfig["name"]
if ("enrich_tags" in summaryindexconfig) or ("enrich_level" in summaryindexconfig):
search = search + ' marker="'
if "enrich_tags" in summaryindexconfig:
if "tags" in sigma_rule:
for tag in sigma_rule["tags"]:
if "attack.t" in tag:
search = search + "attack_ID=" + tag[8:] + ","
if "enrich_level" in summaryindexconfig:
if re.match('attack.t[0-9]+$',tag):
search = search + "attack_ID=" + tag[7:] + ","
elif re.match('attack.g[0-9]+$',tag):
search = search + "attack_group_id=" + tag[7:] + ","
elif re.match('attack.[a-z_]+$',tag):
search = search + "attack_tactics=" + tag[7:].replace("_","-") + ","
else:
search = search + "sigma_tag='" + tag + "',"
if "enrich_level" in summaryindexconfig:
if "level" in sigma_rule:
search = search + "level=" + sigma_rule["level"]
if search[-1:] == ",":
search = search[:-1]
search = search + '"'
else:
search = search + "level=low"
print("# Warning Sigma Rule: " + sigma_rule["title"] + " no level found default to low")
if search[-1:] == ",":
search = search[:-1]
search = search + '"'
return search

@staticmethod
def performSearchTransformation(transformations, search, sigma_rule):
def performSearchTransformation(transformations, search, fields, sigma_rule):
for trans in transformations:

# Search Transformation to add whitelist in front of table or transforming command (for better whitelisting)
Expand Down Expand Up @@ -69,26 +126,67 @@ def performSearchTransformation(transformations, search, sigma_rule):
file_name = file_name.replace("(", "")
file_name = file_name.replace(")", "")
search = search[:-1] + " | search NOT [| inputlookup " + file_name + "] "

if trans == "add_table" and not "| table" in search:
search = search[:-1] + " | table "


# Search Transformation to add host field
if trans == "add_host_field":
if '| table' in search:
search = search[:-1] + ",host "

# Search Transformation to add source field
if trans == "add_source_field":
if '| table' in search:
search = search[:-1] + ",source "

# Search Transformation to add sourcetype field
if trans == "add_sourcetype_field":
if '| table' in search:
search = search[:-1] + ",sourcetype "

# Search Transformation to add transforming_command
# Search Tranformation to add fields
if re.match(r"add_\w+_field", trans):
findTables = re.findall(r" \|\s+table\s+[^\|\]]*", search)
findTablesIdx = [(m.start(0), m.end(0)) for m in re.finditer(r" \|\s+table\s+[^\|\]]*", search)]
newTables = []
for table in findTables:
if table:
newTable = table.rstrip("\n\r")
if trans == "add_time_field" and not re.match('.*[, ]_time[, ].*',table):
newTable += ' _time'
if trans == "add_host_field" and not re.match('.*[, ]host[, ].*',table):
newTable += ' host'
if trans == "add_source_field" and not re.match('.*[, ]source[, ].*',table):
newTable += ' source'
if trans == "add_sourcetype_field" and not re.match('.*[, ]sourcetype[, ].*',table):
newTable += ' sourcetype'
if trans == "add_index_field" and not re.match('.*[, ]index[, ].*',table):
newTable += ' index'
if trans == "add_rawHash_field" and not re.match('.*[, ]rawHash[, ].*',table):
newTable += ' rawHash'
if fields and trans == "add_FIELDLIST_field":
for field in fields:
if not re.match('.*[, ]'+field+'[, ].*',table):
newTable += ' '+field
newTables.append(newTable)
updatedSearch = ""
offset = 0
for i in range(len(findTablesIdx)):
updatedSearch = search[:findTablesIdx[i][0]+offset] + newTables[i] + search[findTablesIdx[i][1]+offset:]
offset+=len(newTables[i]) - (findTablesIdx[i][1]-findTablesIdx[i][0])

search = updatedSearch

if trans == "add_transforming_command":
if not ('| table' in search):
search = search[:-1] + " | stats values(*) AS * by _time "
findTables = re.findall(r" \|\s+table\s+[^\|\]]+", search)
findTablesIdx = [(m.start(0), m.end(0)) for m in re.finditer(r"\|\s+table\s+[^\)\|\]]+", search)]
newTables = []
for table in findTables:
if table:
table = table.rstrip("\n\r").replace(","," ")
fillnullCmd= table.replace(' table ',' fillnull value="" ')
groupingFieldArray= table.replace(" | table ","").split(" ")
groupingFieldArray.remove('_time')
groupingFieldArray.remove('rawHash')
groupingFieldStr = " ".join(groupingFieldArray)
newTable = table + fillnullCmd +' | stats count earliest(_time) as _time latest(_time) as orig_time_end earliest(rawHash) as rawHash by '+groupingFieldStr+' '
newTables.append(newTable)

updatedSearch = ""
offset = 0
for i in range(len(findTablesIdx)):
updatedSearch = search[:findTablesIdx[i][0]+offset] + newTables[i] + search[findTablesIdx[i][1]+offset:]
offset+=len(newTables[i]) - (findTablesIdx[i][1]-findTablesIdx[i][0])

search = updatedSearch


# Add Custom Search Transformations here

Expand Down
17 changes: 12 additions & 5 deletions config/config.yml
Original file line number Diff line number Diff line change
@@ -1,19 +1,26 @@
app: 'sigma_hunting_app'
cron_schedule: '*/10 * * * *'
earliest_time: '-10m'
earliest_time: '0'

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't you search for the last 10m when you have a cron schedule of every 10m?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In fact all my searches use indexed time to prevent missing alert if delay are observed on collect

latest_time: 'now'
schedule_window: auto
allow_skew: '50%'
allow_skew: '1h'
search_initial_filter: '`sigma_hunt` | stats last(rule_next_filter) as rule_next_filter | return $rule_next_filter ]'
search_transformations:
- add_table
- add_time_field
- add_host_field
- add_sourcetype_field
- add_index_field
- add_rawHash_field
- add_FIELDLIST_field
- add_transforming_command
- add_whitelist
- add_whitelist_in_front
alert_action:
summary_index:
name: 'threat-hunting'
name: 'sigma_matches'
enrich_tags: 1
enrich_level: 1
email:
to: '[email protected]'
subject: 'Splunk Alert: $name$'
message: 'Splunk Alert $name$ triggered |List of interesting fields: %fields% |title: %title% status: %status% |description: %description% |references: %references% |tags: %tags% |author: %author% |date: %date% |falsepositives: %falsepositives% |level: %level%'
message: 'Splunk Alert $name$ triggered |List of interesting fields: %fields% |title: %title% status: %status% |description: %description% |references: %references% |tags: %tags% |author: %author% |date: %date% |falsepositives: %falsepositives% |level: %level%'
68 changes: 57 additions & 11 deletions sigma2splunkalert
100755 → 100644
Original file line number Diff line number Diff line change
Expand Up @@ -8,24 +8,38 @@ import argparse
import os
import yaml
import subprocess
import re
from subprocess import DEVNULL
from jinja2 import Environment, FileSystemLoader
import binascii
from classes.UseCase import UseCase
from classes.DetectionRuleConverter import DetectionRuleConverter

def CRC32_from_string(string):
buf = (binascii.crc32(string.encode('utf8')) & 0xFFFFFFFF)
return "%08X" % buf

def CRC32_from_file(filename):
buf = open(filename,'rb').read()
buf = (binascii.crc32(buf) & 0xFFFFFFFF)
return "%08X" % buf

def main(argv):
# parse input variables
parser = argparse.ArgumentParser(
description='Convert Sigma rules to Splunk Alerts savedsearches.conf configuration.')
parser.add_argument('rules', action='store', metavar='N', nargs='+',
help='folder or file containing the Sigma rules')
parser.add_argument('--sigmac-path', '-p', action='store', dest='sigmac',
help='Sigmac path')
parser.add_argument('--config', '-c', action='store', dest='config',
help='Sigma2SplunkAlert configuration file')
parser.add_argument('--sigma-config', '-sc', action='store', dest='sigma_config',
parser.add_argument('--sigma-config', '-sc', action='store', dest='sigma_configs', nargs='+', type=str,
help='Sigma configuration with field name and index name mapping')
parser.add_argument('--template', '-t', action='store', dest='template',
help='file containing the savedsearches.conf template')
parser.add_argument('--filter', '-f', action='store', dest='filter_file',
help='file containing path (or part of path) of rules to ignore')

cmdargs = parser.parse_args()

Expand All @@ -34,16 +48,33 @@ def main(argv):
converter_config_path = cmdargs.config
else:
converter_config_path = 'config/config.yml'

# cmdargs Sigmac Path
if cmdargs.sigmac:
sigma_path = cmdargs.sigmac
else:
sigma_path = 'sigmac'

# cmdargs Sigma Configuration
if cmdargs.sigma_config:
sigma_config_path = cmdargs.sigma_config
sigma_config_path = ''
if cmdargs.sigma_configs:
for sigma_config in cmdargs.sigma_configs:
sigma_config_path += "-c {} ".format(sigma_config)
else:
sigma_config_path = 'sigma_config/splunk-all.yml'
sigma_config_path = '- c ./sigma_config/splunk-all.yml'

# cmdargs template Configuration
if cmdargs.template:
template_path = cmdargs.template

# cmdargs filter Configuration
filterList = []
if cmdargs.filter_file:
try:
with open(cmdargs.filter_file) as filterFile:
filterList = filterFile.read().splitlines()
except FileNotFoundError:
print("Filter File not found. No filters will be applied.")

# Splunk Search Converter

Expand All @@ -56,20 +87,27 @@ def main(argv):

# Iterate through N input folders
for input_path in input_path_list:
files = loadSigmaRules(input_path)
files = loadSigmaRules(input_path,filterList)
for file in files:
# Load Sigma Rule
sigma_rule = openSigmaDetectionRule(file)

# Convert Sigma Rule to Splunk Search
splunk_search = DetectionRuleConverter.convertSigmaRule(sigma_config_path, file)
if splunk_search == "Converter Failure":
splunk_search_results = DetectionRuleConverter.convertSigmaRule(sigma_path,sigma_config_path, file)
if type(splunk_search_results) is not dict and "Converter Failure" in splunk_search_results:
continue
splunk_search = splunk_search_results['search']
splunk_search_fields = splunk_search_results['fields']

# Perform Splunk Search transformations
if "search_transformations" in sigma2splunkalertconfig:
splunk_search = DetectionRuleConverter.performSearchTransformation(
sigma2splunkalertconfig["search_transformations"], splunk_search, sigma_rule)
sigma2splunkalertconfig["search_transformations"], splunk_search, splunk_search_fields, sigma_rule)

# Add Splunk initial filter
if "search_initial_filter" in sigma2splunkalertconfig:
splunk_search = DetectionRuleConverter.addSearchFilter(
sigma2splunkalertconfig["search_initial_filter"], splunk_search, sigma_rule)

# Alert with Summary index
splunk_search = DetectionRuleConverter.addToSummaryIndex(
Expand Down Expand Up @@ -122,19 +160,27 @@ def openSigmaDetectionRule(rule_path):
except yaml.YAMLError as exc:
print(exc)
sys.exit(1)
if 'title' in sigma_uc:
if 'id' in sigma_uc:
sigma_uc['title'] = sigma_uc['title'] + ' - ' + CRC32_from_string(sigma_uc['id'])
else:
sigma_uc['title'] = sigma_uc['title'] + ' - ' + CRC32_from_file(rule_path)
return sigma_uc


def loadSigmaRules(path):
def loadSigmaRules(path, filterList):
files = []
if os.path.isfile(path):
files.append(path)
if not filterList or ( filterList and not any(s in path for s in filterList)):
files.append(path)
else:
# r=root, d=directories, f = files
for r, d, f in os.walk(path):
for file in f:
if '.yml' in file:
files.append(os.path.join(r, file))
fileName = os.path.join(r, file)
if not filterList or ( filterList and not any(s in fileName for s in filterList)):
files.append(fileName)
return files


Expand Down