Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions sigma/cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
from .check import check
from .plugin import plugin_group
from .analyze import analyze_group
from .pysigma import check_pysigma_command
from .pysigma import pysigma_group


CONTEXT_SETTINGS={
Expand Down Expand Up @@ -73,10 +73,10 @@ def version():
def main():
cli.add_command(analyze_group)
cli.add_command(plugin_group)
cli.add_command(pysigma_group)
cli.add_command(list_group)
cli.add_command(convert)
cli.add_command(check)
cli.add_command(check_pysigma_command)
cli.add_command(version)
cli()

Expand Down
246 changes: 242 additions & 4 deletions sigma/cli/pysigma.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import importlib.metadata
import subprocess
import sys
import os
from datetime import datetime
import click
from packaging.specifiers import SpecifierSet
from prettytable import PrettyTable

def get_pysigma_requirement():
requires = importlib.metadata.requires("sigma-cli")
Expand All @@ -18,8 +21,15 @@ def check_pysigma_version():
version_specifier = SpecifierSet(requires_pysgima.split(" ")[1][1:-1])
return importlib.metadata.version("pysigma") in version_specifier

@click.command(
name="check-pysigma",
@click.group(
name="pysigma",
help="pySigma library management commands."
)
def pysigma_group():
pass

@pysigma_group.command(
name="check-version",
help="Check if the installed version of pysigma is compatible with the version required by sigma-cli."
)
@click.option(
Expand All @@ -28,7 +38,7 @@ def check_pysigma_version():
default=False,
help="Suppress output if check passes.",
)
def check_pysigma_command(quiet):
def check_version_command(quiet):
check_pysigma(quiet)

def check_pysigma(quiet=False):
Expand Down Expand Up @@ -61,4 +71,232 @@ def check_pysigma(quiet=False):
)
click.echo("pySigma successfully reinstalled")
else:
click.echo("Incompatible pySigma version was keeped. You can rerun the check with: " + click.style("sigma check-pysigma", fg="green"))
click.echo("Incompatible pySigma version was keeped. You can rerun the check with: " + click.style("sigma pysigma check-version", fg="green"))


@pysigma_group.command(
name="list-cache",
help="List cached data versions and timestamps."
)
def list_cache_command():
"""List the cached versions of pySigma data and their timestamps."""
try:
from sigma.data import mitre_attack, mitre_d3fend

# Configuration for datasets to check
datasets = [
{
'name': 'MITRE ATT&CK',
'module': mitre_attack,
'cache_key': 'mitre_attack_data_default',
'version_key': 'mitre_attack_version'
},
{
'name': 'MITRE D3FEND',
'module': mitre_d3fend,
'cache_key': 'mitre_d3fend_data_default',
'version_key': 'mitre_d3fend_version'
}
]

table = PrettyTable()
table.field_names = ["Dataset", "Version", "Cached Date"]
table.align = "l"

for dataset in datasets:
cache = dataset['module']._get_cache()

# Check if cache directory exists and has the key
if not os.path.exists(cache.directory) or dataset['cache_key'] not in cache:
table.add_row([dataset['name'], "Not cached", "-"])
else:
# Get cached data without triggering download
data = cache.get(dataset['cache_key'], read=True)
version = data.get(dataset['version_key'], 'Unknown')

# Get timestamp from cache files
cache_files = [f for f in os.listdir(cache.directory) if not f.startswith('.')]
if cache_files:
newest_mtime = max(os.path.getmtime(os.path.join(cache.directory, f)) for f in cache_files)
timestamp = datetime.fromtimestamp(newest_mtime).strftime("%Y-%m-%d %H:%M:%S")
else:
timestamp = "Unknown"

table.add_row([dataset['name'], version, timestamp])

click.echo(table)

except ImportError:
click.echo(click.style("Error: Unable to import pySigma data modules.", fg="red"))
click.echo("Make sure pySigma is installed correctly.")
except Exception as e:
click.echo(click.style(f"Error accessing cache: {str(e)}", fg="red"))


@pysigma_group.command(
name="clear-cache",
help="Delete all cached data."
)
@click.option(
"--yes",
"-y",
is_flag=True,
help="Skip confirmation prompt.",
)
def clear_cache_command(yes):
"""Delete the cached data for all datasets."""
try:
from sigma.data import mitre_attack, mitre_d3fend

datasets = [
{'name': 'MITRE ATT&CK', 'module': mitre_attack},
{'name': 'MITRE D3FEND', 'module': mitre_d3fend}
]

# Check what's cached
cached_datasets = []
total_size = 0
total_entries = 0

for dataset in datasets:
cache = dataset['module']._get_cache()
if os.path.exists(cache.directory):
keys = list(cache.iterkeys())
if keys:
size = cache.volume()
cached_datasets.append({
'name': dataset['name'],
'entries': len(keys),
'size': size
})
total_entries += len(keys)
total_size += size

if not cached_datasets:
click.echo(click.style("No cached data found. Nothing to clear.", fg="yellow"))
return

# Confirm deletion
if not yes:
for cached in cached_datasets:
click.echo(f"{cached['name']}: {cached['entries']} entries, {cached['size']} bytes")
click.echo(f"Total: {total_entries} entries, {total_size} bytes")
if not click.confirm(click.style("Are you sure you want to clear all cached data?", fg="yellow")):
click.echo("Cache clearing cancelled.")
return

# Clear all caches
cleared_count = 0
for dataset in datasets:
cache = dataset['module']._get_cache()
if os.path.exists(cache.directory):
keys = list(cache.iterkeys())
if keys:
dataset['module'].clear_cache()
cleared_count += 1

click.echo(click.style(f"✓ Cache cleared successfully for {cleared_count} dataset(s).", fg="green"))
click.echo(f"Removed {total_entries} cache entries ({total_size} bytes)")

except ImportError:
click.echo(click.style("Error: Unable to import pySigma data modules.", fg="red"))
click.echo("Make sure pySigma is installed correctly.")
except Exception as e:
click.echo(click.style(f"Error clearing cache: {str(e)}", fg="red"))


@pysigma_group.command(
name="update-cache",
help="Update cache by clearing and re-caching data."
)
@click.option(
"--yes",
"-y",
is_flag=True,
help="Skip confirmation prompt.",
)
def update_cache_command(yes):
"""Update the cache by deleting it and re-caching data for all datasets."""
try:
from sigma.data import mitre_attack, mitre_d3fend

datasets = [
{
'name': 'MITRE ATT&CK',
'module': mitre_attack,
'trigger_attr': 'mitre_attack_techniques_tactics_mapping'
},
{
'name': 'MITRE D3FEND',
'module': mitre_d3fend,
'trigger_attr': 'mitre_d3fend_techniques'
}
]

# Get current cache info
cached_datasets = []
total_size = 0
total_entries = 0

for dataset in datasets:
cache = dataset['module']._get_cache()
if os.path.exists(cache.directory):
keys = list(cache.iterkeys())
if keys:
size = cache.volume()
cached_datasets.append({
'name': dataset['name'],
'entries': len(keys),
'size': size
})
total_entries += len(keys)
total_size += size

# Confirm update
if not yes:
if cached_datasets:
click.echo("Current cache:")
for cached in cached_datasets:
click.echo(f" {cached['name']}: {cached['entries']} entries, {cached['size']} bytes")
click.echo(f"Total: {total_entries} entries, {total_size} bytes")
else:
click.echo("No cached data found (will download fresh data)")

if not click.confirm(click.style("Update cache by clearing and re-downloading data?", fg="yellow")):
click.echo("Cache update cancelled.")
return

# Clear and update each dataset
updated_count = 0
new_total_size = 0
new_total_entries = 0

for dataset in datasets:
click.echo(f"Updating {dataset['name']}...")

# Clear cache
dataset['module'].clear_cache()

# Trigger re-caching by accessing data
_ = getattr(dataset['module'], dataset['trigger_attr'])

# Get new cache info
cache = dataset['module']._get_cache()
new_keys = list(cache.iterkeys())
new_size = cache.volume()

click.echo(click.style(f" ✓ {dataset['name']} cached: {len(new_keys)} entries, {new_size} bytes", fg="green"))

updated_count += 1
new_total_entries += len(new_keys)
new_total_size += new_size

click.echo()
click.echo(click.style(f"✓ Cache updated successfully for {updated_count} dataset(s).", fg="green"))
click.echo(f"Total: {new_total_entries} entries, {new_total_size} bytes")

except ImportError:
click.echo(click.style("Error: Unable to import pySigma data modules.", fg="red"))
click.echo("Make sure pySigma is installed correctly.")
except Exception as e:
click.echo(click.style(f"Error updating cache: {str(e)}", fg="red"))
62 changes: 58 additions & 4 deletions tests/test_pysigma.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import importlib
import re
from sigma.cli.pysigma import check_pysigma_command, check_pysigma_version
from sigma.cli.pysigma import pysigma_group, check_pysigma_version
from click.testing import CliRunner
import pytest

Expand Down Expand Up @@ -30,13 +30,67 @@ def test_check_pysigma_version_incompatible(monkeypatch, pysigma_expected_versio
)
def test_check_pysigma():
cli = CliRunner()
result = cli.invoke(check_pysigma_command)
result = cli.invoke(pysigma_group, ["check-version"])
assert "pySigma version is compatible with sigma-cli" in result.output

@pytest.mark.skip(reason="This test is not working")
def test_check_pysigma_incompatible(monkeypatch):
monkeypatch.setattr('importlib.metadata.version', lambda x: "0.0.1")
cli = CliRunner()
result = cli.invoke(check_pysigma_command, input="y\n")
result = cli.invoke(pysigma_group, ["check-version"], input="y\n")
assert "pySigma version is not compatible" in result.output
assert "pySigma successfully reinstalled" in result.output
assert "pySigma successfully reinstalled" in result.output


def test_list_cache():
"""Test list-cache command shows cache information."""
cli = CliRunner()
result = cli.invoke(pysigma_group, ["list-cache"])
assert result.exit_code == 0
# Check that the output contains the expected table headers and dataset names
assert "Dataset" in result.output
assert "Version" in result.output
assert "Cached Date" in result.output
assert ("MITRE ATT&CK" in result.output or "Not cached" in result.output)


def test_clear_cache_help():
"""Test clear-cache command help."""
cli = CliRunner()
result = cli.invoke(pysigma_group, ["clear-cache", "--help"])
assert result.exit_code == 0
assert "Delete all cached data" in result.output
assert "--yes" in result.output or "-y" in result.output


def test_clear_cache_with_confirmation_cancel():
"""Test clear-cache command cancellation."""
cli = CliRunner()
result = cli.invoke(pysigma_group, ["clear-cache"], input="n\n")
assert result.exit_code == 0
assert "cancelled" in result.output.lower() or "empty" in result.output.lower() or "No cache directory found" in result.output


def test_clear_cache_with_yes_flag():
"""Test clear-cache command with -y flag."""
cli = CliRunner()
result = cli.invoke(pysigma_group, ["clear-cache", "-y"])
assert result.exit_code == 0
assert "cleared" in result.output.lower() or "empty" in result.output.lower() or "No cache directory found" in result.output


def test_update_cache_help():
"""Test update-cache command help."""
cli = CliRunner()
result = cli.invoke(pysigma_group, ["update-cache", "--help"])
assert result.exit_code == 0
assert "Update cache" in result.output
assert "--yes" in result.output or "-y" in result.output


def test_update_cache_with_confirmation_cancel():
"""Test update-cache command cancellation."""
cli = CliRunner()
result = cli.invoke(pysigma_group, ["update-cache"], input="n\n")
assert result.exit_code == 0
assert "cancelled" in result.output.lower()