diff --git a/reproschema/redcap2reproschema.py b/reproschema/redcap2reproschema.py
index a4d8c21..8afa0d8 100644
--- a/reproschema/redcap2reproschema.py
+++ b/reproschema/redcap2reproschema.py
@@ -78,406 +78,233 @@
ADDITIONAL_NOTES_LIST = ["Field Note", "Question Number (surveys only)"]
-def clean_dict_nans(obj):
- """
- Recursively remove NaN values from nested dictionaries and lists.
- Returns None if the cleaned object would be empty.
- """
- if isinstance(obj, dict):
- cleaned = {}
- for key, value in obj.items():
- cleaned_value = clean_dict_nans(value)
- if cleaned_value is not None:
- cleaned[key] = cleaned_value
- return cleaned if cleaned else None
-
- elif isinstance(obj, list):
- cleaned = [clean_dict_nans(item) for item in obj]
- cleaned = [item for item in cleaned if item is not None]
- return cleaned if cleaned else None
-
- elif pd.isna(obj):
- return None
-
- return obj
+def clean_header(header):
+ cleaned_header = {}
+ for k, v in header.items():
+ # Strip BOM, whitespace, and enclosing quotation marks if present
+ cleaned_key = (
+ k.lstrip("\ufeff").strip().strip('"') if isinstance(k, str) else k
+ )
+ cleaned_header[cleaned_key] = v
+ return cleaned_header
# TODO: normalized condition should depend on the field type, e.g., for SQL
def normalize_condition(condition_str, field_type=None):
# Regular expressions for various pattern replacements
# TODO: function doesn't remove tags
-
- try:
- # Handle boolean values
- if isinstance(condition_str, bool):
- return condition_str
- elif (
- isinstance(condition_str, str) and condition_str.lower() == "true"
- ):
- return True
- elif (
- isinstance(condition_str, str) and condition_str.lower() == "false"
- ):
- return False
-
- # Handle empty/null values
- if condition_str is None or pd.isna(condition_str):
- return None
-
- # Convert non-string types to string
- if not isinstance(condition_str, str):
- try:
- condition_str = str(condition_str)
- except:
- return None
-
- # Remove HTML tags if present
- soup = BeautifulSoup(condition_str, "html.parser")
- condition_str = soup.get_text()
-
- # Define regex patterns
- patterns = {
- "parentheses": (r"\(([0-9]*)\)", r"___\1"),
- "non_gt_lt_equal": (r"([^>|<])=", r"\1 =="),
- "brackets": (r"\[([^\]]*)\]", r" \1 "),
- "or_operator": (r"\bor\b", "||"),
- "and_operator": (r"\band\b", "&&"),
- "extra_spaces": (r"\s+", " "),
- "double_quotes": (r'"', "'"),
- }
-
- # Apply transformations
- for pattern, replacement in patterns.items():
- if isinstance(replacement, tuple):
- condition_str = re.sub(
- replacement[0], replacement[1], condition_str
- )
- else:
- condition_str = re.sub(pattern, replacement, condition_str)
-
- # Handle SQL and calc type conditions differently if specified
- if field_type in ["sql", "calc"]:
- # Add specific handling for SQL/calc expressions if needed
- pass
-
- # Validate the final condition
- condition_str = condition_str.strip()
- if not condition_str:
- return None
-
+ if isinstance(condition_str, bool):
return condition_str
-
- except Exception as e:
- print(f"Error normalizing condition: {str(e)}")
+ elif isinstance(condition_str, str) and condition_str.lower() == "true":
+ return True
+ elif isinstance(condition_str, str) and condition_str.lower() == "false":
+ return False
+ elif condition_str is None:
return None
+ elif not isinstance(condition_str, str):
+ # Convert non-string types to string, or return as is if conversion doesn't make sense
+ try:
+ condition_str = str(condition_str)
+ except:
+ return condition_str
+ re_parentheses = re.compile(r"\(([0-9]*)\)")
+ re_non_gt_lt_equal = re.compile(r"([^>|<])=")
+ re_brackets = re.compile(r"\[([^\]]*)\]")
+ re_extra_spaces = re.compile(r"\s+")
+ re_double_quotes = re.compile(r'"')
+ re_or = re.compile(r"\bor\b") # Match 'or' as whole word
-def process_field_properties(data):
- """
- Extract and process field properties from REDCap data.
+ # Apply regex replacements
+ condition_str = re_parentheses.sub(r"___\1", condition_str)
+ condition_str = re_non_gt_lt_equal.sub(r"\1 ==", condition_str)
+ condition_str = re_brackets.sub(r" \1 ", condition_str)
- Args:
- data (dict): Dictionary containing field data from REDCap
+ # Replace 'or' with '||', ensuring not to replace '||'
+ condition_str = re_or.sub("||", condition_str)
- Returns:
- dict: Processed field properties
- """
- try:
- # Validate input
- if not isinstance(data, dict):
- raise ValueError("Input must be a dictionary")
-
- var_name = data.get("Variable / Field Name")
- if not var_name or pd.isna(var_name):
- raise ValueError("Variable / Field Name is required")
-
- # Initialize properties object
- prop_obj = {
- "variableName": str(var_name).strip(),
- "isAbout": f"items/{str(var_name).strip()}",
- "isVis": True, # Default value
- }
-
- # Process branching logic
- condition = data.get("Branching Logic (Show field only if...)")
- if pd.notna(condition):
- normalized_condition = normalize_condition(condition)
- if normalized_condition:
- prop_obj["isVis"] = normalized_condition
-
- # Process field annotation
- annotation = data.get("Field Annotation")
- if pd.notna(annotation):
- annotation = str(annotation).upper()
- if any(
- marker in annotation
- for marker in ["@READONLY", "@HIDDEN", "@CALCTEXT"]
- ):
- prop_obj["isVis"] = False
-
- # Process required field
- required_field = data.get("Required Field?")
- if pd.notna(required_field):
- required_field = str(required_field).strip().lower()
- if required_field == "y":
- prop_obj["valueRequired"] = True
- elif required_field not in ["", "n"]:
- print(
- f"Warning: Unexpected Required Field value '{required_field}' for {var_name}"
- )
+ # Replace 'and' with '&&'
+ condition_str = condition_str.replace(" and ", " && ")
- # Process matrix properties if present
- matrix_group = data.get("Matrix Group Name")
- matrix_ranking = data.get("Matrix Ranking?")
+ # Trim extra spaces and replace double quotes with single quotes
+ condition_str = re_extra_spaces.sub(
+ " ", condition_str
+ ).strip() # Reduce multiple spaces to a single space
+ condition_str = re_double_quotes.sub(
+ "'", condition_str
+ ) # Replace double quotes with single quotes
- if pd.notna(matrix_group):
- prop_obj["matrixGroupName"] = str(matrix_group).strip()
- if pd.notna(matrix_ranking):
- prop_obj["matrixRanking"] = matrix_ranking
+ return condition_str.strip()
- return prop_obj
- except Exception as e:
- print(
- f"Error processing field properties for {data.get('Variable / Field Name', 'unknown field')}: {str(e)}"
+def process_field_properties(data):
+ """Getting information about the item that will be used in the Activity schema"""
+ condition = data.get("Branching Logic (Show field only if...)")
+ if condition:
+ condition = normalize_condition(condition)
+ else:
+ condition = True
+
+ # Check Field Annotation for special flags - safely handle non-string values
+ annotation = (
+ str(data.get("Field Annotation", "")).upper()
+ if data.get("Field Annotation") is not None
+ else ""
+ )
+ if (
+ condition
+ and isinstance(annotation, str)
+ and (
+ "@READONLY" in annotation
+ or "@HIDDEN" in annotation
+ or "@CALCTEXT" in annotation
)
- # Return basic properties to allow processing to continue
- return {
- "variableName": str(data.get("Variable / Field Name", "unknown")),
- "isAbout": f"items/{str(data.get('Variable / Field Name', 'unknown'))}",
- "isVis": True,
- }
-
-
-def parse_field_type_and_value(field):
- """
- Parse field type and determine appropriate value type.
+ ):
+ condition = False
- Args:
- field: Dictionary containing field information
+ prop_obj = {
+ "variableName": data["Variable / Field Name"],
+ "isAbout": f"items/{data['Variable / Field Name']}",
+ "isVis": condition,
+ }
- Returns:
- tuple: (input_type, value_type)
- """
- try:
- # Get and validate field type
- field_type = field.get("Field Type", "")
- if pd.isna(field_type):
- field_type = ""
- field_type = str(field_type).strip().lower()
-
- # Validate field type
- if field_type and field_type not in INPUT_TYPE_MAP:
+ # Handle Required Field check, accounting for NaN values and empty strings
+ required_field = data.get("Required Field?")
+ if (
+ pd.notna(required_field) and str(required_field).strip()
+ ): # Check if value is not NaN and not empty
+ if str(required_field).lower() == "y":
+ prop_obj["valueRequired"] = True
+ elif str(required_field).lower() not in [
+ "",
+ "n",
+ ]: # Only raise error for unexpected values
raise ValueError(
- f"Field type '{field_type}' is not currently supported, "
- f"supported types are: {', '.join(INPUT_TYPE_MAP.keys())}"
+ f"value {required_field} not supported yet for redcap:Required Field?"
)
+ return prop_obj
- input_type = INPUT_TYPE_MAP.get(field_type, "text")
- value_type = "xsd:string" # Default value type
- # Get validation type
- validation_type = field.get(
- "Text Validation Type OR Show Slider Number"
+def parse_field_type_and_value(field):
+ field_type = field.get("Field Type", "")
+ if field_type not in INPUT_TYPE_MAP:
+ raise Exception(
+ f"Field type {field_type} is not currently supported, "
+ f"supported types are {INPUT_TYPE_MAP.keys()}"
)
- if pd.notna(validation_type):
- validation_type = str(validation_type).strip().lower()
-
- if validation_type:
- if validation_type not in VALUE_TYPE_MAP:
- raise ValueError(
- f"Validation type '{validation_type}' is not supported, "
- f"supported types are: {', '.join(VALUE_TYPE_MAP.keys())}"
- )
-
- value_type = VALUE_TYPE_MAP[validation_type]
-
- # Adjust input type based on validation
- if validation_type == "integer" and field_type == "text":
- input_type = "number"
- elif (
- validation_type in ["float", "number"]
- and field_type == "text"
- ):
- input_type = "float"
- elif validation_type == "email" and field_type == "text":
- input_type = "email"
- elif validation_type == "signature" and field_type == "text":
- input_type = "sign"
- elif value_type == "xsd:date" and field_type == "text":
- input_type = "date"
-
- elif field_type == "yesno":
- value_type = "xsd:boolean"
- elif field_type in COMPUTE_LIST:
- value_type = "xsd:integer"
-
- # Handle radio/select fields with choices
- if input_type in ["radio", "select", "slider"]:
- choices = field.get("Choices, Calculations, OR Slider Labels")
- if pd.notna(choices):
- _, value_types = process_choices(
- choices, field.get("Variable / Field Name", "unknown")
- )
- if value_types:
- value_type = value_types[
- 0
- ] # Use first value type if multiple exist
-
- return input_type, value_type
-
- except Exception as e:
- print(f"Error parsing field type: {str(e)}")
- return "text", "xsd:string" # Return defaults on error
+ input_type = INPUT_TYPE_MAP.get(field_type)
+
+ # Get the validation type from the field, if available
+ validation_type = field.get(
+ "Text Validation Type OR Show Slider Number", ""
+ ).strip()
+
+ if validation_type:
+ # Map the validation type to an XSD type
+ if validation_type not in VALUE_TYPE_MAP:
+ raise Exception(
+ f"Validation type {validation_type} is not currently supported, "
+ f"supported types are {VALUE_TYPE_MAP.keys()}"
+ )
+ value_type = VALUE_TYPE_MAP.get(validation_type)
+ # there are some specific input types in Reproschema that could be used instead of text
+ if validation_type == "integer" and field_type == "text":
+ input_type = "number"
+ elif validation_type in ["float", "number"] and field_type == "text":
+ input_type = "float"
+ elif validation_type == "email" and field_type == "text":
+ input_type = "email"
+ elif validation_type == "signature" and field_type == "text":
+ input_type = "sign"
+ elif value_type == "xsd:date" and field_type == "text":
+ input_type = "date"
+ elif field_type == "yesno":
+ value_type = "xsd:boolean"
+ elif field_type in COMPUTE_LIST:
+ value_type = "xsd:integer"
+ else: # set the default value type as string
+ value_type = "xsd:string"
+ return input_type, value_type
def process_choices(choices_str, field_name):
- """
- Process REDCap choice options into structured format.
+ if len(choices_str.split("|")) < 2:
+ print(f"WARNING: I found only one option for choice: {choices_str}")
- Args:
- choices_str: String containing choice options
- field_name: Field name for error reporting
-
- Returns:
- tuple: (choices list, value types list) or (None, None) if invalid
- """
- try:
- if pd.isna(choices_str) or not isinstance(choices_str, str):
- return None, None
+ choices = []
+ choices_value_type = []
+ for choice in choices_str.split("|"):
+ choice = choice.strip()
- choices_str = choices_str.strip()
- if not choices_str:
- return None, None
+ # Split only on the first comma to separate value from label
+ first_comma_split = choice.split(",", 1)
+ value_part = first_comma_split[0].strip()
- choices = []
- choices_value_type = set()
-
- # Split choices by pipe
- choice_items = [c.strip() for c in choices_str.split("|") if c.strip()]
-
- if len(choice_items) < 1:
- print(f"Warning: No valid choices found in {field_name}")
- return None, None
-
- for choice in choice_items:
- # Split on first comma only
- parts = choice.split(",", 1)
- if len(parts) < 2:
- print(
- f"Warning: Invalid choice format '{choice}' in {field_name}"
- )
- continue
-
- value_part = parts[0].strip()
- label_part = parts[1].strip()
-
- if not label_part:
+ # Get the full label part (keeping all commas and equals signs)
+ if len(first_comma_split) > 1:
+ label_part = first_comma_split[1].strip()
+ else:
+ # Handle cases where there's no comma
+ if choice.endswith(","):
+ label_part = ""
+ else:
print(
- f"Warning: Empty label in choice '{choice}' in {field_name}"
+ f"Warning: Invalid choice format '{choice}' in {field_name} field"
)
- continue
-
- # Determine value type and convert value
- if value_part == "0":
- value = 0
- value_type = "xsd:integer"
- elif value_part.isdigit() and value_part[0] == "0":
+ label_part = choice
+
+ # Determine value type
+ if value_part == "0":
+ value = 0
+ choices_value_type.append("xsd:integer")
+ elif value_part.isdigit() and value_part[0] == "0":
+ value = value_part
+ choices_value_type.append("xsd:string")
+ else:
+ try:
+ value = int(value_part)
+ choices_value_type.append("xsd:integer")
+ except ValueError:
value = value_part
- value_type = "xsd:string"
- else:
- try:
- value = int(value_part)
- value_type = "xsd:integer"
- except ValueError:
- try:
- value = float(value_part)
- value_type = "xsd:decimal"
- except ValueError:
- value = value_part
- value_type = "xsd:string"
-
- choices_value_type.add(value_type)
-
- # Create choice object
- choice_obj = {
- "name": parse_html(label_part) or {"en": label_part},
- "value": value,
- }
- choices.append(choice_obj)
+ choices_value_type.append("xsd:string")
- return (choices, list(choices_value_type)) if choices else (None, None)
+ choice_obj = {
+ "name": {"en": label_part},
+ "value": value,
+ }
+ choices.append(choice_obj)
- except Exception as e:
- print(f"Error processing choices for {field_name}: {str(e)}")
- return None, None
+ return choices, list(set(choices_value_type))
def parse_html(input_string, default_language="en"):
- """
- Parse HTML content and extract language-specific text.
+ result = {}
- Args:
- input_string: The HTML string to parse
- default_language: Default language code (default: "en")
-
- Returns:
- dict: Dictionary of language codes to text content, or None if invalid
- """
- try:
- if pd.isna(input_string):
- return None
+ # Handle non-string input
+ if not isinstance(input_string, str):
+ if pd.isna(input_string): # Handle NaN values
+ return {default_language: ""}
+ try:
+ input_string = str(input_string)
+ except:
+ return {default_language: str(input_string)}
- result = {}
+ soup = BeautifulSoup(input_string, "html.parser")
- # Handle non-string input
- if not isinstance(input_string, str):
- try:
- input_string = str(input_string)
- except:
- return None
-
- # Clean input string
- input_string = input_string.strip()
- if not input_string:
- return None
-
- # Parse HTML
- soup = BeautifulSoup(input_string, "html.parser")
-
- # Find elements with lang attribute
- lang_elements = soup.find_all(True, {"lang": True})
-
- if lang_elements:
- # Process elements with language tags
- for element in lang_elements:
- lang = element.get("lang", default_language).lower()
- text = element.get_text(strip=True)
- if text:
- result[lang] = text
-
- # If no text was extracted but elements exist, try getting default text
- if not result:
- text = soup.get_text(strip=True)
- if text:
- result[default_language] = text
- else:
- # No language tags found, use default language
- text = soup.get_text(strip=True)
+ lang_elements = soup.find_all(True, {"lang": True})
+ if lang_elements:
+ for element in lang_elements:
+ lang = element.get("lang", default_language)
+ text = element.get_text(strip=True)
if text:
- result[default_language] = text
-
- return result if result else None
-
- except Exception as e:
- print(f"Error parsing HTML: {str(e)}")
- # Try to return plain text if HTML parsing fails
- try:
- if isinstance(input_string, str) and input_string.strip():
- return {default_language: input_string.strip()}
- except:
- pass
- return None
+ result[lang] = text
+ if not result: # If no text was extracted
+ result[default_language] = soup.get_text(strip=True)
+ else:
+ result[default_language] = soup.get_text(
+ strip=True
+ ) # Use the entire text as default language text
+ return result
def process_row(
@@ -485,7 +312,7 @@ def process_row(
schema_context_url,
form_name,
field,
- add_preamble=True,
+ add_preable=True,
):
"""Process a row of the REDCap data and generate the jsonld file for the item."""
item_id = field.get(
@@ -498,25 +325,20 @@ def process_row(
# "description": {"en": f"{item_id} of {form_name}"},
}
- field_type = field.get("Field Type")
- if pd.isna(field_type):
- field_type = ""
+ field_type = field.get("Field Type", "")
input_type, value_type = parse_field_type_and_value(field)
# Initialize ui object with common properties
ui_obj = {"inputType": input_type}
# Handle readonly status first - this affects UI behavior
- annotation = field.get("Field Annotation")
- if annotation is not None and not pd.isna(annotation):
- annotation = str(annotation).upper()
- if (
- "@READONLY" in annotation
- or "@HIDDEN" in annotation
- or "@CALCTEXT" in annotation
- or field_type in COMPUTE_LIST
- ):
- ui_obj["readonlyValue"] = True
+ annotation = str(field.get("Field Annotation", "")).upper()
+ if (
+ field_type in COMPUTE_LIST
+ or "@READONLY" in annotation
+ or "@CALCTEXT" in annotation
+ ):
+ ui_obj["readonlyValue"] = True
rowData["ui"] = ui_obj
rowData["responseOptions"] = {"valueType": [value_type]}
@@ -531,65 +353,61 @@ def process_row(
rowData["responseOptions"]["multipleChoice"] = True
for key, value in field.items():
- if pd.isna(value):
- continue
- schema_key = SCHEMA_MAP.get(key)
- if not schema_key:
- continue
-
- if schema_key in ["question", "description"]:
- parsed_value = parse_html(value)
- if parsed_value:
- rowData[schema_key] = parsed_value
-
- elif schema_key == "preamble" and add_preamble:
- parsed_value = parse_html(value)
- if parsed_value:
- rowData[schema_key] = parsed_value
-
- elif schema_key == "allow":
- ui_obj["allow"] = value.split(", ")
-
+ if SCHEMA_MAP.get(key) in ["question", "description"] and value:
+ rowData.update({SCHEMA_MAP[key]: parse_html(value)})
+ elif SCHEMA_MAP.get(key) == "preamble" and value and add_preable:
+ rowData.update({SCHEMA_MAP[key]: parse_html(value)})
+ elif SCHEMA_MAP.get(key) == "allow" and value:
+ rowData["ui"].update({"allow": value.split(", ")})
# choices are only for some input_types
- elif schema_key == "choices" and input_type in [
- "radio",
- "select",
- "slider",
- ]:
- choices, choices_val_type_l = process_choices(
- value, field_name=field["Variable / Field Name"]
- )
- if choices is not None:
- if input_type == "slider":
- rowData["responseOptions"].update(
- {
- "choices": choices,
- "valueType": choices_val_type_l,
- "minValue": 0,
- "maxValue": 100,
- }
- )
- else:
- rowData["responseOptions"].update(
- {
- "choices": choices,
- "valueType": choices_val_type_l,
- }
- )
+ elif (
+ SCHEMA_MAP.get(key) == "choices"
+ and value
+ and input_type in ["radio", "select", "slider"]
+ ):
+ if input_type == "slider":
+ # For sliders, add both choices and min/max values
+ choices, choices_val_type_l = process_choices(
+ value, field_name=field["Variable / Field Name"]
+ )
+ rowData["responseOptions"].update(
+ {
+ "choices": choices,
+ "valueType": choices_val_type_l,
+ "minValue": 0, # hardcoded for redcap/now
+ "maxValue": 100, # hardcoded for redcap/now
+ }
+ )
+ else:
+ # For radio and select, just process choices normally
+ choices, choices_val_type_l = process_choices(
+ value, field_name=field["Variable / Field Name"]
+ )
+ rowData["responseOptions"].update(
+ {
+ "choices": choices,
+ "valueType": choices_val_type_l,
+ }
+ )
# for now adding only for numerics, sometimes can be string or date.. TODO
- elif schema_key in RESPONSE_COND and value_type in [
- "xsd:integer",
- "xsd:decimal",
- ]:
- try:
- if value_type == "xsd:integer":
- parsed_value = int(value)
- else:
- parsed_value = float(value)
- rowData["responseOptions"][schema_key] = parsed_value
- except ValueError:
- print(f"Warning: Value {value} is not a valid {value_type}")
- continue
+ elif (
+ SCHEMA_MAP.get(key) in RESPONSE_COND
+ and value
+ and value_type in ["xsd:integer", "xsd:decimal"]
+ ):
+ if value_type == "xsd:integer":
+ try:
+ value = int(value)
+ except ValueError:
+ print(f"Warning: Value {value} is not an integer")
+ continue
+ elif value_type == "xsd:decimal":
+ try:
+ value = float(value)
+ except ValueError:
+ print(f"Warning: Value {value} is not a decimal")
+ continue
+ rowData["responseOptions"].update({SCHEMA_MAP[key]: value})
# elif key == "Identifier?" and value:
# identifier_val = value.lower() == "y"
@@ -601,19 +419,9 @@ def process_row(
# }
# )
- elif key in ADDITIONAL_NOTES_LIST:
- value_str = str(value).strip()
- if value_str:
- notes_obj = {
- "source": "redcap",
- "column": key,
- "value": f'"{value_str}"',
- }
- rowData.setdefault("additionalNotesObj", []).append(notes_obj)
-
- cleaned_data = clean_dict_nans(rowData)
- if not cleaned_data or "id" not in cleaned_data:
- raise ValueError(f"Missing required fields for item {item_id}")
+ elif key in ADDITIONAL_NOTES_LIST and value:
+ notes_obj = {"source": "redcap", "column": key, "value": value}
+ rowData.setdefault("additionalNotesObj", []).append(notes_obj)
it = Item(**rowData)
file_path_item = os.path.join(
@@ -634,110 +442,48 @@ def create_form_schema(
redcap_version,
form_name,
activity_display_name,
+ activity_description,
order,
bl_list,
- matrix_list,
+ matrix_list, # TODO: in the future
compute_list,
- preamble=None,
+ preable=None,
):
- """
- Create the JSON-LD schema for an Activity.
-
- Args:
- abs_folder_path (str/Path): Path to the output directory
- schema_context_url (str): URL for the schema context
- redcap_version (str): Version of REDCap being used
- form_name (str): Name of the form
- activity_display_name (str): Display name for the activity
- order (list): List of items in order
- bl_list (list): List of branching logic properties
- matrix_list (list): List of matrix group properties
- compute_list (list): List of computation fields
- preamble (str, optional): Form preamble text
- """
- try:
- # Validate inputs
- if not form_name or pd.isna(form_name):
- raise ValueError("Form name is required")
-
- if not activity_display_name or pd.isna(activity_display_name):
- activity_display_name = form_name.replace("_", " ").title()
-
- # Clean and validate order list
- clean_order = []
- if order:
- clean_order = [
- str(item).strip() for item in order if pd.notna(item)
- ]
- clean_order = list(
- dict.fromkeys(clean_order)
- ) # Remove duplicates while preserving order
-
- # Clean and validate bl_list
- clean_bl_list = []
- if bl_list:
- clean_bl_list = [
- prop for prop in bl_list if prop and isinstance(prop, dict)
- ]
-
- # Initialize schema
- json_ld = {
- "category": "reproschema:Activity",
- "id": f"{form_name}_schema",
- "prefLabel": {"en": activity_display_name},
- "schemaVersion": get_context_version(schema_context_url),
- "version": redcap_version,
- "ui": {
- "order": clean_order,
- "addProperties": clean_bl_list,
- "shuffle": False,
- },
- }
+ """Create the JSON-LD schema for the Activity."""
+ # Use a set to track unique items and preserve order
+ unique_order = list(dict.fromkeys(order))
+
+ # Construct the JSON-LD structure
+ json_ld = {
+ "category": "reproschema:Activity",
+ "id": f"{form_name}_schema",
+ "prefLabel": {"en": activity_display_name},
+ # "description": {"en": activity_description},
+ "schemaVersion": get_context_version(schema_context_url),
+ "version": redcap_version,
+ "ui": {
+ "order": unique_order,
+ "addProperties": bl_list,
+ "shuffle": False,
+ },
+ }
+ if preable:
+ json_ld["preamble"] = parse_html(preable)
+ if compute_list:
+ json_ld["compute"] = compute_list
- # Process preamble if present
- if preamble is not None and pd.notna(preamble):
- parsed_preamble = parse_html(preamble)
- if parsed_preamble:
- json_ld["preamble"] = parsed_preamble
-
- # Process compute list
- if compute_list:
- valid_compute = []
- for comp in compute_list:
- if isinstance(comp, dict) and comp.get("jsExpression"):
- valid_compute.append(comp)
- if valid_compute:
- json_ld["compute"] = valid_compute
-
- # Process matrix list if needed
- if matrix_list:
- valid_matrix = []
- for matrix in matrix_list:
- if isinstance(matrix, dict) and matrix.get("matrixGroupName"):
- valid_matrix.append(matrix)
- if valid_matrix:
- json_ld["matrixInfo"] = valid_matrix
-
- # Clean any remaining NaN values
- cleaned_json_ld = clean_dict_nans(json_ld)
- if not cleaned_json_ld:
- raise ValueError(f"All data was NaN for form {form_name}")
-
- # Create Activity object and write to file
- act = Activity(**cleaned_json_ld)
- path = Path(abs_folder_path) / "activities" / form_name
- path.mkdir(parents=True, exist_ok=True)
-
- write_obj_jsonld(
- act,
- path / f"{form_name}_schema",
- contextfile_url=schema_context_url,
- )
+ act = Activity(**json_ld)
+ # TODO (future): remove or fix matrix info
+ # remove matrixInfo to pass validation
+ # if matrix_list:
+ # json_ld["matrixInfo"] = matrix_list
- except Exception as e:
- raise Exception(
- f"Error creating form schema for {form_name}: {str(e)}"
- )
+ path = os.path.join(f"{abs_folder_path}", "activities", form_name)
+ os.makedirs(path, exist_ok=True)
+ filename = f"{form_name}_schema"
+ file_path = os.path.join(path, filename)
+ write_obj_jsonld(act, file_path, contextfile_url=schema_context_url)
+ print(f"{form_name} Instrument schema created")
def process_activities(activity_name, protocol_visibility_obj, protocol_order):
@@ -799,140 +545,87 @@ def create_protocol_schema(
print(f"Protocol schema created in {file_path}")
-def process_csv(csv_file, abs_folder_path, protocol_name):
+def parse_language_iso_codes(input_string):
+ soup = BeautifulSoup(input_string, "lxml")
+ return [
+ element.get("lang") for element in soup.find_all(True, {"lang": True})
+ ]
+
+
+def process_csv(csv_file, abs_folder_path, schema_context_url, protocol_name):
datas = {}
order = {}
compute = {}
+ languages = []
- # TODO: add languages
+ # Read CSV with explicit BOM handling, and maintain original order
+ df = pd.read_csv(
+ csv_file, encoding="utf-8-sig"
+ ) # utf-8-sig handles BOM automatically
- try:
- # Read CSV with explicit BOM handling, and maintain original order
- df = pd.read_csv(
- csv_file, encoding="utf-8-sig"
- ) # utf-8-sig handles BOM automatically
+ # Clean column names (headers)
+ df.columns = df.columns.map(
+ lambda x: x.strip().strip('"').lstrip("\ufeff")
+ )
- # Clean column names (headers)
- df.columns = df.columns.map(
- lambda x: x.strip().strip('"').lstrip("\ufeff")
+ # Clean string values in the dataframe
+ df = df.astype(str).replace("nan", "")
+
+ # Initialize structures for each unique form
+ unique_forms = df["Form Name"].unique()
+ for form_name in unique_forms:
+ datas[form_name] = []
+ order[form_name] = []
+ compute[form_name] = []
+ os.makedirs(
+ f"{abs_folder_path}/activities/{form_name}/items", exist_ok=True
)
- # Validate required columns
- required_columns = ["Form Name", "Variable / Field Name", "Field Type"]
- missing_columns = [
- col for col in required_columns if col not in df.columns
- ]
- if missing_columns:
- raise ValueError(
- f"Missing required columns: {', '.join(missing_columns)}"
- )
-
- # Initialize structures for each unique form
- unique_forms = [f for f in df["Form Name"].unique() if not pd.isna(f)]
- if len(unique_forms) == 0:
- raise ValueError("No valid form names found in the CSV")
+ # TODO: should we bring back the language
+ # if not languages:
+ # languages = parse_language_iso_codes(row["Field Label"])
- for form_name in unique_forms:
- if pd.isna(form_name) or not str(form_name).strip():
- continue
+ # Process rows in original order
+ for _, row in df.iterrows():
+ form_name = row["Form Name"]
+ field_name = row["Variable / Field Name"]
+ field_type = row.get("Field Type", "")
+ field_annotation = row.get("Field Annotation")
- form_name = str(form_name).strip()
- datas[form_name] = []
- order[form_name] = []
- compute[form_name] = []
+ # Add row data to datas dictionary
+ datas[form_name].append(row.to_dict())
- form_dir = (
- Path(abs_folder_path) / "activities" / form_name / "items"
+ if field_type in COMPUTE_LIST:
+ condition = normalize_condition(
+ row["Choices, Calculations, OR Slider Labels"],
+ field_type=field_type,
)
- form_dir.mkdir(parents=True, exist_ok=True)
-
- # TODO: should we bring back the language
- # if not languages:
- # languages = parse_language_iso_codes(row["Field Label"])
-
- for idx, row in df.iterrows():
- try:
- form_name = row["Form Name"]
- field_name = row["Variable / Field Name"]
-
- # Skip rows with missing essential data
- if pd.isna(form_name) or pd.isna(field_name):
- print(
- f"Warning: Skipping row {idx+2} with missing form name or field name"
- )
- continue
-
- form_name = str(form_name).strip()
- field_name = str(field_name).strip()
-
- # Convert row to dict and clean NaN values
- row_dict = clean_dict_nans(row.to_dict())
- if not row_dict:
- print(f"Warning: Skipping empty row {idx+2}")
- continue
-
- datas[form_name].append(row_dict)
-
- # Handle compute fields
- field_type = row.get("Field Type", "")
- field_annotation = row.get("Field Annotation", "")
-
- if (
- pd.notna(field_type)
- and str(field_type).strip() in COMPUTE_LIST
- ):
- calculations = row.get(
- "Choices, Calculations, OR Slider Labels"
- )
- if pd.notna(calculations):
- condition = normalize_condition(calculations)
- if condition:
- compute[form_name].append(
- {
- "variableName": field_name,
- "jsExpression": condition,
- }
- )
- elif pd.notna(field_annotation):
- field_annotation = str(field_annotation).upper()
- if "@CALCTEXT" in field_annotation:
- match = re.search(
- r"@CALCTEXT\((.*)\)", field_annotation
- )
- if match:
- js_expression = normalize_condition(match.group(1))
- if js_expression:
- compute[form_name].append(
- {
- "variableName": field_name,
- "jsExpression": js_expression,
- }
- )
- else:
- order[form_name].append(f"items/{field_name}")
-
- except Exception as e:
- print(f"Warning: Error processing row {idx+2}: {str(e)}")
- continue
-
- for form_name in datas:
- if not datas[form_name]:
- print(f"Warning: Form '{form_name}' has no valid fields")
- if not order[form_name] and not compute[form_name]:
- print(
- f"Warning: Form '{form_name}' has no order or compute fields"
+ compute[form_name].append(
+ {
+ "variableName": field_name,
+ "jsExpression": condition,
+ }
+ )
+ elif (
+ isinstance(field_annotation, str)
+ and "@CALCTEXT" in field_annotation.upper()
+ ):
+ calc_text = field_annotation
+ match = re.search(r"@CALCTEXT\((.*)\)", calc_text)
+ if match:
+ js_expression = match.group(1)
+ js_expression = normalize_condition(js_expression)
+ compute[form_name].append(
+ {
+ "variableName": field_name,
+ "jsExpression": js_expression,
+ }
)
+ else:
+ order[form_name].append(f"items/{field_name}")
- # Create protocol directory
- protocol_dir = Path(abs_folder_path) / protocol_name
- protocol_dir.mkdir(parents=True, exist_ok=True)
-
- return datas, order, compute
-
- except pd.errors.EmptyDataError:
- raise ValueError("The CSV file is empty")
- except Exception as e:
- raise Exception(f"Error processing CSV file: {str(e)}")
+ os.makedirs(f"{abs_folder_path}/{protocol_name}", exist_ok=True)
+ return datas, order, compute, languages
# todo adding output path
@@ -942,184 +635,119 @@ def redcap2reproschema(
"""
Convert a REDCap data dictionary to Reproschema format.
- Args:
- csv_file (str/Path): Path to the REDCap CSV file
- yaml_file (str/Path): Path to the YAML configuration file
- output_path (str/Path): Path to the output directory
- schema_context_url (str, optional): URL for the schema context
-
- Raises:
- ValueError: If required files are missing or invalid
- FileNotFoundError: If input files cannot be found
- Exception: For other processing errors
+ :param csv_file: Path to the REDCap CSV file.
+ :param yaml_path: Path to the YAML configuration file.
+ :param output_path: Path to the output dir, where protocol directory will be created
+ :param schema_context_url: URL of the schema context. Optional.
"""
- try:
- # Validate input files exist
- csv_path = Path(csv_file)
- yaml_path = Path(yaml_file)
- output_dir = Path(output_path)
-
- if not csv_path.exists():
- raise FileNotFoundError(f"CSV file not found: {csv_file}")
- if not yaml_path.exists():
- raise FileNotFoundError(f"YAML file not found: {yaml_file}")
-
- # Read and validate YAML configuration
- try:
- with open(yaml_path, "r", encoding="utf-8") as f:
- protocol = yaml.safe_load(f)
- except yaml.YAMLError as e:
- raise ValueError(f"Invalid YAML file: {str(e)}")
-
- # Extract and validate protocol information
- protocol_name = protocol.get("protocol_name", "").strip()
- if not protocol_name:
- raise ValueError("Protocol name not specified in the YAML file")
-
- protocol_display_name = protocol.get(
- "protocol_display_name", protocol_name
- )
- protocol_description = protocol.get("protocol_description", "")
- redcap_version = protocol.get("redcap_version", "1.0.0")
-
- # Set up output directory
- protocol_name = protocol_name.replace(" ", "_")
- abs_folder_path = output_dir / protocol_name
- abs_folder_path.mkdir(parents=True, exist_ok=True)
-
- # Set schema context URL
- if schema_context_url is None:
- schema_context_url = CONTEXTFILE_URL
-
- # Process CSV file
- print(f"Processing CSV file: {csv_path}")
- datas, order, compute = process_csv(
- csv_path, abs_folder_path, protocol_name
- )
-
- if not datas:
- raise ValueError("No valid data found in CSV file")
-
- # Initialize protocol variables
- protocol_visibility_obj = {}
- protocol_order = []
-
- # Process each form
- for form_name, rows in datas.items():
- print(f"\nProcessing form: {form_name}")
- if not rows:
- print(f"Warning: Empty form {form_name}, skipping")
- continue
-
- # Initialize form-level collections
- bl_list = []
- matrix_list = []
- preambles_list = []
-
- # Process fields in the form
- for field in rows:
- # Validate field data
- if (
- not isinstance(field, dict)
- or "Variable / Field Name" not in field
- ):
- print(
- f"Warning: Invalid field data in form {form_name}, skipping"
- )
- continue
- # Process field properties
- field_properties = process_field_properties(field)
- if field_properties:
- bl_list.append(field_properties)
-
- # Handle matrix groups
- matrix_group = field.get("Matrix Group Name")
- matrix_ranking = field.get("Matrix Ranking?")
- if pd.notna(matrix_group) or pd.notna(matrix_ranking):
- matrix_info = {
+ # Read the YAML configuration
+ with open(yaml_file, "r") as f:
+ protocol = yaml.safe_load(f)
+
+ protocol_name = protocol.get("protocol_name")
+ protocol_display_name = protocol.get("protocol_display_name")
+ protocol_description = protocol.get("protocol_description")
+ redcap_version = protocol.get("redcap_version")
+ # we can add reproschema version here (or automatically extract)
+
+ if not protocol_name:
+ raise ValueError("Protocol name not specified in the YAML file.")
+
+ protocol_name = protocol_name.replace(
+ " ", "_"
+ ) # Replacing spaces with underscores
+ abs_folder_path = Path(output_path) / protocol_name
+ abs_folder_path.mkdir(parents=True, exist_ok=True)
+
+ if schema_context_url is None:
+ schema_context_url = CONTEXTFILE_URL
+
+ # Process the CSV file
+ datas, order, compute, _ = process_csv(
+ csv_file,
+ abs_folder_path,
+ schema_context_url,
+ protocol_name,
+ )
+ # Initialize other variables for protocol context and schema
+ protocol_visibility_obj = {}
+ protocol_order = []
+
+ # Create form schemas and process activities
+ for form_name, rows in datas.items():
+ bl_list = []
+ matrix_list = []
+ preambles_list = []
+
+ for field in rows:
+ # TODO (future): this probably can be done in proces_csv so don't have to run the loop again
+ # TODO: Depends how the Matrix group should be treated
+ field_properties = process_field_properties(field)
+ bl_list.append(field_properties)
+ if field.get("Matrix Group Name") or field.get("Matrix Ranking?"):
+ matrix_list.append(
+ {
"variableName": field["Variable / Field Name"],
+ "matrixGroupName": field["Matrix Group Name"],
+ "matrixRanking": field["Matrix Ranking?"],
}
- if pd.notna(matrix_group):
- matrix_info["matrixGroupName"] = matrix_group
- if pd.notna(matrix_ranking):
- matrix_info["matrixRanking"] = matrix_ranking
- matrix_list.append(matrix_info)
-
- # Handle preambles (section headers)
- preamble = field.get("Section Header")
- if pd.notna(preamble):
- preamble = str(preamble).strip()
- if preamble:
- preambles_list.append(preamble)
-
- # Determine preamble handling strategy
- unique_preambles = set(preambles_list)
- if len(unique_preambles) == 1:
- # Single preamble for the whole form
- preamble_act = preambles_list[0]
- preamble_itm = False
- elif len(unique_preambles) == 0:
- # No preambles
- preamble_act = None
- preamble_itm = False
- else:
- # Multiple preambles, handle at item level
- preamble_act = None
- preamble_itm = True
-
- # Get form display name
- activity_display_name = rows[0].get("Form Name", form_name)
-
- # Create form schema
- print(f"Creating schema for form: {form_name}")
- create_form_schema(
- abs_folder_path=abs_folder_path,
- schema_context_url=schema_context_url,
- redcap_version=redcap_version,
- form_name=form_name,
- activity_display_name=activity_display_name,
- order=order[form_name],
- bl_list=bl_list,
- matrix_list=matrix_list,
- compute_list=compute[form_name],
- preamble=preamble_act, # Note: using correct parameter name
- )
-
- # Process individual items
- for field in rows:
- field_name = field["Variable / Field Name"]
- print(f"Processing field: {field_name}")
- process_row(
- abs_folder_path=abs_folder_path,
- schema_context_url=schema_context_url,
- form_name=form_name,
- field=field,
- add_preamble=preamble_itm, # Note: consistent parameter naming
)
+ preamble = field.get("Section Header", "").strip()
+ if preamble:
+ preambles_list.append(preamble)
+
+ if len(set(preambles_list)) == 1:
+ preamble_act = preambles_list[0]
+ preamble_itm = False
+ elif len(set(preambles_list)) == 0:
+ preamble_act = None
+ preamble_itm = False
+ else:
+ preamble_act = None
+ preamble_itm = True
- # Process form-level activities
- print(f"Processing activities for form: {form_name}")
- process_activities(
- form_name, protocol_visibility_obj, protocol_order
- )
-
- # Create final protocol schema
- print("\nCreating protocol schema")
- create_protocol_schema(
- abs_folder_path=abs_folder_path,
- schema_context_url=schema_context_url,
- redcap_version=redcap_version,
- protocol_name=protocol_name,
- protocol_display_name=protocol_display_name,
- protocol_description=protocol_description,
- protocol_order=protocol_order,
- protocol_visibility_obj=protocol_visibility_obj,
+ activity_display_name = rows[0]["Form Name"]
+ # todo: there is no form note in the csv
+ activity_description = (
+ "" # rows[0].get("Form Note", "Default description")
)
- print(
- f"\nConversion completed successfully. Output directory: {abs_folder_path}"
+ create_form_schema(
+ abs_folder_path,
+ schema_context_url,
+ redcap_version,
+ form_name,
+ activity_display_name,
+ activity_description,
+ order[form_name],
+ bl_list,
+ matrix_list,
+ compute[form_name],
+ preable=preamble_act,
)
- except Exception as e:
- raise Exception(f"Error during conversion: {str(e)}") from e
+ # Process items after I know if preable belongs to the form or item
+ for field in rows:
+ field_name = field["Variable / Field Name"]
+ print("Processing field: ", field_name, " in form: ", form_name)
+ process_row(
+ abs_folder_path,
+ schema_context_url,
+ form_name,
+ field,
+ add_preable=preamble_itm,
+ )
+ print("Processing activities", form_name)
+ process_activities(form_name, protocol_visibility_obj, protocol_order)
+ # Create protocol schema
+ create_protocol_schema(
+ abs_folder_path,
+ schema_context_url,
+ redcap_version,
+ protocol_name,
+ protocol_display_name,
+ protocol_description,
+ protocol_order,
+ protocol_visibility_obj,
+ )
diff --git a/reproschema/reproschema2redcap.py b/reproschema/reproschema2redcap.py
index 4f00f07..5650e82 100644
--- a/reproschema/reproschema2redcap.py
+++ b/reproschema/reproschema2redcap.py
@@ -97,18 +97,30 @@ def process_item(
):
"""
Process an item in JSON format and extract relevant information into a dictionary.
- Only includes non-empty/non-None values to match clean_dict_nans behavior.
+
+ Args:
+ item_json (dict): The JSON object representing the item.
+ activity_name (str): The name of the activity.
+
+ Returns:
+ dict: A dictionary containing the extracted information.
"""
if activity_name.endswith("_schema"):
activity_name = activity_name[:-7]
-
- # Initialize with only required fields
row_data = {
- "var_name": item.id,
+ "val_min": "",
+ "val_max": "",
+ "choices": "",
+ "required": "",
+ "field_notes": "",
+ "var_name": "",
"activity": activity_name,
+ "field_label": "",
+ "isVis_logic": "",
}
- # Extract and add non-empty response option values
+ # Extract min and max values from response options, if available
+ # loading additional files if responseOptions is an url
if isinstance(item.responseOptions, str):
resp = load_file(
item.responseOptions,
@@ -127,82 +139,42 @@ def process_item(
)
else:
response_options = item.responseOptions
+ row_data["val_min"] = response_options.minValue if response_options else ""
+ row_data["val_max"] = response_options.maxValue if response_options else ""
+
+ # 'choices' processing is now handled in 'find_Ftype_and_colH' if it's a URL
+ choices = response_options.choices if response_options else ""
+ if choices and not isinstance(choices, str):
+ if isinstance(choices, list):
+ item_choices = [
+ f"{ch.value}, {ch.name.get('en', '')}" for ch in choices
+ ]
+ row_data["choices"] = " | ".join(item_choices)
- # Only add values if they exist
- if response_options:
- if response_options.minValue is not None:
- row_data["val_min"] = response_options.minValue
- if response_options.maxValue is not None:
- row_data["val_max"] = response_options.maxValue
-
- # Handle choices
- choices = response_options.choices
- if choices and not isinstance(choices, str):
- if isinstance(choices, list):
- item_choices = [
- f"{ch.value}, {ch.name.get('en', '')}"
- for ch in choices
- if ch.value is not None
- ]
- if item_choices:
- row_data["choices"] = " | ".join(item_choices)
-
- # Add valueRequired if explicitly True
- if (
- item_properties
- and "valueRequired" in item_properties
- and item_properties["valueRequired"] is True
- ):
+ if item_properties.get("valueRequired", "") is True:
row_data["required"] = "y"
-
- var_name = str(item.id).split("/")[-1] # Get the last part of the id path
- if var_name.endswith("_total_score"):
- row_data["isVis_logic"] = False # This will make the field hidden
- # Regular isVis handling for other fields
- elif "isVis" in item_properties and item_properties["isVis"] is not True:
+ if "isVis" in item_properties and item_properties["isVis"] is not True:
row_data["isVis_logic"] = item_properties["isVis"]
+ row_data["field_notes"] = item.description.get("en", "")
+ row_data["preamble"] = item.preamble.get("en", activity_preamble)
+ row_data["var_name"] = item.id
- # Handle description
- if (
- item.description
- and "en" in item.description
- and item.description["en"]
- ):
- row_data["field_notes"] = item.description["en"]
-
- # Handle preamble
- if item.preamble and "en" in item.preamble and item.preamble["en"]:
- row_data["preamble"] = item.preamble["en"]
- elif activity_preamble:
- row_data["preamble"] = activity_preamble
-
- # Handle question/field label
if compute_item:
+ # for compute items there are no questions
question = item.description
else:
question = item.question
-
- if isinstance(question, dict) and "en" in question and question["en"]:
- row_data["field_label"] = question["en"]
- elif isinstance(question, str) and question:
+ if isinstance(question, dict):
+ row_data["field_label"] = question.get("en", "")
+ elif isinstance(question, str):
row_data["field_label"] = question
- # Handle compute items
if compute_item and compute_expr:
- print(f"\nDebug - Compute Item: {var_name}")
- print(f"Compute Expression: {compute_expr}")
row_data["choices"] = compute_expr
row_data["field_type"] = "calc"
- # For computed fields, we may need to set visibility to false by default
- if any(score_type in var_name for score_type in ["_score", "_total"]):
- row_data["isVis_logic"] = False
else:
- # Use find_Ftype_and_colH but only add non-empty values
- field_info = find_Ftype_and_colH(item, {}, response_options)
- if field_info.get("field_type"):
- row_data["field_type"] = field_info["field_type"]
- if field_info.get("val_type_OR_slider"):
- row_data["val_type_OR_slider"] = field_info["val_type_OR_slider"]
+ # Call helper function to find field type and validation type (if any) and update row_data
+ row_data = find_Ftype_and_colH(item, row_data, response_options)
return row_data
@@ -248,16 +220,6 @@ def get_csv_data(dir_path, contextfile, http_kwargs):
el["variableName"]: el
for el in parsed_activity_json["ui"]["addProperties"]
}
-
- # Get activity name without adding extra _schema
- activity_name = act.id.split("/")[-1]
- if activity_name.endswith("_schema.jsonld"):
- activity_name = activity_name[
- :-12
- ] # Remove _schema.jsonld
- elif activity_name.endswith(".jsonld"):
- activity_name = activity_name[:-7] # Remove .jsonld
-
items_properties.update(
{
el["isAbout"]: el
@@ -271,81 +233,56 @@ def get_csv_data(dir_path, contextfile, http_kwargs):
item_order = [("ord", el) for el in act.ui.order]
item_calc = [("calc", el) for el in act.compute]
- computed_fields = {
- calc_item.variableName
- for _, calc_item in item_calc
- }
-
for tp, item in item_order + item_calc:
- try:
- if tp == "calc":
- js_expr = item.jsExpression
- var_name = item.variableName
-
- # Find the corresponding item properties
- if var_name in items_properties:
- item = items_properties[var_name][
- "isAbout"
- ]
- # Ensure computed fields are marked as hidden
- items_properties[var_name][
- "isVis"
- ] = False
- else:
- print(
- f"WARNING: no item properties found for computed field {var_name} in {activity_name}"
- )
- continue
- item_calc = True
+ if tp == "calc":
+ js_expr = item.jsExpression
+ if item.variableName in items_properties:
+ item = items_properties[item.variableName][
+ "isAbout"
+ ]
else:
- item_calc = False
- js_expr = None
- it_prop = items_properties.get(item)
- if not _is_url(item):
- item = Path(activity_path).parent / item
-
- try:
- item_json = load_file(
- item,
- started=True,
- http_kwargs=http_kwargs,
- fixoldschema=True,
- compact=True,
- compact_context=contextfile,
+ print(
+ "WARNING: no item properties found for",
+ item.variableName,
+ activity_name,
)
- item_json.pop("@context", "")
- itm = Item(**item_json)
- except Exception as e:
- print(f"Error loading item: {item}")
- print(f"Error details: {str(e)}")
continue
-
- activity_name = act.id.split("/")[-1].split(
- "."
- )[0]
- activity_preamble = (
- act.preamble.get("en", "").strip()
- if hasattr(act, "preamble")
- else ""
- )
-
- row_data = process_item(
- itm,
- it_prop,
- activity_name,
- activity_preamble,
- contextfile,
- http_kwargs,
- item_calc,
- js_expr,
- )
- csv_data.append(row_data)
-
- except Exception as e:
- print(
- f"Error processing item {item}: {str(e)}"
+ item_calc = True
+ else:
+ item_calc = False
+ js_expr = None
+ it_prop = items_properties.get(item)
+ if not _is_url(item):
+ item = Path(activity_path).parent / item
+ try:
+ item_json = load_file(
+ item,
+ started=True,
+ http_kwargs=http_kwargs,
+ fixoldschema=True,
+ compact=True,
+ compact_context=contextfile,
)
+ except Exception:
+ print(f"Error loading item: {item}")
continue
+ item_json.pop("@context", "")
+ itm = Item(**item_json)
+ activity_name = act.id.split("/")[-1].split(".")[0]
+ activity_preamble = act.preamble.get(
+ "en", ""
+ ).strip()
+ row_data = process_item(
+ itm,
+ it_prop,
+ activity_name,
+ activity_preamble,
+ contextfile,
+ http_kwargs,
+ item_calc,
+ js_expr,
+ )
+ csv_data.append(row_data)
# Break after finding the first _schema file
break
return csv_data
@@ -360,7 +297,7 @@ def write_to_csv(csv_data, output_csv_filename):
"Field Type",
"Field Label",
"Choices, Calculations, OR Slider Labels",
- "Field Note",
+ "Field Note", # TODO: is this description?
"Text Validation Type OR Show Slider Number",
"Text Validation Min",
"Text Validation Max",
@@ -379,69 +316,37 @@ def write_to_csv(csv_data, output_csv_filename):
output_csv_filename, "w", newline="", encoding="utf-8"
) as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=headers)
- writer.writeheader()
+ # Map the data from your format to REDCap format
+ redcap_data = []
for row in csv_data:
- redcap_row = {}
-
- # Handle var_name URL conversion
var_name = row["var_name"]
if _is_url(var_name):
var_name = var_name.split("/")[-1].split(".")[0]
- redcap_row["Variable / Field Name"] = var_name
-
- # Handle form name
- activity_name = row["activity"]
- if activity_name.endswith("_schema"):
- activity_name = activity_name[:-7]
- redcap_row["Form Name"] = activity_name
-
- # Map remaining fields
- field_mappings = {
- "preamble": "Section Header",
- "field_type": "Field Type",
- "field_label": "Field Label",
- "choices": "Choices, Calculations, OR Slider Labels",
- "field_notes": "Field Note",
- "val_type_OR_slider": "Text Validation Type OR Show Slider Number",
- "val_min": "Text Validation Min",
- "val_max": "Text Validation Max",
- "required": "Required Field?",
- "isVis_logic": "Branching Logic (Show field only if...)",
- "field_annotation": "Field Annotation",
- "matrix_group": "Matrix Group Name",
- "matrix_ranking": "Matrix Ranking?",
+ redcap_row = {
+ "Variable / Field Name": var_name,
+ "Form Name": row["activity"],
+ "Section Header": row[
+ "preamble"
+ ], # Update this if your data includes section headers
+ "Field Type": row["field_type"],
+ "Field Label": row["field_label"],
+ "Choices, Calculations, OR Slider Labels": row["choices"],
+ "Field Note": row["field_notes"],
+ "Text Validation Type OR Show Slider Number": row.get(
+ "val_type_OR_slider", ""
+ ),
+ "Required Field?": row["required"],
+ "Text Validation Min": row["val_min"],
+ "Text Validation Max": row["val_max"],
+ "Branching Logic (Show field only if...)": row["isVis_logic"],
+ # Add other fields as necessary based on your data
}
+ redcap_data.append(redcap_row)
- # Add mapped fields only if they exist and aren't empty
- for src_key, dest_key in field_mappings.items():
- if (
- src_key in row
- and row[src_key] is not None
- and row[src_key] != ""
- ):
- # Special handling for visibility logic
- if src_key == "isVis_logic":
- if (
- row[src_key] is not True
- ): # Only add if not default True
- redcap_row[dest_key] = row[src_key]
- # Special handling for required field
- elif src_key == "required":
- redcap_row[dest_key] = "y" if row[src_key] else "n"
- # Special handling for field annotation
- elif src_key == "field_annotation":
- current_annotation = redcap_row.get(dest_key, "")
- if current_annotation:
- redcap_row[dest_key] = (
- f"{current_annotation} {row[src_key]}"
- )
- else:
- redcap_row[dest_key] = row[src_key]
- else:
- redcap_row[dest_key] = row[src_key]
-
- writer.writerow(redcap_row)
+ writer.writeheader()
+ for row in redcap_data:
+ writer.writerow(row)
print("The CSV file was written successfully")
diff --git a/reproschema/tests/test_process_choices.py b/reproschema/tests/test_process_choices.py
index 694487e..620c157 100644
--- a/reproschema/tests/test_process_choices.py
+++ b/reproschema/tests/test_process_choices.py
@@ -87,6 +87,7 @@ def test_process_choices_incomplete_values():
choices, value_types = process_choices(choices_str, "incomplete_values")
assert choices == [
{"name": {"en": "Yes"}, "value": 1},
+ {"name": {"en": ""}, "value": 2},
{"name": {"en": "No"}, "value": 3},
]
assert value_types == ["xsd:integer"]
diff --git a/reproschema/tests/test_rs2redcap_redcap2rs.py b/reproschema/tests/test_rs2redcap_redcap2rs.py
index 02c8955..34ffb44 100644
--- a/reproschema/tests/test_rs2redcap_redcap2rs.py
+++ b/reproschema/tests/test_rs2redcap_redcap2rs.py
@@ -220,59 +220,40 @@ def compare_protocols(prot_tree_orig, prot_tree_final):
)
)
else:
- print(
- f"Activity {act_name}: addProperties have different elements"
- )
errors_list.append(
- f"Activity {act_name}: addProperties have different elements"
+ print_return_msg(
+ f"Activity {act_name}: addProperties have different elements, orig: {act_props_orig} and final: {act_props_final}"
+ )
)
else:
for nm, el in act_props_final.items():
for key in ["isVis", "valueRequired"]:
error = False
- orig_value = getattr(act_props_orig[nm], key)
- final_value = getattr(el, key)
-
- if key == "valueRequired":
- # Debug print
- print(f"\nDebug - Activity: {act_name}, Item: {nm}")
- print(
- f"Original valueRequired: {orig_value}, type: {type(orig_value)}"
+ if (getattr(act_props_orig[nm], key) is not None) and (
+ normalize_condition(getattr(el, key))
+ != normalize_condition(
+ getattr(act_props_orig[nm], key)
)
- print(
- f"Final valueRequired: {final_value}, type: {type(final_value)}"
- )
-
- # Compare only True values
- if orig_value is True:
- if final_value is not True:
- error = True
- print(
- f"Error case 1: orig=True, final={final_value}"
- )
- elif final_value is True:
- if orig_value is not True:
- error = True
- print(
- f"Error case 2: orig={orig_value}, final=True"
- )
-
- elif key == "isVis":
- # Original isVis handling
- if orig_value is not None:
- if normalize_condition(
- orig_value
- ) != normalize_condition(final_value):
- error = True
+ ):
+ error = True
+ elif (
+ getattr(el, key)
+ and getattr(act_props_orig[nm], key) is None
+ ):
+ if (
+ key == "isVis"
+ and normalize_condition(getattr(el, key)) != True
+ ):
+ error = True
elif (
- final_value is not None and final_value is not True
+ key == "valueRequired"
+ and normalize_condition(getattr(el, key)) != False
):
error = True
-
if error:
errors_list.append(
- print(
- f"Activity {act_name}: addProperties {nm} have different {key}"
+ print_return_msg(
+ f"Activity {act_name}: addProperties {nm} have different {key}, orig: {getattr(act_props_orig[nm], key)}, final: {normalize_condition(getattr(el, key))}"
)
)
# check compute
@@ -286,9 +267,10 @@ def compare_protocols(prot_tree_orig, prot_tree_final):
)
)
else:
- print(f"Activity {act_name}: compute have different elements")
errors_list.append(
- f"Activity {act_name}: compute have different elements"
+ print_return_msg(
+ f"Activity {act_name}: compute have different elements, orig: {act_comp_orig}, final: {act_comp_final}"
+ )
)
else:
for nm, el in act_comp_final.items():
@@ -298,7 +280,7 @@ def compare_protocols(prot_tree_orig, prot_tree_final):
getattr(act_comp_orig[nm], "jsExpression")
):
errors_list.append(
- print(
+ print_return_msg(
f"Activity {act_name}: compute {nm} have different jsExpression"
)
)
@@ -314,7 +296,7 @@ def compare_protocols(prot_tree_orig, prot_tree_final):
else:
errors_list.append(
print_return_msg(
- f"Activity {act_name}: items have different elements"
+ f"Activity {act_name}: items have different elements, orig: {act_items_orig}, final: {act_items_final}"
)
)
else:
@@ -335,33 +317,20 @@ def compare_protocols(prot_tree_orig, prot_tree_final):
) != normalize_condition(
act_items_orig[nm]["obj"].question.get("en", "")
):
- # Handle cases where one might be NaN/None and the other empty string
- orig_q = act_items_orig[nm]["obj"].question.get("en", "")
- final_q = el["obj"].question.get("en", "")
-
- # Convert None/NaN to empty string for comparison
- orig_q = (
- "" if pd.isna(orig_q) or orig_q is None else orig_q
- )
- final_q = (
- "" if pd.isna(final_q) or final_q is None else final_q
- )
-
- if normalize_condition(orig_q) != normalize_condition(
- final_q
+ if "
" in normalize_condition(
+ act_items_orig[nm]["obj"].question.get("en", "")
):
- if "
" in normalize_condition(orig_q):
- warnings_list.append(
- print_return_msg(
- f"Activity {act_name}: items {nm} have different question, FIX normalized function!!!"
- )
+ warnings_list.append(
+ print_return_msg(
+ f"Activity {act_name}: items {nm} have different question, FIX normalized function!!!"
)
- else:
- errors_list.append(
- print_return_msg(
- f"Activity {act_name}: items {nm} have different question"
- )
+ )
+ else:
+ errors_list.append(
+ print_return_msg(
+ f"Activity {act_name}: items {nm} have different question"
)
+ )
elif (
el["obj"].ui.inputType
!= act_items_orig[nm]["obj"].ui.inputType