diff --git a/.github/workflows/cdci.yml b/.github/workflows/cdci.yml
index 9fd742c..1fa82e6 100644
--- a/.github/workflows/cdci.yml
+++ b/.github/workflows/cdci.yml
@@ -17,6 +17,8 @@ jobs:
python-version: ["3.9", "3.10", "3.11", "3.12", "3.13"]
steps:
- uses: actions/checkout@v4
+ - uses: psf/black@stable
+ - uses: isort/isort-action@v1
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v5
with:
diff --git a/docs/conf.py b/docs/conf.py
index 6a86ef6..61c767d 100644
--- a/docs/conf.py
+++ b/docs/conf.py
@@ -17,7 +17,9 @@
project = "vuegen"
copyright = "2024, Multiomics-Analytics-Group"
-author = "Multiomics-Analytics-Group, Sebastián Ayala Ruano, Henry Webel, Alberto Santos"
+author = (
+ "Multiomics-Analytics-Group, Sebastián Ayala Ruano, Henry Webel, Alberto Santos"
+)
PACKAGE_VERSION = metadata.version("vuegen")
version = PACKAGE_VERSION
release = PACKAGE_VERSION
diff --git a/pyproject.toml b/pyproject.toml
index 6e4b088..0e0a006 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -60,3 +60,6 @@ docs = ["sphinx", "sphinx-book-theme", "myst-nb", "ipywidgets", "sphinx-new-tab-
[tool.poetry.scripts]
# https://python-poetry.org/docs/pyproject/#scripts
vuegen = "vuegen.__main__:main"
+
+[tool.isort]
+profile = "black"
diff --git a/src/vuegen/__init__.py b/src/vuegen/__init__.py
index d538f87..5becc17 100644
--- a/src/vuegen/__init__.py
+++ b/src/vuegen/__init__.py
@@ -1 +1 @@
-__version__ = "1.0.0"
\ No newline at end of file
+__version__ = "1.0.0"
diff --git a/src/vuegen/__main__.py b/src/vuegen/__main__.py
index 1304235..5c853d5 100644
--- a/src/vuegen/__main__.py
+++ b/src/vuegen/__main__.py
@@ -26,11 +26,11 @@ def main():
# https://docs.python.org/3/library/argparse.html#printing-help
parser.print_help()
sys.exit(1)
-
+
if config_path and dir_path:
print("Please provide only one of configuration file or directory path:\n")
parser.print_help()
- sys.exit(1) # otherwise could resort to either or ?
+ sys.exit(1) # otherwise could resort to either or ?
# Define logger suffix based on report type and name
logger_suffix = f"{report_type}_report_{str(report_name)}"
@@ -39,11 +39,14 @@ def main():
logger = get_logger(f"{logger_suffix}")
# Generate the report
- report_generator.get_report(report_type=report_type,
- logger=logger,
- config_path=config_path,
- dir_path=dir_path,
- streamlit_autorun=streamlit_autorun)
+ report_generator.get_report(
+ report_type=report_type,
+ logger=logger,
+ config_path=config_path,
+ dir_path=dir_path,
+ streamlit_autorun=streamlit_autorun,
+ )
+
-if __name__ == '__main__':
+if __name__ == "__main__":
main()
diff --git a/src/vuegen/config_manager.py b/src/vuegen/config_manager.py
index 8b05b39..4832b08 100644
--- a/src/vuegen/config_manager.py
+++ b/src/vuegen/config_manager.py
@@ -12,7 +12,8 @@ class ConfigManager:
"""
Class for handling metadata of reports from YAML config file and creating report objects.
"""
- def __init__(self, logger: Optional[logging.Logger]=None):
+
+ def __init__(self, logger: Optional[logging.Logger] = None):
"""
Initializes the ConfigManager with a logger.
@@ -67,25 +68,44 @@ def _create_component_config_fromfile(self, file_path: Path) -> Dict[str, str]:
component_config["caption"] = ""
# Infer component config
- if file_ext in [r.DataFrameFormat.CSV.value_with_dot, r.DataFrameFormat.TXT.value_with_dot]:
+ if file_ext in [
+ r.DataFrameFormat.CSV.value_with_dot,
+ r.DataFrameFormat.TXT.value_with_dot,
+ ]:
# Check for CSVNetworkFormat keywords
if "edgelist" in file_path.stem.lower():
component_config["component_type"] = r.ComponentType.PLOT.value
component_config["plot_type"] = r.PlotType.INTERACTIVE_NETWORK.value
- component_config ["csv_network_format"] = r.CSVNetworkFormat.EDGELIST.value
+ component_config["csv_network_format"] = (
+ r.CSVNetworkFormat.EDGELIST.value
+ )
elif "adjlist" in file_path.stem.lower():
- component_config ["component_type"] = r.ComponentType.PLOT.value
- component_config ["plot_type"] = r.PlotType.INTERACTIVE_NETWORK.value
- component_config ["csv_network_format"] = r.CSVNetworkFormat.ADJLIST.value
+ component_config["component_type"] = r.ComponentType.PLOT.value
+ component_config["plot_type"] = r.PlotType.INTERACTIVE_NETWORK.value
+ component_config["csv_network_format"] = (
+ r.CSVNetworkFormat.ADJLIST.value
+ )
# Fill the config with dataframe content
else:
- component_config ["component_type"] = r.ComponentType.DATAFRAME.value
- component_config ["file_format"] = r.DataFrameFormat.CSV.value if file_ext == r.DataFrameFormat.CSV.value_with_dot else r.DataFrameFormat.TXT.value
- component_config ["delimiter"] = "," if file_ext == r.DataFrameFormat.CSV.value_with_dot else "\\t"
+ component_config["component_type"] = r.ComponentType.DATAFRAME.value
+ component_config["file_format"] = (
+ r.DataFrameFormat.CSV.value
+ if file_ext == r.DataFrameFormat.CSV.value_with_dot
+ else r.DataFrameFormat.TXT.value
+ )
+ component_config["delimiter"] = (
+ "," if file_ext == r.DataFrameFormat.CSV.value_with_dot else "\\t"
+ )
# Check other DataframeFormats than csv and txt
- elif file_ext in [fmt.value_with_dot for fmt in r.DataFrameFormat if fmt not in [r.DataFrameFormat.CSV, r.DataFrameFormat.TXT]]:
- component_config ["component_type"] = r.ComponentType.DATAFRAME.value
- component_config ["file_format"] = next(fmt.value for fmt in r.DataFrameFormat if fmt.value_with_dot == file_ext)
+ elif file_ext in [
+ fmt.value_with_dot
+ for fmt in r.DataFrameFormat
+ if fmt not in [r.DataFrameFormat.CSV, r.DataFrameFormat.TXT]
+ ]:
+ component_config["component_type"] = r.ComponentType.DATAFRAME.value
+ component_config["file_format"] = next(
+ fmt.value for fmt in r.DataFrameFormat if fmt.value_with_dot == file_ext
+ )
elif file_ext == ".html":
if is_pyvis_html(file_path):
component_config["component_type"] = r.ComponentType.PLOT.value
@@ -94,19 +114,19 @@ def _create_component_config_fromfile(self, file_path: Path) -> Dict[str, str]:
component_config["component_type"] = r.ComponentType.HTML.value
# Check for network formats
elif file_ext in [fmt.value_with_dot for fmt in r.NetworkFormat]:
- component_config ["component_type"] = r.ComponentType.PLOT.value
+ component_config["component_type"] = r.ComponentType.PLOT.value
if file_ext in [
r.NetworkFormat.PNG.value_with_dot,
r.NetworkFormat.JPG.value_with_dot,
r.NetworkFormat.JPEG.value_with_dot,
r.NetworkFormat.SVG.value_with_dot,
]:
- component_config ["plot_type"] = r.PlotType.STATIC.value
+ component_config["plot_type"] = r.PlotType.STATIC.value
else:
- component_config ["plot_type"] = r.PlotType.INTERACTIVE_NETWORK.value
- # Check for interactive plots
+ component_config["plot_type"] = r.PlotType.INTERACTIVE_NETWORK.value
+ # Check for interactive plots
elif file_ext == ".json":
- component_config ["component_type"] = r.ComponentType.PLOT.value
+ component_config["component_type"] = r.ComponentType.PLOT.value
try:
with open(file_path, "r", encoding="utf-8") as f:
json_data = json.load(f)
@@ -118,12 +138,14 @@ def _create_component_config_fromfile(self, file_path: Path) -> Dict[str, str]:
self.logger.warning(f"Could not parse JSON file {file_path}: {e}")
component_config["plot_type"] = "unknown"
elif file_ext == ".md":
- component_config ["component_type"] = r.ComponentType.MARKDOWN.value
+ component_config["component_type"] = r.ComponentType.MARKDOWN.value
else:
- self.logger.error(f"Unsupported file extension: {file_ext}. Skipping file: {file_path}\n")
+ self.logger.error(
+ f"Unsupported file extension: {file_ext}. Skipping file: {file_path}\n"
+ )
return None
- return component_config
+ return component_config
def _sort_paths_by_numprefix(self, paths: List[Path]) -> List[Path]:
"""
@@ -139,26 +161,27 @@ def _sort_paths_by_numprefix(self, paths: List[Path]) -> List[Path]:
List[Path]
The sorted list of Path objects.
"""
+
def get_sort_key(path: Path) -> tuple:
parts = path.name.split("_", 1)
if parts[0].isdigit():
numeric_prefix = int(parts[0])
else:
# Non-numeric prefixes go to the end
- numeric_prefix = float('inf')
- return numeric_prefix, path.name.lower()
+ numeric_prefix = float("inf")
+ return numeric_prefix, path.name.lower()
return sorted(paths, key=get_sort_key)
-
+
def _read_description_file(self, folder_path: Path) -> str:
"""
Reads the content of a description.md file if it exists in the given folder.
-
+
Parameters
----------
folder_path : Path
Path to the folder where description.md might be located.
-
+
Returns
-------
str
@@ -166,11 +189,13 @@ def _read_description_file(self, folder_path: Path) -> str:
"""
description_file = folder_path / "description.md"
if description_file.exists():
- ret = description_file.read_text().strip().replace('\n', '\n ')
+ ret = description_file.read_text().strip().replace("\n", "\n ")
return f"{ret}\n"
return ""
- def _create_subsect_config_fromdir(self, subsection_dir_path: Path) -> Dict[str, Union[str, List[Dict]]]:
+ def _create_subsect_config_fromdir(
+ self, subsection_dir_path: Path
+ ) -> Dict[str, Union[str, List[Dict]]]:
"""
Creates subsection config from a directory.
@@ -185,18 +210,20 @@ def _create_subsect_config_fromdir(self, subsection_dir_path: Path) -> Dict[str,
The subsection config.
"""
# Sort files by number prefix
- sorted_files = self._sort_paths_by_numprefix(list(subsection_dir_path.iterdir()))
+ sorted_files = self._sort_paths_by_numprefix(
+ list(subsection_dir_path.iterdir())
+ )
components = []
for file in sorted_files:
if file.is_file():
component_config = self._create_component_config_fromfile(file)
- # Skip unsupported files
+ # Skip unsupported files
if component_config is None:
continue
- # Add component config to list
+ # Add component config to list
components.append(component_config)
-
+
subsection_config = {
"title": self._create_title_fromdir(subsection_dir_path.name),
"description": self._read_description_file(subsection_dir_path),
@@ -204,7 +231,9 @@ def _create_subsect_config_fromdir(self, subsection_dir_path: Path) -> Dict[str,
}
return subsection_config
- def _create_sect_config_fromdir(self, section_dir_path: Path) -> Dict[str, Union[str, List[Dict]]]:
+ def _create_sect_config_fromdir(
+ self, section_dir_path: Path
+ ) -> Dict[str, Union[str, List[Dict]]]:
"""
Creates section config from a directory.
@@ -218,8 +247,10 @@ def _create_sect_config_fromdir(self, section_dir_path: Path) -> Dict[str, Union
Dict[str, Union[str, List[Dict]]]
The section config.
"""
- # Sort subsections by number prefix
- sorted_subsections = self._sort_paths_by_numprefix(list(section_dir_path.iterdir()))
+ # Sort subsections by number prefix
+ sorted_subsections = self._sort_paths_by_numprefix(
+ list(section_dir_path.iterdir())
+ )
subsections = []
for subsection_dir in sorted_subsections:
@@ -233,7 +264,9 @@ def _create_sect_config_fromdir(self, section_dir_path: Path) -> Dict[str, Union
}
return section_config
- def create_yamlconfig_fromdir(self, base_dir: str) -> Tuple[Dict[str, Union[str, List[Dict]]], Path]:
+ def create_yamlconfig_fromdir(
+ self, base_dir: str
+ ) -> Tuple[Dict[str, Union[str, List[Dict]]], Path]:
"""
Generates a YAML-compatible config file from a directory. It also returns the resolved folder path.
@@ -264,10 +297,12 @@ def create_yamlconfig_fromdir(self, base_dir: str) -> Tuple[Dict[str, Union[str,
# Sort sections by their number prefix
sorted_sections = self._sort_paths_by_numprefix(list(base_dir_path.iterdir()))
- # Generate sections and subsections config
+ # Generate sections and subsections config
for section_dir in sorted_sections:
if section_dir.is_dir():
- yaml_config["sections"].append(self._create_sect_config_fromdir(section_dir))
+ yaml_config["sections"].append(
+ self._create_sect_config_fromdir(section_dir)
+ )
return yaml_config, base_dir_path
@@ -294,20 +329,22 @@ def initialize_report(self, config: dict) -> tuple[r.Report, dict]:
"""
# Create a Report object from metadata
report = r.Report(
- title = config['report']['title'],
- logger = self.logger,
- sections = [],
- description = config['report'].get('description'),
- graphical_abstract = config['report'].get('graphical_abstract'),
- logo = config['report'].get('logo')
+ title=config["report"]["title"],
+ logger=self.logger,
+ sections=[],
+ description=config["report"].get("description"),
+ graphical_abstract=config["report"].get("graphical_abstract"),
+ logo=config["report"].get("logo"),
)
# Create sections and subsections
- for section_data in config.get('sections', []):
+ for section_data in config.get("sections", []):
section = self._create_section(section_data)
report.sections.append(section)
- self.logger.info(f"Report '{report.title}' initialized with {len(report.sections)} sections.")
+ self.logger.info(
+ f"Report '{report.title}' initialized with {len(report.sections)} sections."
+ )
return report, config
def _create_section(self, section_data: dict) -> r.Section:
@@ -326,16 +363,16 @@ def _create_section(self, section_data: dict) -> r.Section:
"""
# Initialize the Section object
section = r.Section(
- title = section_data['title'],
- subsections = [],
- description = section_data.get('description')
+ title=section_data["title"],
+ subsections=[],
+ description=section_data.get("description"),
)
# Create subsections
- for subsection_data in section_data.get('subsections', []):
+ for subsection_data in section_data.get("subsections", []):
subsection = self._create_subsection(subsection_data)
section.subsections.append(subsection)
-
+
return section
def _create_subsection(self, subsection_data: dict) -> r.Subsection:
@@ -354,13 +391,13 @@ def _create_subsection(self, subsection_data: dict) -> r.Subsection:
"""
# Initialize the Subsection object
subsection = r.Subsection(
- title = subsection_data['title'],
- components = [],
- description = subsection_data.get('description')
+ title=subsection_data["title"],
+ components=[],
+ description=subsection_data.get("description"),
)
# Create components
- for component_data in subsection_data.get('components', []):
+ for component_data in subsection_data.get("components", []):
component = self._create_component(component_data)
subsection.components.append(component)
@@ -381,7 +418,9 @@ def _create_component(self, component_data: dict) -> r.Component:
A Component object (Plot, DataFrame, or Markdown) populated with the provided metadata.
"""
# Determine the component type
- component_type = assert_enum_value(r.ComponentType, component_data['component_type'], self.logger)
+ component_type = assert_enum_value(
+ r.ComponentType, component_data["component_type"], self.logger
+ )
# Dispatch to the corresponding creation method
if component_type == r.ComponentType.PLOT:
@@ -412,17 +451,26 @@ def _create_plot_component(self, component_data: dict) -> r.Plot:
A Plot object populated with the provided metadata.
"""
# Validate enum fields
- plot_type = assert_enum_value(r.PlotType, component_data['plot_type'], self.logger)
- csv_network_format = (assert_enum_value(r.CSVNetworkFormat, component_data.get('csv_network_format', ''), self.logger)
- if component_data.get('csv_network_format') else None)
+ plot_type = assert_enum_value(
+ r.PlotType, component_data["plot_type"], self.logger
+ )
+ csv_network_format = (
+ assert_enum_value(
+ r.CSVNetworkFormat,
+ component_data.get("csv_network_format", ""),
+ self.logger,
+ )
+ if component_data.get("csv_network_format")
+ else None
+ )
return r.Plot(
- title = component_data['title'],
- logger = self.logger,
- file_path = component_data['file_path'],
- plot_type = plot_type,
- csv_network_format = csv_network_format,
- caption = component_data.get('caption')
+ title=component_data["title"],
+ logger=self.logger,
+ file_path=component_data["file_path"],
+ plot_type=plot_type,
+ csv_network_format=csv_network_format,
+ caption=component_data.get("caption"),
)
def _create_dataframe_component(self, component_data: dict) -> r.DataFrame:
@@ -438,17 +486,19 @@ def _create_dataframe_component(self, component_data: dict) -> r.DataFrame:
-------
DataFrame
A DataFrame object populated with the provided metadata.
- """
+ """
# Validate enum field and return dataframe
- file_format = assert_enum_value(r.DataFrameFormat, component_data['file_format'], self.logger)
-
+ file_format = assert_enum_value(
+ r.DataFrameFormat, component_data["file_format"], self.logger
+ )
+
return r.DataFrame(
- title = component_data['title'],
- logger = self.logger,
- file_path = component_data['file_path'],
- file_format = file_format,
- delimiter = component_data.get('delimiter'),
- caption = component_data.get('caption')
+ title=component_data["title"],
+ logger=self.logger,
+ file_path=component_data["file_path"],
+ file_format=file_format,
+ delimiter=component_data.get("delimiter"),
+ caption=component_data.get("caption"),
)
def _create_markdown_component(self, component_data: dict) -> r.Markdown:
@@ -466,12 +516,12 @@ def _create_markdown_component(self, component_data: dict) -> r.Markdown:
A Markdown object populated with the provided metadata.
"""
return r.Markdown(
- title = component_data['title'],
- logger = self.logger,
- file_path = component_data['file_path'],
- caption = component_data.get('caption')
+ title=component_data["title"],
+ logger=self.logger,
+ file_path=component_data["file_path"],
+ caption=component_data.get("caption"),
)
-
+
def _create_html_component(self, component_data: dict) -> r.Html:
"""
Creates an Html component.
@@ -487,12 +537,12 @@ def _create_html_component(self, component_data: dict) -> r.Html:
An Html object populated with the provided metadata.
"""
return r.Html(
- title = component_data['title'],
- logger = self.logger,
- file_path = component_data['file_path'],
- caption = component_data.get('caption')
+ title=component_data["title"],
+ logger=self.logger,
+ file_path=component_data["file_path"],
+ caption=component_data.get("caption"),
)
-
+
def _create_apicall_component(self, component_data: dict) -> r.APICall:
"""
Creates an APICall component.
@@ -508,14 +558,14 @@ def _create_apicall_component(self, component_data: dict) -> r.APICall:
An APICall object populated with the provided metadata.
"""
return r.APICall(
- title = component_data['title'],
- logger = self.logger,
- api_url = component_data['api_url'],
- caption = component_data.get('caption'),
- headers = component_data.get('headers'),
- params = component_data.get('params')
+ title=component_data["title"],
+ logger=self.logger,
+ api_url=component_data["api_url"],
+ caption=component_data.get("caption"),
+ headers=component_data.get("headers"),
+ params=component_data.get("params"),
)
-
+
def _create_chatbot_component(self, component_data: dict) -> r.ChatBot:
"""
Creates a ChatBot component.
@@ -531,11 +581,11 @@ def _create_chatbot_component(self, component_data: dict) -> r.ChatBot:
A chatbot object populated with the provided metadata.
"""
return r.ChatBot(
- title = component_data['title'],
- logger = self.logger,
- api_url = component_data['api_url'],
- model = component_data['model'],
- caption = component_data.get('caption'),
- headers = component_data.get('headers'),
- params = component_data.get('params')
+ title=component_data["title"],
+ logger=self.logger,
+ api_url=component_data["api_url"],
+ model=component_data["model"],
+ caption=component_data.get("caption"),
+ headers=component_data.get("headers"),
+ params=component_data.get("params"),
)
diff --git a/src/vuegen/quarto_reportview.py b/src/vuegen/quarto_reportview.py
index e68be0c..c82f3af 100644
--- a/src/vuegen/quarto_reportview.py
+++ b/src/vuegen/quarto_reportview.py
@@ -1,7 +1,7 @@
import os
import subprocess
-from typing import List
from pathlib import Path
+from typing import List
import networkx as nx
import pandas as pd
@@ -15,13 +15,15 @@ class QuartoReportView(r.ReportView):
A ReportView subclass for generating Quarto reports.
"""
- BASE_DIR = 'quarto_report'
- STATIC_FILES_DIR = os.path.join(BASE_DIR, 'static')
+ BASE_DIR = "quarto_report"
+ STATIC_FILES_DIR = os.path.join(BASE_DIR, "static")
def __init__(self, report: r.Report, report_type: r.ReportType):
- super().__init__(report = report, report_type = report_type)
+ super().__init__(report=report, report_type=report_type)
- def generate_report(self, output_dir: str = BASE_DIR, static_dir: str = STATIC_FILES_DIR) -> None:
+ def generate_report(
+ self, output_dir: str = BASE_DIR, static_dir: str = STATIC_FILES_DIR
+ ) -> None:
"""
Generates the qmd file of the quarto report. It creates code for rendering each section and its subsections with all components.
@@ -32,80 +34,115 @@ def generate_report(self, output_dir: str = BASE_DIR, static_dir: str = STATIC_F
static_dir : str, optional
The folder where the static files will be saved (default is STATIC_FILES_DIR).
"""
- self.report.logger.debug(f"Generating '{self.report_type}' report in directory: '{output_dir}'")
+ self.report.logger.debug(
+ f"Generating '{self.report_type}' report in directory: '{output_dir}'"
+ )
# Create the output folder
if create_folder(output_dir):
self.report.logger.debug(f"Created output directory: '{output_dir}'")
else:
- self.report.logger.debug(f"Output directory already existed: '{output_dir}'")
+ self.report.logger.debug(
+ f"Output directory already existed: '{output_dir}'"
+ )
# Create the static folder
if create_folder(static_dir):
- self.report.logger.info(f"Created output directory for static content: '{static_dir}'")
+ self.report.logger.info(
+ f"Created output directory for static content: '{static_dir}'"
+ )
else:
- self.report.logger.info(f"Output directory for static content already existed: '{static_dir}'")
-
+ self.report.logger.info(
+ f"Output directory for static content already existed: '{static_dir}'"
+ )
+
try:
# Create variable to check if the report is static or revealjs
- is_report_static = self.report_type in {r.ReportType.PDF, r.ReportType.DOCX, r.ReportType.ODT, r.ReportType.PPTX}
+ is_report_static = self.report_type in {
+ r.ReportType.PDF,
+ r.ReportType.DOCX,
+ r.ReportType.ODT,
+ r.ReportType.PPTX,
+ }
is_report_revealjs = self.report_type == r.ReportType.REVEALJS
-
+
# Define the YAML header for the quarto report
yaml_header = self._create_yaml_header()
-
- # Create qmd content and imports for the report
+
+ # Create qmd content and imports for the report
qmd_content = []
report_imports = []
# Add description of the report
if self.report.description:
- qmd_content.append(f'''{self.report.description}''')
+ qmd_content.append(f"""{self.report.description}""")
# If available add the graphical abstract
if self.report.graphical_abstract:
- qmd_content.append(self._generate_image_content(self.report.graphical_abstract))
+ qmd_content.append(
+ self._generate_image_content(self.report.graphical_abstract)
+ )
# Add the sections and subsections to the report
self.report.logger.info("Starting to generate sections for the report.")
for section in self.report.sections:
- self.report.logger.debug(f"Processing section: '{section.title}' - {len(section.subsections)} subsection(s)")
+ self.report.logger.debug(
+ f"Processing section: '{section.title}' - {len(section.subsections)} subsection(s)"
+ )
# Add section header and description
- qmd_content.append(f'# {section.title}')
+ qmd_content.append(f"# {section.title}")
if section.description:
- qmd_content.append(f'''{section.description}\n''')
-
+ qmd_content.append(f"""{section.description}\n""")
+
if section.subsections:
# Iterate through subsections and integrate them into the section file
for subsection in section.subsections:
- self.report.logger.debug(f"Processing subsection: '{subsection.title}' - {len(subsection.components)} component(s)")
+ self.report.logger.debug(
+ f"Processing subsection: '{subsection.title}' - {len(subsection.components)} component(s)"
+ )
# Generate content for the subsection
- subsection_content, subsection_imports = self._generate_subsection(subsection, is_report_static, is_report_revealjs)
+ subsection_content, subsection_imports = (
+ self._generate_subsection(
+ subsection, is_report_static, is_report_revealjs
+ )
+ )
qmd_content.extend(subsection_content)
report_imports.extend(subsection_imports)
else:
- self.report.logger.warning(f"No subsections found in section: '{section.title}'. To show content in the report, add subsections to the section.")
-
+ self.report.logger.warning(
+ f"No subsections found in section: '{section.title}'. To show content in the report, add subsections to the section."
+ )
+
# Flatten the subsection_imports into a single list
- flattened_report_imports = [imp for sublist in report_imports for imp in sublist]
-
+ flattened_report_imports = [
+ imp for sublist in report_imports for imp in sublist
+ ]
+
# Remove duplicated imports
report_unique_imports = list(set(flattened_report_imports))
# Format imports
report_formatted_imports = "\n".join(report_unique_imports)
-
+
# Write the navigation and general content to a Python file
- with open(os.path.join(output_dir, f"{self.BASE_DIR}.qmd"), 'w') as quarto_report:
+ with open(
+ os.path.join(output_dir, f"{self.BASE_DIR}.qmd"), "w"
+ ) as quarto_report:
quarto_report.write(yaml_header)
- quarto_report.write(f"""\n```{{python}}
+ quarto_report.write(
+ f"""\n```{{python}}
#| label: 'Imports'
{report_formatted_imports}
-```\n\n""")
+```\n\n"""
+ )
quarto_report.write("\n".join(qmd_content))
- self.report.logger.info(f"Created qmd script to render the app: {self.BASE_DIR}.qmd")
-
+ self.report.logger.info(
+ f"Created qmd script to render the app: {self.BASE_DIR}.qmd"
+ )
+
except Exception as e:
- self.report.logger.error(f"An error occurred while generating the report: {str(e)}")
+ self.report.logger.error(
+ f"An error occurred while generating the report: {str(e)}"
+ )
raise
def run_report(self, output_dir: str = BASE_DIR) -> None:
@@ -118,15 +155,31 @@ def run_report(self, output_dir: str = BASE_DIR) -> None:
The folder where the report was generated (default is 'sections').
"""
try:
- subprocess.run(["quarto", "render", os.path.join(output_dir, f"{self.BASE_DIR}.qmd")], check=True)
+ subprocess.run(
+ ["quarto", "render", os.path.join(output_dir, f"{self.BASE_DIR}.qmd")],
+ check=True,
+ )
if self.report_type == r.ReportType.JUPYTER:
- subprocess.run(["quarto", "convert", os.path.join(output_dir, f"{self.BASE_DIR}.qmd")], check=True)
- self.report.logger.info(f"'{self.report.title}' '{self.report_type}' report rendered")
+ subprocess.run(
+ [
+ "quarto",
+ "convert",
+ os.path.join(output_dir, f"{self.BASE_DIR}.qmd"),
+ ],
+ check=True,
+ )
+ self.report.logger.info(
+ f"'{self.report.title}' '{self.report_type}' report rendered"
+ )
except subprocess.CalledProcessError as e:
- self.report.logger.error(f"Error running '{self.report.title}' {self.report_type} report: {str(e)}")
+ self.report.logger.error(
+ f"Error running '{self.report.title}' {self.report_type} report: {str(e)}"
+ )
raise
except FileNotFoundError as e:
- self.report.logger.error(f"Quarto is not installed. Please install Quarto to run the report: {str(e)}")
+ self.report.logger.error(
+ f"Quarto is not installed. Please install Quarto to run the report: {str(e)}"
+ )
raise
def _create_yaml_header(self) -> str:
@@ -226,7 +279,7 @@ def _create_yaml_header(self) -> str:
pptx:
toc: false
output: true""",
-r.ReportType.JUPYTER: """
+ r.ReportType.JUPYTER: """
html:
toc: true
toc-location: left
@@ -252,7 +305,7 @@ def _create_yaml_header(self) -> str:
| © 2025 Multiomics Network Analytics Group (MoNA)
- """
+ """,
}
# Create a key based on the report type and format
key = self.report_type
@@ -269,9 +322,11 @@ def _create_yaml_header(self) -> str:
return yaml_header
- def _generate_subsection(self, subsection, is_report_static, is_report_revealjs) -> tuple[List[str], List[str]]:
+ def _generate_subsection(
+ self, subsection, is_report_static, is_report_revealjs
+ ) -> tuple[List[str], List[str]]:
"""
- Generate code to render components (plots, dataframes, markdown) in the given subsection,
+ Generate code to render components (plots, dataframes, markdown) in the given subsection,
creating imports and content for the subsection based on the component type.
Parameters
@@ -292,35 +347,51 @@ def _generate_subsection(self, subsection, is_report_static, is_report_revealjs)
subsection_imports = []
# Add subsection header and description
- subsection_content.append(f'## {subsection.title}')
- if subsection.description:
- subsection_content.append(f'''{subsection.description}\n''')
+ subsection_content.append(f"## {subsection.title}")
+ if subsection.description:
+ subsection_content.append(f"""{subsection.description}\n""")
if is_report_revealjs:
- subsection_content.append(f'::: {{.panel-tabset}}\n')
+ subsection_content.append(f"::: {{.panel-tabset}}\n")
for component in subsection.components:
component_imports = self._generate_component_imports(component)
subsection_imports.append(component_imports)
if component.component_type == r.ComponentType.PLOT:
- subsection_content.extend(self._generate_plot_content(component, is_report_static))
+ subsection_content.extend(
+ self._generate_plot_content(component, is_report_static)
+ )
elif component.component_type == r.ComponentType.DATAFRAME:
- subsection_content.extend(self._generate_dataframe_content(component, is_report_static))
- elif component.component_type == r.ComponentType.MARKDOWN and component.title.lower() != "description":
+ subsection_content.extend(
+ self._generate_dataframe_content(component, is_report_static)
+ )
+ elif (
+ component.component_type == r.ComponentType.MARKDOWN
+ and component.title.lower() != "description"
+ ):
subsection_content.extend(self._generate_markdown_content(component))
- elif component.component_type == r.ComponentType.HTML and not is_report_static:
+ elif (
+ component.component_type == r.ComponentType.HTML
+ and not is_report_static
+ ):
subsection_content.extend(self._generate_html_content(component))
else:
- self.report.logger.warning(f"Unsupported component type '{component.component_type}' in subsection: {subsection.title}")
-
+ self.report.logger.warning(
+ f"Unsupported component type '{component.component_type}' in subsection: {subsection.title}"
+ )
+
if is_report_revealjs:
- subsection_content.append(':::\n')
+ subsection_content.append(":::\n")
- self.report.logger.info(f"Generated content and imports for subsection: '{subsection.title}'")
+ self.report.logger.info(
+ f"Generated content and imports for subsection: '{subsection.title}'"
+ )
return subsection_content, subsection_imports
- def _generate_plot_content(self, plot, is_report_static, static_dir: str = STATIC_FILES_DIR) -> List[str]:
+ def _generate_plot_content(
+ self, plot, is_report_static, static_dir: str = STATIC_FILES_DIR
+ ) -> List[str]:
"""
Generate content for a plot component based on the report type.
@@ -330,7 +401,7 @@ def _generate_plot_content(self, plot, is_report_static, static_dir: str = STATI
The plot component to generate content for.
static_dir : str, optional
The folder where the static files will be saved (default is STATIC_FILES_DIR).
-
+
Returns
-------
list : List[str]
@@ -338,29 +409,39 @@ def _generate_plot_content(self, plot, is_report_static, static_dir: str = STATI
"""
plot_content = []
# Add title
- plot_content.append(f'### {plot.title}')
-
+ plot_content.append(f"### {plot.title}")
+
# Define plot path
if is_report_static:
- static_plot_path = os.path.join(static_dir, f"{plot.title.replace(' ', '_')}.png")
+ static_plot_path = os.path.join(
+ static_dir, f"{plot.title.replace(' ', '_')}.png"
+ )
else:
- html_plot_file = os.path.join(static_dir, f"{plot.title.replace(' ', '_')}.html")
+ html_plot_file = os.path.join(
+ static_dir, f"{plot.title.replace(' ', '_')}.html"
+ )
# Add content for the different plot types
try:
if plot.plot_type == r.PlotType.STATIC:
- plot_content.append(self._generate_image_content(plot.file_path, width=950))
+ plot_content.append(
+ self._generate_image_content(plot.file_path, width=950)
+ )
elif plot.plot_type == r.PlotType.PLOTLY:
plot_content.append(self._generate_plot_code(plot))
if is_report_static:
- plot_content.append(f"""fig_plotly.write_image("{os.path.abspath(static_plot_path)}")\n```\n""")
+ plot_content.append(
+ f"""fig_plotly.write_image("{os.path.abspath(static_plot_path)}")\n```\n"""
+ )
plot_content.append(self._generate_image_content(static_plot_path))
else:
plot_content.append(f"""fig_plotly.show()\n```\n""")
elif plot.plot_type == r.PlotType.ALTAIR:
plot_content.append(self._generate_plot_code(plot))
if is_report_static:
- plot_content.append(f"""fig_altair.save("{os.path.abspath(static_plot_path)}")\n```\n""")
+ plot_content.append(
+ f"""fig_altair.save("{os.path.abspath(static_plot_path)}")\n```\n"""
+ )
plot_content.append(self._generate_image_content(static_plot_path))
else:
plot_content.append(f"""fig_altair\n```\n""")
@@ -371,14 +452,16 @@ def _generate_plot_content(self, plot, is_report_static, static_dir: str = STATI
networkx_graph, html_plot_file = networkx_graph
elif isinstance(networkx_graph, nx.Graph) and not is_report_static:
# Get the pyvis object and create html
- pyvis_graph = plot.create_and_save_pyvis_network(networkx_graph, html_plot_file)
-
+ pyvis_graph = plot.create_and_save_pyvis_network(
+ networkx_graph, html_plot_file
+ )
+
# Add number of nodes and edges to the plor conetnt
num_nodes = networkx_graph.number_of_nodes()
num_edges = networkx_graph.number_of_edges()
- plot_content.append(f'**Number of nodes:** {num_nodes}\n')
- plot_content.append(f'**Number of edges:** {num_edges}\n')
-
+ plot_content.append(f"**Number of nodes:** {num_nodes}\n")
+ plot_content.append(f"**Number of edges:** {num_edges}\n")
+
# Add code to generate network depending on the report type
if is_report_static:
plot.save_netwrok_image(networkx_graph, static_plot_path, "png")
@@ -386,21 +469,25 @@ def _generate_plot_content(self, plot, is_report_static, static_dir: str = STATI
else:
plot_content.append(self._generate_plot_code(plot, html_plot_file))
else:
- self.report.logger.warning(f"Unsupported plot type: {plot.plot_type}")
+ self.report.logger.warning(f"Unsupported plot type: {plot.plot_type}")
except Exception as e:
- self.report.logger.error(f"Error generating content for '{plot.plot_type}' plot '{plot.id}' '{plot.title}': {str(e)}")
+ self.report.logger.error(
+ f"Error generating content for '{plot.plot_type}' plot '{plot.id}' '{plot.title}': {str(e)}"
+ )
raise
-
+
# Add caption if available
if plot.caption:
- plot_content.append(f'>{plot.caption}\n')
+ plot_content.append(f">{plot.caption}\n")
- self.report.logger.info(f"Successfully generated content for plot: '{plot.title}'")
+ self.report.logger.info(
+ f"Successfully generated content for plot: '{plot.title}'"
+ )
return plot_content
- def _generate_plot_code(self, plot, output_file = "") -> str:
+ def _generate_plot_code(self, plot, output_file="") -> str:
"""
- Create the plot code based on its visualization tool.
+ Create the plot code based on its visualization tool.
Parameters
----------
@@ -419,7 +506,7 @@ def _generate_plot_code(self, plot, output_file = "") -> str:
#| fig-cap: ""
"""
# If the file path is a URL, generate code to fetch content via requests
- if is_url(plot.file_path):
+ if is_url(plot.file_path):
plot_code += f"""
response = requests.get('{plot.file_path}')
response.raise_for_status()
@@ -427,7 +514,7 @@ def _generate_plot_code(self, plot, output_file = "") -> str:
else: # If it's a local file
plot_code += f"""
with open('{os.path.join("..", plot.file_path)}', 'r') as plot_file:
- plot_json = plot_file.read()\n"""
+ plot_json = plot_file.read()\n"""
# Add specific code for each visualization tool
if plot.plot_type == r.PlotType.PLOTLY:
plot_code += """
@@ -437,7 +524,7 @@ def _generate_plot_code(self, plot, output_file = "") -> str:
plot_code += """fig_altair = alt.Chart.from_json(plot_json).properties(width=900, height=400)"""
elif plot.plot_type == r.PlotType.INTERACTIVE_NETWORK:
# Generate the HTML embedding for interactive networks
- if is_url(plot.file_path) and plot.file_path.endswith('.html'):
+ if is_url(plot.file_path) and plot.file_path.endswith(".html"):
iframe_src = output_file
else:
iframe_src = os.path.join("..", output_file)
@@ -459,7 +546,7 @@ def _generate_dataframe_content(self, dataframe, is_report_static) -> List[str]:
The dataframe component to add to content.
is_report_static : bool
A boolean indicating whether the report is static or interactive.
-
+
Returns
-------
list : List[str]
@@ -467,45 +554,61 @@ def _generate_dataframe_content(self, dataframe, is_report_static) -> List[str]:
"""
dataframe_content = []
# Add title
- dataframe_content.append(f'### {dataframe.title}')
+ dataframe_content.append(f"### {dataframe.title}")
# Append header for DataFrame loading
- dataframe_content.append(f"""```{{python}}
+ dataframe_content.append(
+ f"""```{{python}}
#| label: '{dataframe.title} {dataframe.id}'
#| fig-cap: ""
-""")
+"""
+ )
# Mapping of file extensions to read functions
read_function_mapping = {
r.DataFrameFormat.CSV.value_with_dot: pd.read_csv,
r.DataFrameFormat.PARQUET.value_with_dot: pd.read_parquet,
r.DataFrameFormat.TXT.value_with_dot: pd.read_table,
r.DataFrameFormat.XLS.value_with_dot: pd.read_excel,
- r.DataFrameFormat.XLSX.value_with_dot: pd.read_excel
+ r.DataFrameFormat.XLSX.value_with_dot: pd.read_excel,
}
try:
# Check if the file extension matches any DataFrameFormat value
file_extension = os.path.splitext(dataframe.file_path)[1].lower()
- if not any(file_extension == fmt.value_with_dot for fmt in r.DataFrameFormat):
- self.report.logger.error(f"Unsupported file extension: {file_extension}. Supported extensions are: {', '.join(fmt.value for fmt in r.DataFrameFormat)}.")
-
+ if not any(
+ file_extension == fmt.value_with_dot for fmt in r.DataFrameFormat
+ ):
+ self.report.logger.error(
+ f"Unsupported file extension: {file_extension}. Supported extensions are: {', '.join(fmt.value for fmt in r.DataFrameFormat)}."
+ )
+
# Build the file path (URL or local file)
- file_path = dataframe.file_path if is_url(dataframe.file_path) else os.path.join("..", dataframe.file_path)
+ file_path = (
+ dataframe.file_path
+ if is_url(dataframe.file_path)
+ else os.path.join("..", dataframe.file_path)
+ )
# Load the DataFrame using the correct function
read_function = read_function_mapping[file_extension]
- dataframe_content.append(f"""df = pd.{read_function.__name__}('{file_path}')""")
+ dataframe_content.append(
+ f"""df = pd.{read_function.__name__}('{file_path}')"""
+ )
# Display the dataframe
dataframe_content.extend(self._show_dataframe(dataframe, is_report_static))
-
+
except Exception as e:
- self.report.logger.error(f"Error generating content for DataFrame: {dataframe.title}. Error: {str(e)}")
+ self.report.logger.error(
+ f"Error generating content for DataFrame: {dataframe.title}. Error: {str(e)}"
+ )
raise
# Add caption if available
if dataframe.caption:
- dataframe_content.append(f'>{dataframe.caption}\n')
+ dataframe_content.append(f">{dataframe.caption}\n")
- self.report.logger.info(f"Successfully generated content for DataFrame: '{dataframe.title}'")
+ self.report.logger.info(
+ f"Successfully generated content for DataFrame: '{dataframe.title}'"
+ )
return dataframe_content
def _generate_markdown_content(self, markdown) -> List[str]:
@@ -516,45 +619,55 @@ def _generate_markdown_content(self, markdown) -> List[str]:
----------
markdown : Markdown
The markdown component to add to content.
-
+
Returns
-------
list : List[str]
The list of content lines for the markdown.
- """
+ """
markdown_content = []
# Add title
- markdown_content.append(f'### {markdown.title}')
-
+ markdown_content.append(f"### {markdown.title}")
+
try:
# Initialize md code with common structure
- markdown_content.append(f"""
+ markdown_content.append(
+ f"""
```{{python}}
#| label: '{markdown.title} {markdown.id}'
-#| fig-cap: ""\n""")
+#| fig-cap: ""\n"""
+ )
# If the file path is a URL, generate code to fetch content via requests
- if is_url(markdown.file_path):
- markdown_content.append(f"""
+ if is_url(markdown.file_path):
+ markdown_content.append(
+ f"""
response = requests.get('{markdown.file_path}')
response.raise_for_status()
-markdown_content = response.text\n""")
- else: #If it's a local file
- markdown_content.append(f"""
+markdown_content = response.text\n"""
+ )
+ else: # If it's a local file
+ markdown_content.append(
+ f"""
with open('{os.path.join("..", markdown.file_path)}', 'r') as markdown_file:
- markdown_content = markdown_file.read()\n""")
-
+ markdown_content = markdown_file.read()\n"""
+ )
+
# Code to display md content
markdown_content.append(f"""display.Markdown(markdown_content)\n```\n""")
except Exception as e:
- self.report.logger.error(f"Error generating content for Markdown: {markdown.title}. Error: {str(e)}")
+ self.report.logger.error(
+ f"Error generating content for Markdown: {markdown.title}. Error: {str(e)}"
+ )
raise
-
+
# Add caption if available
if markdown.caption:
- markdown_content.append(f'>{markdown.caption}\n')
-
- self.report.logger.info(f"Successfully generated content for Markdown: '{markdown.title}'")
+ markdown_content.append(f">{markdown.caption}\n")
+
+ self.report.logger.info(
+ f"Successfully generated content for Markdown: '{markdown.title}'"
+ )
return markdown_content
def _generate_html_content(self, html) -> List[str]:
@@ -572,13 +685,17 @@ def _generate_html_content(self, html) -> List[str]:
The list of content lines for embedding the HTML.
"""
html_content = []
-
+
# Add title
- html_content.append(f'### {html.title}')
-
+ html_content.append(f"### {html.title}")
+
try:
# Embed the HTML in an iframe
- iframe_src = html.file_path if is_url(html.file_path) else os.path.join("..", html.file_path)
+ iframe_src = (
+ html.file_path
+ if is_url(html.file_path)
+ else os.path.join("..", html.file_path)
+ )
iframe_code = f"""
Number of nodes: {num_nodes}
", unsafe_allow_html=True) -st.markdown(f"Number of relationships: {num_edges}
", unsafe_allow_html=True)\n""") - +st.markdown(f"Number of relationships: {num_edges}
", unsafe_allow_html=True)\n""" + ) + # Add the specific code for visualization plot_content.append(self._generate_plot_code(plot)) else: self.report.logger.warning(f"Unsupported plot type: {plot.plot_type}") except Exception as e: - self.report.logger.error(f"Error generating content for '{plot.plot_type}' plot '{plot.id}' '{plot.title}': {str(e)}") - raise + self.report.logger.error( + f"Error generating content for '{plot.plot_type}' plot '{plot.id}' '{plot.title}': {str(e)}" + ) + raise - self.report.logger.info(f"Successfully generated content for plot '{plot.id}': '{plot.title}'") + self.report.logger.info( + f"Successfully generated content for plot '{plot.id}': '{plot.title}'" + ) return plot_content - + def _generate_plot_code(self, plot) -> str: """ - Create the plot code based on its visualization tool. + Create the plot code based on its visualization tool. Parameters ---------- @@ -402,7 +524,7 @@ def _generate_plot_code(self, plot) -> str: The generated plot code as a string. """ # If the file path is a URL, generate code to fetch content via requests - if is_url(plot.file_path): + if is_url(plot.file_path): plot_code = f""" response = requests.get('{plot.file_path}') response.raise_for_status() @@ -410,8 +532,8 @@ def _generate_plot_code(self, plot) -> str: else: # If it's a local file plot_code = f""" with open('{os.path.join(plot.file_path)}', 'r') as plot_file: - plot_json = json.load(plot_file)\n""" - + plot_json = json.load(plot_file)\n""" + # Add specific code for each visualization tool if plot.plot_type == r.PlotType.PLOTLY: plot_code += "st.plotly_chart(plot_json, use_container_width=True)\n" @@ -420,7 +542,7 @@ def _generate_plot_code(self, plot) -> str: plot_code += """ altair_plot = alt.Chart.from_dict(plot_json) st.vega_lite_chart(json.loads(altair_plot.to_json()), use_container_width=True)\n""" - + elif plot.plot_type == r.PlotType.INTERACTIVE_NETWORK: plot_code = """# Streamlit checkbox for controlling the layout control_layout = st.checkbox('Add panel to control layout', value=True) @@ -428,7 +550,7 @@ def _generate_plot_code(self, plot) -> str: # Load HTML into HTML component for display on Streamlit st.components.v1.html(html_data, height=net_html_height)\n""" return plot_code - + def _generate_dataframe_content(self, dataframe) -> List[str]: """ Generate content for a DataFrame component. @@ -445,29 +567,40 @@ def _generate_dataframe_content(self, dataframe) -> List[str]: """ dataframe_content = [] # Add title - dataframe_content.append(self._format_text(text=dataframe.title, type='header', level=4, color='#2b8cbe')) - + dataframe_content.append( + self._format_text( + text=dataframe.title, type="header", level=4, color="#2b8cbe" + ) + ) + # Mapping of file extensions to read functions read_function_mapping = { r.DataFrameFormat.CSV.value_with_dot: pd.read_csv, r.DataFrameFormat.PARQUET.value_with_dot: pd.read_parquet, r.DataFrameFormat.TXT.value_with_dot: pd.read_table, r.DataFrameFormat.XLS.value_with_dot: pd.read_excel, - r.DataFrameFormat.XLSX.value_with_dot: pd.read_excel + r.DataFrameFormat.XLSX.value_with_dot: pd.read_excel, } try: # Check if the file extension matches any DataFrameFormat value file_extension = os.path.splitext(dataframe.file_path)[1].lower() - if not any(file_extension == fmt.value_with_dot for fmt in r.DataFrameFormat): - self.report.logger.error(f"Unsupported file extension: {file_extension}. Supported extensions are: {', '.join(fmt.value for fmt in r.DataFrameFormat)}.") + if not any( + file_extension == fmt.value_with_dot for fmt in r.DataFrameFormat + ): + self.report.logger.error( + f"Unsupported file extension: {file_extension}. Supported extensions are: {', '.join(fmt.value for fmt in r.DataFrameFormat)}." + ) # Load the DataFrame using the correct function read_function = read_function_mapping[file_extension] - dataframe_content.append(f"""df = pd.{read_function.__name__}('{dataframe.file_path}')""") - + dataframe_content.append( + f"""df = pd.{read_function.__name__}('{dataframe.file_path}')""" + ) + # Displays a DataFrame using AgGrid with configurable options. - dataframe_content.append(""" + dataframe_content.append( + """ # Displays a DataFrame using AgGrid with configurable options. grid_builder = GridOptionsBuilder.from_dataframe(df) grid_builder.configure_default_column(editable=True, groupable=True) @@ -486,18 +619,27 @@ def _generate_dataframe_content(self, dataframe) -> List[str]: file_name=f"dataframe_{df_index}.csv", mime='text/csv', key=f"download_button_{df_index}") -df_index += 1""") +df_index += 1""" + ) except Exception as e: - self.report.logger.error(f"Error generating content for DataFrame: {dataframe.title}. Error: {str(e)}") + self.report.logger.error( + f"Error generating content for DataFrame: {dataframe.title}. Error: {str(e)}" + ) raise - + # Add caption if available if dataframe.caption: - dataframe_content.append(self._format_text(text=dataframe.caption, type='caption', text_align="left")) - - self.report.logger.info(f"Successfully generated content for DataFrame: '{dataframe.title}'") + dataframe_content.append( + self._format_text( + text=dataframe.caption, type="caption", text_align="left" + ) + ) + + self.report.logger.info( + f"Successfully generated content for DataFrame: '{dataframe.title}'" + ) return dataframe_content - + def _generate_markdown_content(self, markdown) -> List[str]: """ Generate content for a Markdown component. @@ -515,31 +657,49 @@ def _generate_markdown_content(self, markdown) -> List[str]: markdown_content = [] # Add title - markdown_content.append(self._format_text(text=markdown.title, type='header', level=4, color='#2b8cbe')) + markdown_content.append( + self._format_text( + text=markdown.title, type="header", level=4, color="#2b8cbe" + ) + ) try: # If the file path is a URL, generate code to fetch content via requests - if is_url(markdown.file_path): - markdown_content.append(f""" + if is_url(markdown.file_path): + markdown_content.append( + f""" response = requests.get('{markdown.file_path}') response.raise_for_status() -markdown_content = response.text\n""") - else: #If it's a local file - markdown_content.append(f""" +markdown_content = response.text\n""" + ) + else: # If it's a local file + markdown_content.append( + f""" with open('{os.path.join("..", markdown.file_path)}', 'r') as markdown_file: - markdown_content = markdown_file.read()\n""") + markdown_content = markdown_file.read()\n""" + ) # Code to display md content - markdown_content.append("st.markdown(markdown_content, unsafe_allow_html=True)\n") + markdown_content.append( + "st.markdown(markdown_content, unsafe_allow_html=True)\n" + ) except Exception as e: - self.report.logger.error(f"Error generating content for Markdown: {markdown.title}. Error: {str(e)}") + self.report.logger.error( + f"Error generating content for Markdown: {markdown.title}. Error: {str(e)}" + ) raise - + # Add caption if available if markdown.caption: - markdown_content.append(self._format_text(text=markdown.caption, type='caption', text_align="left")) - - self.report.logger.info(f"Successfully generated content for Markdown: '{markdown.title}'") + markdown_content.append( + self._format_text( + text=markdown.caption, type="caption", text_align="left" + ) + ) + + self.report.logger.info( + f"Successfully generated content for Markdown: '{markdown.title}'" + ) return markdown_content - + def _generate_html_content(self, html) -> List[str]: """ Generate content for an HTML component in a Streamlit app. @@ -557,35 +717,49 @@ def _generate_html_content(self, html) -> List[str]: html_content = [] # Add title - html_content.append(self._format_text(text=html.title, type='header', level=4, color='#2b8cbe')) + html_content.append( + self._format_text(text=html.title, type="header", level=4, color="#2b8cbe") + ) try: - if is_url(html.file_path): + if is_url(html.file_path): # If it's a URL, fetch content dynamically - html_content.append(f""" + html_content.append( + f""" response = requests.get('{html.file_path}') response.raise_for_status() -html_content = response.text\n""") +html_content = response.text\n""" + ) else: # If it's a local file - html_content.append(f""" + html_content.append( + f""" with open('{os.path.join("..", html.file_path)}', 'r', encoding='utf-8') as html_file: - html_content = html_file.read()\n""") + html_content = html_file.read()\n""" + ) # Display HTML content using Streamlit - html_content.append("st.components.v1.html(html_content, height=600, scrolling=True)\n") + html_content.append( + "st.components.v1.html(html_content, height=600, scrolling=True)\n" + ) except Exception as e: - self.report.logger.error(f"Error generating content for HTML: {html.title}. Error: {str(e)}") + self.report.logger.error( + f"Error generating content for HTML: {html.title}. Error: {str(e)}" + ) raise # Add caption if available if html.caption: - html_content.append(self._format_text(text=html.caption, type='caption', text_align="left")) + html_content.append( + self._format_text(text=html.caption, type="caption", text_align="left") + ) - self.report.logger.info(f"Successfully generated content for HTML: '{html.title}'") + self.report.logger.info( + f"Successfully generated content for HTML: '{html.title}'" + ) return html_content - + def _generate_apicall_content(self, apicall) -> List[str]: """ Generate content for a Markdown component. @@ -603,21 +777,33 @@ def _generate_apicall_content(self, apicall) -> List[str]: apicall_content = [] # Add tile - apicall_content.append(self._format_text(text=apicall.title, type='header', level=4, color='#2b8cbe')) + apicall_content.append( + self._format_text( + text=apicall.title, type="header", level=4, color="#2b8cbe" + ) + ) try: - apicall_response = apicall.make_api_request(method='GET') + apicall_response = apicall.make_api_request(method="GET") apicall_content.append(f"""st.write({apicall_response})\n""") except Exception as e: - self.report.logger.error(f"Error generating content for APICall: {apicall.title}. Error: {str(e)}") + self.report.logger.error( + f"Error generating content for APICall: {apicall.title}. Error: {str(e)}" + ) raise # Add caption if available if apicall.caption: - apicall_content.append(self._format_text(text=apicall.caption, type='caption', text_align="left")) - - self.report.logger.info(f"Successfully generated content for APICall: '{apicall.title}'") + apicall_content.append( + self._format_text( + text=apicall.caption, type="caption", text_align="left" + ) + ) + + self.report.logger.info( + f"Successfully generated content for APICall: '{apicall.title}'" + ) return apicall_content - + def _generate_chatbot_content(self, chatbot) -> List[str]: """ Generate content for a ChatBot component. @@ -635,10 +821,15 @@ def _generate_chatbot_content(self, chatbot) -> List[str]: chatbot_content = [] # Add title - chatbot_content.append(self._format_text(text=chatbot.title, type='header', level=4, color='#2b8cbe')) + chatbot_content.append( + self._format_text( + text=chatbot.title, type="header", level=4, color="#2b8cbe" + ) + ) # Chatbot logic for embedding in the web application - chatbot_content.append(f""" + chatbot_content.append( + f""" def generate_query(messages): response = requests.post( "{chatbot.api_call.api_url}", @@ -693,15 +884,22 @@ def response_generator(msg_content): st.session_state.messages.append(parsed_response) with st.chat_message("assistant"): st.write_stream(response_generator(parsed_response["content"])) - """) + """ + ) # Add caption if available if chatbot.caption: - chatbot_content.append(self._format_text(text=chatbot.caption, type='caption', text_align="left")) + chatbot_content.append( + self._format_text( + text=chatbot.caption, type="caption", text_align="left" + ) + ) - self.report.logger.info(f"Successfully generated content for ChatBot: '{chatbot.title}'") + self.report.logger.info( + f"Successfully generated content for ChatBot: '{chatbot.title}'" + ) return chatbot_content - + def _generate_component_imports(self, component: r.Component) -> List[str]: """ Generate necessary imports for a component of the report. @@ -712,7 +910,7 @@ def _generate_component_imports(self, component: r.Component) -> List[str]: The component for which to generate the required imports. The component can be of type: - PLOT - DATAFRAME - + Returns ------- list : List[str] @@ -720,31 +918,38 @@ def _generate_component_imports(self, component: r.Component) -> List[str]: """ # Dictionary to hold the imports for each component type components_imports = { - 'plot': { - r.PlotType.ALTAIR: ['import json', 'import altair as alt', 'import requests'], - r.PlotType.PLOTLY: ['import json', 'import requests'], - r.PlotType.INTERACTIVE_NETWORK: ['import requests'] + "plot": { + r.PlotType.ALTAIR: [ + "import json", + "import altair as alt", + "import requests", + ], + r.PlotType.PLOTLY: ["import json", "import requests"], + r.PlotType.INTERACTIVE_NETWORK: ["import requests"], }, - 'dataframe': ['import pandas as pd', 'from st_aggrid import AgGrid, GridOptionsBuilder'], - 'markdown': ['import requests'], - 'chatbot': ['import time', 'import json', 'import requests'] + "dataframe": [ + "import pandas as pd", + "from st_aggrid import AgGrid, GridOptionsBuilder", + ], + "markdown": ["import requests"], + "chatbot": ["import time", "import json", "import requests"], } component_type = component.component_type - component_imports = ['import streamlit as st'] + component_imports = ["import streamlit as st"] # Add relevant imports based on component type and visualization tool if component_type == r.ComponentType.PLOT: - plot_type = getattr(component, 'plot_type', None) - if plot_type in components_imports['plot']: - component_imports.extend(components_imports['plot'][plot_type]) + plot_type = getattr(component, "plot_type", None) + if plot_type in components_imports["plot"]: + component_imports.extend(components_imports["plot"][plot_type]) elif component_type == r.ComponentType.MARKDOWN: - component_imports.extend(components_imports['markdown']) + component_imports.extend(components_imports["markdown"]) elif component_type == r.ComponentType.CHATBOT: - component_imports.extend(components_imports['chatbot']) + component_imports.extend(components_imports["chatbot"]) elif component_type == r.ComponentType.DATAFRAME: - component_imports.extend(components_imports['dataframe']) - component_imports.append('df_index = 1') + component_imports.extend(components_imports["dataframe"]) + component_imports.append("df_index = 1") # Return the list of import statements - return component_imports \ No newline at end of file + return component_imports diff --git a/src/vuegen/utils.py b/src/vuegen/utils.py index 356cc11..6c85396 100644 --- a/src/vuegen/utils.py +++ b/src/vuegen/utils.py @@ -49,7 +49,10 @@ def check_path(filepath: str) -> bool: # Check if the path exists return os.path.exists(os.path.abspath(filepath)) -def assert_enum_value(enum_class: Type[StrEnum], value: str, logger: logging.Logger) -> StrEnum: + +def assert_enum_value( + enum_class: Type[StrEnum], value: str, logger: logging.Logger +) -> StrEnum: """ Validate that the given value is a valid member of the specified enumeration class. @@ -76,8 +79,13 @@ def assert_enum_value(enum_class: Type[StrEnum], value: str, logger: logging.Log return enum_class[value.upper()] except KeyError: expected_values = ", ".join([str(e.value) for e in enum_class]) - logger.error(f"Invalid value for {enum_class.__name__}: '{value}'. Expected values are: {expected_values}") - raise ValueError(f"Invalid {enum_class.__name__}: {value}. Expected values are: {expected_values}") + logger.error( + f"Invalid value for {enum_class.__name__}: '{value}'. Expected values are: {expected_values}" + ) + raise ValueError( + f"Invalid {enum_class.__name__}: {value}. Expected values are: {expected_values}" + ) + def is_url(filepath: str) -> bool: """ @@ -87,12 +95,12 @@ def is_url(filepath: str) -> bool: ---------- filepath : str The filepath to check. - + Returns ------- bool - True if the input path is a valid URL, meaning it contains both a scheme - (e.g., http, https, ftp) and a network location (e.g., example.com). + True if the input path is a valid URL, meaning it contains both a scheme + (e.g., http, https, ftp) and a network location (e.g., example.com). Returns False if either the scheme or the network location is missing or invalid. Raises @@ -107,6 +115,7 @@ def is_url(filepath: str) -> bool: parsed_url = urlparse(filepath) return bool(parsed_url.scheme and parsed_url.netloc) + def is_pyvis_html(filepath: str) -> bool: """ Check if the provided HTML file is a Pyvis network visualization. @@ -131,16 +140,17 @@ def is_pyvis_html(filepath: str) -> bool: # Validate both conditions pyvis_identifier_valid = bool(soup.find("div", {"id": "mynetwork"})) - + # Count top-level elements inside body_children = [tag.name for tag in soup.body.find_all(recursive=False)] - + # A pure Pyvis file should contain only "div" and "script" elements in body_structure_valid = set(body_children) <= {"div", "script"} # Both conditions must be true return pyvis_identifier_valid and body_structure_valid + ## FILE_SYSTEM def create_folder(directory_path: str, is_nested: bool = False) -> bool: """ @@ -178,6 +188,7 @@ def create_folder(directory_path: str, is_nested: bool = False) -> bool: except OSError as e: raise OSError(f"Error creating directory '{directory_path}': {e}") + def get_parser(prog_name: str, others: dict = {}) -> argparse.Namespace: """ Initiates argparse.ArgumentParser() and adds common arguments. @@ -211,35 +222,36 @@ def get_parser(prog_name: str, others: dict = {}) -> argparse.Namespace: parser.add_argument( "-c", "--config", - type = str, - default = None, - help = "Path to the YAML configuration file." + type=str, + default=None, + help="Path to the YAML configuration file.", ) parser.add_argument( "-dir", "--directory", - type = str, - default = None, - help = "Path to the directory from which the YAML config will be inferred." + type=str, + default=None, + help="Path to the directory from which the YAML config will be inferred.", ) parser.add_argument( "-rt", "--report_type", - type = str, - default = 'streamlit', - help = "Type of the report to generate (streamlit, html, pdf, docx, odt, revealjs, pptx, or jupyter)." + type=str, + default="streamlit", + help="Type of the report to generate (streamlit, html, pdf, docx, odt, revealjs, pptx, or jupyter).", ) parser.add_argument( - "-st_autorun", + "-st_autorun", "--streamlit_autorun", - action = "store_true", # Automatically sets True if the flag is passed - default = False, - help = "Automatically run the Streamlit app after report generation." + action="store_true", # Automatically sets True if the flag is passed + default=False, + help="Automatically run the Streamlit app after report generation.", ) # Parse arguments return parser + def fetch_file_stream(file_path: str) -> StringIO: """ Fetches a file-like stream from a given file path or URL. @@ -273,14 +285,19 @@ def fetch_file_stream(file_path: str) -> StringIO: response.raise_for_status() # Raise an exception for HTTP errors return StringIO(response.text) except requests.exceptions.RequestException as e: - raise ValueError(f"Error fetching content from URL: {file_path}. Error: {str(e)}") + raise ValueError( + f"Error fetching content from URL: {file_path}. Error: {str(e)}" + ) else: # Handle local file input if not os.path.exists(file_path): - raise FileNotFoundError(f"The file at {file_path} was not found or cannot be accessed.") - with open(file_path, 'r') as file: + raise FileNotFoundError( + f"The file at {file_path} was not found or cannot be accessed." + ) + with open(file_path, "r") as file: return StringIO(file.read()) + ## FILE_CONVERSION def cyjs_to_networkx(file_path: str, name: str = "name", ident: str = "id") -> nx.Graph: """ @@ -312,44 +329,44 @@ def cyjs_to_networkx(file_path: str, name: str = "name", ident: str = "id") -> n """ try: # If file_path is a file-like object (e.g., StringIO), read from it - if hasattr(file_path, 'read'): + if hasattr(file_path, "read"): data = json.load(file_path) else: # Otherwise, assume it's a file path and open the file - with open(file_path, 'r') as json_file: + with open(file_path, "r") as json_file: data = json.load(json_file) if name == ident: raise nx.NetworkXError("name and ident must be different.") - + multigraph = data.get("multigraph", False) directed = data.get("directed", False) - + if multigraph: graph = nx.MultiGraph() else: graph = nx.Graph() - + if directed: graph = graph.to_directed() - + graph.graph = dict(data.get("data", {})) - + # Add nodes with all attributes from the 'data' field of the JSON for d in data["elements"]["nodes"]: node_data = d["data"].copy() node = d["data"].get(ident) # Use 'id' (or other unique identifier) - + if node is None: raise ValueError("Each node must contain an 'id' key.") - + # Optionally include 'name' and 'id' attributes if present if name in d["data"]: node_data[name] = d["data"].get(name) - + graph.add_node(node) graph.nodes[node].update(node_data) - + # Add edges with all attributes from the 'data' field of the JSON for d in data["elements"]["edges"]: edge_data = d["data"].copy() @@ -357,7 +374,7 @@ def cyjs_to_networkx(file_path: str, name: str = "name", ident: str = "id") -> n targ = d["data"].get("target") if sour is None or targ is None: raise ValueError("Each edge must contain 'source' and 'target' keys.") - + if multigraph: key = d["data"].get("key", 0) graph.add_edge(sour, targ, key=key) @@ -366,10 +383,11 @@ def cyjs_to_networkx(file_path: str, name: str = "name", ident: str = "id") -> n graph.add_edge(sour, targ) graph.edges[sour, targ].update(edge_data) return graph - + except KeyError as e: raise ValueError(f"Missing required key in data: {e}") + def pyvishtml_to_networkx(html_file: str) -> nx.Graph: """ Converts a PyVis HTML file to a NetworkX graph. @@ -395,52 +413,59 @@ def pyvishtml_to_networkx(html_file: str) -> nx.Graph: html_content = html_file.getvalue() else: # Otherwise, treat it as a file path - with open(html_file, 'r', encoding='utf-8') as f: + with open(html_file, "r", encoding="utf-8") as f: html_content = f.read() - soup = BeautifulSoup(html_content, 'html.parser') + soup = BeautifulSoup(html_content, "html.parser") # Extract the network data from the JavaScript objects - script_tag = soup.find('script', text=lambda x: x and 'nodes = new vis.DataSet' in x) + script_tag = soup.find( + "script", text=lambda x: x and "nodes = new vis.DataSet" in x + ) if not script_tag: raise ValueError("Could not find network data in the provided HTML file.") - + # Parse the nodes and edges script_text = script_tag.string - nodes_json = json.loads(script_text.split('nodes = new vis.DataSet(')[1].split(');')[0]) - edges_json = json.loads(script_text.split('edges = new vis.DataSet(')[1].split(');')[0]) + nodes_json = json.loads( + script_text.split("nodes = new vis.DataSet(")[1].split(");")[0] + ) + edges_json = json.loads( + script_text.split("edges = new vis.DataSet(")[1].split(");")[0] + ) # Create a NetworkX graph graph = nx.Graph() # Add nodes for node in nodes_json: - node_id = node.pop('id', None) + node_id = node.pop("id", None) if node_id is None: raise ValueError("Node is missing an 'id' attribute.") - + graph.add_node(node_id, **node) # Add edges for edge in edges_json: - source = edge.pop('from') - target = edge.pop('to') + source = edge.pop("from") + target = edge.pop("to") graph.add_edge(source, target, **edge) # Relabel nodes to use 'name' as the identifier, or 'id' if 'name' is unavailable mapping = {} for node_id, data in graph.nodes(data=True): - name = data.get('name') + name = data.get("name") if name: mapping[node_id] = name else: # Fallback to the original ID if no 'name' exists mapping[node_id] = node_id - + graph = nx.relabel_nodes(graph, mapping) return graph + ## CONFIG def load_yaml_config(file_path: str) -> dict: """ @@ -468,7 +493,7 @@ def load_yaml_config(file_path: str) -> dict: raise FileNotFoundError(f"The config file at {file_path} was not found.") # Load the YAML configuration file - with open(file_path, 'r') as file: + with open(file_path, "r") as file: try: config = yaml.safe_load(file) except yaml.YAMLError as exc: @@ -476,6 +501,7 @@ def load_yaml_config(file_path: str) -> dict: return config + def write_yaml_config(yaml_data: dict, directory_path: Path) -> Path: """ Writes the generated YAML structure to a file. @@ -494,7 +520,7 @@ def write_yaml_config(yaml_data: dict, directory_path: Path) -> Path: """ assert isinstance(yaml_data, dict), "YAML data must be a dictionary." assert isinstance(directory_path, Path), "directory_path must be a Path object." - + # Generate the output YAML file path based on the folder name output_yaml = directory_path / (directory_path.name + "_config.yaml") @@ -509,6 +535,7 @@ def write_yaml_config(yaml_data: dict, directory_path: Path) -> Path: # Return the path to the written file return output_yaml + ## LOGGING def get_basename(fname: None | str = None) -> str: """ @@ -633,7 +660,9 @@ def generate_log_filename(folder: str = "logs", suffix: str = "") -> str: return log_filepath -def init_log(filename: str, display: bool = False, logger_id: str | None = None) -> logging.Logger: +def init_log( + filename: str, display: bool = False, logger_id: str | None = None +) -> logging.Logger: """ - Custom python logger configuration (basicConfig()) with two handlers (for stdout and for file) @@ -705,21 +734,22 @@ def get_logger(log_suffix): """ # Generate log file name log_file = generate_log_filename(suffix=log_suffix) - + # Initialize logger logger = init_log(log_file, display=True) - + # Log the path to the log file logger.info(f"Path to log file: {log_file}") return logger + ## REPORT FORMATTING def generate_footer() -> str: """ Generate an HTML footer for a report. - This function creates a styled HTML footer that includes a link to VueGen + This function creates a styled HTML footer that includes a link to VueGen and the Multiomics Network Analytics Group (MoNA). Returns @@ -727,7 +757,7 @@ def generate_footer() -> str: str A formatted HTML string representing the footer. """ - footer = '''