Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions mellea_contribs/reqlib/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
"""Requirements library for mellea-contribs."""

from mellea_contribs.reqlib.python import (
python_executable,
python_executable_unsafe,
python_executable_sandbox,
python_syntax_valid,
python_files_accessible,
python_imports_resolved,
python_columns_accessible,
python_code_formatted,
python_packages_installed,
python_paths_fixed,
python_auto_fix,
)

__all__ = [
# Python verifiers
"python_syntax_valid",
"python_executable",
"python_executable_unsafe",
"python_executable_sandbox",
# Auto-fixing requirements
"python_files_accessible",
"python_imports_resolved",
"python_columns_accessible",
"python_code_formatted",
"python_packages_installed",
"python_paths_fixed",
"python_auto_fix",
]
120 changes: 120 additions & 0 deletions mellea_contribs/reqlib/data_generators.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
"""Data generation utilities for auto-fixing Python Requirements.

This module provides random data generators used to create dummy data
when auto-fixing missing files and DataFrame columns.
"""

import random
from datetime import datetime
from typing import Any, Callable, Dict

try:
import pycountry
except ImportError:
pycountry = None

try:
import lorem
except ImportError:
lorem = None


def random_datetime() -> datetime:
"""Generate random datetime between 2000-2024."""
return datetime.fromtimestamp(
random.uniform(
datetime.fromisoformat("2000-01-01T00:00:00").timestamp(),
datetime.fromisoformat("2024-01-01T00:00:00").timestamp()
)
)


def random_year() -> int:
"""Generate random year between 2020-2024."""
return random.randint(2020, 2024)


def random_month() -> int:
"""Generate random month (1-12)."""
return random.randint(1, 12)


def random_day() -> int:
"""Generate random day (1-31)."""
return random.randint(1, 31)


def random_hour() -> int:
"""Generate random hour (0-23)."""
return random.randint(0, 23)


def random_minute() -> int:
"""Generate random minute (0-59)."""
return random.randint(0, 59)


def random_second() -> int:
"""Generate random second (0-59)."""
return random.randint(0, 59)


def random_int() -> int:
"""Generate random integer between 0-10."""
return random.randint(0, 10)


def random_country() -> str:
"""Generate random country name."""
if pycountry is None:
# Fallback if pycountry not available
return random.choice([
"United States", "Canada", "United Kingdom", "Germany",
"France", "Japan", "Australia", "Brazil", "India", "China"
])
return random.choice(list(pycountry.countries)).name


def random_name() -> str:
"""Generate random person name."""
return random.choice([
"Masataro", "Jason", "Nathan", "Shun", "Xiaojie", "Zhangfan",
"Alice", "Bob", "Carol", "David", "Emma", "Frank"
])


def lorem_paragraph() -> str:
"""Generate lorem ipsum paragraph."""
if lorem is None:
# Fallback if lorem not available
return (
"Lorem ipsum dolor sit amet, consectetur adipiscing elit. "
"Sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. "
"Ut enim ad minim veniam, quis nostrud exercitation ullamco."
)
return lorem.paragraph()


# Mapping from column names to appropriate generators
COLUMN_GENERATORS: Dict[str, Callable[[], Any]] = {
"date": random_datetime,
"year": random_year,
"month": random_month,
"day": random_day,
"hour": random_hour,
"minute": random_minute,
"second": random_second,
"country": random_country,
"name": random_name,
}


def get_generator_for_column(column_name: str) -> Callable[[], Any]:
"""Get appropriate generator for column name, defaulting to random_int."""
return COLUMN_GENERATORS.get(column_name.lower(), random_int)


def generate_dummy_data(column_name: str, num_rows: int) -> list[Any]:
"""Generate dummy data for a column."""
generator = get_generator_for_column(column_name)
return [generator() for _ in range(num_rows)]
197 changes: 197 additions & 0 deletions mellea_contribs/reqlib/file_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
"""File I/O utilities for auto-fixing Python Requirements.

This module provides file type predicates and I/O functions for
creating dummy files when auto-fixing missing file dependencies.
"""

import os
from pathlib import Path
from typing import Optional
import numpy as np

try:
import pandas as pd
except ImportError:
pd = None

try:
import imageio.v3 as imageio
except ImportError:
imageio = None

from .data_generators import lorem_paragraph


def is_table(path: str) -> bool:
"""Check if file is a table format (CSV, TSV, XLSX, JSON)."""
ext = Path(path).suffix.lower()
return ext in {".csv", ".tsv", ".xlsx", ".json"}


def is_image(path: str) -> bool:
"""Check if file is an image format (PNG, JPEG, TIFF, GIF)."""
ext = Path(path).suffix.lower()
return ext in {".png", ".jpeg", ".jpg", ".tiff", ".gif"}


def is_audio(path: str) -> bool:
"""Check if file is an audio format (WAV, MP3, MP4, OGG)."""
ext = Path(path).suffix.lower()
return ext in {".wav", ".mp3", ".mp4", ".ogg"}


def is_structured(path: str) -> bool:
"""Check if file is a structured format (XML, HTML, JSON, YAML)."""
ext = Path(path).suffix.lower()
return ext in {".xml", ".html", ".json", ".yaml"}


def read_table(path: str) -> Optional[object]:
"""Read table file into DataFrame if pandas available."""
if pd is None:
return None

ext = Path(path).suffix.lower()
try:
if ext == ".csv":
return pd.read_csv(path)
elif ext == ".tsv":
return pd.read_csv(path, sep="\t")
elif ext == ".xlsx":
return pd.read_excel(path)
elif ext == ".json":
return pd.read_json(path)
except Exception:
return None
return None


def write_table(path: str, df: object) -> bool:
"""Write DataFrame to table file if pandas available."""
if pd is None or df is None:
return False

ext = Path(path).suffix.lower()
try:
if ext == ".csv":
df.to_csv(path, index=False)
elif ext == ".tsv":
df.to_csv(path, index=False, sep="\t")
elif ext == ".xlsx":
df.to_excel(path, index=False)
elif ext == ".json":
df.to_json(path)
else:
return False
return True
except Exception:
return False


def create_dummy_table(path: str, num_rows: int = 5) -> bool:
"""Create dummy table file with basic structure."""
if pd is None:
return False

try:
# Create basic DataFrame with ID column
df = pd.DataFrame({
"id": list(range(num_rows))
})
return write_table(path, df)
except Exception:
return False


def create_dummy_image(path: str, width: int = 100, height: int = 100) -> bool:
"""Create dummy image file (black image)."""
if imageio is None:
return False

try:
# Create black image
image = np.zeros((height, width, 3), dtype=np.uint8)
imageio.imwrite(path, image)
return True
except Exception:
return False


def create_dummy_text(path: str) -> bool:
"""Create dummy text file."""
try:
with open(path, "w") as f:
f.write(lorem_paragraph())
return True
except Exception:
return False


def create_dummy_file(path: str) -> bool:
"""Create appropriate dummy file based on extension."""
# Ensure directory exists
os.makedirs(os.path.dirname(path), exist_ok=True)

if is_table(path):
return create_dummy_table(path)
elif is_image(path):
return create_dummy_image(path)
elif Path(path).suffix.lower() == ".txt":
return create_dummy_text(path)
else:
# Create empty file for unknown types
try:
Path(path).touch()
return True
except Exception:
return False


def add_column_to_table(path: str, column_name: str, values: list) -> bool:
"""Add column with values to existing table file."""
if pd is None:
return False

try:
df = read_table(path)
if df is None:
return False

# Ensure values list matches DataFrame length
if len(values) != len(df):
# Repeat or truncate values to match
if len(values) < len(df):
values = (values * ((len(df) // len(values)) + 1))[:len(df)]
else:
values = values[:len(df)]

df[column_name] = values
return write_table(path, df)
except Exception:
return False


def get_all_files_by_type(directory: str = "data", predicate_func=None) -> list[str]:
"""Get all files in directory matching predicate.

Args:
directory: Directory to scan
predicate_func: Function to filter files (e.g., is_table)

Returns:
List of file paths
"""
if not os.path.exists(directory):
return []

files = []
try:
for filename in os.listdir(directory):
full_path = os.path.join(directory, filename)
if os.path.isfile(full_path):
if predicate_func is None or predicate_func(filename):
files.append(full_path)
except (OSError, PermissionError):
pass

return files
Loading