From 93efa0220689cae12f837404c6aa8bd0ea386375 Mon Sep 17 00:00:00 2001 From: Henry Webel Date: Mon, 24 Feb 2025 14:58:02 +0100 Subject: [PATCH 1/3] :art: sort imports, format codebase with black --- .github/workflows/cdci.yml | 2 + src/vuegen/__init__.py | 2 +- src/vuegen/__main__.py | 19 +- src/vuegen/config_manager.py | 244 +++++++------ src/vuegen/quarto_reportview.py | 421 ++++++++++++++-------- src/vuegen/report.py | 396 +++++++++++++++------ src/vuegen/report_generator.py | 24 +- src/vuegen/streamlit_reportview.py | 543 ++++++++++++++++++++--------- src/vuegen/utils.py | 146 +++++--- 9 files changed, 1198 insertions(+), 599 deletions(-) diff --git a/.github/workflows/cdci.yml b/.github/workflows/cdci.yml index 9fd742c..1fa82e6 100644 --- a/.github/workflows/cdci.yml +++ b/.github/workflows/cdci.yml @@ -17,6 +17,8 @@ jobs: python-version: ["3.9", "3.10", "3.11", "3.12", "3.13"] steps: - uses: actions/checkout@v4 + - uses: psf/black@stable + - uses: isort/isort-action@v1 - name: Set up Python ${{ matrix.python-version }} uses: actions/setup-python@v5 with: diff --git a/src/vuegen/__init__.py b/src/vuegen/__init__.py index d538f87..5becc17 100644 --- a/src/vuegen/__init__.py +++ b/src/vuegen/__init__.py @@ -1 +1 @@ -__version__ = "1.0.0" \ No newline at end of file +__version__ = "1.0.0" diff --git a/src/vuegen/__main__.py b/src/vuegen/__main__.py index 1304235..5c853d5 100644 --- a/src/vuegen/__main__.py +++ b/src/vuegen/__main__.py @@ -26,11 +26,11 @@ def main(): # https://docs.python.org/3/library/argparse.html#printing-help parser.print_help() sys.exit(1) - + if config_path and dir_path: print("Please provide only one of configuration file or directory path:\n") parser.print_help() - sys.exit(1) # otherwise could resort to either or ? + sys.exit(1) # otherwise could resort to either or ? # Define logger suffix based on report type and name logger_suffix = f"{report_type}_report_{str(report_name)}" @@ -39,11 +39,14 @@ def main(): logger = get_logger(f"{logger_suffix}") # Generate the report - report_generator.get_report(report_type=report_type, - logger=logger, - config_path=config_path, - dir_path=dir_path, - streamlit_autorun=streamlit_autorun) + report_generator.get_report( + report_type=report_type, + logger=logger, + config_path=config_path, + dir_path=dir_path, + streamlit_autorun=streamlit_autorun, + ) + -if __name__ == '__main__': +if __name__ == "__main__": main() diff --git a/src/vuegen/config_manager.py b/src/vuegen/config_manager.py index 8b05b39..4832b08 100644 --- a/src/vuegen/config_manager.py +++ b/src/vuegen/config_manager.py @@ -12,7 +12,8 @@ class ConfigManager: """ Class for handling metadata of reports from YAML config file and creating report objects. """ - def __init__(self, logger: Optional[logging.Logger]=None): + + def __init__(self, logger: Optional[logging.Logger] = None): """ Initializes the ConfigManager with a logger. @@ -67,25 +68,44 @@ def _create_component_config_fromfile(self, file_path: Path) -> Dict[str, str]: component_config["caption"] = "" # Infer component config - if file_ext in [r.DataFrameFormat.CSV.value_with_dot, r.DataFrameFormat.TXT.value_with_dot]: + if file_ext in [ + r.DataFrameFormat.CSV.value_with_dot, + r.DataFrameFormat.TXT.value_with_dot, + ]: # Check for CSVNetworkFormat keywords if "edgelist" in file_path.stem.lower(): component_config["component_type"] = r.ComponentType.PLOT.value component_config["plot_type"] = r.PlotType.INTERACTIVE_NETWORK.value - component_config ["csv_network_format"] = r.CSVNetworkFormat.EDGELIST.value + component_config["csv_network_format"] = ( + r.CSVNetworkFormat.EDGELIST.value + ) elif "adjlist" in file_path.stem.lower(): - component_config ["component_type"] = r.ComponentType.PLOT.value - component_config ["plot_type"] = r.PlotType.INTERACTIVE_NETWORK.value - component_config ["csv_network_format"] = r.CSVNetworkFormat.ADJLIST.value + component_config["component_type"] = r.ComponentType.PLOT.value + component_config["plot_type"] = r.PlotType.INTERACTIVE_NETWORK.value + component_config["csv_network_format"] = ( + r.CSVNetworkFormat.ADJLIST.value + ) # Fill the config with dataframe content else: - component_config ["component_type"] = r.ComponentType.DATAFRAME.value - component_config ["file_format"] = r.DataFrameFormat.CSV.value if file_ext == r.DataFrameFormat.CSV.value_with_dot else r.DataFrameFormat.TXT.value - component_config ["delimiter"] = "," if file_ext == r.DataFrameFormat.CSV.value_with_dot else "\\t" + component_config["component_type"] = r.ComponentType.DATAFRAME.value + component_config["file_format"] = ( + r.DataFrameFormat.CSV.value + if file_ext == r.DataFrameFormat.CSV.value_with_dot + else r.DataFrameFormat.TXT.value + ) + component_config["delimiter"] = ( + "," if file_ext == r.DataFrameFormat.CSV.value_with_dot else "\\t" + ) # Check other DataframeFormats than csv and txt - elif file_ext in [fmt.value_with_dot for fmt in r.DataFrameFormat if fmt not in [r.DataFrameFormat.CSV, r.DataFrameFormat.TXT]]: - component_config ["component_type"] = r.ComponentType.DATAFRAME.value - component_config ["file_format"] = next(fmt.value for fmt in r.DataFrameFormat if fmt.value_with_dot == file_ext) + elif file_ext in [ + fmt.value_with_dot + for fmt in r.DataFrameFormat + if fmt not in [r.DataFrameFormat.CSV, r.DataFrameFormat.TXT] + ]: + component_config["component_type"] = r.ComponentType.DATAFRAME.value + component_config["file_format"] = next( + fmt.value for fmt in r.DataFrameFormat if fmt.value_with_dot == file_ext + ) elif file_ext == ".html": if is_pyvis_html(file_path): component_config["component_type"] = r.ComponentType.PLOT.value @@ -94,19 +114,19 @@ def _create_component_config_fromfile(self, file_path: Path) -> Dict[str, str]: component_config["component_type"] = r.ComponentType.HTML.value # Check for network formats elif file_ext in [fmt.value_with_dot for fmt in r.NetworkFormat]: - component_config ["component_type"] = r.ComponentType.PLOT.value + component_config["component_type"] = r.ComponentType.PLOT.value if file_ext in [ r.NetworkFormat.PNG.value_with_dot, r.NetworkFormat.JPG.value_with_dot, r.NetworkFormat.JPEG.value_with_dot, r.NetworkFormat.SVG.value_with_dot, ]: - component_config ["plot_type"] = r.PlotType.STATIC.value + component_config["plot_type"] = r.PlotType.STATIC.value else: - component_config ["plot_type"] = r.PlotType.INTERACTIVE_NETWORK.value - # Check for interactive plots + component_config["plot_type"] = r.PlotType.INTERACTIVE_NETWORK.value + # Check for interactive plots elif file_ext == ".json": - component_config ["component_type"] = r.ComponentType.PLOT.value + component_config["component_type"] = r.ComponentType.PLOT.value try: with open(file_path, "r", encoding="utf-8") as f: json_data = json.load(f) @@ -118,12 +138,14 @@ def _create_component_config_fromfile(self, file_path: Path) -> Dict[str, str]: self.logger.warning(f"Could not parse JSON file {file_path}: {e}") component_config["plot_type"] = "unknown" elif file_ext == ".md": - component_config ["component_type"] = r.ComponentType.MARKDOWN.value + component_config["component_type"] = r.ComponentType.MARKDOWN.value else: - self.logger.error(f"Unsupported file extension: {file_ext}. Skipping file: {file_path}\n") + self.logger.error( + f"Unsupported file extension: {file_ext}. Skipping file: {file_path}\n" + ) return None - return component_config + return component_config def _sort_paths_by_numprefix(self, paths: List[Path]) -> List[Path]: """ @@ -139,26 +161,27 @@ def _sort_paths_by_numprefix(self, paths: List[Path]) -> List[Path]: List[Path] The sorted list of Path objects. """ + def get_sort_key(path: Path) -> tuple: parts = path.name.split("_", 1) if parts[0].isdigit(): numeric_prefix = int(parts[0]) else: # Non-numeric prefixes go to the end - numeric_prefix = float('inf') - return numeric_prefix, path.name.lower() + numeric_prefix = float("inf") + return numeric_prefix, path.name.lower() return sorted(paths, key=get_sort_key) - + def _read_description_file(self, folder_path: Path) -> str: """ Reads the content of a description.md file if it exists in the given folder. - + Parameters ---------- folder_path : Path Path to the folder where description.md might be located. - + Returns ------- str @@ -166,11 +189,13 @@ def _read_description_file(self, folder_path: Path) -> str: """ description_file = folder_path / "description.md" if description_file.exists(): - ret = description_file.read_text().strip().replace('\n', '\n ') + ret = description_file.read_text().strip().replace("\n", "\n ") return f"{ret}\n" return "" - def _create_subsect_config_fromdir(self, subsection_dir_path: Path) -> Dict[str, Union[str, List[Dict]]]: + def _create_subsect_config_fromdir( + self, subsection_dir_path: Path + ) -> Dict[str, Union[str, List[Dict]]]: """ Creates subsection config from a directory. @@ -185,18 +210,20 @@ def _create_subsect_config_fromdir(self, subsection_dir_path: Path) -> Dict[str, The subsection config. """ # Sort files by number prefix - sorted_files = self._sort_paths_by_numprefix(list(subsection_dir_path.iterdir())) + sorted_files = self._sort_paths_by_numprefix( + list(subsection_dir_path.iterdir()) + ) components = [] for file in sorted_files: if file.is_file(): component_config = self._create_component_config_fromfile(file) - # Skip unsupported files + # Skip unsupported files if component_config is None: continue - # Add component config to list + # Add component config to list components.append(component_config) - + subsection_config = { "title": self._create_title_fromdir(subsection_dir_path.name), "description": self._read_description_file(subsection_dir_path), @@ -204,7 +231,9 @@ def _create_subsect_config_fromdir(self, subsection_dir_path: Path) -> Dict[str, } return subsection_config - def _create_sect_config_fromdir(self, section_dir_path: Path) -> Dict[str, Union[str, List[Dict]]]: + def _create_sect_config_fromdir( + self, section_dir_path: Path + ) -> Dict[str, Union[str, List[Dict]]]: """ Creates section config from a directory. @@ -218,8 +247,10 @@ def _create_sect_config_fromdir(self, section_dir_path: Path) -> Dict[str, Union Dict[str, Union[str, List[Dict]]] The section config. """ - # Sort subsections by number prefix - sorted_subsections = self._sort_paths_by_numprefix(list(section_dir_path.iterdir())) + # Sort subsections by number prefix + sorted_subsections = self._sort_paths_by_numprefix( + list(section_dir_path.iterdir()) + ) subsections = [] for subsection_dir in sorted_subsections: @@ -233,7 +264,9 @@ def _create_sect_config_fromdir(self, section_dir_path: Path) -> Dict[str, Union } return section_config - def create_yamlconfig_fromdir(self, base_dir: str) -> Tuple[Dict[str, Union[str, List[Dict]]], Path]: + def create_yamlconfig_fromdir( + self, base_dir: str + ) -> Tuple[Dict[str, Union[str, List[Dict]]], Path]: """ Generates a YAML-compatible config file from a directory. It also returns the resolved folder path. @@ -264,10 +297,12 @@ def create_yamlconfig_fromdir(self, base_dir: str) -> Tuple[Dict[str, Union[str, # Sort sections by their number prefix sorted_sections = self._sort_paths_by_numprefix(list(base_dir_path.iterdir())) - # Generate sections and subsections config + # Generate sections and subsections config for section_dir in sorted_sections: if section_dir.is_dir(): - yaml_config["sections"].append(self._create_sect_config_fromdir(section_dir)) + yaml_config["sections"].append( + self._create_sect_config_fromdir(section_dir) + ) return yaml_config, base_dir_path @@ -294,20 +329,22 @@ def initialize_report(self, config: dict) -> tuple[r.Report, dict]: """ # Create a Report object from metadata report = r.Report( - title = config['report']['title'], - logger = self.logger, - sections = [], - description = config['report'].get('description'), - graphical_abstract = config['report'].get('graphical_abstract'), - logo = config['report'].get('logo') + title=config["report"]["title"], + logger=self.logger, + sections=[], + description=config["report"].get("description"), + graphical_abstract=config["report"].get("graphical_abstract"), + logo=config["report"].get("logo"), ) # Create sections and subsections - for section_data in config.get('sections', []): + for section_data in config.get("sections", []): section = self._create_section(section_data) report.sections.append(section) - self.logger.info(f"Report '{report.title}' initialized with {len(report.sections)} sections.") + self.logger.info( + f"Report '{report.title}' initialized with {len(report.sections)} sections." + ) return report, config def _create_section(self, section_data: dict) -> r.Section: @@ -326,16 +363,16 @@ def _create_section(self, section_data: dict) -> r.Section: """ # Initialize the Section object section = r.Section( - title = section_data['title'], - subsections = [], - description = section_data.get('description') + title=section_data["title"], + subsections=[], + description=section_data.get("description"), ) # Create subsections - for subsection_data in section_data.get('subsections', []): + for subsection_data in section_data.get("subsections", []): subsection = self._create_subsection(subsection_data) section.subsections.append(subsection) - + return section def _create_subsection(self, subsection_data: dict) -> r.Subsection: @@ -354,13 +391,13 @@ def _create_subsection(self, subsection_data: dict) -> r.Subsection: """ # Initialize the Subsection object subsection = r.Subsection( - title = subsection_data['title'], - components = [], - description = subsection_data.get('description') + title=subsection_data["title"], + components=[], + description=subsection_data.get("description"), ) # Create components - for component_data in subsection_data.get('components', []): + for component_data in subsection_data.get("components", []): component = self._create_component(component_data) subsection.components.append(component) @@ -381,7 +418,9 @@ def _create_component(self, component_data: dict) -> r.Component: A Component object (Plot, DataFrame, or Markdown) populated with the provided metadata. """ # Determine the component type - component_type = assert_enum_value(r.ComponentType, component_data['component_type'], self.logger) + component_type = assert_enum_value( + r.ComponentType, component_data["component_type"], self.logger + ) # Dispatch to the corresponding creation method if component_type == r.ComponentType.PLOT: @@ -412,17 +451,26 @@ def _create_plot_component(self, component_data: dict) -> r.Plot: A Plot object populated with the provided metadata. """ # Validate enum fields - plot_type = assert_enum_value(r.PlotType, component_data['plot_type'], self.logger) - csv_network_format = (assert_enum_value(r.CSVNetworkFormat, component_data.get('csv_network_format', ''), self.logger) - if component_data.get('csv_network_format') else None) + plot_type = assert_enum_value( + r.PlotType, component_data["plot_type"], self.logger + ) + csv_network_format = ( + assert_enum_value( + r.CSVNetworkFormat, + component_data.get("csv_network_format", ""), + self.logger, + ) + if component_data.get("csv_network_format") + else None + ) return r.Plot( - title = component_data['title'], - logger = self.logger, - file_path = component_data['file_path'], - plot_type = plot_type, - csv_network_format = csv_network_format, - caption = component_data.get('caption') + title=component_data["title"], + logger=self.logger, + file_path=component_data["file_path"], + plot_type=plot_type, + csv_network_format=csv_network_format, + caption=component_data.get("caption"), ) def _create_dataframe_component(self, component_data: dict) -> r.DataFrame: @@ -438,17 +486,19 @@ def _create_dataframe_component(self, component_data: dict) -> r.DataFrame: ------- DataFrame A DataFrame object populated with the provided metadata. - """ + """ # Validate enum field and return dataframe - file_format = assert_enum_value(r.DataFrameFormat, component_data['file_format'], self.logger) - + file_format = assert_enum_value( + r.DataFrameFormat, component_data["file_format"], self.logger + ) + return r.DataFrame( - title = component_data['title'], - logger = self.logger, - file_path = component_data['file_path'], - file_format = file_format, - delimiter = component_data.get('delimiter'), - caption = component_data.get('caption') + title=component_data["title"], + logger=self.logger, + file_path=component_data["file_path"], + file_format=file_format, + delimiter=component_data.get("delimiter"), + caption=component_data.get("caption"), ) def _create_markdown_component(self, component_data: dict) -> r.Markdown: @@ -466,12 +516,12 @@ def _create_markdown_component(self, component_data: dict) -> r.Markdown: A Markdown object populated with the provided metadata. """ return r.Markdown( - title = component_data['title'], - logger = self.logger, - file_path = component_data['file_path'], - caption = component_data.get('caption') + title=component_data["title"], + logger=self.logger, + file_path=component_data["file_path"], + caption=component_data.get("caption"), ) - + def _create_html_component(self, component_data: dict) -> r.Html: """ Creates an Html component. @@ -487,12 +537,12 @@ def _create_html_component(self, component_data: dict) -> r.Html: An Html object populated with the provided metadata. """ return r.Html( - title = component_data['title'], - logger = self.logger, - file_path = component_data['file_path'], - caption = component_data.get('caption') + title=component_data["title"], + logger=self.logger, + file_path=component_data["file_path"], + caption=component_data.get("caption"), ) - + def _create_apicall_component(self, component_data: dict) -> r.APICall: """ Creates an APICall component. @@ -508,14 +558,14 @@ def _create_apicall_component(self, component_data: dict) -> r.APICall: An APICall object populated with the provided metadata. """ return r.APICall( - title = component_data['title'], - logger = self.logger, - api_url = component_data['api_url'], - caption = component_data.get('caption'), - headers = component_data.get('headers'), - params = component_data.get('params') + title=component_data["title"], + logger=self.logger, + api_url=component_data["api_url"], + caption=component_data.get("caption"), + headers=component_data.get("headers"), + params=component_data.get("params"), ) - + def _create_chatbot_component(self, component_data: dict) -> r.ChatBot: """ Creates a ChatBot component. @@ -531,11 +581,11 @@ def _create_chatbot_component(self, component_data: dict) -> r.ChatBot: A chatbot object populated with the provided metadata. """ return r.ChatBot( - title = component_data['title'], - logger = self.logger, - api_url = component_data['api_url'], - model = component_data['model'], - caption = component_data.get('caption'), - headers = component_data.get('headers'), - params = component_data.get('params') + title=component_data["title"], + logger=self.logger, + api_url=component_data["api_url"], + model=component_data["model"], + caption=component_data.get("caption"), + headers=component_data.get("headers"), + params=component_data.get("params"), ) diff --git a/src/vuegen/quarto_reportview.py b/src/vuegen/quarto_reportview.py index e68be0c..c82f3af 100644 --- a/src/vuegen/quarto_reportview.py +++ b/src/vuegen/quarto_reportview.py @@ -1,7 +1,7 @@ import os import subprocess -from typing import List from pathlib import Path +from typing import List import networkx as nx import pandas as pd @@ -15,13 +15,15 @@ class QuartoReportView(r.ReportView): A ReportView subclass for generating Quarto reports. """ - BASE_DIR = 'quarto_report' - STATIC_FILES_DIR = os.path.join(BASE_DIR, 'static') + BASE_DIR = "quarto_report" + STATIC_FILES_DIR = os.path.join(BASE_DIR, "static") def __init__(self, report: r.Report, report_type: r.ReportType): - super().__init__(report = report, report_type = report_type) + super().__init__(report=report, report_type=report_type) - def generate_report(self, output_dir: str = BASE_DIR, static_dir: str = STATIC_FILES_DIR) -> None: + def generate_report( + self, output_dir: str = BASE_DIR, static_dir: str = STATIC_FILES_DIR + ) -> None: """ Generates the qmd file of the quarto report. It creates code for rendering each section and its subsections with all components. @@ -32,80 +34,115 @@ def generate_report(self, output_dir: str = BASE_DIR, static_dir: str = STATIC_F static_dir : str, optional The folder where the static files will be saved (default is STATIC_FILES_DIR). """ - self.report.logger.debug(f"Generating '{self.report_type}' report in directory: '{output_dir}'") + self.report.logger.debug( + f"Generating '{self.report_type}' report in directory: '{output_dir}'" + ) # Create the output folder if create_folder(output_dir): self.report.logger.debug(f"Created output directory: '{output_dir}'") else: - self.report.logger.debug(f"Output directory already existed: '{output_dir}'") + self.report.logger.debug( + f"Output directory already existed: '{output_dir}'" + ) # Create the static folder if create_folder(static_dir): - self.report.logger.info(f"Created output directory for static content: '{static_dir}'") + self.report.logger.info( + f"Created output directory for static content: '{static_dir}'" + ) else: - self.report.logger.info(f"Output directory for static content already existed: '{static_dir}'") - + self.report.logger.info( + f"Output directory for static content already existed: '{static_dir}'" + ) + try: # Create variable to check if the report is static or revealjs - is_report_static = self.report_type in {r.ReportType.PDF, r.ReportType.DOCX, r.ReportType.ODT, r.ReportType.PPTX} + is_report_static = self.report_type in { + r.ReportType.PDF, + r.ReportType.DOCX, + r.ReportType.ODT, + r.ReportType.PPTX, + } is_report_revealjs = self.report_type == r.ReportType.REVEALJS - + # Define the YAML header for the quarto report yaml_header = self._create_yaml_header() - - # Create qmd content and imports for the report + + # Create qmd content and imports for the report qmd_content = [] report_imports = [] # Add description of the report if self.report.description: - qmd_content.append(f'''{self.report.description}''') + qmd_content.append(f"""{self.report.description}""") # If available add the graphical abstract if self.report.graphical_abstract: - qmd_content.append(self._generate_image_content(self.report.graphical_abstract)) + qmd_content.append( + self._generate_image_content(self.report.graphical_abstract) + ) # Add the sections and subsections to the report self.report.logger.info("Starting to generate sections for the report.") for section in self.report.sections: - self.report.logger.debug(f"Processing section: '{section.title}' - {len(section.subsections)} subsection(s)") + self.report.logger.debug( + f"Processing section: '{section.title}' - {len(section.subsections)} subsection(s)" + ) # Add section header and description - qmd_content.append(f'# {section.title}') + qmd_content.append(f"# {section.title}") if section.description: - qmd_content.append(f'''{section.description}\n''') - + qmd_content.append(f"""{section.description}\n""") + if section.subsections: # Iterate through subsections and integrate them into the section file for subsection in section.subsections: - self.report.logger.debug(f"Processing subsection: '{subsection.title}' - {len(subsection.components)} component(s)") + self.report.logger.debug( + f"Processing subsection: '{subsection.title}' - {len(subsection.components)} component(s)" + ) # Generate content for the subsection - subsection_content, subsection_imports = self._generate_subsection(subsection, is_report_static, is_report_revealjs) + subsection_content, subsection_imports = ( + self._generate_subsection( + subsection, is_report_static, is_report_revealjs + ) + ) qmd_content.extend(subsection_content) report_imports.extend(subsection_imports) else: - self.report.logger.warning(f"No subsections found in section: '{section.title}'. To show content in the report, add subsections to the section.") - + self.report.logger.warning( + f"No subsections found in section: '{section.title}'. To show content in the report, add subsections to the section." + ) + # Flatten the subsection_imports into a single list - flattened_report_imports = [imp for sublist in report_imports for imp in sublist] - + flattened_report_imports = [ + imp for sublist in report_imports for imp in sublist + ] + # Remove duplicated imports report_unique_imports = list(set(flattened_report_imports)) # Format imports report_formatted_imports = "\n".join(report_unique_imports) - + # Write the navigation and general content to a Python file - with open(os.path.join(output_dir, f"{self.BASE_DIR}.qmd"), 'w') as quarto_report: + with open( + os.path.join(output_dir, f"{self.BASE_DIR}.qmd"), "w" + ) as quarto_report: quarto_report.write(yaml_header) - quarto_report.write(f"""\n```{{python}} + quarto_report.write( + f"""\n```{{python}} #| label: 'Imports' {report_formatted_imports} -```\n\n""") +```\n\n""" + ) quarto_report.write("\n".join(qmd_content)) - self.report.logger.info(f"Created qmd script to render the app: {self.BASE_DIR}.qmd") - + self.report.logger.info( + f"Created qmd script to render the app: {self.BASE_DIR}.qmd" + ) + except Exception as e: - self.report.logger.error(f"An error occurred while generating the report: {str(e)}") + self.report.logger.error( + f"An error occurred while generating the report: {str(e)}" + ) raise def run_report(self, output_dir: str = BASE_DIR) -> None: @@ -118,15 +155,31 @@ def run_report(self, output_dir: str = BASE_DIR) -> None: The folder where the report was generated (default is 'sections'). """ try: - subprocess.run(["quarto", "render", os.path.join(output_dir, f"{self.BASE_DIR}.qmd")], check=True) + subprocess.run( + ["quarto", "render", os.path.join(output_dir, f"{self.BASE_DIR}.qmd")], + check=True, + ) if self.report_type == r.ReportType.JUPYTER: - subprocess.run(["quarto", "convert", os.path.join(output_dir, f"{self.BASE_DIR}.qmd")], check=True) - self.report.logger.info(f"'{self.report.title}' '{self.report_type}' report rendered") + subprocess.run( + [ + "quarto", + "convert", + os.path.join(output_dir, f"{self.BASE_DIR}.qmd"), + ], + check=True, + ) + self.report.logger.info( + f"'{self.report.title}' '{self.report_type}' report rendered" + ) except subprocess.CalledProcessError as e: - self.report.logger.error(f"Error running '{self.report.title}' {self.report_type} report: {str(e)}") + self.report.logger.error( + f"Error running '{self.report.title}' {self.report_type} report: {str(e)}" + ) raise except FileNotFoundError as e: - self.report.logger.error(f"Quarto is not installed. Please install Quarto to run the report: {str(e)}") + self.report.logger.error( + f"Quarto is not installed. Please install Quarto to run the report: {str(e)}" + ) raise def _create_yaml_header(self) -> str: @@ -226,7 +279,7 @@ def _create_yaml_header(self) -> str: pptx: toc: false output: true""", -r.ReportType.JUPYTER: """ + r.ReportType.JUPYTER: """ html: toc: true toc-location: left @@ -252,7 +305,7 @@ def _create_yaml_header(self) -> str: VueGen | © 2025 Multiomics Network Analytics Group (MoNA) - """ + """, } # Create a key based on the report type and format key = self.report_type @@ -269,9 +322,11 @@ def _create_yaml_header(self) -> str: return yaml_header - def _generate_subsection(self, subsection, is_report_static, is_report_revealjs) -> tuple[List[str], List[str]]: + def _generate_subsection( + self, subsection, is_report_static, is_report_revealjs + ) -> tuple[List[str], List[str]]: """ - Generate code to render components (plots, dataframes, markdown) in the given subsection, + Generate code to render components (plots, dataframes, markdown) in the given subsection, creating imports and content for the subsection based on the component type. Parameters @@ -292,35 +347,51 @@ def _generate_subsection(self, subsection, is_report_static, is_report_revealjs) subsection_imports = [] # Add subsection header and description - subsection_content.append(f'## {subsection.title}') - if subsection.description: - subsection_content.append(f'''{subsection.description}\n''') + subsection_content.append(f"## {subsection.title}") + if subsection.description: + subsection_content.append(f"""{subsection.description}\n""") if is_report_revealjs: - subsection_content.append(f'::: {{.panel-tabset}}\n') + subsection_content.append(f"::: {{.panel-tabset}}\n") for component in subsection.components: component_imports = self._generate_component_imports(component) subsection_imports.append(component_imports) if component.component_type == r.ComponentType.PLOT: - subsection_content.extend(self._generate_plot_content(component, is_report_static)) + subsection_content.extend( + self._generate_plot_content(component, is_report_static) + ) elif component.component_type == r.ComponentType.DATAFRAME: - subsection_content.extend(self._generate_dataframe_content(component, is_report_static)) - elif component.component_type == r.ComponentType.MARKDOWN and component.title.lower() != "description": + subsection_content.extend( + self._generate_dataframe_content(component, is_report_static) + ) + elif ( + component.component_type == r.ComponentType.MARKDOWN + and component.title.lower() != "description" + ): subsection_content.extend(self._generate_markdown_content(component)) - elif component.component_type == r.ComponentType.HTML and not is_report_static: + elif ( + component.component_type == r.ComponentType.HTML + and not is_report_static + ): subsection_content.extend(self._generate_html_content(component)) else: - self.report.logger.warning(f"Unsupported component type '{component.component_type}' in subsection: {subsection.title}") - + self.report.logger.warning( + f"Unsupported component type '{component.component_type}' in subsection: {subsection.title}" + ) + if is_report_revealjs: - subsection_content.append(':::\n') + subsection_content.append(":::\n") - self.report.logger.info(f"Generated content and imports for subsection: '{subsection.title}'") + self.report.logger.info( + f"Generated content and imports for subsection: '{subsection.title}'" + ) return subsection_content, subsection_imports - def _generate_plot_content(self, plot, is_report_static, static_dir: str = STATIC_FILES_DIR) -> List[str]: + def _generate_plot_content( + self, plot, is_report_static, static_dir: str = STATIC_FILES_DIR + ) -> List[str]: """ Generate content for a plot component based on the report type. @@ -330,7 +401,7 @@ def _generate_plot_content(self, plot, is_report_static, static_dir: str = STATI The plot component to generate content for. static_dir : str, optional The folder where the static files will be saved (default is STATIC_FILES_DIR). - + Returns ------- list : List[str] @@ -338,29 +409,39 @@ def _generate_plot_content(self, plot, is_report_static, static_dir: str = STATI """ plot_content = [] # Add title - plot_content.append(f'### {plot.title}') - + plot_content.append(f"### {plot.title}") + # Define plot path if is_report_static: - static_plot_path = os.path.join(static_dir, f"{plot.title.replace(' ', '_')}.png") + static_plot_path = os.path.join( + static_dir, f"{plot.title.replace(' ', '_')}.png" + ) else: - html_plot_file = os.path.join(static_dir, f"{plot.title.replace(' ', '_')}.html") + html_plot_file = os.path.join( + static_dir, f"{plot.title.replace(' ', '_')}.html" + ) # Add content for the different plot types try: if plot.plot_type == r.PlotType.STATIC: - plot_content.append(self._generate_image_content(plot.file_path, width=950)) + plot_content.append( + self._generate_image_content(plot.file_path, width=950) + ) elif plot.plot_type == r.PlotType.PLOTLY: plot_content.append(self._generate_plot_code(plot)) if is_report_static: - plot_content.append(f"""fig_plotly.write_image("{os.path.abspath(static_plot_path)}")\n```\n""") + plot_content.append( + f"""fig_plotly.write_image("{os.path.abspath(static_plot_path)}")\n```\n""" + ) plot_content.append(self._generate_image_content(static_plot_path)) else: plot_content.append(f"""fig_plotly.show()\n```\n""") elif plot.plot_type == r.PlotType.ALTAIR: plot_content.append(self._generate_plot_code(plot)) if is_report_static: - plot_content.append(f"""fig_altair.save("{os.path.abspath(static_plot_path)}")\n```\n""") + plot_content.append( + f"""fig_altair.save("{os.path.abspath(static_plot_path)}")\n```\n""" + ) plot_content.append(self._generate_image_content(static_plot_path)) else: plot_content.append(f"""fig_altair\n```\n""") @@ -371,14 +452,16 @@ def _generate_plot_content(self, plot, is_report_static, static_dir: str = STATI networkx_graph, html_plot_file = networkx_graph elif isinstance(networkx_graph, nx.Graph) and not is_report_static: # Get the pyvis object and create html - pyvis_graph = plot.create_and_save_pyvis_network(networkx_graph, html_plot_file) - + pyvis_graph = plot.create_and_save_pyvis_network( + networkx_graph, html_plot_file + ) + # Add number of nodes and edges to the plor conetnt num_nodes = networkx_graph.number_of_nodes() num_edges = networkx_graph.number_of_edges() - plot_content.append(f'**Number of nodes:** {num_nodes}\n') - plot_content.append(f'**Number of edges:** {num_edges}\n') - + plot_content.append(f"**Number of nodes:** {num_nodes}\n") + plot_content.append(f"**Number of edges:** {num_edges}\n") + # Add code to generate network depending on the report type if is_report_static: plot.save_netwrok_image(networkx_graph, static_plot_path, "png") @@ -386,21 +469,25 @@ def _generate_plot_content(self, plot, is_report_static, static_dir: str = STATI else: plot_content.append(self._generate_plot_code(plot, html_plot_file)) else: - self.report.logger.warning(f"Unsupported plot type: {plot.plot_type}") + self.report.logger.warning(f"Unsupported plot type: {plot.plot_type}") except Exception as e: - self.report.logger.error(f"Error generating content for '{plot.plot_type}' plot '{plot.id}' '{plot.title}': {str(e)}") + self.report.logger.error( + f"Error generating content for '{plot.plot_type}' plot '{plot.id}' '{plot.title}': {str(e)}" + ) raise - + # Add caption if available if plot.caption: - plot_content.append(f'>{plot.caption}\n') + plot_content.append(f">{plot.caption}\n") - self.report.logger.info(f"Successfully generated content for plot: '{plot.title}'") + self.report.logger.info( + f"Successfully generated content for plot: '{plot.title}'" + ) return plot_content - def _generate_plot_code(self, plot, output_file = "") -> str: + def _generate_plot_code(self, plot, output_file="") -> str: """ - Create the plot code based on its visualization tool. + Create the plot code based on its visualization tool. Parameters ---------- @@ -419,7 +506,7 @@ def _generate_plot_code(self, plot, output_file = "") -> str: #| fig-cap: "" """ # If the file path is a URL, generate code to fetch content via requests - if is_url(plot.file_path): + if is_url(plot.file_path): plot_code += f""" response = requests.get('{plot.file_path}') response.raise_for_status() @@ -427,7 +514,7 @@ def _generate_plot_code(self, plot, output_file = "") -> str: else: # If it's a local file plot_code += f""" with open('{os.path.join("..", plot.file_path)}', 'r') as plot_file: - plot_json = plot_file.read()\n""" + plot_json = plot_file.read()\n""" # Add specific code for each visualization tool if plot.plot_type == r.PlotType.PLOTLY: plot_code += """ @@ -437,7 +524,7 @@ def _generate_plot_code(self, plot, output_file = "") -> str: plot_code += """fig_altair = alt.Chart.from_json(plot_json).properties(width=900, height=400)""" elif plot.plot_type == r.PlotType.INTERACTIVE_NETWORK: # Generate the HTML embedding for interactive networks - if is_url(plot.file_path) and plot.file_path.endswith('.html'): + if is_url(plot.file_path) and plot.file_path.endswith(".html"): iframe_src = output_file else: iframe_src = os.path.join("..", output_file) @@ -459,7 +546,7 @@ def _generate_dataframe_content(self, dataframe, is_report_static) -> List[str]: The dataframe component to add to content. is_report_static : bool A boolean indicating whether the report is static or interactive. - + Returns ------- list : List[str] @@ -467,45 +554,61 @@ def _generate_dataframe_content(self, dataframe, is_report_static) -> List[str]: """ dataframe_content = [] # Add title - dataframe_content.append(f'### {dataframe.title}') + dataframe_content.append(f"### {dataframe.title}") # Append header for DataFrame loading - dataframe_content.append(f"""```{{python}} + dataframe_content.append( + f"""```{{python}} #| label: '{dataframe.title} {dataframe.id}' #| fig-cap: "" -""") +""" + ) # Mapping of file extensions to read functions read_function_mapping = { r.DataFrameFormat.CSV.value_with_dot: pd.read_csv, r.DataFrameFormat.PARQUET.value_with_dot: pd.read_parquet, r.DataFrameFormat.TXT.value_with_dot: pd.read_table, r.DataFrameFormat.XLS.value_with_dot: pd.read_excel, - r.DataFrameFormat.XLSX.value_with_dot: pd.read_excel + r.DataFrameFormat.XLSX.value_with_dot: pd.read_excel, } try: # Check if the file extension matches any DataFrameFormat value file_extension = os.path.splitext(dataframe.file_path)[1].lower() - if not any(file_extension == fmt.value_with_dot for fmt in r.DataFrameFormat): - self.report.logger.error(f"Unsupported file extension: {file_extension}. Supported extensions are: {', '.join(fmt.value for fmt in r.DataFrameFormat)}.") - + if not any( + file_extension == fmt.value_with_dot for fmt in r.DataFrameFormat + ): + self.report.logger.error( + f"Unsupported file extension: {file_extension}. Supported extensions are: {', '.join(fmt.value for fmt in r.DataFrameFormat)}." + ) + # Build the file path (URL or local file) - file_path = dataframe.file_path if is_url(dataframe.file_path) else os.path.join("..", dataframe.file_path) + file_path = ( + dataframe.file_path + if is_url(dataframe.file_path) + else os.path.join("..", dataframe.file_path) + ) # Load the DataFrame using the correct function read_function = read_function_mapping[file_extension] - dataframe_content.append(f"""df = pd.{read_function.__name__}('{file_path}')""") + dataframe_content.append( + f"""df = pd.{read_function.__name__}('{file_path}')""" + ) # Display the dataframe dataframe_content.extend(self._show_dataframe(dataframe, is_report_static)) - + except Exception as e: - self.report.logger.error(f"Error generating content for DataFrame: {dataframe.title}. Error: {str(e)}") + self.report.logger.error( + f"Error generating content for DataFrame: {dataframe.title}. Error: {str(e)}" + ) raise # Add caption if available if dataframe.caption: - dataframe_content.append(f'>{dataframe.caption}\n') + dataframe_content.append(f">{dataframe.caption}\n") - self.report.logger.info(f"Successfully generated content for DataFrame: '{dataframe.title}'") + self.report.logger.info( + f"Successfully generated content for DataFrame: '{dataframe.title}'" + ) return dataframe_content def _generate_markdown_content(self, markdown) -> List[str]: @@ -516,45 +619,55 @@ def _generate_markdown_content(self, markdown) -> List[str]: ---------- markdown : Markdown The markdown component to add to content. - + Returns ------- list : List[str] The list of content lines for the markdown. - """ + """ markdown_content = [] # Add title - markdown_content.append(f'### {markdown.title}') - + markdown_content.append(f"### {markdown.title}") + try: # Initialize md code with common structure - markdown_content.append(f""" + markdown_content.append( + f""" ```{{python}} #| label: '{markdown.title} {markdown.id}' -#| fig-cap: ""\n""") +#| fig-cap: ""\n""" + ) # If the file path is a URL, generate code to fetch content via requests - if is_url(markdown.file_path): - markdown_content.append(f""" + if is_url(markdown.file_path): + markdown_content.append( + f""" response = requests.get('{markdown.file_path}') response.raise_for_status() -markdown_content = response.text\n""") - else: #If it's a local file - markdown_content.append(f""" +markdown_content = response.text\n""" + ) + else: # If it's a local file + markdown_content.append( + f""" with open('{os.path.join("..", markdown.file_path)}', 'r') as markdown_file: - markdown_content = markdown_file.read()\n""") - + markdown_content = markdown_file.read()\n""" + ) + # Code to display md content markdown_content.append(f"""display.Markdown(markdown_content)\n```\n""") except Exception as e: - self.report.logger.error(f"Error generating content for Markdown: {markdown.title}. Error: {str(e)}") + self.report.logger.error( + f"Error generating content for Markdown: {markdown.title}. Error: {str(e)}" + ) raise - + # Add caption if available if markdown.caption: - markdown_content.append(f'>{markdown.caption}\n') - - self.report.logger.info(f"Successfully generated content for Markdown: '{markdown.title}'") + markdown_content.append(f">{markdown.caption}\n") + + self.report.logger.info( + f"Successfully generated content for Markdown: '{markdown.title}'" + ) return markdown_content def _generate_html_content(self, html) -> List[str]: @@ -572,13 +685,17 @@ def _generate_html_content(self, html) -> List[str]: The list of content lines for embedding the HTML. """ html_content = [] - + # Add title - html_content.append(f'### {html.title}') - + html_content.append(f"### {html.title}") + try: # Embed the HTML in an iframe - iframe_src = html.file_path if is_url(html.file_path) else os.path.join("..", html.file_path) + iframe_src = ( + html.file_path + if is_url(html.file_path) + else os.path.join("..", html.file_path) + ) iframe_code = f"""
@@ -586,13 +703,19 @@ def _generate_html_content(self, html) -> List[str]: html_content.append(iframe_code) except Exception as e: - self.report.logger.error(f"Error generating content for HTML: {html.title}. Error: {str(e)}") + self.report.logger.error( + f"Error generating content for HTML: {html.title}. Error: {str(e)}" + ) raise - - self.report.logger.info(f"Successfully generated content for HTML: '{html.title}'") + + self.report.logger.info( + f"Successfully generated content for HTML: '{html.title}'" + ) return html_content - def _generate_image_content(self, image_path: str, alt_text: str = "", width: int = 650, height: int = 400) -> str: + def _generate_image_content( + self, image_path: str, alt_text: str = "", width: int = 650, height: int = 400 + ) -> str: """ Adds an image to the content list in an HTML format with a specified width and height. @@ -606,7 +729,7 @@ def _generate_image_content(self, image_path: str, alt_text: str = "", width: in Height of the image in pixels (default is 400). alt_text : str, optional Alternative text for the image (default is an empty string). - + Returns ------- str @@ -614,12 +737,18 @@ def _generate_image_content(self, image_path: str, alt_text: str = "", width: in """ if is_url(image_path): src = image_path - return f"""![]({src}){{fig-alt={alt_text} width={width} height={height}}}\n""" + return ( + f"""![]({src}){{fig-alt={alt_text} width={width} height={height}}}\n""" + ) else: src = Path(image_path).resolve() - return f"""![](/{src}){{fig-alt={alt_text} width={width} height={height}}}\n""" - - def _show_dataframe(self, dataframe, is_report_static, static_dir: str = STATIC_FILES_DIR) -> List[str]: + return ( + f"""![](/{src}){{fig-alt={alt_text} width={width} height={height}}}\n""" + ) + + def _show_dataframe( + self, dataframe, is_report_static, static_dir: str = STATIC_FILES_DIR + ) -> List[str]: """ Appends either a static image or an interactive representation of a DataFrame to the content list. @@ -631,7 +760,7 @@ def _show_dataframe(self, dataframe, is_report_static, static_dir: str = STATIC_ Determines if the report is in a static format (e.g., PDF) or interactive (e.g., HTML). static_dir : str, optional The folder where the static files will be saved (default is STATIC_FILES_DIR). - + Returns ------- list : List[str] @@ -640,16 +769,22 @@ def _show_dataframe(self, dataframe, is_report_static, static_dir: str = STATIC_ dataframe_content = [] if is_report_static: # Generate path for the DataFrame image - df_image = os.path.join(static_dir, f"{dataframe.title.replace(' ', '_')}.png") - dataframe_content.append(f"dfi.export(df, '{os.path.abspath(df_image)}', max_rows=10, max_cols=5)\n```\n") + df_image = os.path.join( + static_dir, f"{dataframe.title.replace(' ', '_')}.png" + ) + dataframe_content.append( + f"dfi.export(df, '{os.path.abspath(df_image)}', max_rows=10, max_cols=5)\n```\n" + ) # Use helper method to add centered image content dataframe_content.append(self._generate_image_content(df_image)) else: # Append code to display the DataFrame interactively - dataframe_content.append(f"""show(df, classes="display nowrap compact", lengthMenu=[3, 5, 10])\n```\n""") - + dataframe_content.append( + f"""show(df, classes="display nowrap compact", lengthMenu=[3, 5, 10])\n```\n""" + ) + return dataframe_content - + def _generate_component_imports(self, component: r.Component) -> List[str]: """ Generate necessary imports for a component of the report. @@ -661,7 +796,7 @@ def _generate_component_imports(self, component: r.Component) -> List[str]: - PLOT - DATAFRAME - MARKDOWN - + Returns ------- list : List[str] @@ -669,27 +804,31 @@ def _generate_component_imports(self, component: r.Component) -> List[str]: """ # Dictionary to hold the imports for each component type components_imports = { - 'plot': { - r.PlotType.ALTAIR: ['import altair as alt', 'import requests'], - r.PlotType.PLOTLY: ['import plotly.io as pio', 'import requests'] + "plot": { + r.PlotType.ALTAIR: ["import altair as alt", "import requests"], + r.PlotType.PLOTLY: ["import plotly.io as pio", "import requests"], }, - 'dataframe': ['import pandas as pd', 'from itables import show', 'import dataframe_image as dfi'], - 'markdown': ['import IPython.display as display', 'import requests'] + "dataframe": [ + "import pandas as pd", + "from itables import show", + "import dataframe_image as dfi", + ], + "markdown": ["import IPython.display as display", "import requests"], } - # Iterate over sections and subsections to determine needed imports + # Iterate over sections and subsections to determine needed imports component_type = component.component_type component_imports = [] # Add relevant imports based on component type and visualization tool if component_type == r.ComponentType.PLOT: - plot_type = getattr(component, 'plot_type', None) - if plot_type in components_imports['plot']: - component_imports.extend(components_imports['plot'][plot_type]) + plot_type = getattr(component, "plot_type", None) + if plot_type in components_imports["plot"]: + component_imports.extend(components_imports["plot"][plot_type]) elif component_type == r.ComponentType.DATAFRAME: - component_imports.extend(components_imports['dataframe']) + component_imports.extend(components_imports["dataframe"]) elif component_type == r.ComponentType.MARKDOWN: - component_imports.extend(components_imports['markdown']) + component_imports.extend(components_imports["markdown"]) # Return the list of import statements return component_imports diff --git a/src/vuegen/report.py b/src/vuegen/report.py index 336b54c..8564a1d 100644 --- a/src/vuegen/report.py +++ b/src/vuegen/report.py @@ -29,7 +29,8 @@ class ReportType(StrEnum): ODT = auto() REVEALJS = auto() PPTX = auto() - JUPYTER = auto() + JUPYTER = auto() + class ComponentType(StrEnum): PLOT = auto() @@ -39,12 +40,14 @@ class ComponentType(StrEnum): APICALL = auto() CHATBOT = auto() + class PlotType(StrEnum): STATIC = auto() PLOTLY = auto() ALTAIR = auto() INTERACTIVE_NETWORK = auto() + class NetworkFormat(StrEnum): GML = auto() GRAPHML = auto() @@ -63,10 +66,12 @@ def value_with_dot(self): """Return the file extension with the dot.""" return f".{self.name.lower()}" + class CSVNetworkFormat(StrEnum): EDGELIST = auto() ADJLIST = auto() + class DataFrameFormat(StrEnum): CSV = auto() TXT = auto() @@ -79,10 +84,11 @@ def value_with_dot(self): """Return the file extension with the dot.""" return f".{self.name.lower()}" + @dataclass -class Component(): +class Component: """ - Base class for different components in a report subsection. It encapsulates elements like + Base class for different components in a report subsection. It encapsulates elements like plots, dataframes, markdown, or apicalls, providing a consistent structure for report generation. Attributes @@ -102,7 +108,8 @@ class Component(): caption : Optional[str] Caption providing additional context about the component (default: None). """ - _id_counter: ClassVar[int] = 0 + + _id_counter: ClassVar[int] = 0 id: int = field(init=False) title: str component_type: ComponentType @@ -118,6 +125,7 @@ def _generate_id(cls) -> int: cls._id_counter += 1 return cls._id_counter + class Plot(Component): """ A plot within a subsection of a report. @@ -129,23 +137,36 @@ class Plot(Component): csv_network_format : CSVNetworkFormat, optional The format of the CSV file for network plots (EDGELIST or ADJLIST) (default is None). """ - def __init__(self, title: str, logger: logging.Logger, plot_type: PlotType, file_path: str=None, - caption: str=None, csv_network_format: Optional[CSVNetworkFormat]=None): + + def __init__( + self, + title: str, + logger: logging.Logger, + plot_type: PlotType, + file_path: str = None, + caption: str = None, + csv_network_format: Optional[CSVNetworkFormat] = None, + ): """ Initializes a Plot object. """ # Call the constructor of the parent class (Component) to set common attributes - super().__init__(title = title, logger = logger, component_type = ComponentType.PLOT, - file_path = file_path, caption = caption) + super().__init__( + title=title, + logger=logger, + component_type=ComponentType.PLOT, + file_path=file_path, + caption=caption, + ) # Set specific attributes for the Plot class self.plot_type = plot_type self.csv_network_format = csv_network_format - + def read_network(self) -> nx.Graph: """ Reads the network file and returns a NetworkX graph object. - + Returns ------- G : networkx.Graph @@ -165,59 +186,86 @@ def read_network(self) -> nx.Graph: NetworkFormat.GML.value_with_dot: nx.read_gml, NetworkFormat.GRAPHML.value_with_dot: nx.read_graphml, NetworkFormat.GEXF.value_with_dot: nx.read_gexf, - NetworkFormat.CYJS.value_with_dot: cyjs_to_networkx + NetworkFormat.CYJS.value_with_dot: cyjs_to_networkx, } # Handle .csv and .txt files with custom delimiters based on the text format (edgelist or adjlist) try: # Fetch the file stream (local or URL) using fetch_file_stream file_stream = fetch_file_stream(self.file_path) - + # Determine the file extension and check if it is supported file_extension = os.path.splitext(self.file_path)[-1].lower() # Check if the file extension matches any Enum value if not any(file_extension == fmt.value_with_dot for fmt in NetworkFormat): - self.logger.error(f"Unsupported file extension: {file_extension}. Supported extensions are: {', '.join(fmt.value for fmt in NetworkFormat)}.") - + self.logger.error( + f"Unsupported file extension: {file_extension}. Supported extensions are: {', '.join(fmt.value for fmt in NetworkFormat)}." + ) + # Handle HTML files for pyvis interactive networks if file_extension == NetworkFormat.HTML.value_with_dot: G = pyvishtml_to_networkx(file_stream) return (G, self.file_path) - + # Handle CSV and TXT files with custom delimiters based on the text format (edgelist or adjlist) - if file_extension in [NetworkFormat.CSV.value_with_dot, NetworkFormat.TXT.value_with_dot] and self.csv_network_format: - delimiter = ',' if file_extension == '.csv' else '\\t' + if ( + file_extension + in [NetworkFormat.CSV.value_with_dot, NetworkFormat.TXT.value_with_dot] + and self.csv_network_format + ): + delimiter = "," if file_extension == ".csv" else "\\t" try: df_net = pd.read_csv(file_stream, delimiter=delimiter) except pd.errors.ParserError: - self.logger.error(f"Error parsing CSV/TXT file {self.file_path}. Please check the file format or delimiter.") + self.logger.error( + f"Error parsing CSV/TXT file {self.file_path}. Please check the file format or delimiter." + ) if self.csv_network_format == CSVNetworkFormat.EDGELIST: # Assert that "source" and "target" columns are present in the DataFrame required_columns = {"source", "target"} if not required_columns.issubset(df_net.columns): - missing_cols = ", ".join(required_columns.difference(df_net.columns)) - self.logger.error(f"CSV network file must contain 'source' and 'target' columns. Missing columns: {missing_cols}.") - + missing_cols = ", ".join( + required_columns.difference(df_net.columns) + ) + self.logger.error( + f"CSV network file must contain 'source' and 'target' columns. Missing columns: {missing_cols}." + ) + # Use additional columns as edge attributes, excluding "source" and "target" - edge_attributes = [col for col in df_net.columns if col not in required_columns] - + edge_attributes = [ + col for col in df_net.columns if col not in required_columns + ] + # Return a NetworkX graph object from the edgelist if edge_attributes: - G = nx.from_pandas_edgelist(df_net, source="source", target="target", edge_attr=edge_attributes) + G = nx.from_pandas_edgelist( + df_net, + source="source", + target="target", + edge_attr=edge_attributes, + ) else: - G = nx.from_pandas_edgelist(df_net, source="source", target="target") - - self.logger.info(f"Successfully read network from file: {self.file_path}.") + G = nx.from_pandas_edgelist( + df_net, source="source", target="target" + ) + + self.logger.info( + f"Successfully read network from file: {self.file_path}." + ) return G elif self.csv_network_format == CSVNetworkFormat.ADJLIST: G = nx.from_pandas_adjacency(df_net) - self.logger.info(f"Successfully read network from file: {self.file_path}.") + self.logger.info( + f"Successfully read network from file: {self.file_path}." + ) return G else: - self.logger.error(f"Unsupported format for CSV/TXT file: {self.csv_network_format}.") - + self.logger.error( + f"Unsupported format for CSV/TXT file: {self.csv_network_format}." + ) + # Handle other formats using the mapping and return the NetworkX graph object from the specified network file G = file_extension_map[file_extension](file_stream) G = self._add_size_attribute(G) @@ -225,12 +273,16 @@ def read_network(self) -> nx.Graph: return G except Exception as e: self.logger.error(f"Error occurred while reading network file: {str(e)}.") - raise RuntimeError(f"An error occurred while reading the network file: {str(e)}") - - def save_netwrok_image(self, G: nx.Graph, output_file: str, format: str, dpi: int=300) -> None: + raise RuntimeError( + f"An error occurred while reading the network file: {str(e)}" + ) + + def save_netwrok_image( + self, G: nx.Graph, output_file: str, format: str, dpi: int = 300 + ) -> None: """ Saves a NetworkX graph as an image file in the specified format and resolution. - + Parameters ---------- G : networkx.Graph @@ -244,15 +296,23 @@ def save_netwrok_image(self, G: nx.Graph, output_file: str, format: str, dpi: in """ # Check if the output file path is valid if not os.path.isdir(os.path.dirname(output_file)): - self.logger.error(f"Directory for saving image does not exist: {os.path.dirname(output_file)}.") - raise FileNotFoundError(f"The directory for saving the file does not exist: {os.path.dirname(output_file)}.") - + self.logger.error( + f"Directory for saving image does not exist: {os.path.dirname(output_file)}." + ) + raise FileNotFoundError( + f"The directory for saving the file does not exist: {os.path.dirname(output_file)}." + ) + # Validate image format - valid_formats = ['png', 'jpg', 'jpeg', 'svg'] + valid_formats = ["png", "jpg", "jpeg", "svg"] if format.lower() not in valid_formats: - self.logger.error(f"Invalid image format: {format}. Supported formats are: {', '.join(valid_formats)}.") - raise ValueError(f"Invalid format: {format}. Supported formats are: {', '.join(valid_formats)}.") - + self.logger.error( + f"Invalid image format: {format}. Supported formats are: {', '.join(valid_formats)}." + ) + raise ValueError( + f"Invalid format: {format}. Supported formats are: {', '.join(valid_formats)}." + ) + try: # Draw the graph and save it as an image file nx.draw(G, with_labels=False) @@ -266,7 +326,7 @@ def save_netwrok_image(self, G: nx.Graph, output_file: str, format: str, dpi: in def create_and_save_pyvis_network(self, G: nx.Graph, output_file: str) -> Network: """ Creates a PyVis network from a NetworkX graph object and saves it as an HTML file. - + Parameters ---------- G : networkx.Graph @@ -281,43 +341,59 @@ def create_and_save_pyvis_network(self, G: nx.Graph, output_file: str) -> Networ """ # Check if the network object and output file path are valid if not isinstance(G, nx.Graph): - self.logger.error(f"Provided object is not a valid NetworkX graph: {type(G)}.") - raise TypeError(f"The provided object is not a valid NetworkX graph: {type(G)}.") + self.logger.error( + f"Provided object is not a valid NetworkX graph: {type(G)}." + ) + raise TypeError( + f"The provided object is not a valid NetworkX graph: {type(G)}." + ) if not os.path.isdir(os.path.dirname(output_file)): - self.logger.error(f"Directory for saving PyVis network does not exist: {os.path.dirname(output_file)}.") - raise FileNotFoundError(f"The directory for saving the file does not exist: {os.path.dirname(output_file)}.") - + self.logger.error( + f"Directory for saving PyVis network does not exist: {os.path.dirname(output_file)}." + ) + raise FileNotFoundError( + f"The directory for saving the file does not exist: {os.path.dirname(output_file)}." + ) + try: # Create a PyVis network object - net = Network(height='600px', width='100%', bgcolor='white', font_color='black') + net = Network( + height="600px", width="100%", bgcolor="white", font_color="black" + ) net.from_nx(G) # Customize the network visualization of nodes for node in net.nodes: - node_id = node['id'] + node_id = node["id"] node_data = G.nodes[node_id] - node['label'] = node_data.get('name', node_id) - node['font'] = {'size': 12} - node['borderWidth'] = 2 - node['borderWidthSelected'] = 2.5 + node["label"] = node_data.get("name", node_id) + node["font"] = {"size": 12} + node["borderWidth"] = 2 + node["borderWidthSelected"] = 2.5 # Apply the force_atlas_2based layout and show panel to control layout - net.force_atlas_2based(gravity=-30, central_gravity=0.005, spring_length=100, spring_strength=0.1, damping=0.4) - net.show_buttons(filter_=['physics']) - + net.force_atlas_2based( + gravity=-30, + central_gravity=0.005, + spring_length=100, + spring_strength=0.1, + damping=0.4, + ) + net.show_buttons(filter_=["physics"]) + # Save the network as an HTML file net.save_graph(output_file) self.logger.info(f"PyVis network created and saved as: {output_file}.") return net - + except Exception as e: self.logger.error(f"Failed to create and save PyVis network: {str(e)}.") raise RuntimeError(f"Failed to create and save the PyVis network: {str(e)}") - + def _add_size_attribute(self, G: nx.Graph) -> nx.Graph: """ Adds a 'size' attribute to the nodes of a NetworkX graph based on their degree centrality. - + Parameters ---------- G : networkx.Graph @@ -330,19 +406,19 @@ def _add_size_attribute(self, G: nx.Graph) -> nx.Graph: """ # Clean up edge attributes to avoid conflicts for u, v, data in G.edges(data=True): - data.pop('source', None) - data.pop('target', None) + data.pop("source", None) + data.pop("target", None) # Assign node labels as their IDs for node in G.nodes(data=True): - G.nodes[node[0]]['label'] = G.nodes[node[0]].get('name', node[0]) + G.nodes[node[0]]["label"] = G.nodes[node[0]].get("name", node[0]) # Obtain and set degree values for nodes degrees = {node: G.degree(node) for node in G.nodes()} - + # Assign sizes based on degrees - min_size = 5 - max_size = 30 + min_size = 5 + max_size = 30 min_degree = min(degrees.values()) max_degree = max(degrees.values()) @@ -353,11 +429,14 @@ def _add_size_attribute(self, G: nx.Graph) -> nx.Graph: elif degree == max_degree: size = max_size else: - size = min_size + (max_size - min_size) * ((degree - min_degree) / (max_degree - min_degree)) - - G.nodes[node]['size'] = size + size = min_size + (max_size - min_size) * ( + (degree - min_degree) / (max_degree - min_degree) + ) + + G.nodes[node]["size"] = size return G + class DataFrame(Component): """ A DataFrame within a subsection of a report. @@ -369,37 +448,77 @@ class DataFrame(Component): delimiter : Optional[str] The delimiter to use if the file is a delimited text format (e.g., ';', '\t', etc). """ - def __init__(self, title: str, logger: logging.Logger, file_format: DataFrameFormat, file_path: str=None, - caption: str=None, delimiter: Optional[str]=None): + + def __init__( + self, + title: str, + logger: logging.Logger, + file_format: DataFrameFormat, + file_path: str = None, + caption: str = None, + delimiter: Optional[str] = None, + ): """ Initializes a DataFrame object. """ - super().__init__(title = title, logger = logger, component_type=ComponentType.DATAFRAME, - file_path=file_path, caption=caption) + super().__init__( + title=title, + logger=logger, + component_type=ComponentType.DATAFRAME, + file_path=file_path, + caption=caption, + ) self.file_format = file_format self.delimiter = delimiter + class Markdown(Component): """ A Markdown text component within a subsection of a report. """ - def __init__(self, title: str, logger: logging.Logger, file_path: str=None, caption: str=None): + + def __init__( + self, + title: str, + logger: logging.Logger, + file_path: str = None, + caption: str = None, + ): """ Initializes a Markdown object. """ - super().__init__(title = title, logger = logger, component_type=ComponentType.MARKDOWN, - file_path=file_path, caption=caption) - + super().__init__( + title=title, + logger=logger, + component_type=ComponentType.MARKDOWN, + file_path=file_path, + caption=caption, + ) + + class Html(Component): """ An html component within a subsection of a report. """ - def __init__(self, title: str, logger: logging.Logger, file_path: str=None, caption: str=None): + + def __init__( + self, + title: str, + logger: logging.Logger, + file_path: str = None, + caption: str = None, + ): """ Initializes an html object. """ - super().__init__(title = title, logger = logger, component_type=ComponentType.HTML, - file_path=file_path, caption=caption) + super().__init__( + title=title, + logger=logger, + component_type=ComponentType.HTML, + file_path=file_path, + caption=caption, + ) + class APICall(Component): """ @@ -414,14 +533,29 @@ class APICall(Component): params : Optional[dict] Query parameters to include in the API request (default is None). """ - def __init__(self, title: str, logger: logging.Logger, api_url: str, caption: str = None, - headers: Optional[dict] = None, params: Optional[dict] = None): - super().__init__(title = title, logger = logger, component_type=ComponentType.APICALL, caption=caption) + + def __init__( + self, + title: str, + logger: logging.Logger, + api_url: str, + caption: str = None, + headers: Optional[dict] = None, + params: Optional[dict] = None, + ): + super().__init__( + title=title, + logger=logger, + component_type=ComponentType.APICALL, + caption=caption, + ) self.api_url = api_url self.headers = headers or {} self.params = params or {} - def make_api_request(self, method: str, request_body: Optional[dict] = None) -> Optional[dict]: + def make_api_request( + self, method: str, request_body: Optional[dict] = None + ) -> Optional[dict]: """ Sends an HTTP request to the specified API and returns the JSON response. @@ -439,14 +573,23 @@ def make_api_request(self, method: str, request_body: Optional[dict] = None) -> """ try: self.logger.info(f"Making {method} request to API: {self.api_url}") - response = requests.request(method, self.api_url, headers=self.headers, params=self.params, json=request_body) + response = requests.request( + method, + self.api_url, + headers=self.headers, + params=self.params, + json=request_body, + ) response.raise_for_status() - self.logger.info(f"Request successful with status code {response.status_code}.") + self.logger.info( + f"Request successful with status code {response.status_code}." + ) return response.json() except requests.exceptions.RequestException as e: self.logger.error(f"API request failed: {e}") return None - + + class ChatBot(Component): """ A component for creating a ChatBot that interacts with an API. @@ -463,23 +606,38 @@ class ChatBot(Component): params : Optional[dict] Query parameters to include in the API request (default is None). """ - def __init__(self, title: str, logger: logging.Logger, api_url: str, model: str, - caption: str = None, headers: Optional[dict] = None, params: Optional[dict] = None): - super().__init__(title=title, logger=logger, component_type=ComponentType.CHATBOT, caption=caption) + + def __init__( + self, + title: str, + logger: logging.Logger, + api_url: str, + model: str, + caption: str = None, + headers: Optional[dict] = None, + params: Optional[dict] = None, + ): + super().__init__( + title=title, + logger=logger, + component_type=ComponentType.CHATBOT, + caption=caption, + ) self.model = model self.api_call = APICall( - title=title, - logger=logger, - api_url=api_url, + title=title, + logger=logger, + api_url=api_url, caption=caption, - headers=headers, - params=params + headers=headers, + params=params, ) - + + @dataclass class Subsection: """ - A subsection within a section, containing multiple components (plots, dataFrames, + A subsection within a section, containing multiple components (plots, dataFrames, markdown text, apicals, etc). Attributes @@ -495,10 +653,11 @@ class Subsection: description : str, optional A description of the subsection (default is None). """ - _id_counter: ClassVar[int] = 0 + + _id_counter: ClassVar[int] = 0 id: int = field(init=False) title: str - components: List['Component'] = field(default_factory=list) + components: List["Component"] = field(default_factory=list) description: Optional[str] = None def __post_init__(self): @@ -509,6 +668,7 @@ def _generate_id(cls) -> int: cls._id_counter += 1 return cls._id_counter + @dataclass class Section: """ @@ -527,10 +687,11 @@ class Section: description : str, optional A description of the section (default is None). """ - _id_counter: ClassVar[int] = 0 + + _id_counter: ClassVar[int] = 0 id: int = field(init=False) title: str - subsections: List['Subsection'] = field(default_factory=list) + subsections: List["Subsection"] = field(default_factory=list) description: Optional[str] = None def __post_init__(self): @@ -541,6 +702,7 @@ def _generate_id(cls) -> int: cls._id_counter += 1 return cls._id_counter + @dataclass class Report: """ @@ -561,13 +723,15 @@ class Report: logo : str, optional The file path to the logo image (default is None). """ + title: str logger: logging.Logger - sections: List['Section'] = field(default_factory=list) + sections: List["Section"] = field(default_factory=list) description: Optional[str] = None graphical_abstract: Optional[str] = None logo: Optional[str] = None + class ReportView(ABC): """ An abstract base class for report view implementations. @@ -582,17 +746,18 @@ class ReportView(ABC): The report that this ABC is associated with. report_type : ReportType The report type. It should be one of the values of the ReportType Enum. - + """ - def __init__(self, report: 'Report', report_type: 'ReportType'): + + def __init__(self, report: "Report", report_type: "ReportType"): self.report = report self.report_type = report_type @abstractmethod - def generate_report(self, output_dir: str = 'sections') -> None: + def generate_report(self, output_dir: str = "sections") -> None: """ Generates the report and creates output files. - + Parameters ---------- output_dir : str, optional @@ -601,7 +766,7 @@ def generate_report(self, output_dir: str = 'sections') -> None: pass @abstractmethod - def run_report(self, output_dir: str = 'sections') -> None: + def run_report(self, output_dir: str = "sections") -> None: """ Runs the generated report. @@ -611,7 +776,7 @@ def run_report(self, output_dir: str = 'sections') -> None: The folder where the report was generated (default is 'sections'). """ pass - + @abstractmethod def _generate_component_imports(self) -> str: """ @@ -624,13 +789,14 @@ def _generate_component_imports(self) -> str: - PLOT - DATAFRAME - MARKDOWN - + Returns ------- str A str of import statements for the component. """ - pass + pass + class WebAppReportView(ReportView): """ @@ -641,7 +807,7 @@ class WebAppReportView(ReportView): def _format_text(self, text: str, type: str, level: int, color: str) -> str: """ Format text for the report view. - + Parameters ---------- text : str @@ -652,7 +818,7 @@ def _format_text(self, text: str, type: str, level: int, color: str) -> str: If the text is a header, the level of the header (e.g., 1 for h1, 2 for h2, etc.). color : str, optional The color of the header text. - + Returns ------- str @@ -664,7 +830,7 @@ def _format_text(self, text: str, type: str, level: int, color: str) -> str: def _generate_sections(self, output_dir: str) -> None: """ Creates sections and subsections for the report. - + Parameters ---------- output_dir : str @@ -677,9 +843,11 @@ def _generate_sections(self, output_dir: str) -> None: pass @abstractmethod - def _generate_subsection(self, subsection: Subsection) -> tuple[List[str], List[str]]: + def _generate_subsection( + self, subsection: Subsection + ) -> tuple[List[str], List[str]]: """ - Generate code to render components (plots, dataframes, markdown) in the given subsection, + Generate code to render components (plots, dataframes, markdown) in the given subsection, creating imports and content for the subsection based on the component type. Parameters diff --git a/src/vuegen/report_generator.py b/src/vuegen/report_generator.py index dc5ca70..846ad91 100644 --- a/src/vuegen/report_generator.py +++ b/src/vuegen/report_generator.py @@ -4,10 +4,17 @@ from .quarto_reportview import QuartoReportView from .report import ReportType from .streamlit_reportview import StreamlitReportView -from .utils import assert_enum_value, load_yaml_config, write_yaml_config, get_logger +from .utils import (assert_enum_value, get_logger, load_yaml_config, + write_yaml_config) -def get_report(report_type: str, logger: logging.Logger = get_logger("report"), config_path: str = None, dir_path: str = None, streamlit_autorun: bool = False) -> None: +def get_report( + report_type: str, + logger: logging.Logger = get_logger("report"), + config_path: str = None, + dir_path: str = None, + streamlit_autorun: bool = False, +) -> None: """ Generate and run a report based on the specified engine. @@ -31,7 +38,7 @@ def get_report(report_type: str, logger: logging.Logger = get_logger("report"), """ # Initialize the config manager object config_manager = ConfigManager(logger) - + if dir_path: # Generate configuration from the provided directory yaml_data, base_folder_path = config_manager.create_yamlconfig_fromdir(dir_path) @@ -39,7 +46,7 @@ def get_report(report_type: str, logger: logging.Logger = get_logger("report"), # Load the YAML configuration file with the report metadata report_config = load_yaml_config(config_path) - + # Load report object and metadata report, report_metadata = config_manager.initialize_report(report_config) @@ -49,17 +56,12 @@ def get_report(report_type: str, logger: logging.Logger = get_logger("report"), # Create and run ReportView object based on its type if report_type == ReportType.STREAMLIT: st_report = StreamlitReportView( - report = report, - report_type = report_type, - streamlit_autorun = streamlit_autorun + report=report, report_type=report_type, streamlit_autorun=streamlit_autorun ) st_report.generate_report() st_report.run_report() else: - quarto_report = QuartoReportView( - report = report, - report_type = report_type - ) + quarto_report = QuartoReportView(report=report, report_type=report_type) quarto_report.generate_report() quarto_report.run_report() diff --git a/src/vuegen/streamlit_reportview.py b/src/vuegen/streamlit_reportview.py index 789c73a..85d9a62 100644 --- a/src/vuegen/streamlit_reportview.py +++ b/src/vuegen/streamlit_reportview.py @@ -5,7 +5,7 @@ import pandas as pd from . import report as r -from .utils import create_folder, is_url, generate_footer +from .utils import create_folder, generate_footer, is_url class StreamlitReportView(r.WebAppReportView): @@ -13,16 +13,23 @@ class StreamlitReportView(r.WebAppReportView): A Streamlit-based implementation of the WebAppReportView abstract base class. """ - BASE_DIR = 'streamlit_report' - SECTIONS_DIR = os.path.join(BASE_DIR, 'sections') - STATIC_FILES_DIR = os.path.join(BASE_DIR, 'static') - REPORT_MANAG_SCRIPT = 'report_manager.py' - - def __init__(self, report: r.Report, report_type: r.ReportType, streamlit_autorun: bool = False): - super().__init__(report = report, report_type = report_type) + BASE_DIR = "streamlit_report" + SECTIONS_DIR = os.path.join(BASE_DIR, "sections") + STATIC_FILES_DIR = os.path.join(BASE_DIR, "static") + REPORT_MANAG_SCRIPT = "report_manager.py" + + def __init__( + self, + report: r.Report, + report_type: r.ReportType, + streamlit_autorun: bool = False, + ): + super().__init__(report=report, report_type=report_type) self.streamlit_autorun = streamlit_autorun - def generate_report(self, output_dir: str = SECTIONS_DIR, static_dir: str = STATIC_FILES_DIR) -> None: + def generate_report( + self, output_dir: str = SECTIONS_DIR, static_dir: str = STATIC_FILES_DIR + ) -> None: """ Generates the Streamlit report and creates Python files for each section and its subsections and plots. @@ -33,9 +40,11 @@ def generate_report(self, output_dir: str = SECTIONS_DIR, static_dir: str = STAT static_dir : str, optional The folder where the static files will be saved (default is STATIC_FILES_DIR). """ - self.report.logger.debug(f"Generating '{self.report_type}' report in directory: '{output_dir}'") + self.report.logger.debug( + f"Generating '{self.report_type}' report in directory: '{output_dir}'" + ) - # Create the output folder + # Create the output folder if create_folder(output_dir, is_nested=True): self.report.logger.info(f"Created output directory: '{output_dir}'") else: @@ -43,30 +52,42 @@ def generate_report(self, output_dir: str = SECTIONS_DIR, static_dir: str = STAT # Create the static folder if create_folder(static_dir): - self.report.logger.info(f"Created output directory for static content: '{static_dir}'") + self.report.logger.info( + f"Created output directory for static content: '{static_dir}'" + ) else: - self.report.logger.info(f"Output directory for static content already existed: '{static_dir}'") - + self.report.logger.info( + f"Output directory for static content already existed: '{static_dir}'" + ) + try: self.report.logger.debug("Processing app navigation code.") # Define the Streamlit imports and report manager content report_manag_content = [] - if self.report.logo: + if self.report.logo: report_manag_content.append( f"""import streamlit as st\n st.set_page_config(layout="wide", page_title="{self.report.title}", page_icon="{self.report.logo}") -st.logo("{self.report.logo}")""") +st.logo("{self.report.logo}")""" + ) else: report_manag_content.append( f"""import streamlit as st\n -st.set_page_config(layout="wide", page_title="{self.report.title}")""") - report_manag_content.append(self._format_text(text=self.report.title, type = 'header', level=1, color='#023858')) +st.set_page_config(layout="wide", page_title="{self.report.title}")""" + ) + report_manag_content.append( + self._format_text( + text=self.report.title, type="header", level=1, color="#023858" + ) + ) # Initialize a dictionary to store the navigation structure report_manag_content.append("\nsections_pages = {}") # Generate the home page and update the report manager content - self._generate_home_section(output_dir=output_dir, report_manag_content=report_manag_content) + self._generate_home_section( + output_dir=output_dir, report_manag_content=report_manag_content + ) for section in self.report.sections: # Create a folder for each section @@ -75,34 +96,52 @@ def generate_report(self, output_dir: str = SECTIONS_DIR, static_dir: str = STAT section_dir_path = os.path.join(output_dir, section_name_var) if create_folder(section_dir_path): - self.report.logger.debug(f"Created section directory: {section_dir_path}") + self.report.logger.debug( + f"Created section directory: {section_dir_path}" + ) else: - self.report.logger.debug(f"Section directory already existed: {section_dir_path}") - + self.report.logger.debug( + f"Section directory already existed: {section_dir_path}" + ) + for subsection in section.subsections: subsection_name_var = subsection.title.replace(" ", "_") - subsection_file_path = os.path.join(section_name_var, subsection_name_var + ".py") + subsection_file_path = os.path.join( + section_name_var, subsection_name_var + ".py" + ) # Create a Page object for each subsection and add it to the home page content - report_manag_content.append(f"{subsection_name_var} = st.Page('{subsection_file_path}', title='{subsection.title}')") + report_manag_content.append( + f"{subsection_name_var} = st.Page('{subsection_file_path}', title='{subsection.title}')" + ) subsection_page_vars.append(subsection_name_var) - + # Add all subsection Page objects to the corresponding section - report_manag_content.append(f"sections_pages['{section.title}'] = [{', '.join(subsection_page_vars)}]\n") + report_manag_content.append( + f"sections_pages['{section.title}'] = [{', '.join(subsection_page_vars)}]\n" + ) # Add navigation object to the home page content - report_manag_content.append(f"""report_nav = st.navigation(sections_pages) -report_nav.run()""") - + report_manag_content.append( + f"""report_nav = st.navigation(sections_pages) +report_nav.run()""" + ) + # Write the navigation and general content to a Python file - with open(os.path.join(output_dir, self.REPORT_MANAG_SCRIPT), 'w') as nav_manager: + with open( + os.path.join(output_dir, self.REPORT_MANAG_SCRIPT), "w" + ) as nav_manager: nav_manager.write("\n".join(report_manag_content)) - self.report.logger.info(f"Created app navigation script: {self.REPORT_MANAG_SCRIPT}") + self.report.logger.info( + f"Created app navigation script: {self.REPORT_MANAG_SCRIPT}" + ) # Create Python files for each section and its subsections and plots self._generate_sections(output_dir=output_dir) except Exception as e: - self.report.logger.error(f"An error occurred while generating the report: {str(e)}") + self.report.logger.error( + f"An error occurred while generating the report: {str(e)}" + ) raise def run_report(self, output_dir: str = SECTIONS_DIR) -> None: @@ -115,9 +154,18 @@ def run_report(self, output_dir: str = SECTIONS_DIR) -> None: The folder where the report was generated (default is SECTIONS_DIR). """ if self.streamlit_autorun: - self.report.logger.info(f"Running '{self.report.title}' {self.report_type} report.") + self.report.logger.info( + f"Running '{self.report.title}' {self.report_type} report." + ) try: - subprocess.run(["streamlit", "run", os.path.join(output_dir, self.REPORT_MANAG_SCRIPT)], check=True) + subprocess.run( + [ + "streamlit", + "run", + os.path.join(output_dir, self.REPORT_MANAG_SCRIPT), + ], + check=True, + ) except KeyboardInterrupt: print("Streamlit process interrupted.") except subprocess.CalledProcessError as e: @@ -125,20 +173,33 @@ def run_report(self, output_dir: str = SECTIONS_DIR) -> None: raise else: # If autorun is False, print instructions for manual execution - self.report.logger.info(f"All the scripts to build the Streamlit app are available at {output_dir}") - self.report.logger.info(f"To run the Streamlit app, use the following command:") - self.report.logger.info(f"streamlit run {os.path.join(output_dir, self.REPORT_MANAG_SCRIPT)}") + self.report.logger.info( + f"All the scripts to build the Streamlit app are available at {output_dir}" + ) + self.report.logger.info( + f"To run the Streamlit app, use the following command:" + ) + self.report.logger.info( + f"streamlit run {os.path.join(output_dir, self.REPORT_MANAG_SCRIPT)}" + ) msg = ( f"\nAll the scripts to build the Streamlit app are available at: {output_dir}\n\n" f"To run the Streamlit app, use the following command:\n\n" f"\tstreamlit run {os.path.join(output_dir, self.REPORT_MANAG_SCRIPT)}" - ) + ) print(msg) - def _format_text(self, text: str, type: str, level: int = 1, color: str = '#000000', text_align: str = 'center') -> str: + def _format_text( + self, + text: str, + type: str, + level: int = 1, + color: str = "#000000", + text_align: str = "center", + ) -> str: """ Generates a Streamlit markdown text string with the specified level and color. - + Parameters ---------- text : str @@ -157,14 +218,16 @@ def _format_text(self, text: str, type: str, level: int = 1, color: str = '#0000 str A formatted markdown string for the specified text. """ - if type == 'header': + if type == "header": tag = f"h{level}" - elif type == 'paragraph': - tag = 'p' + elif type == "paragraph": + tag = "p" return f"""st.markdown('''<{tag} style='text-align: {text_align}; color: {color};'>{text}''', unsafe_allow_html=True)""" - def _generate_home_section(self, output_dir: str, report_manag_content: list) -> None: + def _generate_home_section( + self, output_dir: str, report_manag_content: list + ) -> None: """ Generates the homepage for the report and updates the report manager content. @@ -180,18 +243,24 @@ def _generate_home_section(self, output_dir: str, report_manag_content: list) -> try: # Create folder for the home page home_dir_path = os.path.join(output_dir, "Home") - if create_folder(home_dir_path): + if create_folder(home_dir_path): self.report.logger.debug(f"Created home directory: {home_dir_path}") else: - self.report.logger.debug(f"Home directory already existed: {home_dir_path}") - + self.report.logger.debug( + f"Home directory already existed: {home_dir_path}" + ) + # Create the home page content home_content = [] home_content.append(f"import streamlit as st") if self.report.description: - home_content.append(self._format_text(text=self.report.description, type='paragraph')) + home_content.append( + self._format_text(text=self.report.description, type="paragraph") + ) if self.report.graphical_abstract: - home_content.append(f"\nst.image('{self.report.graphical_abstract}', use_column_width=True)") + home_content.append( + f"\nst.image('{self.report.graphical_abstract}', use_column_width=True)" + ) # Define the footer variable and add it to the home page content home_content.append("footer = '''" + generate_footer() + "'''\n") @@ -199,12 +268,14 @@ def _generate_home_section(self, output_dir: str, report_manag_content: list) -> # Write the home page content to a Python file home_page_path = os.path.join(home_dir_path, "Homepage.py") - with open(home_page_path, 'w') as home_page: + with open(home_page_path, "w") as home_page: home_page.write("\n".join(home_content)) self.report.logger.info(f"Home page content written to '{home_page_path}'.") # Add the home page to the report manager content - report_manag_content.append(f"homepage = st.Page('Home/Homepage.py', title='Homepage')") + report_manag_content.append( + f"homepage = st.Page('Home/Homepage.py', title='Homepage')" + ) report_manag_content.append(f"sections_pages['Home'] = [homepage]\n") self.report.logger.info("Home page added to the report manager content.") except Exception as e: @@ -214,7 +285,7 @@ def _generate_home_section(self, output_dir: str, report_manag_content: list) -> def _generate_sections(self, output_dir: str) -> None: """ Generates Python files for each section in the report, including subsections and its components (plots, dataframes, markdown). - + Parameters ---------- output_dir : str @@ -225,48 +296,66 @@ def _generate_sections(self, output_dir: str) -> None: try: for section in self.report.sections: section_name_var = section.title.replace(" ", "_") - self.report.logger.debug(f"Processing section '{section.id}': '{section.title}' - {len(section.subsections)} subsection(s)") + self.report.logger.debug( + f"Processing section '{section.id}': '{section.title}' - {len(section.subsections)} subsection(s)" + ) if section.subsections: # Iterate through subsections and integrate them into the section file for subsection in section.subsections: - self.report.logger.debug(f"Processing subsection '{subsection.id}': '{subsection.title} - {len(subsection.components)} component(s)'") + self.report.logger.debug( + f"Processing subsection '{subsection.id}': '{subsection.title} - {len(subsection.components)} component(s)'" + ) try: # Create subsection file - subsection_file_path = os.path.join(output_dir, section_name_var, subsection.title.replace(" ", "_") + ".py") - + subsection_file_path = os.path.join( + output_dir, + section_name_var, + subsection.title.replace(" ", "_") + ".py", + ) + # Generate content and imports for the subsection - subsection_content, subsection_imports = self._generate_subsection(subsection) + subsection_content, subsection_imports = ( + self._generate_subsection(subsection) + ) # Flatten the subsection_imports into a single list - flattened_subsection_imports = [imp for sublist in subsection_imports for imp in sublist] - + flattened_subsection_imports = [ + imp for sublist in subsection_imports for imp in sublist + ] + # Remove duplicated imports unique_imports = list(set(flattened_subsection_imports)) # Write everything to the subsection file - with open(subsection_file_path, 'w') as subsection_file: + with open(subsection_file_path, "w") as subsection_file: # Write imports at the top of the file - subsection_file.write("\n".join(unique_imports) + "\n\n") + subsection_file.write( + "\n".join(unique_imports) + "\n\n" + ) # Write the subsection content (descriptions, plots) subsection_file.write("\n".join(subsection_content)) - self.report.logger.info(f"Subsection file created: '{subsection_file_path}'") + self.report.logger.info( + f"Subsection file created: '{subsection_file_path}'" + ) except Exception as subsection_error: self.report.logger.error( f"Error processing subsection '{subsection.id}' '{subsection.title}' in section '{section.id}' '{section.title}': {str(subsection_error)}" ) raise else: - self.report.logger.warning(f"No subsections found in section: '{section.title}'. To show content in the report, add subsections to the section.") + self.report.logger.warning( + f"No subsections found in section: '{section.title}'. To show content in the report, add subsections to the section." + ) except Exception as e: self.report.logger.error(f"Error generating sections: {str(e)}") raise def _generate_subsection(self, subsection) -> tuple[List[str], List[str]]: """ - Generate code to render components (plots, dataframes, markdown) in the given subsection, + Generate code to render components (plots, dataframes, markdown) in the given subsection, creating imports and content for the subsection based on the component type. Parameters @@ -282,11 +371,17 @@ def _generate_subsection(self, subsection) -> tuple[List[str], List[str]]: """ subsection_content = [] subsection_imports = [] - + # Add subsection header and description - subsection_content.append(self._format_text(text=subsection.title, type='header', level=3, color='#023558')) + subsection_content.append( + self._format_text( + text=subsection.title, type="header", level=3, color="#023558" + ) + ) if subsection.description: - subsection_content.append(self._format_text(text=subsection.description, type='paragraph')) + subsection_content.append( + self._format_text(text=subsection.description, type="paragraph") + ) for component in subsection.components: # Write imports if not already done @@ -299,7 +394,10 @@ def _generate_subsection(self, subsection) -> tuple[List[str], List[str]]: elif component.component_type == r.ComponentType.DATAFRAME: subsection_content.extend(self._generate_dataframe_content(component)) # If md files is called "description.md", do not include it in the report - elif component.component_type == r.ComponentType.MARKDOWN and component.title.lower() != "description": + elif ( + component.component_type == r.ComponentType.MARKDOWN + and component.title.lower() != "description" + ): subsection_content.extend(self._generate_markdown_content(component)) elif component.component_type == r.ComponentType.HTML: subsection_content.extend(self._generate_html_content(component)) @@ -308,19 +406,25 @@ def _generate_subsection(self, subsection) -> tuple[List[str], List[str]]: elif component.component_type == r.ComponentType.CHATBOT: subsection_content.extend(self._generate_chatbot_content(component)) else: - self.report.logger.warning(f"Unsupported component type '{component.component_type}' in subsection: {subsection.title}") - + self.report.logger.warning( + f"Unsupported component type '{component.component_type}' in subsection: {subsection.title}" + ) + # Define the footer variable and add it to the home page content subsection_content.append("footer = '''" + generate_footer() + "'''\n") subsection_content.append("st.markdown(footer, unsafe_allow_html=True)\n") - - self.report.logger.info(f"Generated content and imports for subsection: '{subsection.title}'") + + self.report.logger.info( + f"Generated content and imports for subsection: '{subsection.title}'" + ) return subsection_content, subsection_imports - - def _generate_plot_content(self, plot, static_dir: str = STATIC_FILES_DIR) -> List[str]: + + def _generate_plot_content( + self, plot, static_dir: str = STATIC_FILES_DIR + ) -> List[str]: """ Generate content for a plot component based on the plot type (static or interactive). - + Parameters ---------- plot : Plot @@ -335,12 +439,16 @@ def _generate_plot_content(self, plot, static_dir: str = STATIC_FILES_DIR) -> Li """ plot_content = [] # Add title - plot_content.append(self._format_text(text=plot.title, type='header', level=4, color='#2b8cbe')) - + plot_content.append( + self._format_text(text=plot.title, type="header", level=4, color="#2b8cbe") + ) + # Add content for the different plot types try: if plot.plot_type == r.PlotType.STATIC: - plot_content.append(f"\nst.image('{plot.file_path}', caption='{plot.caption}', use_column_width=True)\n") + plot_content.append( + f"\nst.image('{plot.file_path}', caption='{plot.caption}', use_column_width=True)\n" + ) elif plot.plot_type == r.PlotType.PLOTLY: plot_content.append(self._generate_plot_code(plot)) elif plot.plot_type == r.PlotType.ALTAIR: @@ -352,43 +460,57 @@ def _generate_plot_content(self, plot, static_dir: str = STATIC_FILES_DIR) -> Li networkx_graph, html_plot_file = networkx_graph else: # Otherwise, create and save a new pyvis network from the netowrkx graph - html_plot_file = os.path.join(static_dir, f"{plot.title.replace(' ', '_')}.html") - pyvis_graph = plot.create_and_save_pyvis_network(networkx_graph, html_plot_file) - + html_plot_file = os.path.join( + static_dir, f"{plot.title.replace(' ', '_')}.html" + ) + pyvis_graph = plot.create_and_save_pyvis_network( + networkx_graph, html_plot_file + ) + # Add number of nodes and edges to the plor conetnt num_nodes = networkx_graph.number_of_nodes() num_edges = networkx_graph.number_of_edges() - + # Determine whether the file path is a URL or a local file if is_url(html_plot_file): - plot_content.append(f""" + plot_content.append( + f""" response = requests.get('{html_plot_file}') response.raise_for_status() -html_data = response.text\n""") +html_data = response.text\n""" + ) else: - plot_content.append(f""" + plot_content.append( + f""" with open('{html_plot_file}', 'r') as f: - html_data = f.read()\n""") + html_data = f.read()\n""" + ) # Append the code for additional information (nodes and edges count) - plot_content.append(f""" + plot_content.append( + f""" st.markdown(f"

Number of nodes: {num_nodes}

", unsafe_allow_html=True) -st.markdown(f"

Number of relationships: {num_edges}

", unsafe_allow_html=True)\n""") - +st.markdown(f"

Number of relationships: {num_edges}

", unsafe_allow_html=True)\n""" + ) + # Add the specific code for visualization plot_content.append(self._generate_plot_code(plot)) else: self.report.logger.warning(f"Unsupported plot type: {plot.plot_type}") except Exception as e: - self.report.logger.error(f"Error generating content for '{plot.plot_type}' plot '{plot.id}' '{plot.title}': {str(e)}") - raise + self.report.logger.error( + f"Error generating content for '{plot.plot_type}' plot '{plot.id}' '{plot.title}': {str(e)}" + ) + raise - self.report.logger.info(f"Successfully generated content for plot '{plot.id}': '{plot.title}'") + self.report.logger.info( + f"Successfully generated content for plot '{plot.id}': '{plot.title}'" + ) return plot_content - + def _generate_plot_code(self, plot) -> str: """ - Create the plot code based on its visualization tool. + Create the plot code based on its visualization tool. Parameters ---------- @@ -402,7 +524,7 @@ def _generate_plot_code(self, plot) -> str: The generated plot code as a string. """ # If the file path is a URL, generate code to fetch content via requests - if is_url(plot.file_path): + if is_url(plot.file_path): plot_code = f""" response = requests.get('{plot.file_path}') response.raise_for_status() @@ -410,8 +532,8 @@ def _generate_plot_code(self, plot) -> str: else: # If it's a local file plot_code = f""" with open('{os.path.join(plot.file_path)}', 'r') as plot_file: - plot_json = json.load(plot_file)\n""" - + plot_json = json.load(plot_file)\n""" + # Add specific code for each visualization tool if plot.plot_type == r.PlotType.PLOTLY: plot_code += "st.plotly_chart(plot_json, use_container_width=True)\n" @@ -420,7 +542,7 @@ def _generate_plot_code(self, plot) -> str: plot_code += """ altair_plot = alt.Chart.from_dict(plot_json) st.vega_lite_chart(json.loads(altair_plot.to_json()), use_container_width=True)\n""" - + elif plot.plot_type == r.PlotType.INTERACTIVE_NETWORK: plot_code = """# Streamlit checkbox for controlling the layout control_layout = st.checkbox('Add panel to control layout', value=True) @@ -428,7 +550,7 @@ def _generate_plot_code(self, plot) -> str: # Load HTML into HTML component for display on Streamlit st.components.v1.html(html_data, height=net_html_height)\n""" return plot_code - + def _generate_dataframe_content(self, dataframe) -> List[str]: """ Generate content for a DataFrame component. @@ -445,29 +567,40 @@ def _generate_dataframe_content(self, dataframe) -> List[str]: """ dataframe_content = [] # Add title - dataframe_content.append(self._format_text(text=dataframe.title, type='header', level=4, color='#2b8cbe')) - + dataframe_content.append( + self._format_text( + text=dataframe.title, type="header", level=4, color="#2b8cbe" + ) + ) + # Mapping of file extensions to read functions read_function_mapping = { r.DataFrameFormat.CSV.value_with_dot: pd.read_csv, r.DataFrameFormat.PARQUET.value_with_dot: pd.read_parquet, r.DataFrameFormat.TXT.value_with_dot: pd.read_table, r.DataFrameFormat.XLS.value_with_dot: pd.read_excel, - r.DataFrameFormat.XLSX.value_with_dot: pd.read_excel + r.DataFrameFormat.XLSX.value_with_dot: pd.read_excel, } try: # Check if the file extension matches any DataFrameFormat value file_extension = os.path.splitext(dataframe.file_path)[1].lower() - if not any(file_extension == fmt.value_with_dot for fmt in r.DataFrameFormat): - self.report.logger.error(f"Unsupported file extension: {file_extension}. Supported extensions are: {', '.join(fmt.value for fmt in r.DataFrameFormat)}.") + if not any( + file_extension == fmt.value_with_dot for fmt in r.DataFrameFormat + ): + self.report.logger.error( + f"Unsupported file extension: {file_extension}. Supported extensions are: {', '.join(fmt.value for fmt in r.DataFrameFormat)}." + ) # Load the DataFrame using the correct function read_function = read_function_mapping[file_extension] - dataframe_content.append(f"""df = pd.{read_function.__name__}('{dataframe.file_path}')""") - + dataframe_content.append( + f"""df = pd.{read_function.__name__}('{dataframe.file_path}')""" + ) + # Displays a DataFrame using AgGrid with configurable options. - dataframe_content.append(""" + dataframe_content.append( + """ # Displays a DataFrame using AgGrid with configurable options. grid_builder = GridOptionsBuilder.from_dataframe(df) grid_builder.configure_default_column(editable=True, groupable=True) @@ -486,18 +619,27 @@ def _generate_dataframe_content(self, dataframe) -> List[str]: file_name=f"dataframe_{df_index}.csv", mime='text/csv', key=f"download_button_{df_index}") -df_index += 1""") +df_index += 1""" + ) except Exception as e: - self.report.logger.error(f"Error generating content for DataFrame: {dataframe.title}. Error: {str(e)}") + self.report.logger.error( + f"Error generating content for DataFrame: {dataframe.title}. Error: {str(e)}" + ) raise - + # Add caption if available if dataframe.caption: - dataframe_content.append(self._format_text(text=dataframe.caption, type='caption', text_align="left")) - - self.report.logger.info(f"Successfully generated content for DataFrame: '{dataframe.title}'") + dataframe_content.append( + self._format_text( + text=dataframe.caption, type="caption", text_align="left" + ) + ) + + self.report.logger.info( + f"Successfully generated content for DataFrame: '{dataframe.title}'" + ) return dataframe_content - + def _generate_markdown_content(self, markdown) -> List[str]: """ Generate content for a Markdown component. @@ -515,31 +657,49 @@ def _generate_markdown_content(self, markdown) -> List[str]: markdown_content = [] # Add title - markdown_content.append(self._format_text(text=markdown.title, type='header', level=4, color='#2b8cbe')) + markdown_content.append( + self._format_text( + text=markdown.title, type="header", level=4, color="#2b8cbe" + ) + ) try: # If the file path is a URL, generate code to fetch content via requests - if is_url(markdown.file_path): - markdown_content.append(f""" + if is_url(markdown.file_path): + markdown_content.append( + f""" response = requests.get('{markdown.file_path}') response.raise_for_status() -markdown_content = response.text\n""") - else: #If it's a local file - markdown_content.append(f""" +markdown_content = response.text\n""" + ) + else: # If it's a local file + markdown_content.append( + f""" with open('{os.path.join("..", markdown.file_path)}', 'r') as markdown_file: - markdown_content = markdown_file.read()\n""") + markdown_content = markdown_file.read()\n""" + ) # Code to display md content - markdown_content.append("st.markdown(markdown_content, unsafe_allow_html=True)\n") + markdown_content.append( + "st.markdown(markdown_content, unsafe_allow_html=True)\n" + ) except Exception as e: - self.report.logger.error(f"Error generating content for Markdown: {markdown.title}. Error: {str(e)}") + self.report.logger.error( + f"Error generating content for Markdown: {markdown.title}. Error: {str(e)}" + ) raise - + # Add caption if available if markdown.caption: - markdown_content.append(self._format_text(text=markdown.caption, type='caption', text_align="left")) - - self.report.logger.info(f"Successfully generated content for Markdown: '{markdown.title}'") + markdown_content.append( + self._format_text( + text=markdown.caption, type="caption", text_align="left" + ) + ) + + self.report.logger.info( + f"Successfully generated content for Markdown: '{markdown.title}'" + ) return markdown_content - + def _generate_html_content(self, html) -> List[str]: """ Generate content for an HTML component in a Streamlit app. @@ -557,35 +717,49 @@ def _generate_html_content(self, html) -> List[str]: html_content = [] # Add title - html_content.append(self._format_text(text=html.title, type='header', level=4, color='#2b8cbe')) + html_content.append( + self._format_text(text=html.title, type="header", level=4, color="#2b8cbe") + ) try: - if is_url(html.file_path): + if is_url(html.file_path): # If it's a URL, fetch content dynamically - html_content.append(f""" + html_content.append( + f""" response = requests.get('{html.file_path}') response.raise_for_status() -html_content = response.text\n""") +html_content = response.text\n""" + ) else: # If it's a local file - html_content.append(f""" + html_content.append( + f""" with open('{os.path.join("..", html.file_path)}', 'r', encoding='utf-8') as html_file: - html_content = html_file.read()\n""") + html_content = html_file.read()\n""" + ) # Display HTML content using Streamlit - html_content.append("st.components.v1.html(html_content, height=600, scrolling=True)\n") + html_content.append( + "st.components.v1.html(html_content, height=600, scrolling=True)\n" + ) except Exception as e: - self.report.logger.error(f"Error generating content for HTML: {html.title}. Error: {str(e)}") + self.report.logger.error( + f"Error generating content for HTML: {html.title}. Error: {str(e)}" + ) raise # Add caption if available if html.caption: - html_content.append(self._format_text(text=html.caption, type='caption', text_align="left")) + html_content.append( + self._format_text(text=html.caption, type="caption", text_align="left") + ) - self.report.logger.info(f"Successfully generated content for HTML: '{html.title}'") + self.report.logger.info( + f"Successfully generated content for HTML: '{html.title}'" + ) return html_content - + def _generate_apicall_content(self, apicall) -> List[str]: """ Generate content for a Markdown component. @@ -603,21 +777,33 @@ def _generate_apicall_content(self, apicall) -> List[str]: apicall_content = [] # Add tile - apicall_content.append(self._format_text(text=apicall.title, type='header', level=4, color='#2b8cbe')) + apicall_content.append( + self._format_text( + text=apicall.title, type="header", level=4, color="#2b8cbe" + ) + ) try: - apicall_response = apicall.make_api_request(method='GET') + apicall_response = apicall.make_api_request(method="GET") apicall_content.append(f"""st.write({apicall_response})\n""") except Exception as e: - self.report.logger.error(f"Error generating content for APICall: {apicall.title}. Error: {str(e)}") + self.report.logger.error( + f"Error generating content for APICall: {apicall.title}. Error: {str(e)}" + ) raise # Add caption if available if apicall.caption: - apicall_content.append(self._format_text(text=apicall.caption, type='caption', text_align="left")) - - self.report.logger.info(f"Successfully generated content for APICall: '{apicall.title}'") + apicall_content.append( + self._format_text( + text=apicall.caption, type="caption", text_align="left" + ) + ) + + self.report.logger.info( + f"Successfully generated content for APICall: '{apicall.title}'" + ) return apicall_content - + def _generate_chatbot_content(self, chatbot) -> List[str]: """ Generate content for a ChatBot component. @@ -635,10 +821,15 @@ def _generate_chatbot_content(self, chatbot) -> List[str]: chatbot_content = [] # Add title - chatbot_content.append(self._format_text(text=chatbot.title, type='header', level=4, color='#2b8cbe')) + chatbot_content.append( + self._format_text( + text=chatbot.title, type="header", level=4, color="#2b8cbe" + ) + ) # Chatbot logic for embedding in the web application - chatbot_content.append(f""" + chatbot_content.append( + f""" def generate_query(messages): response = requests.post( "{chatbot.api_call.api_url}", @@ -693,15 +884,22 @@ def response_generator(msg_content): st.session_state.messages.append(parsed_response) with st.chat_message("assistant"): st.write_stream(response_generator(parsed_response["content"])) - """) + """ + ) # Add caption if available if chatbot.caption: - chatbot_content.append(self._format_text(text=chatbot.caption, type='caption', text_align="left")) + chatbot_content.append( + self._format_text( + text=chatbot.caption, type="caption", text_align="left" + ) + ) - self.report.logger.info(f"Successfully generated content for ChatBot: '{chatbot.title}'") + self.report.logger.info( + f"Successfully generated content for ChatBot: '{chatbot.title}'" + ) return chatbot_content - + def _generate_component_imports(self, component: r.Component) -> List[str]: """ Generate necessary imports for a component of the report. @@ -712,7 +910,7 @@ def _generate_component_imports(self, component: r.Component) -> List[str]: The component for which to generate the required imports. The component can be of type: - PLOT - DATAFRAME - + Returns ------- list : List[str] @@ -720,31 +918,38 @@ def _generate_component_imports(self, component: r.Component) -> List[str]: """ # Dictionary to hold the imports for each component type components_imports = { - 'plot': { - r.PlotType.ALTAIR: ['import json', 'import altair as alt', 'import requests'], - r.PlotType.PLOTLY: ['import json', 'import requests'], - r.PlotType.INTERACTIVE_NETWORK: ['import requests'] + "plot": { + r.PlotType.ALTAIR: [ + "import json", + "import altair as alt", + "import requests", + ], + r.PlotType.PLOTLY: ["import json", "import requests"], + r.PlotType.INTERACTIVE_NETWORK: ["import requests"], }, - 'dataframe': ['import pandas as pd', 'from st_aggrid import AgGrid, GridOptionsBuilder'], - 'markdown': ['import requests'], - 'chatbot': ['import time', 'import json', 'import requests'] + "dataframe": [ + "import pandas as pd", + "from st_aggrid import AgGrid, GridOptionsBuilder", + ], + "markdown": ["import requests"], + "chatbot": ["import time", "import json", "import requests"], } component_type = component.component_type - component_imports = ['import streamlit as st'] + component_imports = ["import streamlit as st"] # Add relevant imports based on component type and visualization tool if component_type == r.ComponentType.PLOT: - plot_type = getattr(component, 'plot_type', None) - if plot_type in components_imports['plot']: - component_imports.extend(components_imports['plot'][plot_type]) + plot_type = getattr(component, "plot_type", None) + if plot_type in components_imports["plot"]: + component_imports.extend(components_imports["plot"][plot_type]) elif component_type == r.ComponentType.MARKDOWN: - component_imports.extend(components_imports['markdown']) + component_imports.extend(components_imports["markdown"]) elif component_type == r.ComponentType.CHATBOT: - component_imports.extend(components_imports['chatbot']) + component_imports.extend(components_imports["chatbot"]) elif component_type == r.ComponentType.DATAFRAME: - component_imports.extend(components_imports['dataframe']) - component_imports.append('df_index = 1') + component_imports.extend(components_imports["dataframe"]) + component_imports.append("df_index = 1") # Return the list of import statements - return component_imports \ No newline at end of file + return component_imports diff --git a/src/vuegen/utils.py b/src/vuegen/utils.py index 356cc11..6c85396 100644 --- a/src/vuegen/utils.py +++ b/src/vuegen/utils.py @@ -49,7 +49,10 @@ def check_path(filepath: str) -> bool: # Check if the path exists return os.path.exists(os.path.abspath(filepath)) -def assert_enum_value(enum_class: Type[StrEnum], value: str, logger: logging.Logger) -> StrEnum: + +def assert_enum_value( + enum_class: Type[StrEnum], value: str, logger: logging.Logger +) -> StrEnum: """ Validate that the given value is a valid member of the specified enumeration class. @@ -76,8 +79,13 @@ def assert_enum_value(enum_class: Type[StrEnum], value: str, logger: logging.Log return enum_class[value.upper()] except KeyError: expected_values = ", ".join([str(e.value) for e in enum_class]) - logger.error(f"Invalid value for {enum_class.__name__}: '{value}'. Expected values are: {expected_values}") - raise ValueError(f"Invalid {enum_class.__name__}: {value}. Expected values are: {expected_values}") + logger.error( + f"Invalid value for {enum_class.__name__}: '{value}'. Expected values are: {expected_values}" + ) + raise ValueError( + f"Invalid {enum_class.__name__}: {value}. Expected values are: {expected_values}" + ) + def is_url(filepath: str) -> bool: """ @@ -87,12 +95,12 @@ def is_url(filepath: str) -> bool: ---------- filepath : str The filepath to check. - + Returns ------- bool - True if the input path is a valid URL, meaning it contains both a scheme - (e.g., http, https, ftp) and a network location (e.g., example.com). + True if the input path is a valid URL, meaning it contains both a scheme + (e.g., http, https, ftp) and a network location (e.g., example.com). Returns False if either the scheme or the network location is missing or invalid. Raises @@ -107,6 +115,7 @@ def is_url(filepath: str) -> bool: parsed_url = urlparse(filepath) return bool(parsed_url.scheme and parsed_url.netloc) + def is_pyvis_html(filepath: str) -> bool: """ Check if the provided HTML file is a Pyvis network visualization. @@ -131,16 +140,17 @@ def is_pyvis_html(filepath: str) -> bool: # Validate both conditions pyvis_identifier_valid = bool(soup.find("div", {"id": "mynetwork"})) - + # Count top-level elements inside body_children = [tag.name for tag in soup.body.find_all(recursive=False)] - + # A pure Pyvis file should contain only "div" and "script" elements in body_structure_valid = set(body_children) <= {"div", "script"} # Both conditions must be true return pyvis_identifier_valid and body_structure_valid + ## FILE_SYSTEM def create_folder(directory_path: str, is_nested: bool = False) -> bool: """ @@ -178,6 +188,7 @@ def create_folder(directory_path: str, is_nested: bool = False) -> bool: except OSError as e: raise OSError(f"Error creating directory '{directory_path}': {e}") + def get_parser(prog_name: str, others: dict = {}) -> argparse.Namespace: """ Initiates argparse.ArgumentParser() and adds common arguments. @@ -211,35 +222,36 @@ def get_parser(prog_name: str, others: dict = {}) -> argparse.Namespace: parser.add_argument( "-c", "--config", - type = str, - default = None, - help = "Path to the YAML configuration file." + type=str, + default=None, + help="Path to the YAML configuration file.", ) parser.add_argument( "-dir", "--directory", - type = str, - default = None, - help = "Path to the directory from which the YAML config will be inferred." + type=str, + default=None, + help="Path to the directory from which the YAML config will be inferred.", ) parser.add_argument( "-rt", "--report_type", - type = str, - default = 'streamlit', - help = "Type of the report to generate (streamlit, html, pdf, docx, odt, revealjs, pptx, or jupyter)." + type=str, + default="streamlit", + help="Type of the report to generate (streamlit, html, pdf, docx, odt, revealjs, pptx, or jupyter).", ) parser.add_argument( - "-st_autorun", + "-st_autorun", "--streamlit_autorun", - action = "store_true", # Automatically sets True if the flag is passed - default = False, - help = "Automatically run the Streamlit app after report generation." + action="store_true", # Automatically sets True if the flag is passed + default=False, + help="Automatically run the Streamlit app after report generation.", ) # Parse arguments return parser + def fetch_file_stream(file_path: str) -> StringIO: """ Fetches a file-like stream from a given file path or URL. @@ -273,14 +285,19 @@ def fetch_file_stream(file_path: str) -> StringIO: response.raise_for_status() # Raise an exception for HTTP errors return StringIO(response.text) except requests.exceptions.RequestException as e: - raise ValueError(f"Error fetching content from URL: {file_path}. Error: {str(e)}") + raise ValueError( + f"Error fetching content from URL: {file_path}. Error: {str(e)}" + ) else: # Handle local file input if not os.path.exists(file_path): - raise FileNotFoundError(f"The file at {file_path} was not found or cannot be accessed.") - with open(file_path, 'r') as file: + raise FileNotFoundError( + f"The file at {file_path} was not found or cannot be accessed." + ) + with open(file_path, "r") as file: return StringIO(file.read()) + ## FILE_CONVERSION def cyjs_to_networkx(file_path: str, name: str = "name", ident: str = "id") -> nx.Graph: """ @@ -312,44 +329,44 @@ def cyjs_to_networkx(file_path: str, name: str = "name", ident: str = "id") -> n """ try: # If file_path is a file-like object (e.g., StringIO), read from it - if hasattr(file_path, 'read'): + if hasattr(file_path, "read"): data = json.load(file_path) else: # Otherwise, assume it's a file path and open the file - with open(file_path, 'r') as json_file: + with open(file_path, "r") as json_file: data = json.load(json_file) if name == ident: raise nx.NetworkXError("name and ident must be different.") - + multigraph = data.get("multigraph", False) directed = data.get("directed", False) - + if multigraph: graph = nx.MultiGraph() else: graph = nx.Graph() - + if directed: graph = graph.to_directed() - + graph.graph = dict(data.get("data", {})) - + # Add nodes with all attributes from the 'data' field of the JSON for d in data["elements"]["nodes"]: node_data = d["data"].copy() node = d["data"].get(ident) # Use 'id' (or other unique identifier) - + if node is None: raise ValueError("Each node must contain an 'id' key.") - + # Optionally include 'name' and 'id' attributes if present if name in d["data"]: node_data[name] = d["data"].get(name) - + graph.add_node(node) graph.nodes[node].update(node_data) - + # Add edges with all attributes from the 'data' field of the JSON for d in data["elements"]["edges"]: edge_data = d["data"].copy() @@ -357,7 +374,7 @@ def cyjs_to_networkx(file_path: str, name: str = "name", ident: str = "id") -> n targ = d["data"].get("target") if sour is None or targ is None: raise ValueError("Each edge must contain 'source' and 'target' keys.") - + if multigraph: key = d["data"].get("key", 0) graph.add_edge(sour, targ, key=key) @@ -366,10 +383,11 @@ def cyjs_to_networkx(file_path: str, name: str = "name", ident: str = "id") -> n graph.add_edge(sour, targ) graph.edges[sour, targ].update(edge_data) return graph - + except KeyError as e: raise ValueError(f"Missing required key in data: {e}") + def pyvishtml_to_networkx(html_file: str) -> nx.Graph: """ Converts a PyVis HTML file to a NetworkX graph. @@ -395,52 +413,59 @@ def pyvishtml_to_networkx(html_file: str) -> nx.Graph: html_content = html_file.getvalue() else: # Otherwise, treat it as a file path - with open(html_file, 'r', encoding='utf-8') as f: + with open(html_file, "r", encoding="utf-8") as f: html_content = f.read() - soup = BeautifulSoup(html_content, 'html.parser') + soup = BeautifulSoup(html_content, "html.parser") # Extract the network data from the JavaScript objects - script_tag = soup.find('script', text=lambda x: x and 'nodes = new vis.DataSet' in x) + script_tag = soup.find( + "script", text=lambda x: x and "nodes = new vis.DataSet" in x + ) if not script_tag: raise ValueError("Could not find network data in the provided HTML file.") - + # Parse the nodes and edges script_text = script_tag.string - nodes_json = json.loads(script_text.split('nodes = new vis.DataSet(')[1].split(');')[0]) - edges_json = json.loads(script_text.split('edges = new vis.DataSet(')[1].split(');')[0]) + nodes_json = json.loads( + script_text.split("nodes = new vis.DataSet(")[1].split(");")[0] + ) + edges_json = json.loads( + script_text.split("edges = new vis.DataSet(")[1].split(");")[0] + ) # Create a NetworkX graph graph = nx.Graph() # Add nodes for node in nodes_json: - node_id = node.pop('id', None) + node_id = node.pop("id", None) if node_id is None: raise ValueError("Node is missing an 'id' attribute.") - + graph.add_node(node_id, **node) # Add edges for edge in edges_json: - source = edge.pop('from') - target = edge.pop('to') + source = edge.pop("from") + target = edge.pop("to") graph.add_edge(source, target, **edge) # Relabel nodes to use 'name' as the identifier, or 'id' if 'name' is unavailable mapping = {} for node_id, data in graph.nodes(data=True): - name = data.get('name') + name = data.get("name") if name: mapping[node_id] = name else: # Fallback to the original ID if no 'name' exists mapping[node_id] = node_id - + graph = nx.relabel_nodes(graph, mapping) return graph + ## CONFIG def load_yaml_config(file_path: str) -> dict: """ @@ -468,7 +493,7 @@ def load_yaml_config(file_path: str) -> dict: raise FileNotFoundError(f"The config file at {file_path} was not found.") # Load the YAML configuration file - with open(file_path, 'r') as file: + with open(file_path, "r") as file: try: config = yaml.safe_load(file) except yaml.YAMLError as exc: @@ -476,6 +501,7 @@ def load_yaml_config(file_path: str) -> dict: return config + def write_yaml_config(yaml_data: dict, directory_path: Path) -> Path: """ Writes the generated YAML structure to a file. @@ -494,7 +520,7 @@ def write_yaml_config(yaml_data: dict, directory_path: Path) -> Path: """ assert isinstance(yaml_data, dict), "YAML data must be a dictionary." assert isinstance(directory_path, Path), "directory_path must be a Path object." - + # Generate the output YAML file path based on the folder name output_yaml = directory_path / (directory_path.name + "_config.yaml") @@ -509,6 +535,7 @@ def write_yaml_config(yaml_data: dict, directory_path: Path) -> Path: # Return the path to the written file return output_yaml + ## LOGGING def get_basename(fname: None | str = None) -> str: """ @@ -633,7 +660,9 @@ def generate_log_filename(folder: str = "logs", suffix: str = "") -> str: return log_filepath -def init_log(filename: str, display: bool = False, logger_id: str | None = None) -> logging.Logger: +def init_log( + filename: str, display: bool = False, logger_id: str | None = None +) -> logging.Logger: """ - Custom python logger configuration (basicConfig()) with two handlers (for stdout and for file) @@ -705,21 +734,22 @@ def get_logger(log_suffix): """ # Generate log file name log_file = generate_log_filename(suffix=log_suffix) - + # Initialize logger logger = init_log(log_file, display=True) - + # Log the path to the log file logger.info(f"Path to log file: {log_file}") return logger + ## REPORT FORMATTING def generate_footer() -> str: """ Generate an HTML footer for a report. - This function creates a styled HTML footer that includes a link to VueGen + This function creates a styled HTML footer that includes a link to VueGen and the Multiomics Network Analytics Group (MoNA). Returns @@ -727,7 +757,7 @@ def generate_footer() -> str: str A formatted HTML string representing the footer. """ - footer = '''