diff --git a/reproschema/redcap2reproschema.py b/reproschema/redcap2reproschema.py
index a4d8c21..8043a25 100644
--- a/reproschema/redcap2reproschema.py
+++ b/reproschema/redcap2reproschema.py
@@ -2,6 +2,7 @@
import re
from pathlib import Path
+import numpy as np
import pandas as pd
import yaml
from bs4 import BeautifulSoup
@@ -79,93 +80,78 @@
def clean_dict_nans(obj):
- """
- Recursively remove NaN values from nested dictionaries and lists.
- Returns None if the cleaned object would be empty.
- """
- if isinstance(obj, dict):
- cleaned = {}
- for key, value in obj.items():
- cleaned_value = clean_dict_nans(value)
- if cleaned_value is not None:
- cleaned[key] = cleaned_value
- return cleaned if cleaned else None
-
- elif isinstance(obj, list):
- cleaned = [clean_dict_nans(item) for item in obj]
- cleaned = [item for item in cleaned if item is not None]
- return cleaned if cleaned else None
-
- elif pd.isna(obj):
- return None
-
- return obj
+ """Remove NaN values from a dictionary."""
+ if not isinstance(obj, dict):
+ return obj
+ return {k: v for k, v in obj.items() if pd.notna(v)}
# TODO: normalized condition should depend on the field type, e.g., for SQL
def normalize_condition(condition_str, field_type=None):
- # Regular expressions for various pattern replacements
- # TODO: function doesn't remove tags
+ """Normalize condition strings with specific handling for calc fields."""
+ if condition_str is None or pd.isna(condition_str):
+ return None
- try:
- # Handle boolean values
- if isinstance(condition_str, bool):
- return condition_str
- elif (
- isinstance(condition_str, str) and condition_str.lower() == "true"
- ):
+ # Handle boolean values
+ if isinstance(condition_str, bool):
+ return condition_str
+ if isinstance(condition_str, str):
+ if condition_str.lower() == "true":
return True
- elif (
- isinstance(condition_str, str) and condition_str.lower() == "false"
- ):
+ if condition_str.lower() == "false":
return False
- # Handle empty/null values
- if condition_str is None or pd.isna(condition_str):
+ # Convert to string if needed
+ if not isinstance(condition_str, str):
+ try:
+ condition_str = str(condition_str)
+ except:
return None
- # Convert non-string types to string
- if not isinstance(condition_str, str):
- try:
- condition_str = str(condition_str)
- except:
- return None
-
- # Remove HTML tags if present
- soup = BeautifulSoup(condition_str, "html.parser")
- condition_str = soup.get_text()
-
- # Define regex patterns
- patterns = {
- "parentheses": (r"\(([0-9]*)\)", r"___\1"),
- "non_gt_lt_equal": (r"([^>|<])=", r"\1 =="),
- "brackets": (r"\[([^\]]*)\]", r" \1 "),
- "or_operator": (r"\bor\b", "||"),
- "and_operator": (r"\band\b", "&&"),
- "extra_spaces": (r"\s+", " "),
- "double_quotes": (r'"', "'"),
- }
-
- # Apply transformations
- for pattern, replacement in patterns.items():
- if isinstance(replacement, tuple):
- condition_str = re.sub(
- replacement[0], replacement[1], condition_str
- )
- else:
- condition_str = re.sub(pattern, replacement, condition_str)
-
- # Handle SQL and calc type conditions differently if specified
- if field_type in ["sql", "calc"]:
- # Add specific handling for SQL/calc expressions if needed
- pass
+ try:
- # Validate the final condition
+ # Clean HTML
+ condition_str = BeautifulSoup(condition_str, "html.parser").get_text()
condition_str = condition_str.strip()
+
if not condition_str:
return None
- return condition_str
+ # Common operator normalizations for all types
+ operator_replacements = [
+ (r"\s*\+\s*", " + "), # Normalize spacing around +
+ (r"\s*-\s*", " - "), # Normalize spacing around -
+ (r"\s*\*\s*", " * "), # Normalize spacing around *
+ (r"\s*\/\s*", " / "), # Normalize spacing around /
+ (r"\s*\(\s*", "("), # Remove spaces after opening parenthesis
+ (r"\s*\)\s*", ")"), # Remove spaces before closing parenthesis
+ (r"\s*,\s*", ","), # Normalize spaces around commas
+ (r"\s+", " "), # Normalize multiple spaces
+ ]
+
+ # Apply operator normalizations first
+ for pattern, repl in operator_replacements:
+ condition_str = re.sub(pattern, repl, condition_str)
+
+ # Then apply type-specific replacements
+ if field_type in ["sql", "calc"]:
+ # For calc fields, just remove brackets from field references
+ condition_str = re.sub(r"\[([^\]]+)\]", r"\1", condition_str)
+ else:
+ # For branching logic
+ replacements = [
+ (r"\(([0-9]*)\)", r"___\1"),
+ (r"([^>|<])=", r"\1=="),
+ (r"\[([^\]]*)\]", r"\1"), # Remove brackets and extra spaces
+ (r"\bor\b", "||"),
+ (r"\band\b", "&&"),
+ (r'"', "'"),
+ ]
+ for pattern, repl in replacements:
+ condition_str = re.sub(pattern, repl, condition_str)
+
+ result = condition_str.strip()
+ return result
except Exception as e:
print(f"Error normalizing condition: {str(e)}")
@@ -174,79 +160,80 @@ def normalize_condition(condition_str, field_type=None):
def process_field_properties(data):
"""
- Extract and process field properties from REDCap data.
+ Process field properties from REDCap data dictionary to create a property object.
+
+ This function extracts and processes field properties from a REDCap data dictionary row,
+ handling variable names, visibility conditions, field annotations, required fields,
+ and matrix group information.
Args:
- data (dict): Dictionary containing field data from REDCap
+ data (dict): A dictionary containing field data from the REDCap data dictionary.
+ Expected keys include:
+ - "Variable / Field Name": The field's variable name
+ - "Branching Logic (Show field only if...)": Conditional display logic
+ - "Field Annotation": Special field annotations (e.g., @READONLY, @HIDDEN)
+ - "Required Field?": Whether the field is required
+ - "Matrix Group Name": Matrix group identifier
+ - "Matrix Ranking?": Matrix ranking information
Returns:
- dict: Processed field properties
- """
- try:
- # Validate input
- if not isinstance(data, dict):
- raise ValueError("Input must be a dictionary")
-
- var_name = data.get("Variable / Field Name")
- if not var_name or pd.isna(var_name):
- raise ValueError("Variable / Field Name is required")
-
- # Initialize properties object
- prop_obj = {
- "variableName": str(var_name).strip(),
- "isAbout": f"items/{str(var_name).strip()}",
- "isVis": True, # Default value
- }
-
- # Process branching logic
- condition = data.get("Branching Logic (Show field only if...)")
- if pd.notna(condition):
- normalized_condition = normalize_condition(condition)
- if normalized_condition:
- prop_obj["isVis"] = normalized_condition
-
- # Process field annotation
- annotation = data.get("Field Annotation")
- if pd.notna(annotation):
- annotation = str(annotation).upper()
- if any(
- marker in annotation
- for marker in ["@READONLY", "@HIDDEN", "@CALCTEXT"]
- ):
- prop_obj["isVis"] = False
-
- # Process required field
- required_field = data.get("Required Field?")
- if pd.notna(required_field):
- required_field = str(required_field).strip().lower()
- if required_field == "y":
- prop_obj["valueRequired"] = True
- elif required_field not in ["", "n"]:
- print(
- f"Warning: Unexpected Required Field value '{required_field}' for {var_name}"
- )
+ dict: A property object containing processed field information with the following structure:
+ {
+ "variableName": str, # The field's variable name
+ "isAbout": str, # Reference to the item (e.g., "items/variable_name")
+ "isVis": str/bool, # Visibility condition or False if hidden
+ "valueRequired": bool, # Optional, present if field is required
+ "matrixGroupName": str,# Optional, present if field is part of a matrix
+ "matrixRanking": bool # Optional, present if matrix has ranking
+ }
- # Process matrix properties if present
- matrix_group = data.get("Matrix Group Name")
- matrix_ranking = data.get("Matrix Ranking?")
+ Examples:
+ >>> data = {
+ ... "Variable / Field Name": "age",
+ ... "Required Field?": "y",
+ ... "Branching Logic (Show field only if...)": "[gender] = '1'"
+ ... }
+ >>> process_field_properties(data)
+ {'variableName': 'age', 'isAbout': 'items/age', 'valueRequired': True, 'isVis': "gender == '1'"}
+ """
+ if not isinstance(data, dict):
+ return {"variableName": "unknown", "isAbout": "items/unknown"}
+
+ var_name = str(data.get("Variable / Field Name", "unknown")).strip()
+ prop_obj = {"variableName": var_name, "isAbout": f"items/{var_name}"}
+
+ # Handle required field consistently
+ if data.get("Required Field?", "").strip().lower() == "y":
+ prop_obj["valueRequired"] = True
+
+ # Set isVis only when needed
+ condition = data.get("Branching Logic (Show field only if...)")
+ if pd.notna(condition):
+ normalized = normalize_condition(condition)
+ if normalized:
+ prop_obj["isVis"] = normalized
+
+ # Handle field annotations that affect visibility
+ annotation = data.get("Field Annotation", "").strip().upper()
+ if annotation:
+ if (
+ "@HIDDEN" in annotation
+ or "@READONLY" in annotation
+ or "@CALCTEXT" in annotation
+ ):
+ prop_obj["isVis"] = False
- if pd.notna(matrix_group):
- prop_obj["matrixGroupName"] = str(matrix_group).strip()
- if pd.notna(matrix_ranking):
- prop_obj["matrixRanking"] = matrix_ranking
+ field_type = data.get("Field Type", "").strip().lower()
+ if field_type in ["calc", "sql"]:
+ prop_obj["isVis"] = False
- return prop_obj
+ matrix_group = data.get("Matrix Group Name")
+ if pd.notna(matrix_group):
+ prop_obj["matrixGroupName"] = str(matrix_group).strip()
+ if pd.notna(data.get("Matrix Ranking?")):
+ prop_obj["matrixRanking"] = data["Matrix Ranking?"]
- except Exception as e:
- print(
- f"Error processing field properties for {data.get('Variable / Field Name', 'unknown field')}: {str(e)}"
- )
- # Return basic properties to allow processing to continue
- return {
- "variableName": str(data.get("Variable / Field Name", "unknown")),
- "isAbout": f"items/{str(data.get('Variable / Field Name', 'unknown'))}",
- "isVis": True,
- }
+ return prop_obj
def parse_field_type_and_value(field):
@@ -379,34 +366,48 @@ def process_choices(choices_str, field_name):
continue
# Determine value type and convert value
- if value_part == "0":
- value = 0
- value_type = "xsd:integer"
- elif value_part.isdigit() and value_part[0] == "0":
- value = value_part
- value_type = "xsd:string"
- else:
- try:
- value = int(value_part)
+ try:
+ # First try integer conversion
+ if value_part == "0":
+ value = 0
value_type = "xsd:integer"
- except ValueError:
+ elif value_part.isdigit() and value_part[0] == "0":
+ value = value_part
+ value_type = "xsd:string"
+ else:
try:
- value = float(value_part)
- value_type = "xsd:decimal"
+ value = int(value_part)
+ value_type = "xsd:integer"
except ValueError:
- value = value_part
- value_type = "xsd:string"
+ try:
+ value = float(value_part)
+ value_type = "xsd:decimal"
+ except ValueError:
+ value = value_part
+ value_type = "xsd:string"
+
+ choices_value_type.add(value_type)
+
+ # Create choice object
+ parsed_label = parse_html(label_part)
+ choice_obj = {
+ "name": (
+ parsed_label if parsed_label else {"en": label_part}
+ ),
+ "value": value,
+ }
+ choices.append(choice_obj)
- choices_value_type.add(value_type)
+ except (ValueError, TypeError) as e:
+ print(
+ f"Warning: Error processing choice '{choice}' in {field_name}: {str(e)}"
+ )
+ continue
- # Create choice object
- choice_obj = {
- "name": parse_html(label_part) or {"en": label_part},
- "value": value,
- }
- choices.append(choice_obj)
+ if not choices:
+ return None, None
- return (choices, list(choices_value_type)) if choices else (None, None)
+ return choices, list(choices_value_type)
except Exception as e:
print(f"Error processing choices for {field_name}: {str(e)}")
@@ -452,18 +453,18 @@ def parse_html(input_string, default_language="en"):
# Process elements with language tags
for element in lang_elements:
lang = element.get("lang", default_language).lower()
- text = element.get_text(strip=True)
+ text = element.get_text(strip=False)
if text:
result[lang] = text
# If no text was extracted but elements exist, try getting default text
if not result:
- text = soup.get_text(strip=True)
+ text = soup.get_text(strip=False)
if text:
result[default_language] = text
else:
# No language tags found, use default language
- text = soup.get_text(strip=True)
+ text = soup.get_text(strip=False)
if text:
result[default_language] = text
@@ -657,34 +658,48 @@ def create_form_schema(
"""
try:
# Validate inputs
- if not form_name or pd.isna(form_name):
+ if (
+ pd.isna(form_name).any()
+ if isinstance(form_name, pd.Series)
+ else pd.isna(form_name)
+ ):
raise ValueError("Form name is required")
- if not activity_display_name or pd.isna(activity_display_name):
- activity_display_name = form_name.replace("_", " ").title()
+ # Set default activity display name if not provided
+ if (
+ pd.isna(activity_display_name).any()
+ if isinstance(activity_display_name, pd.Series)
+ else pd.isna(activity_display_name)
+ ):
+ activity_display_name = str(form_name).replace("_", " ").title()
# Clean and validate order list
clean_order = []
- if order:
- clean_order = [
- str(item).strip() for item in order if pd.notna(item)
- ]
- clean_order = list(
- dict.fromkeys(clean_order)
- ) # Remove duplicates while preserving order
+ if order is not None:
+ if isinstance(order, (list, pd.Series, np.ndarray)):
+ clean_order = [
+ str(item).strip()
+ for item in order
+ if not (isinstance(item, pd.Series) and item.isna().any())
+ and not pd.isna(item)
+ ]
+ clean_order = list(dict.fromkeys(clean_order))
# Clean and validate bl_list
clean_bl_list = []
- if bl_list:
- clean_bl_list = [
- prop for prop in bl_list if prop and isinstance(prop, dict)
- ]
+ if bl_list is not None:
+ if isinstance(bl_list, (list, pd.Series, np.ndarray)):
+ clean_bl_list = [
+ prop
+ for prop in bl_list
+ if prop is not None and isinstance(prop, dict)
+ ]
# Initialize schema
json_ld = {
"category": "reproschema:Activity",
"id": f"{form_name}_schema",
- "prefLabel": {"en": activity_display_name},
+ "prefLabel": {"en": str(activity_display_name)},
"schemaVersion": get_context_version(schema_context_url),
"version": redcap_version,
"ui": {
@@ -695,37 +710,30 @@ def create_form_schema(
}
# Process preamble if present
- if preamble is not None and pd.notna(preamble):
- parsed_preamble = parse_html(preamble)
- if parsed_preamble:
- json_ld["preamble"] = parsed_preamble
-
- # Process compute list
- if compute_list:
- valid_compute = []
- for comp in compute_list:
- if isinstance(comp, dict) and comp.get("jsExpression"):
- valid_compute.append(comp)
- if valid_compute:
- json_ld["compute"] = valid_compute
-
- # Process matrix list if needed
- if matrix_list:
- valid_matrix = []
- for matrix in matrix_list:
- if isinstance(matrix, dict) and matrix.get("matrixGroupName"):
- valid_matrix.append(matrix)
- if valid_matrix:
- json_ld["matrixInfo"] = valid_matrix
-
- # Clean any remaining NaN values
- cleaned_json_ld = clean_dict_nans(json_ld)
- if not cleaned_json_ld:
- raise ValueError(f"All data was NaN for form {form_name}")
+ if preamble is not None:
+ if isinstance(preamble, pd.Series):
+ if not preamble.isna().all():
+ parsed_preamble = parse_html(
+ preamble.iloc[0] if len(preamble) > 0 else None
+ )
+ if parsed_preamble:
+ json_ld["preamble"] = parsed_preamble
+ elif not pd.isna(preamble):
+ parsed_preamble = parse_html(preamble)
+ if parsed_preamble:
+ json_ld["preamble"] = parsed_preamble
+
+ # Process matrix info if present
+ if matrix_list and len(matrix_list) > 0:
+ json_ld["matrixInfo"] = matrix_list
+
+ # Process compute list if present
+ if compute_list and len(compute_list) > 0:
+ json_ld["compute"] = compute_list
# Create Activity object and write to file
- act = Activity(**cleaned_json_ld)
- path = Path(abs_folder_path) / "activities" / form_name
+ act = Activity(**json_ld)
+ path = Path(abs_folder_path) / "activities" / str(form_name)
path.mkdir(parents=True, exist_ok=True)
write_obj_jsonld(
@@ -807,17 +815,11 @@ def process_csv(csv_file, abs_folder_path, protocol_name):
# TODO: add languages
try:
- # Read CSV with explicit BOM handling, and maintain original order
- df = pd.read_csv(
- csv_file, encoding="utf-8-sig"
- ) # utf-8-sig handles BOM automatically
-
- # Clean column names (headers)
+ df = pd.read_csv(csv_file, encoding="utf-8-sig")
df.columns = df.columns.map(
lambda x: x.strip().strip('"').lstrip("\ufeff")
)
- # Validate required columns
required_columns = ["Form Name", "Variable / Field Name", "Field Type"]
missing_columns = [
col for col in required_columns if col not in df.columns
@@ -828,15 +830,15 @@ def process_csv(csv_file, abs_folder_path, protocol_name):
)
# Initialize structures for each unique form
- unique_forms = [f for f in df["Form Name"].unique() if not pd.isna(f)]
+ unique_forms = df["Form Name"].dropna().unique()
if len(unique_forms) == 0:
raise ValueError("No valid form names found in the CSV")
for form_name in unique_forms:
- if pd.isna(form_name) or not str(form_name).strip():
+ form_name = str(form_name).strip()
+ if not form_name:
continue
- form_name = str(form_name).strip()
datas[form_name] = []
order[form_name] = []
compute[form_name] = []
@@ -851,88 +853,82 @@ def process_csv(csv_file, abs_folder_path, protocol_name):
# languages = parse_language_iso_codes(row["Field Label"])
for idx, row in df.iterrows():
- try:
- form_name = row["Form Name"]
- field_name = row["Variable / Field Name"]
+ form_name = row["Form Name"]
+ field_name = row["Variable / Field Name"]
- # Skip rows with missing essential data
- if pd.isna(form_name) or pd.isna(field_name):
- print(
- f"Warning: Skipping row {idx+2} with missing form name or field name"
- )
- continue
+ # Skip rows with missing essential data
+ if pd.isna(form_name) or pd.isna(field_name):
+ print(
+ f"Warning: Skipping row {idx+2} with missing form name or field name"
+ )
+ continue
- form_name = str(form_name).strip()
- field_name = str(field_name).strip()
+ form_name = str(form_name).strip()
+ field_name = str(field_name).strip()
- # Convert row to dict and clean NaN values
- row_dict = clean_dict_nans(row.to_dict())
- if not row_dict:
- print(f"Warning: Skipping empty row {idx+2}")
- continue
+ # Convert row to dict and clean NaN values
+ row_dict = {k: v for k, v in row.to_dict().items() if pd.notna(v)}
+ if not row_dict:
+ print(f"Warning: Skipping empty row {idx+2}")
+ continue
- datas[form_name].append(row_dict)
+ datas[form_name].append(row_dict)
+ field_path = f"items/{field_name}"
- # Handle compute fields
- field_type = row.get("Field Type", "")
- field_annotation = row.get("Field Annotation", "")
+ field_type = row_dict.get("Field Type", "").strip().lower()
+ field_annotation = row_dict.get("Field Annotation", "")
- if (
- pd.notna(field_type)
- and str(field_type).strip() in COMPUTE_LIST
- ):
- calculations = row.get(
- "Choices, Calculations, OR Slider Labels"
+ # Handle compute fields
+ is_compute = False
+
+ # Case 1: Field is calc type
+ if field_type in COMPUTE_LIST:
+ calc_value = row_dict.get(
+ "Choices, Calculations, OR Slider Labels", ""
+ )
+ if calc_value and str(calc_value).strip():
+ compute_expression = normalize_condition(
+ calc_value, field_type=field_type
)
- if pd.notna(calculations):
- condition = normalize_condition(calculations)
- if condition:
- compute[form_name].append(
- {
- "variableName": field_name,
- "jsExpression": condition,
- }
- )
- elif pd.notna(field_annotation):
- field_annotation = str(field_annotation).upper()
- if "@CALCTEXT" in field_annotation:
- match = re.search(
- r"@CALCTEXT\((.*)\)", field_annotation
+ if compute_expression:
+ is_compute = True
+ compute[form_name].append(
+ {
+ "variableName": field_name,
+ "jsExpression": compute_expression,
+ }
+ )
+ else:
+ print(
+ f"Warning: Could not normalize calc expression for {field_name}: {calc_value}"
)
- if match:
- js_expression = normalize_condition(match.group(1))
- if js_expression:
- compute[form_name].append(
- {
- "variableName": field_name,
- "jsExpression": js_expression,
- }
- )
- else:
- order[form_name].append(f"items/{field_name}")
-
- except Exception as e:
- print(f"Warning: Error processing row {idx+2}: {str(e)}")
- continue
- for form_name in datas:
- if not datas[form_name]:
- print(f"Warning: Form '{form_name}' has no valid fields")
- if not order[form_name] and not compute[form_name]:
- print(
- f"Warning: Form '{form_name}' has no order or compute fields"
- )
+ # Case 2: Field has @CALCTEXT
+ elif (
+ field_annotation
+ and "@CALCTEXT" in str(field_annotation).upper()
+ ):
+ match = re.search(r"@CALCTEXT\((.*)\)", field_annotation)
+ if match:
+ compute_expression = normalize_condition(match.group(1))
+ if compute_expression:
+ is_compute = True
+ compute[form_name].append(
+ {
+ "variableName": field_name,
+ "jsExpression": compute_expression,
+ }
+ )
- # Create protocol directory
- protocol_dir = Path(abs_folder_path) / protocol_name
- protocol_dir.mkdir(parents=True, exist_ok=True)
+ # Add to order list only if not a compute field
+ if not is_compute:
+ order[form_name].append(field_path)
return datas, order, compute
- except pd.errors.EmptyDataError:
- raise ValueError("The CSV file is empty")
except Exception as e:
- raise Exception(f"Error processing CSV file: {str(e)}")
+ print(f"Error processing CSV: {str(e)}")
+ raise
# todo adding output path
diff --git a/reproschema/reproschema2redcap.py b/reproschema/reproschema2redcap.py
index 4f00f07..6f604c4 100644
--- a/reproschema/reproschema2redcap.py
+++ b/reproschema/reproschema2redcap.py
@@ -1,4 +1,5 @@
import csv
+import logging
from pathlib import Path
import requests
@@ -8,6 +9,8 @@
from .models import Activity, Item, Protocol, ResponseOption
from .utils import start_server, stop_server
+logger = logging.getLogger(__name__)
+
def fetch_choices_from_url(url):
try:
@@ -37,6 +40,17 @@ def fetch_choices_from_url(url):
def find_Ftype_and_colH(item, row_data, response_options):
+ """
+ Determine field type and column H value.
+
+ Args:
+ item: Item object containing UI information
+ row_data: Dictionary to store field data
+ response_options: Response options object
+
+ Returns:
+ dict: Updated row_data with field type and validation info
+ """
# Extract the input type from the item_json
f_type = item.ui.inputType
col_h = ""
@@ -58,16 +72,17 @@ def find_Ftype_and_colH(item, row_data, response_options):
f_type = "text"
col_h = "date_mdy"
elif f_type == "select":
- multiple_choice = response_options.multipleChoice
- print("mult", multiple_choice)
+ multiple_choice = getattr(response_options, "multipleChoice", False)
+ logger.debug(
+ f"Multiple choice setting for {item.id}: {multiple_choice}"
+ )
f_type = "checkbox" if multiple_choice else "dropdown"
elif f_type == "radio":
- if response_options.multipleChoice:
+ if getattr(response_options, "multipleChoice", False):
f_type = "checkbox"
- elif f_type.startswith("select"): # TODO: this should be reviewed
- # Adjusting for selectCountry, selectLanguage, selectState types
+ elif f_type.startswith("select"):
f_type = "radio"
- choices_url = response_options.choices
+ choices_url = getattr(response_options, "choices", None)
if choices_url and isinstance(choices_url, str):
choices_data = fetch_choices_from_url(choices_url)
if choices_data:
@@ -78,7 +93,6 @@ def find_Ftype_and_colH(item, row_data, response_options):
f_type = "text"
row_data["field_type"] = f_type.lower()
-
if col_h:
row_data["val_type_OR_slider"] = col_h.lower()
@@ -139,39 +153,71 @@ def process_item(
choices = response_options.choices
if choices and not isinstance(choices, str):
if isinstance(choices, list):
- item_choices = [
- f"{ch.value}, {ch.name.get('en', '')}"
- for ch in choices
- if ch.value is not None
- ]
+ # Handle the case where choices is a list
+ item_choices = []
+ for ch in choices:
+ if hasattr(ch, "value") and ch.value is not None:
+ name = (
+ ch.name.get("en", "")
+ if hasattr(ch, "name")
+ else ""
+ )
+ item_choices.append(f"{ch.value}, {name}")
if item_choices:
row_data["choices"] = " | ".join(item_choices)
# Add valueRequired if explicitly True
if (
item_properties
- and "valueRequired" in item_properties
- and item_properties["valueRequired"] is True
+ and isinstance(item_properties, dict) # Ensure it's a dictionary
+ and item_properties.get("valueRequired") is True
):
row_data["required"] = "y"
var_name = str(item.id).split("/")[-1] # Get the last part of the id path
+
+ # Handle compute items
+ if compute_item and compute_expr:
+ logger.debug(f"Processing compute item: {var_name}")
+ logger.debug(f"Compute expression: {compute_expr}")
+ row_data["choices"] = compute_expr
+ row_data["field_type"] = "calc"
+ # For computed fields, we may need to set visibility to false by default
+ if any(score_type in var_name for score_type in ["_score", "_total"]):
+ row_data["isVis_logic"] = False
+ else:
+ # Use find_Ftype_and_colH but only add non-empty values
+ field_info = find_Ftype_and_colH(item, {}, response_options)
+ if field_info.get("field_type"):
+ row_data["field_type"] = field_info["field_type"]
+ if field_info.get("val_type_OR_slider"):
+ row_data["val_type_OR_slider"] = field_info["val_type_OR_slider"]
+
+ # Handle visibility
if var_name.endswith("_total_score"):
- row_data["isVis_logic"] = False # This will make the field hidden
- # Regular isVis handling for other fields
- elif "isVis" in item_properties and item_properties["isVis"] is not True:
+ row_data["isVis_logic"] = False
+ elif (
+ item_properties
+ and isinstance(item_properties, dict) # Ensure it's a dictionary
+ and "isVis" in item_properties
+ and item_properties["isVis"] is not True
+ ):
row_data["isVis_logic"] = item_properties["isVis"]
# Handle description
if (
- item.description
- and "en" in item.description
- and item.description["en"]
+ hasattr(item, "description")
+ and isinstance(item.description, dict)
+ and item.description.get("en")
):
row_data["field_notes"] = item.description["en"]
# Handle preamble
- if item.preamble and "en" in item.preamble and item.preamble["en"]:
+ if (
+ hasattr(item, "preamble")
+ and isinstance(item.preamble, dict)
+ and item.preamble.get("en")
+ ):
row_data["preamble"] = item.preamble["en"]
elif activity_preamble:
row_data["preamble"] = activity_preamble
@@ -180,44 +226,23 @@ def process_item(
if compute_item:
question = item.description
else:
- question = item.question
+ question = item.question if hasattr(item, "question") else None
- if isinstance(question, dict) and "en" in question and question["en"]:
+ if isinstance(question, dict) and question.get("en"):
row_data["field_label"] = question["en"]
elif isinstance(question, str) and question:
row_data["field_label"] = question
- # Handle compute items
- if compute_item and compute_expr:
- print(f"\nDebug - Compute Item: {var_name}")
- print(f"Compute Expression: {compute_expr}")
- row_data["choices"] = compute_expr
- row_data["field_type"] = "calc"
- # For computed fields, we may need to set visibility to false by default
- if any(score_type in var_name for score_type in ["_score", "_total"]):
- row_data["isVis_logic"] = False
- else:
- # Use find_Ftype_and_colH but only add non-empty values
- field_info = find_Ftype_and_colH(item, {}, response_options)
- if field_info.get("field_type"):
- row_data["field_type"] = field_info["field_type"]
- if field_info.get("val_type_OR_slider"):
- row_data["val_type_OR_slider"] = field_info["val_type_OR_slider"]
-
return row_data
def get_csv_data(dir_path, contextfile, http_kwargs):
csv_data = []
- # Iterate over directories in dir_path
for protocol_dir in dir_path.iterdir():
if protocol_dir.is_dir():
- # Check for a _schema file in each directory
schema_file = next(protocol_dir.glob("*_schema"), None)
- print(f"Found schema file: {schema_file}")
if schema_file:
- # Process the found _schema file
parsed_protocol_json = load_file(
schema_file,
started=True,
@@ -234,6 +259,7 @@ def get_csv_data(dir_path, contextfile, http_kwargs):
for activity_path in activity_order:
if not _is_url(activity_path):
activity_path = protocol_dir / activity_path
+
parsed_activity_json = load_file(
activity_path,
started=True,
@@ -244,110 +270,78 @@ def get_csv_data(dir_path, contextfile, http_kwargs):
)
del parsed_activity_json["@context"]
act = Activity(**parsed_activity_json)
- items_properties = {
- el["variableName"]: el
- for el in parsed_activity_json["ui"]["addProperties"]
- }
- # Get activity name without adding extra _schema
+ # Get activity name
activity_name = act.id.split("/")[-1]
if activity_name.endswith("_schema.jsonld"):
- activity_name = activity_name[
- :-12
- ] # Remove _schema.jsonld
+ activity_name = activity_name[:-12]
elif activity_name.endswith(".jsonld"):
- activity_name = activity_name[:-7] # Remove .jsonld
-
- items_properties.update(
- {
- el["isAbout"]: el
- for el in parsed_activity_json["ui"][
- "addProperties"
- ]
+ activity_name = activity_name[:-7]
+
+ # Create a map of computed items
+ compute_map = {}
+ if hasattr(act, "compute"):
+ compute_map = {
+ comp.variableName: comp.jsExpression
+ for comp in act.compute
}
- )
- if parsed_activity_json:
- item_order = [("ord", el) for el in act.ui.order]
- item_calc = [("calc", el) for el in act.compute]
+ # Process each item defined in addProperties
+ for item_def in parsed_activity_json["ui"][
+ "addProperties"
+ ]:
+ item_path = item_def["isAbout"]
+ var_name = item_def["variableName"]
+
+ # Get the item file path
+ if not _is_url(item_path):
+ full_item_path = (
+ Path(activity_path).parent / item_path
+ )
+ else:
+ full_item_path = item_path
+
+ try:
+ item_json = load_file(
+ full_item_path,
+ started=True,
+ http_kwargs=http_kwargs,
+ fixoldschema=True,
+ compact=True,
+ compact_context=contextfile,
+ )
+ item_json.pop("@context", "")
+ item = Item(**item_json)
- computed_fields = {
- calc_item.variableName
- for _, calc_item in item_calc
- }
+ activity_preamble = (
+ act.preamble.get("en", "").strip()
+ if hasattr(act, "preamble")
+ else ""
+ )
+
+ # Check if this is a computed item
+ compute_expr = compute_map.get(var_name)
+ is_computed = compute_expr is not None
+
+ row_data = process_item(
+ item,
+ item_def,
+ activity_name,
+ activity_preamble,
+ contextfile,
+ http_kwargs,
+ is_computed,
+ compute_expr,
+ )
+ csv_data.append(row_data)
+
+ except Exception as e:
+ print(
+ f"Error processing item {item_path} for activity {activity_name}"
+ )
+ print(f"Error details: {str(e)}")
+ continue
- for tp, item in item_order + item_calc:
- try:
- if tp == "calc":
- js_expr = item.jsExpression
- var_name = item.variableName
-
- # Find the corresponding item properties
- if var_name in items_properties:
- item = items_properties[var_name][
- "isAbout"
- ]
- # Ensure computed fields are marked as hidden
- items_properties[var_name][
- "isVis"
- ] = False
- else:
- print(
- f"WARNING: no item properties found for computed field {var_name} in {activity_name}"
- )
- continue
- item_calc = True
- else:
- item_calc = False
- js_expr = None
- it_prop = items_properties.get(item)
- if not _is_url(item):
- item = Path(activity_path).parent / item
-
- try:
- item_json = load_file(
- item,
- started=True,
- http_kwargs=http_kwargs,
- fixoldschema=True,
- compact=True,
- compact_context=contextfile,
- )
- item_json.pop("@context", "")
- itm = Item(**item_json)
- except Exception as e:
- print(f"Error loading item: {item}")
- print(f"Error details: {str(e)}")
- continue
-
- activity_name = act.id.split("/")[-1].split(
- "."
- )[0]
- activity_preamble = (
- act.preamble.get("en", "").strip()
- if hasattr(act, "preamble")
- else ""
- )
-
- row_data = process_item(
- itm,
- it_prop,
- activity_name,
- activity_preamble,
- contextfile,
- http_kwargs,
- item_calc,
- js_expr,
- )
- csv_data.append(row_data)
-
- except Exception as e:
- print(
- f"Error processing item {item}: {str(e)}"
- )
- continue
- # Break after finding the first _schema file
- break
return csv_data
diff --git a/reproschema/tests/data_test_nimh-minimal/nimh_minimal/nimh_minimal/nimh_minimal_schema b/reproschema/tests/data_test_nimh-minimal/nimh_minimal/nimh_minimal/nimh_minimal_schema
index 448891d..f52c800 100644
--- a/reproschema/tests/data_test_nimh-minimal/nimh_minimal/nimh_minimal/nimh_minimal_schema
+++ b/reproschema/tests/data_test_nimh-minimal/nimh_minimal/nimh_minimal/nimh_minimal_schema
@@ -2,7 +2,7 @@
"@context": [
"https://raw.githubusercontent.com/ReproNim/reproschema/1.0.0-rc4/contexts/generic",
{
- "activity_path": "https://raw.githubusercontent.com/ReproNim/reproschema-library/a23a13875c7262c0bd0d77bd90c1ec296c6d1116/activities/"
+ "activity_path": "https://raw.githubusercontent.com/ReproNim/reproschema-library/main/activities/"
}
],
"@type": "reproschema:Protocol",
diff --git a/reproschema/tests/test_process_csv.py b/reproschema/tests/test_process_csv.py
new file mode 100644
index 0000000..f223e6b
--- /dev/null
+++ b/reproschema/tests/test_process_csv.py
@@ -0,0 +1,71 @@
+import tempfile
+from pathlib import Path
+
+import pandas as pd
+import pytest
+
+from ..redcap2reproschema import normalize_condition, process_csv
+
+
+def test_process_csv():
+ csv_data = """Form Name,Variable / Field Name,Field Type,Field Annotation,"Choices, Calculations, OR Slider Labels"
+form1,field1,text,,
+form1,field2,calc,,[field1] + [field3]
+form1,field3,text,@CALCTEXT(3*3),
+form2,field4,text,,
+,field5,text,,"""
+
+ with tempfile.TemporaryDirectory() as tmpdir:
+ csv_path = Path(tmpdir) / "test.csv"
+ csv_path.write_text(csv_data)
+
+ datas, order, compute = process_csv(csv_path, tmpdir, "test_protocol")
+
+ assert set(datas.keys()) == {"form1", "form2"}
+ assert len(datas["form1"]) == 3
+ assert len(datas["form2"]) == 1
+
+ assert order["form1"] == [
+ "items/field1"
+ ] # both field2 and field3 go to compute
+ assert order["form2"] == ["items/field4"]
+
+ assert len(compute["form1"]) == 2
+ assert any(
+ item["variableName"] == "field2" for item in compute["form1"]
+ )
+ assert any(
+ item["variableName"] == "field3" for item in compute["form1"]
+ )
+
+
+def test_process_csv_missing_columns():
+ csv_data = "Column1,Column2\na,b"
+ with tempfile.TemporaryDirectory() as tmpdir:
+ csv_path = Path(tmpdir) / "test.csv"
+ csv_path.write_text(csv_data)
+
+ with pytest.raises(ValueError):
+ process_csv(csv_path, tmpdir, "test_protocol")
+
+
+def test_normalize_condition():
+ # Test calc expressions
+ assert (
+ normalize_condition("[field1] + [field2]", field_type="calc")
+ == "field1 + field2"
+ )
+ assert (
+ normalize_condition("[total]*100", field_type="calc") == "total * 100"
+ )
+ assert normalize_condition("2+2", field_type="calc") == "2 + 2"
+
+ # Test @CALCTEXT expressions
+ assert normalize_condition("3*3") == "3 * 3"
+
+ # Test branching logic
+ assert normalize_condition("[age] = 1") == "age == 1"
+ assert (
+ normalize_condition("[field1] = 1 or [field2] = 2")
+ == "field1 == 1 || field2 == 2"
+ )
diff --git a/reproschema/tests/test_redcap2reproschema.py b/reproschema/tests/test_redcap2reproschema.py
index ffbbe67..ff8d7b9 100644
--- a/reproschema/tests/test_redcap2reproschema.py
+++ b/reproschema/tests/test_redcap2reproschema.py
@@ -6,6 +6,7 @@
from click.testing import CliRunner
from ..cli import main
+from ..redcap2reproschema import process_field_properties
CSV_FILE_NAME = "redcap_dict.csv"
YAML_FILE_NAME = "redcap2rs.yaml"
@@ -51,3 +52,35 @@ def test_redcap2reproschema(tmpdir):
assert os.path.isdir(
protocol_name
), f"Expected output directory '{protocol_name}' does not exist"
+
+
+def test_process_field_properties_visibility():
+ # Test case 1: No branching logic or annotations
+ field_data = {"Variable / Field Name": "test_field"}
+ result = process_field_properties(field_data)
+ assert "isVis" not in result
+
+ # Test case 2: With branching logic
+ field_data = {
+ "Variable / Field Name": "test_field",
+ "Branching Logic (Show field only if...)": "[age] > 18",
+ }
+ result = process_field_properties(field_data)
+ assert result["isVis"] == "age > 18"
+
+ # Test case 3: With @HIDDEN annotation
+ field_data = {
+ "Variable / Field Name": "test_field",
+ "Field Annotation": "@HIDDEN",
+ }
+ result = process_field_properties(field_data)
+ assert result["isVis"] is False
+
+ # Test case 4: With both branching logic and @HIDDEN
+ field_data = {
+ "Variable / Field Name": "test_field",
+ "Branching Logic (Show field only if...)": "[age] > 18",
+ "Field Annotation": "@HIDDEN",
+ }
+ result = process_field_properties(field_data)
+ assert result["isVis"] is False
diff --git a/reproschema/tests/test_rs2redcap_redcap2rs.py b/reproschema/tests/test_rs2redcap_redcap2rs.py
index 02c8955..c953cab 100644
--- a/reproschema/tests/test_rs2redcap_redcap2rs.py
+++ b/reproschema/tests/test_rs2redcap_redcap2rs.py
@@ -220,59 +220,61 @@ def compare_protocols(prot_tree_orig, prot_tree_final):
)
)
else:
- print(
- f"Activity {act_name}: addProperties have different elements"
- )
errors_list.append(
- f"Activity {act_name}: addProperties have different elements"
+ print_return_msg(
+ f"Activity {act_name}: addProperties have different elements, orig: {act_props_orig} and final: {act_props_final}"
+ )
)
else:
for nm, el in act_props_final.items():
for key in ["isVis", "valueRequired"]:
error = False
- orig_value = getattr(act_props_orig[nm], key)
- final_value = getattr(el, key)
-
- if key == "valueRequired":
- # Debug print
- print(f"\nDebug - Activity: {act_name}, Item: {nm}")
- print(
- f"Original valueRequired: {orig_value}, type: {type(orig_value)}"
+ orig_val = getattr(act_props_orig[nm], key)
+ final_val = getattr(el, key)
+
+ if key == "isVis":
+ orig_norm = (
+ normalize_condition(orig_val)
+ if orig_val is not None
+ else None
)
- print(
- f"Final valueRequired: {final_value}, type: {type(final_value)}"
+ final_norm = (
+ normalize_condition(final_val)
+ if final_val is not None
+ else None
)
- # Compare only True values
- if orig_value is True:
- if final_value is not True:
+ # Case 1: original is True - final can be None or True
+ if orig_norm is True:
+ if not (final_norm is None or final_norm is True):
error = True
- print(
- f"Error case 1: orig=True, final={final_value}"
- )
- elif final_value is True:
- if orig_value is not True:
+ # Case 2: original is False - final must be False
+ elif orig_norm is False:
+ if final_norm is not False:
error = True
- print(
- f"Error case 2: orig={orig_value}, final=True"
- )
-
- elif key == "isVis":
- # Original isVis handling
- if orig_value is not None:
- if normalize_condition(
- orig_value
- ) != normalize_condition(final_value):
+ # Case 3: original is None - final can be None or True
+ elif orig_norm is None:
+ if not (final_norm is None or final_norm is True):
+ error = True
+ # Case 4: original is something else - must match exactly
+ else:
+ if orig_norm != final_norm:
error = True
- elif (
- final_value is not None and final_value is not True
+ else: # handle valueRequired
+ if (orig_val is not None) and (
+ normalize_condition(final_val)
+ != normalize_condition(orig_val)
):
error = True
+ elif final_val and orig_val is None:
+ if normalize_condition(final_val) != False:
+ error = True
if error:
errors_list.append(
- print(
- f"Activity {act_name}: addProperties {nm} have different {key}"
+ print_return_msg(
+ f"Activity {act_name}: addProperties {nm} have different {key}, "
+ f"orig: {orig_val}, final: {normalize_condition(final_val)}"
)
)
# check compute
@@ -286,9 +288,10 @@ def compare_protocols(prot_tree_orig, prot_tree_final):
)
)
else:
- print(f"Activity {act_name}: compute have different elements")
errors_list.append(
- f"Activity {act_name}: compute have different elements"
+ print_return_msg(
+ f"Activity {act_name}: compute have different elements, orig: {act_comp_orig}, final: {act_comp_final}"
+ )
)
else:
for nm, el in act_comp_final.items():
@@ -298,7 +301,7 @@ def compare_protocols(prot_tree_orig, prot_tree_final):
getattr(act_comp_orig[nm], "jsExpression")
):
errors_list.append(
- print(
+ print_return_msg(
f"Activity {act_name}: compute {nm} have different jsExpression"
)
)
@@ -314,7 +317,7 @@ def compare_protocols(prot_tree_orig, prot_tree_final):
else:
errors_list.append(
print_return_msg(
- f"Activity {act_name}: items have different elements"
+ f"Activity {act_name}: items have different elements, orig: {act_items_orig}, final: {act_items_final}"
)
)
else:
@@ -335,33 +338,20 @@ def compare_protocols(prot_tree_orig, prot_tree_final):
) != normalize_condition(
act_items_orig[nm]["obj"].question.get("en", "")
):
- # Handle cases where one might be NaN/None and the other empty string
- orig_q = act_items_orig[nm]["obj"].question.get("en", "")
- final_q = el["obj"].question.get("en", "")
-
- # Convert None/NaN to empty string for comparison
- orig_q = (
- "" if pd.isna(orig_q) or orig_q is None else orig_q
- )
- final_q = (
- "" if pd.isna(final_q) or final_q is None else final_q
- )
-
- if normalize_condition(orig_q) != normalize_condition(
- final_q
+ if "
" in normalize_condition(
+ act_items_orig[nm]["obj"].question.get("en", "")
):
- if "
" in normalize_condition(orig_q):
- warnings_list.append(
- print_return_msg(
- f"Activity {act_name}: items {nm} have different question, FIX normalized function!!!"
- )
+ warnings_list.append(
+ print_return_msg(
+ f"Activity {act_name}: items {nm} have different question, FIX normalized function!!!"
)
- else:
- errors_list.append(
- print_return_msg(
- f"Activity {act_name}: items {nm} have different question"
- )
+ )
+ else:
+ errors_list.append(
+ print_return_msg(
+ f"Activity {act_name}: items {nm} have different question"
)
+ )
elif (
el["obj"].ui.inputType
!= act_items_orig[nm]["obj"].ui.inputType