Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions config/main.toml
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,21 @@ config_version = 1
_queue_name = "science_data_headers"
name = "HAMMA2 AGS science data header input queue"

# Step for daily compression of data files on /media/pi
# Works during quiet hours only, compressing whatever is eligible
# Uses hamma.compression for optimized HAMMA trigger data compression
[steps.compress_media_data]
_preset = "compress_data.outputs.plugin"
name = "Compress Media Data"
source_path = "/media/pi"
output_subdir = "compressed" # Archives stored in /media/pi/compressed/
min_age_days = 1 # Only compress files older than 1 day
delete_originals = true
method = "quantize" # 'lossless' (~25%) or 'quantize' (smaller)
step = 8 # Quantization step: 2=~18%, 4=~12%, 8=~7% of original
quiet_start = 2 # Only run between 2 AM and 5 AM (quiet hours)
quiet_end = 5


# List pipelines to run here under the [pipelines] key
[pipelines]
Expand Down Expand Up @@ -247,3 +262,18 @@ config_version = 1
output_steps = [
"science_csv_output",
]

# Daily compression of data files on /media/pi
[pipelines.daily_compression]
_builder = "pipeline"
_enabled = true
period_s = 3600 # Check every hour; actual compression controlled by step update_time
name = "Daily Data Compression"
input_steps = [
"builtins.inputs.current_time",
]
process_steps = [
]
output_steps = [
"compress_media_data",
]
339 changes: 339 additions & 0 deletions plugins/compress_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,339 @@
"""
Plugin to compress HAMMA trigger data files during quiet hours.

Uses the hamma.compression module for optimized compression of trigger data.
Only runs during configured quiet hours, deferring CPU and I/O to other processes.
"""

# Standard library imports
import datetime
from pathlib import Path

# Local imports
import brokkr.pipeline.base

# HAMMA compression module
from hamma.compression import compress_file


class CompressData(brokkr.pipeline.base.OutputStep):
"""Compress HAMMA trigger data files during quiet hours."""

def __init__(self,
source_path,
output_subdir="compressed",
min_age_days=1,
delete_originals=True,
method="quantize",
step=8,
quiet_start=2,
quiet_end=5,
**output_step_kwargs):
"""
Compress HAMMA trigger data files during quiet hours.

Parameters
----------
source_path : str
The path containing files to compress (e.g., /media/pi).
output_subdir : str
Subdirectory name within source_path for compressed archives.
Default is "compressed". Archives are stored here to keep them
on the same drive as the source data.
min_age_days : int
Minimum age in days for files to be compressed.
Files newer than this are skipped to avoid compressing
files still being written. Default is 1 day.
delete_originals : bool
Whether to delete original files after successful compression.
Default is True.
method : str
Compression method for hamma.compression:
- 'lossless': Lossless LZMA compression (~25% of original)
- 'quantize': Quantization + LZMA (smaller, configurable fidelity)
Default is 'quantize'.
step : int
Quantization step size (only used if method='quantize'):
- step=1: lossless
- step=2: ~18% of original, RMSE<1 (virtually lossless)
- step=4: ~12% of original, RMSE~2 (high fidelity)
- step=8: ~7% of original, RMSE~4
Default is 8.
quiet_start : int
Hour (0-23) when quiet period starts. Compression only runs
during quiet hours. Default is 2 (2 AM).
quiet_end : int
Hour (0-23) when quiet period ends. Default is 5 (5 AM).
output_step_kwargs : **kwargs, optional
Keyword arguments to pass to the OutputStep constructor.

"""
super().__init__(**output_step_kwargs)

self.source_path = Path(source_path)
self.output_subdir = output_subdir
self.min_age_days = min_age_days
self.delete_originals = delete_originals
self.method = method
self.step = step
self.quiet_start = quiet_start
self.quiet_end = quiet_end

def _is_quiet_time(self, current_time):
"""
Check if current time is within the quiet period.

Parameters
----------
current_time : datetime
The current time to check.

Returns
-------
bool
True if within quiet hours, False otherwise.
"""
current_hour = current_time.hour

# Handle wraparound (e.g., quiet_start=22, quiet_end=5)
if self.quiet_start <= self.quiet_end:
return self.quiet_start <= current_hour < self.quiet_end
else:
return current_hour >= self.quiet_start or current_hour < self.quiet_end

def _get_output_path(self):
"""
Get the output directory for compressed archives.

Creates the directory if it doesn't exist.

Returns
-------
Path
Path to the output directory.
"""
output_path = self.source_path / self.output_subdir
output_path.mkdir(parents=True, exist_ok=True)
return output_path

def execute(self, input_data=None):
"""
Execute compression if within quiet hours.

Parameters
----------
input_data : any, optional
Per iteration input data passed from previous PipelineSteps.
Expected to contain 'time' key with current timestamp.

Returns
-------
input_data : same as input_data
Input data passed through for further steps to consume.
"""
try:
current_time = input_data['time'].value

# Only work during quiet hours
if not self._is_quiet_time(current_time):
self.logger.debug(
"Outside quiet hours (%02d:00-%02d:00), skipping",
self.quiet_start, self.quiet_end)
return input_data

# Compress eligible files during quiet hours
compressed_count, skipped_count, error_count = self.compress_old_files()

if compressed_count > 0 or error_count > 0:
self.logger.info(
"Compression pass: %d compressed, %d already done, %d errors",
compressed_count, skipped_count, error_count)

except Exception as e:
self.logger.error(
"%s in %s on step %s: %s",
type(e).__name__, type(self), self.name, e)
self.logger.info("Error details:", exc_info=True)

return input_data

def compress_old_files(self):
"""
Find and compress trigger files older than min_age_days.

Returns
-------
tuple
(compressed_count, skipped_count, error_count)
"""
compressed_count = 0
skipped_count = 0
error_count = 0

if not self.source_path.exists():
self.logger.warning(
"Source path does not exist: %s", self.source_path)
return compressed_count, skipped_count, error_count

# Get/create output directory
output_path = self._get_output_path()

cutoff_time = datetime.datetime.now() - datetime.timedelta(
days=self.min_age_days)

# Find directories that match the date pattern (YYYY-MM-DD*)
# This handles the hourly subdirectories like 2024-01-06T12
for data_dir in self.source_path.iterdir():
if not data_dir.is_dir():
continue

# Skip the output subdirectory itself
if data_dir.name == self.output_subdir:
continue

try:
# Check if directory is old enough based on modification time
dir_mtime = datetime.datetime.fromtimestamp(data_dir.stat().st_mtime)
if dir_mtime > cutoff_time:
self.logger.debug(
"Skipping directory (too recent): %s", data_dir.name)
continue

# Process .bin files in this directory
result = self._compress_directory_files(data_dir, output_path)
compressed_count += result[0]
skipped_count += result[1]
error_count += result[2]

except Exception as e:
self.logger.error(
"Error processing directory %s: %s", data_dir.name, e)
error_count += 1

return compressed_count, skipped_count, error_count

def _compress_directory_files(self, data_dir, output_path):
"""
Compress all .bin files in a directory.

Parameters
----------
data_dir : Path
The directory containing .bin files.
output_path : Path
The directory where compressed files will be stored.

Returns
-------
tuple
(compressed_count, skipped_count, error_count)
"""
compressed_count = 0
skipped_count = 0
error_count = 0

# Create corresponding output subdirectory to preserve structure
out_subdir = output_path / data_dir.name
out_subdir.mkdir(parents=True, exist_ok=True)

# Find all .bin files in the directory
bin_files = list(data_dir.glob("*.bin"))

for bin_file in bin_files:
# Check if already compressed
hmc_file = out_subdir / (bin_file.stem + ".hmc")
if hmc_file.exists():
skipped_count += 1
continue

# Compress the file
if self._compress_file(bin_file, out_subdir):
compressed_count += 1
# Delete original if configured
if self.delete_originals:
self._remove_file(bin_file)
else:
error_count += 1

# If directory is now empty and delete_originals is True, remove it
if self.delete_originals:
remaining = list(data_dir.glob("*"))
if not remaining:
self._remove_directory(data_dir)

return compressed_count, skipped_count, error_count

def _compress_file(self, input_file, output_dir):
"""
Compress a single trigger file using hamma.compression.

Parameters
----------
input_file : Path
The .bin file to compress.
output_dir : Path
The directory where the .hmc file will be stored.

Returns
-------
bool
True if compression was successful, False otherwise.
"""
self.logger.debug("Compressing file: %s", input_file.name)

try:
# Use hamma.compression.compress_file
results = compress_file(
str(input_file),
output_dir=str(output_dir),
method=self.method,
step=self.step
)

if results and len(results) > 0:
result = results[0]
self.logger.debug(
"Compressed %s: %.1f%% (method=%s)",
input_file.name,
result['ratio'] * 100,
result['method'])
return True
else:
self.logger.error(
"Compression returned no results for %s", input_file.name)
return False

except Exception as e:
self.logger.error(
"Error compressing %s: %s", input_file.name, e)
return False

def _remove_file(self, file_path):
"""
Safely remove a file.

Parameters
----------
file_path : Path
The file to remove.
"""
try:
file_path.unlink()
except Exception as e:
self.logger.warning(
"Error removing file %s: %s", file_path, e)

def _remove_directory(self, directory):
"""
Safely remove an empty directory.

Parameters
----------
directory : Path
The directory to remove.
"""
try:
directory.rmdir()
except Exception as e:
self.logger.warning(
"Error removing directory %s: %s", directory, e)
Loading