Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# See https://pre-commit.com for more information
# See https://pre-commit.com/hooks.html for more hooks

# excluding fixes for csv data files
exclude: ".*\\.csv$"

repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v5.0.0
Expand All @@ -26,7 +30,7 @@ repos:
rev: v2.4.0
hooks:
- id: codespell
args: [--toml, pyproject.toml, --skip="CHANGELOG.md"]
args: [--toml, pyproject.toml, "--skip=CHANGELOG.md"]
additional_dependencies: [tomli]

# Format TOML files
Expand Down
245 changes: 245 additions & 0 deletions reproschema/convertutils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,245 @@
import re
from pathlib import Path
from typing import Any, Dict, List

import yaml
from bs4 import BeautifulSoup

from .context_url import CONTEXTFILE_URL
from .jsonldutils import get_context_version
from .models import Activity, Item, Protocol, write_obj_jsonld

PROTOCOL_KEYS_REQUIRED = [
"protocol_name",
"protocol_display_name",
"redcap_version",
]


def read_check_yaml_config(yaml_path: str) -> Dict[str, Any]:
"""Read and check the YAML configuration file."""
try:
with open(yaml_path, "r", encoding="utf-8") as f:
protocol = yaml.safe_load(f)
except yaml.YAMLError as e:
raise ValueError(f"Invalid YAML file: {str(e)}")
if set(PROTOCOL_KEYS_REQUIRED) - set(protocol.keys()):
raise ValueError(
f"Missing required keys in YAML file: {set(PROTOCOL_KEYS_REQUIRED) - set(protocol.keys())}"
)
return protocol


def normalize_condition(condition_str, field_type=None):
"""Normalize condition strings with specific handling for calc fields."""

# Handle boolean values
if isinstance(condition_str, bool):
return condition_str
if isinstance(condition_str, str):
if condition_str.lower() == "true":
return True
if condition_str.lower() == "false":
return False

# Convert to string if needed
if not isinstance(condition_str, str):
try:
condition_str = str(condition_str)
except:
raise ValueError("Condition must be a string or boolean")

# Clean HTML
condition_str = BeautifulSoup(condition_str, "html.parser").get_text()
condition_str = condition_str.strip()

if condition_str is None:
return None

# Common operator normalizations for all types
operator_replacements = [
(r"\s*\+\s*", " + "), # Normalize spacing around +
(r"\s*-\s*", " - "), # Normalize spacing around -
(r"\s*\*\s*", " * "), # Normalize spacing around *
(r"\s*\/\s*", " / "), # Normalize spacing around /
(r"\s*\(\s*", "("), # Remove spaces after opening parenthesis
(r"\s*\)\s*", ")"), # Remove spaces before closing parenthesis
(r"\s*,\s*", ","), # Normalize spaces around commas
(r"\s+", " "), # Normalize multiple spaces
]

# Apply operator normalizations first
for pattern, repl in operator_replacements:
condition_str = re.sub(pattern, repl, condition_str)

# Then apply type-specific replacements
if field_type in ["sql", "calc"]:
# For calc fields, just remove brackets from field references
condition_str = re.sub(r"\[([^\]]+)\]", r"\1", condition_str)
else:
# For branching logic
replacements = [
(r"\(([0-9]*)\)", r"___\1"),
(r"([^>|<])=", r"\1=="),
(r"\[([^\]]*)\]", r"\1"), # Remove brackets and extra spaces
(r"\bor\b", "||"),
(r"\band\b", "&&"),
(r'"', "'"),
]
for pattern, repl in replacements:
condition_str = re.sub(pattern, repl, condition_str)

result = condition_str.strip()
return result


def parse_html(input_string, default_language="en"):
"""
Parse HTML content and extract language-specific text.

Args:
input_string: The HTML string to parse
default_language: Default language code (default: "en")

Returns:
dict: Dictionary of language codes to text content, or None if invalid
"""
try:
result = {}

# Handle non-string input
if not isinstance(input_string, str):
try:
input_string = str(input_string)
except:
return None

# Clean input string
input_string = input_string.strip()
if not input_string:
return None

# Parse HTML
soup = BeautifulSoup(input_string, "html.parser")

# Find elements with lang attribute
lang_elements = soup.find_all(True, {"lang": True})

if lang_elements:
# Process elements with language tags
for element in lang_elements:
lang = element.get("lang", default_language).lower()
text = element.get_text(strip=True)
if text:
result[lang] = text

# If no text was extracted but elements exist, try getting default text
if not result:
text = soup.get_text(strip=True)
if text:
result[default_language] = text
else:
# No language tags found, use default language
text = soup.get_text(strip=True)
if text:
result[default_language] = text

return result if result else None

except Exception as e:
print(f"Error parsing HTML: {str(e)}, trying plain text")
# Try to return plain text if HTML parsing fails
try:
if isinstance(input_string, str) and input_string.strip():
return {default_language: input_string.strip()}
except:
raise ValueError(f"Invalid input for HTML parsing: {input_string}")


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def get_value_type(validation_type):
"""
Determine the XSD value type based on REDCap validation type
Args:
validation_type (str): Validation type from REDCap
Returns:
str: XSD value type for ReproSchema
"""
if validation_type is None:
return "xsd:string"
# Handle date and time formats with pattern matching
if validation_type.startswith("date_"):
return "xsd:date"
elif validation_type.startswith("datetime_"):
return "xsd:dateTime"
elif validation_type.startswith("time"):
return "xsd:time"
# For other types, use the mapping
return VALUE_TYPE_MAP.get(validation_type, "xsd:string")

def create_activity_schema(
activity_name: str,
activity_data: Dict[str, Any],
output_path: Path,
redcap_version: str,
contextfile_url: str = CONTEXTFILE_URL,
):
json_ld = {
"category": "reproschema:Activity",
"id": f"{activity_name}_schema",
"prefLabel": {"en": activity_name},
"schemaVersion": get_context_version(contextfile_url),
"version": redcap_version,
"ui": {
"order": activity_data[
"order"
], # TODO spr czy to jest "clean order" i "clean bl list"?
"addProperties": activity_data["addProperties"],
"shuffle": False,
},
}

if activity_data["compute"]:
json_ld["compute"] = activity_data["compute"]
if activity_data.get("preamble"):
json_ld["preamble"] = activity_data["preamble"]
act = Activity(**json_ld)
path = output_path / "activities" / activity_name
path.mkdir(parents=True, exist_ok=True)
write_obj_jsonld(
act,
path / f"{activity_name}_schema",
contextfile_url=contextfile_url,
)

items_path = path / "items"
items_path.mkdir(parents=True, exist_ok=True)

for item in activity_data["items"]:
item_path = items_path / item["id"]
item_path.parent.mkdir(parents=True, exist_ok=True)
write_obj_jsonld(
Item(**item), item_path, contextfile_url=CONTEXTFILE_URL
)
print(f"{activity_name} Instrument schema created")


def create_protocol_schema(
protocol_data: Dict[str, Any],
activities: List[str],
output_path: Path,
contextfile_url: str = CONTEXTFILE_URL,
):
protocol_name = protocol_data["protocol_name"].strip().replace(" ", "_")
protocol_schema = {
"category": "reproschema:Protocol",
"id": f"{protocol_name}_schema",
"prefLabel": {"en": protocol_data["protocol_display_name"]},
"description": {"en": protocol_data.get("protocol_description", "")},
"schemaVersion": get_context_version(contextfile_url),
"version": protocol_data["redcap_version"],
"ui": {
"addProperties": [
{
"isAbout": f"../activities/{activity}/{activity}_schema",
"variableName": f"{activity}_schema",
"prefLabel": {"en": activity.replace("_", " ").title()},
"isVis": True,
}
for activity in activities
],
"order": [
f"../activities/{activity}/{activity}_schema"
for activity in activities
],
"shuffle": False,
},
}

protocol_dir = output_path / protocol_name
protocol_dir.mkdir(parents=True, exist_ok=True)
write_obj_jsonld(
Protocol(**protocol_schema),
protocol_dir / f"{protocol_name}_schema",
contextfile_url=contextfile_url,
)
print(f"Protocol schema created in {protocol_dir}")
Loading