Skip to content

Commit e4ee95a

Browse files
authored
Merge pull request #76 from SigmaHQ/pysigma-1.0
Updated to pySigma 1.0 and added cache management commands for MITRE content
2 parents 227bdb1 + 95a396c commit e4ee95a

File tree

3 files changed

+302
-10
lines changed

3 files changed

+302
-10
lines changed

sigma/cli/main.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434
from .check import check
3535
from .plugin import plugin_group
3636
from .analyze import analyze_group
37-
from .pysigma import check_pysigma_command
37+
from .pysigma import pysigma_group
3838

3939

4040
CONTEXT_SETTINGS={
@@ -73,10 +73,10 @@ def version():
7373
def main():
7474
cli.add_command(analyze_group)
7575
cli.add_command(plugin_group)
76+
cli.add_command(pysigma_group)
7677
cli.add_command(list_group)
7778
cli.add_command(convert)
7879
cli.add_command(check)
79-
cli.add_command(check_pysigma_command)
8080
cli.add_command(version)
8181
cli()
8282

sigma/cli/pysigma.py

Lines changed: 242 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
11
import importlib.metadata
22
import subprocess
33
import sys
4+
import os
5+
from datetime import datetime
46
import click
57
from packaging.specifiers import SpecifierSet
8+
from prettytable import PrettyTable
69

710
def get_pysigma_requirement():
811
requires = importlib.metadata.requires("sigma-cli")
@@ -18,8 +21,15 @@ def check_pysigma_version():
1821
version_specifier = SpecifierSet(requires_pysgima.split(" ")[1][1:-1])
1922
return importlib.metadata.version("pysigma") in version_specifier
2023

21-
@click.command(
22-
name="check-pysigma",
24+
@click.group(
25+
name="pysigma",
26+
help="pySigma library management commands."
27+
)
28+
def pysigma_group():
29+
pass
30+
31+
@pysigma_group.command(
32+
name="check-version",
2333
help="Check if the installed version of pysigma is compatible with the version required by sigma-cli."
2434
)
2535
@click.option(
@@ -28,7 +38,7 @@ def check_pysigma_version():
2838
default=False,
2939
help="Suppress output if check passes.",
3040
)
31-
def check_pysigma_command(quiet):
41+
def check_version_command(quiet):
3242
check_pysigma(quiet)
3343

3444
def check_pysigma(quiet=False):
@@ -61,4 +71,232 @@ def check_pysigma(quiet=False):
6171
)
6272
click.echo("pySigma successfully reinstalled")
6373
else:
64-
click.echo("Incompatible pySigma version was keeped. You can rerun the check with: " + click.style("sigma check-pysigma", fg="green"))
74+
click.echo("Incompatible pySigma version was keeped. You can rerun the check with: " + click.style("sigma pysigma check-version", fg="green"))
75+
76+
77+
@pysigma_group.command(
78+
name="list-cache",
79+
help="List cached data versions and timestamps."
80+
)
81+
def list_cache_command():
82+
"""List the cached versions of pySigma data and their timestamps."""
83+
try:
84+
from sigma.data import mitre_attack, mitre_d3fend
85+
86+
# Configuration for datasets to check
87+
datasets = [
88+
{
89+
'name': 'MITRE ATT&CK',
90+
'module': mitre_attack,
91+
'cache_key': 'mitre_attack_data_default',
92+
'version_key': 'mitre_attack_version'
93+
},
94+
{
95+
'name': 'MITRE D3FEND',
96+
'module': mitre_d3fend,
97+
'cache_key': 'mitre_d3fend_data_default',
98+
'version_key': 'mitre_d3fend_version'
99+
}
100+
]
101+
102+
table = PrettyTable()
103+
table.field_names = ["Dataset", "Version", "Cached Date"]
104+
table.align = "l"
105+
106+
for dataset in datasets:
107+
cache = dataset['module']._get_cache()
108+
109+
# Check if cache directory exists and has the key
110+
if not os.path.exists(cache.directory) or dataset['cache_key'] not in cache:
111+
table.add_row([dataset['name'], "Not cached", "-"])
112+
else:
113+
# Get cached data without triggering download
114+
data = cache.get(dataset['cache_key'], read=True)
115+
version = data.get(dataset['version_key'], 'Unknown')
116+
117+
# Get timestamp from cache files
118+
cache_files = [f for f in os.listdir(cache.directory) if not f.startswith('.')]
119+
if cache_files:
120+
newest_mtime = max(os.path.getmtime(os.path.join(cache.directory, f)) for f in cache_files)
121+
timestamp = datetime.fromtimestamp(newest_mtime).strftime("%Y-%m-%d %H:%M:%S")
122+
else:
123+
timestamp = "Unknown"
124+
125+
table.add_row([dataset['name'], version, timestamp])
126+
127+
click.echo(table)
128+
129+
except ImportError:
130+
click.echo(click.style("Error: Unable to import pySigma data modules.", fg="red"))
131+
click.echo("Make sure pySigma is installed correctly.")
132+
except Exception as e:
133+
click.echo(click.style(f"Error accessing cache: {str(e)}", fg="red"))
134+
135+
136+
@pysigma_group.command(
137+
name="clear-cache",
138+
help="Delete all cached data."
139+
)
140+
@click.option(
141+
"--yes",
142+
"-y",
143+
is_flag=True,
144+
help="Skip confirmation prompt.",
145+
)
146+
def clear_cache_command(yes):
147+
"""Delete the cached data for all datasets."""
148+
try:
149+
from sigma.data import mitre_attack, mitre_d3fend
150+
151+
datasets = [
152+
{'name': 'MITRE ATT&CK', 'module': mitre_attack},
153+
{'name': 'MITRE D3FEND', 'module': mitre_d3fend}
154+
]
155+
156+
# Check what's cached
157+
cached_datasets = []
158+
total_size = 0
159+
total_entries = 0
160+
161+
for dataset in datasets:
162+
cache = dataset['module']._get_cache()
163+
if os.path.exists(cache.directory):
164+
keys = list(cache.iterkeys())
165+
if keys:
166+
size = cache.volume()
167+
cached_datasets.append({
168+
'name': dataset['name'],
169+
'entries': len(keys),
170+
'size': size
171+
})
172+
total_entries += len(keys)
173+
total_size += size
174+
175+
if not cached_datasets:
176+
click.echo(click.style("No cached data found. Nothing to clear.", fg="yellow"))
177+
return
178+
179+
# Confirm deletion
180+
if not yes:
181+
for cached in cached_datasets:
182+
click.echo(f"{cached['name']}: {cached['entries']} entries, {cached['size']} bytes")
183+
click.echo(f"Total: {total_entries} entries, {total_size} bytes")
184+
if not click.confirm(click.style("Are you sure you want to clear all cached data?", fg="yellow")):
185+
click.echo("Cache clearing cancelled.")
186+
return
187+
188+
# Clear all caches
189+
cleared_count = 0
190+
for dataset in datasets:
191+
cache = dataset['module']._get_cache()
192+
if os.path.exists(cache.directory):
193+
keys = list(cache.iterkeys())
194+
if keys:
195+
dataset['module'].clear_cache()
196+
cleared_count += 1
197+
198+
click.echo(click.style(f"✓ Cache cleared successfully for {cleared_count} dataset(s).", fg="green"))
199+
click.echo(f"Removed {total_entries} cache entries ({total_size} bytes)")
200+
201+
except ImportError:
202+
click.echo(click.style("Error: Unable to import pySigma data modules.", fg="red"))
203+
click.echo("Make sure pySigma is installed correctly.")
204+
except Exception as e:
205+
click.echo(click.style(f"Error clearing cache: {str(e)}", fg="red"))
206+
207+
208+
@pysigma_group.command(
209+
name="update-cache",
210+
help="Update cache by clearing and re-caching data."
211+
)
212+
@click.option(
213+
"--yes",
214+
"-y",
215+
is_flag=True,
216+
help="Skip confirmation prompt.",
217+
)
218+
def update_cache_command(yes):
219+
"""Update the cache by deleting it and re-caching data for all datasets."""
220+
try:
221+
from sigma.data import mitre_attack, mitre_d3fend
222+
223+
datasets = [
224+
{
225+
'name': 'MITRE ATT&CK',
226+
'module': mitre_attack,
227+
'trigger_attr': 'mitre_attack_techniques_tactics_mapping'
228+
},
229+
{
230+
'name': 'MITRE D3FEND',
231+
'module': mitre_d3fend,
232+
'trigger_attr': 'mitre_d3fend_techniques'
233+
}
234+
]
235+
236+
# Get current cache info
237+
cached_datasets = []
238+
total_size = 0
239+
total_entries = 0
240+
241+
for dataset in datasets:
242+
cache = dataset['module']._get_cache()
243+
if os.path.exists(cache.directory):
244+
keys = list(cache.iterkeys())
245+
if keys:
246+
size = cache.volume()
247+
cached_datasets.append({
248+
'name': dataset['name'],
249+
'entries': len(keys),
250+
'size': size
251+
})
252+
total_entries += len(keys)
253+
total_size += size
254+
255+
# Confirm update
256+
if not yes:
257+
if cached_datasets:
258+
click.echo("Current cache:")
259+
for cached in cached_datasets:
260+
click.echo(f" {cached['name']}: {cached['entries']} entries, {cached['size']} bytes")
261+
click.echo(f"Total: {total_entries} entries, {total_size} bytes")
262+
else:
263+
click.echo("No cached data found (will download fresh data)")
264+
265+
if not click.confirm(click.style("Update cache by clearing and re-downloading data?", fg="yellow")):
266+
click.echo("Cache update cancelled.")
267+
return
268+
269+
# Clear and update each dataset
270+
updated_count = 0
271+
new_total_size = 0
272+
new_total_entries = 0
273+
274+
for dataset in datasets:
275+
click.echo(f"Updating {dataset['name']}...")
276+
277+
# Clear cache
278+
dataset['module'].clear_cache()
279+
280+
# Trigger re-caching by accessing data
281+
_ = getattr(dataset['module'], dataset['trigger_attr'])
282+
283+
# Get new cache info
284+
cache = dataset['module']._get_cache()
285+
new_keys = list(cache.iterkeys())
286+
new_size = cache.volume()
287+
288+
click.echo(click.style(f" ✓ {dataset['name']} cached: {len(new_keys)} entries, {new_size} bytes", fg="green"))
289+
290+
updated_count += 1
291+
new_total_entries += len(new_keys)
292+
new_total_size += new_size
293+
294+
click.echo()
295+
click.echo(click.style(f"✓ Cache updated successfully for {updated_count} dataset(s).", fg="green"))
296+
click.echo(f"Total: {new_total_entries} entries, {new_total_size} bytes")
297+
298+
except ImportError:
299+
click.echo(click.style("Error: Unable to import pySigma data modules.", fg="red"))
300+
click.echo("Make sure pySigma is installed correctly.")
301+
except Exception as e:
302+
click.echo(click.style(f"Error updating cache: {str(e)}", fg="red"))

tests/test_pysigma.py

Lines changed: 58 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import importlib
22
import re
3-
from sigma.cli.pysigma import check_pysigma_command, check_pysigma_version
3+
from sigma.cli.pysigma import pysigma_group, check_pysigma_version
44
from click.testing import CliRunner
55
import pytest
66

@@ -30,13 +30,67 @@ def test_check_pysigma_version_incompatible(monkeypatch, pysigma_expected_versio
3030
)
3131
def test_check_pysigma():
3232
cli = CliRunner()
33-
result = cli.invoke(check_pysigma_command)
33+
result = cli.invoke(pysigma_group, ["check-version"])
3434
assert "pySigma version is compatible with sigma-cli" in result.output
3535

3636
@pytest.mark.skip(reason="This test is not working")
3737
def test_check_pysigma_incompatible(monkeypatch):
3838
monkeypatch.setattr('importlib.metadata.version', lambda x: "0.0.1")
3939
cli = CliRunner()
40-
result = cli.invoke(check_pysigma_command, input="y\n")
40+
result = cli.invoke(pysigma_group, ["check-version"], input="y\n")
4141
assert "pySigma version is not compatible" in result.output
42-
assert "pySigma successfully reinstalled" in result.output
42+
assert "pySigma successfully reinstalled" in result.output
43+
44+
45+
def test_list_cache():
46+
"""Test list-cache command shows cache information."""
47+
cli = CliRunner()
48+
result = cli.invoke(pysigma_group, ["list-cache"])
49+
assert result.exit_code == 0
50+
# Check that the output contains the expected table headers and dataset names
51+
assert "Dataset" in result.output
52+
assert "Version" in result.output
53+
assert "Cached Date" in result.output
54+
assert ("MITRE ATT&CK" in result.output or "Not cached" in result.output)
55+
56+
57+
def test_clear_cache_help():
58+
"""Test clear-cache command help."""
59+
cli = CliRunner()
60+
result = cli.invoke(pysigma_group, ["clear-cache", "--help"])
61+
assert result.exit_code == 0
62+
assert "Delete all cached data" in result.output
63+
assert "--yes" in result.output or "-y" in result.output
64+
65+
66+
def test_clear_cache_with_confirmation_cancel():
67+
"""Test clear-cache command cancellation."""
68+
cli = CliRunner()
69+
result = cli.invoke(pysigma_group, ["clear-cache"], input="n\n")
70+
assert result.exit_code == 0
71+
assert "cancelled" in result.output.lower() or "empty" in result.output.lower() or "No cache directory found" in result.output
72+
73+
74+
def test_clear_cache_with_yes_flag():
75+
"""Test clear-cache command with -y flag."""
76+
cli = CliRunner()
77+
result = cli.invoke(pysigma_group, ["clear-cache", "-y"])
78+
assert result.exit_code == 0
79+
assert "cleared" in result.output.lower() or "empty" in result.output.lower() or "No cache directory found" in result.output
80+
81+
82+
def test_update_cache_help():
83+
"""Test update-cache command help."""
84+
cli = CliRunner()
85+
result = cli.invoke(pysigma_group, ["update-cache", "--help"])
86+
assert result.exit_code == 0
87+
assert "Update cache" in result.output
88+
assert "--yes" in result.output or "-y" in result.output
89+
90+
91+
def test_update_cache_with_confirmation_cancel():
92+
"""Test update-cache command cancellation."""
93+
cli = CliRunner()
94+
result = cli.invoke(pysigma_group, ["update-cache"], input="n\n")
95+
assert result.exit_code == 0
96+
assert "cancelled" in result.output.lower()

0 commit comments

Comments
 (0)