diff --git a/conf/modules.config b/conf/modules.config index beae2c2f..e78ccaa5 100644 --- a/conf/modules.config +++ b/conf/modules.config @@ -528,13 +528,14 @@ process { ] } - withName: SCANPY_RANKGENESGROUPS { - ext.prefix = { meta.id + '_characteristic_genes' } + withName: "RANKGENESGROUPS_.*" { publishDir = [ - path: { "${params.outdir}/per_group/${meta.id}/characteristic_genes" }, + path: { "${params.outdir}/differential_expression" }, mode: params.publish_dir_mode, - saveAs: { filename -> filename.endsWith(".png") || (params.save_intermediates && !filename.equals('versions.yml')) ? filename : null }, + saveAs: { filename -> filename.equals('versions.yml') ? null : filename }, ] + ext.sample_group_col = params.sample_group_col // set to null to avoid sample group comparisons + ext.method = params.rankgenesgroups_method } // Finalize diff --git a/modules/local/adata/merge/main.nf b/modules/local/adata/merge/main.nf index 4e8d31c5..093e81f1 100644 --- a/modules/local/adata/merge/main.nf +++ b/modules/local/adata/merge/main.nf @@ -16,6 +16,7 @@ process ADATA_MERGE { tuple val(meta), path("*_outer.h5ad") , emit: outer tuple val(meta), path("*_inner.h5ad") , emit: inner tuple val(meta), path("*_integrate.h5ad"), emit: integrate + path("celltype_predictions/*.csv") , emit: celltypes path "gene_intersection.pkl" , emit: intersect_genes path "versions.yml" , emit: versions diff --git a/modules/local/adata/merge/templates/merge.py b/modules/local/adata/merge/templates/merge.py index 153f0c15..ea3a8aaa 100644 --- a/modules/local/adata/merge/templates/merge.py +++ b/modules/local/adata/merge/templates/merge.py @@ -73,6 +73,20 @@ def get_columns(adata): adata_outer.write("${prefix}_outer.h5ad") adata_inner.write("${prefix}_inner.h5ad") +# we write the cell type predictions to csv files +os.makedirs("celltype_predictions", exist_ok=True) +for col in adata_outer.obs.columns: + if col.startswith("celltypes__"): + # split the column names into three parts + tool_name = col.split("__")[1] + model_name = "__".join(col.split("__")[2:]) + adata_outer.obs[col].to_csv(f"celltype_predictions/{tool_name}_{model_name}.csv") + +# we have one more column that is the label column +# if there are multiple values, we write it to a csv file +if adata_outer.obs["label"].nunique() > 1: + adata_outer.obs["label"].to_csv("celltype_predictions/label.csv") + if base_path: adata_integrate = adata_inner[~adata_inner.obs.index.isin(adata_base.obs.index)] diff --git a/modules/local/celltypes/celltypist/templates/celltypist.py b/modules/local/celltypes/celltypist/templates/celltypist.py index 90147ee3..2e3df595 100644 --- a/modules/local/celltypes/celltypist/templates/celltypist.py +++ b/modules/local/celltypes/celltypist/templates/celltypist.py @@ -66,12 +66,20 @@ def format_yaml_like(data: dict, indent: int = 0) -> str: adata.obs.index, ["predicted_labels", "conf_score"] ] - df_celltypist.columns = [f"celltypist:{model_name}", f"celltypist:{model_name}:conf"] + df_celltypist.columns = [f"celltypes__celltypist__{model_name}", f"celltypist__{model_name}__conf"] df_list.append(df_celltypist) df_celltypist = pd.concat(df_list, axis=1) df_celltypist.to_pickle("${prefix}.pkl") +# cell type columns starting with celltypes__celltypist__ to a csv file, +df_celltypist.loc[:, df_celltypist.columns.str.startswith("celltypes__celltypist__")].to_csv(f"{prefix}_predictions.csv") + +# confidence scores to a csv file, +df_celltypist.loc[:, df_celltypist.columns.str.startswith("celltypist__")].to_csv(f"{prefix}_predictions_conf.csv") + +df_celltypist = df_celltypist.loc[:, df_celltypist.columns.str.startswith("celltypes__celltypist__")] + adata.obs = pd.concat([adata.obs, df_celltypist], axis=1) adata.write_h5ad(f"{prefix}.h5ad") diff --git a/modules/local/celltypes/singler/main.nf b/modules/local/celltypes/singler/main.nf index 370256bc..f39a752a 100644 --- a/modules/local/celltypes/singler/main.nf +++ b/modules/local/celltypes/singler/main.nf @@ -11,10 +11,11 @@ process CELLTYPES_SINGLER { output: //tuple val(meta), path("*.h5ad"), emit: h5ad - tuple val(meta), path("*.csv") , emit: obs - tuple val(meta), path("*_distribution.pdf"), emit: distribution - tuple val(meta), path("*_heatmap.pdf") , emit: heatmap - path "versions.yml" , emit: versions + tuple val(meta), path("*_predictions.csv") , emit: obs + tuple val(meta), path("*_predictions_conf.csv") , emit: predictions_conf + tuple val(meta), path("*_distribution.pdf") , emit: distribution + tuple val(meta), path("*_heatmap.pdf") , emit: heatmap + path "versions.yml" , emit: versions when: task.ext.when == null || task.ext.when diff --git a/modules/local/celltypes/singler/templates/singleR.R b/modules/local/celltypes/singler/templates/singleR.R index 449f993d..d8fb46e9 100644 --- a/modules/local/celltypes/singler/templates/singleR.R +++ b/modules/local/celltypes/singler/templates/singleR.R @@ -62,7 +62,7 @@ for (ref_idx in seq_along(references)) { reflabel %in% colnames(colData(reference)) ) predictions <- SingleR( - test = assay(sce, 'counts'), + test = assay(sce, 'decontXcounts'), ref = reference, labels = colData(reference)[[reflabel]] ) @@ -101,10 +101,13 @@ for (ref_idx in seq_along(references)) { height = 12 ) + # change columns names + label_col <- which(colnames(predictions) == "pruned.labels") colnames(predictions) <- paste0( - colnames(predictions), "_", prefix, "_", ref_name + "singler__", ref_name, "__", colnames(predictions) ) - prediction_results[[ref]] <- predictions + colnames(predictions)[label_col] <- paste0("celltypes__singler__", ref_name) + prediction_results[[ref_name]] <- predictions } prediction_nrows <- lapply(prediction_results, nrow) @@ -118,14 +121,27 @@ stopifnot( # This is predicated in the assumption that all prediction data frames have exactly # the same rows ... see the stopifnot clause above +# Remove names from the list to prevent them being added as column prefixes +# we handled name collision in the previous loop explicitly +names(prediction_results) <- NULL predictions <- do.call(cbind, prediction_results) +# we write the actual cell type columns to a csv file write.csv( - predictions, + predictions[, grepl("celltypes__singler__", colnames(predictions))], file = paste0(prefix, "_predictions.csv"), row.names = TRUE ) +# write all confidence scores to a csv file +write.csv( + predictions[, !grepl("celltypes__singler__", colnames(predictions))], + file = paste0(prefix, "_predictions_conf.csv"), + row.names = TRUE +) + + + # Capturing version information, as before versions <- list( "${task.process}" = list( diff --git a/modules/local/scanpy/leiden/main.nf b/modules/local/scanpy/leiden/main.nf index 358c188a..d516fff9 100644 --- a/modules/local/scanpy/leiden/main.nf +++ b/modules/local/scanpy/leiden/main.nf @@ -15,6 +15,7 @@ process SCANPY_LEIDEN { output: tuple val(meta), path("${prefix}.h5ad"), emit: h5ad + tuple val(meta), path("${prefix}.csv"), emit: clusters path "${prefix}.pkl", emit: obs path "${prefix}.png", emit: plots, optional: true path "${prefix}_mqc.json", emit: multiqc_files, optional: true diff --git a/modules/local/scanpy/leiden/templates/leiden.py b/modules/local/scanpy/leiden/templates/leiden.py index d285d7e3..0bd01675 100644 --- a/modules/local/scanpy/leiden/templates/leiden.py +++ b/modules/local/scanpy/leiden/templates/leiden.py @@ -28,6 +28,7 @@ sc.tl.leiden(adata, **kwargs) adata.obs[[key_added]].to_pickle(f"{prefix}.pkl") +adata.obs[[key_added]].to_csv(f"{prefix}.csv") adata.write_h5ad(f"{prefix}.h5ad") if "${plot_umap}" == "true": diff --git a/modules/local/scanpy/rankgenesgroups/main.nf b/modules/local/scanpy/rankgenesgroups/main.nf index 9bc04d57..251e63b0 100644 --- a/modules/local/scanpy/rankgenesgroups/main.nf +++ b/modules/local/scanpy/rankgenesgroups/main.nf @@ -8,9 +8,10 @@ process SCANPY_RANKGENESGROUPS { : 'community.wave.seqera.io/library/pyyaml_scanpy:3c9e9f631f45553d'}" input: - tuple val(meta), path(h5ad) + tuple val(meta), path(h5ad), path(cluster_csv) output: + tuple val(meta), path(prefix), emit: outdir tuple val(meta), path("*.h5ad"), emit: h5ad, optional: true path "*.pkl", emit: uns, optional: true path "*.png", emit: plots, optional: true @@ -21,7 +22,8 @@ process SCANPY_RANKGENESGROUPS { task.ext.when == null || task.ext.when script: - obs_key = meta.obs_key ?: "leiden" - prefix = task.ext.prefix ?: "${meta.id}" + prefix = task.ext.prefix ?: cluster_csv.baseName + sample_group_col = task.ext.sample_group_col ?: null + method = task.ext.method ?: 'wilcoxon' template('rank_genes_groups.py') } diff --git a/modules/local/scanpy/rankgenesgroups/templates/rank_genes_groups.py b/modules/local/scanpy/rankgenesgroups/templates/rank_genes_groups.py index 5117b488..35ea8fea 100644 --- a/modules/local/scanpy/rankgenesgroups/templates/rank_genes_groups.py +++ b/modules/local/scanpy/rankgenesgroups/templates/rank_genes_groups.py @@ -1,10 +1,23 @@ #!/usr/bin/env python3 +# In this script, we do the following: +# For each cell group column, +# 1. we do the following fr each cell group recorded in the cell group column: +# - per-group DE vs each other group and vs rest +# - if sample groups are provided, we take out the cells with the cell group, and do per-sample group DE vs each other sample group and vs rest +# 2. If sample groups are provided, we do the following for each sample group: +# - subset the cells with the sample group +# - then compare cell groups within that subset (for this cell_group_col) + import os -import json import platform -import base64 -import pickle +import re +from pathlib import Path +import warnings +import json +from typing import Tuple + +warnings.filterwarnings("ignore", category=RuntimeWarning) os.environ["NUMBA_CACHE_DIR"] = "./tmp/numba" os.environ["MPLCONFIGDIR"] = "./tmp/matplotlib" @@ -15,60 +28,243 @@ import yaml from threadpoolctl import threadpool_limits + threadpool_limits(int("${task.cpus}")) sc.settings.n_jobs = int("${task.cpus}") -adata = sc.read_h5ad("${h5ad}") -prefix = "${prefix}" -kwargs = { - "groupby": "${obs_key}", - "pts": True -} +def sanitize_filename(filename: str) -> str: + """ + Replaces invalid characters in a filename with underscores. + """ + # Build the pattern programmatically to avoid relying on backslash escapes + # that could be altered by Nextflow template processing. + invalid_specials = re.escape('<>:"/' + chr(92) + '|?*') + control_chars = ''.join(chr(c) for c in range(0x00, 0x20)) + invalid_chars_pattern = f'[{invalid_specials}{control_chars}]' + sanitized_filename = re.sub(invalid_chars_pattern, "_", str(filename)) + sanitized_filename = sanitized_filename.strip(" .") + if not sanitized_filename: + sanitized_filename = "untitled" + return sanitized_filename + + +def ensure_categorical_str(adata_obj: sc.AnnData, column: str) -> None: + """ + Ensures the specified column in the AnnData object is a categorical string. + """ + adata_obj.obs[column] = adata_obj.obs[column].astype(str).astype("category") + + +def valid_groups(adata_obj: sc.AnnData, column: str, min_cells: int = 3) -> list[str]: + """ + Returns the valid groups in the specified column of the AnnData object that have at least min_cells cells + """ + vc = adata_obj.obs[column].astype(str).value_counts() + return vc[vc >= min_cells].index.astype("str").tolist() + + +# (No JSON loading/saving helpers; we keep results in-memory and write a single +# combined JSON file at the start and end of the run.) -if adata.obs["${obs_key}"].value_counts().size > 1: - sc.pp.log1p(adata) - sc.tl.rank_genes_groups(adata, **kwargs) - rgg_dict = adata.uns["rank_genes_groups"] +def run_and_save_de(adata_obj: sc.AnnData, groupby: str, group: str, reference, out_dir: Path, method: str) -> Tuple[str, str]: + """ + Runs differential expression analysis for the specified group and reference group, and saves the results to the specified output directory. + """ + out_dir.mkdir(parents=True, exist_ok=True) + # Run differential expression analysis + sc.tl.rank_genes_groups( + adata_obj, + use_raw=False, + groupby=groupby, + groups=[group], + reference=reference, + pts=True, + method=method, + ) + # Get the results of the differential expression analysis + rgg_df = sc.get.rank_genes_groups_df(adata_obj, group=None) + # Create a standardized filename for the output files + ref_name = reference if isinstance(reference, str) else str(reference) + basename = f"{sanitize_filename(group)}_vs_{sanitize_filename(ref_name)}.csv" + csv_path = out_dir / f"{basename}.csv" + png_path = out_dir / f"{basename}.png" + rgg_df.to_csv(csv_path, index=False) + sc.pl.rank_genes_groups(adata_obj, show=False) + plt.savefig(png_path) + plt.close() - pickle.dump(rgg_dict, open(f"{prefix}.pkl", "wb")) - adata.write_h5ad(f"{prefix}.h5ad") + return (csv_path, png_path) - # Plot - sc.pl.rank_genes_groups(adata, show=False) - path = f"{prefix}.png" - plt.savefig(path) +adata = sc.read_h5ad("${h5ad}") +sample_group_col = "${sample_group_col}" +method = "${method}" + +cell_groups_csv = "${cluster_csv}" +cell_groups_df = pd.read_csv(cell_groups_csv, index_col=0) +outdir = Path("${prefix}") +outdir.mkdir(parents=True, exist_ok=True) - # MultiQC - with open(path, "rb") as f_plot, open("${prefix}_mqc.json", "w") as f_json: - image_string = base64.b64encode(f_plot.read()).decode("utf-8") - image_html = f'
' +# Align order +cell_groups_df = cell_groups_df[cell_groups_df.index.isin(adata.obs.index)] +adata = adata[adata.obs.index.isin(cell_groups_df.index)] +cell_groups_df = cell_groups_df.reindex(adata.obs.index) +cell_group_cols = list(cell_groups_df.columns) - custom_json = { - "id": "${prefix}", - "parent_id": "${meta.integration}", - "parent_name": "${meta.integration}", - "parent_description": "Results of the ${meta.integration} integration.", +# ensure there are cells left +if adata.n_obs == 0 or cell_groups_df.shape[0] == 0: + raise ValueError(f"No cells left after aligning adata and cluster_csv") - "section_name": "${meta.id} characteristic genes", - "plot_type": "image", - "data": image_html, - } +# Add grouping columns to adata.obs +for cell_group_col in cell_group_cols: + if cell_group_col not in adata.obs.columns: + adata.obs[cell_group_col] = cell_groups_df[cell_group_col].astype("str").astype("category") - json.dump(custom_json, f_json) +has_sample_groups = sample_group_col not in (None, "", "null") +if has_sample_groups: + if sample_group_col not in adata.obs.columns: + raise ValueError(f"sample_group_col '{sample_group_col}' not found in adata.obs") + # if all cells have the same sample group, we skip the sample group comparisons + if adata.obs[sample_group_col].nunique() == 1: + has_sample_groups = False + print(f"All cells are in the same sample group.") else: - print("Skipping rank_genes_groups computation as the group has less than 2 unique values.") + print(f"No sample group column provided.") +if not has_sample_groups: + print(f"Skipping sample group comparisons.") + +# Maintain three in-memory dictionaries for results; we will emit a single JSON +# file that contains them all at the beginning and end of the run. +results_all_cells: dict = {c: {} for c in cell_group_cols} +# only initialize the sample group within cell group dictionary if sample groups are provided +results_sample_within_cell = ({c: {} for c in cell_group_cols} if has_sample_groups else None) +results_cell_within_sample = ({c: {} for c in cell_group_cols} if has_sample_groups else None) + +for cell_group_col in cell_group_cols: + print(f"Differential analysis for cell group column: {cell_group_col}") + ensure_categorical_str(adata, cell_group_col) + col_outdir = outdir / sanitize_filename(cell_group_col) + + # keep only the groups that have at least 3 cells + groups = valid_groups(adata, cell_group_col, min_cells=3) + # if there are fewer than 2 groups with at least 3 cells, we skip the differential analysis + if len(groups) < 2: + print(f"\t- Skipping {cell_group_col}: fewer than 2 groups with ≥3 cells") + continue + + for group in groups: + # ------------------------------------------------------------ + # 1) For each cell group column: per-group DE vs each other group and vs rest + # ------------------------------------------------------------ + print(f"\t- Processing cell group '{group}'") + group_outdir = col_outdir / "cell_groups" / sanitize_filename(group) + + # Pairwise comparisons + for other in [g for g in groups if g != group]: + print(f"\t\t- {group} vs {other}") + csv_path, png_path = run_and_save_de(adata, cell_group_col, group, other, group_outdir, method) + results_all_cells.setdefault(cell_group_col, {}).setdefault(group, {})[other] = { + "csv_path": str(csv_path), + "png_path": str(png_path), + } + # Versus rest (only meaningful if >2 groups) + if len(groups) > 2: + other = "rest" + print(f"\t\t- {group} vs {other}") + csv_path, png_path = run_and_save_de(adata, cell_group_col, group, other, group_outdir, method) + results_all_cells.setdefault(cell_group_col, {}).setdefault(group, {})[other] = { + "csv_path": str(csv_path), + "png_path": str(png_path), + } -# Versions + # ------------------------------------------------------------ + # 2) For each cell group column: per-sample group DE vs each other sample group and vs rest + # ------------------------------------------------------------ + if has_sample_groups: + # 2) Within each cell group, compare sample groups (if provided) + subset = adata[adata.obs[cell_group_col].astype(str) == group].copy() + ensure_categorical_str(subset, cell_group_col) + ensure_categorical_str(subset, sample_group_col) + + sample_groups = valid_groups(subset, sample_group_col, min_cells=3) + if len(sample_groups) < 2: + print("\t\t- Skipping within-group sample comparisons: fewer than 2 sample groups with ≥3 cells") + continue + for sg in sample_groups: + print(f"\t\t- Sample group '{sg}' within cell group '{group}'") + sg_outdir = group_outdir / sanitize_filename(sg) + for other in [x for x in sample_groups if x != sg]: + print(f"\t\t\t- {sg} vs {other}") + csv_path, png_path = run_and_save_de(subset, sample_group_col, sg, other, sg_outdir, method) + results_sample_within_cell.setdefault(cell_group_col, {}).setdefault(group, {}).setdefault(sg, {})[other] = { + "csv_path": str(csv_path), + "png_path": str(png_path), + } + if len(sample_groups) > 2: + other = "rest" + print(f"\t\t\t- {sg} vs {other}") + csv_path, png_path = run_and_save_de(subset, sample_group_col, sg, other, sg_outdir, method) + results_sample_within_cell.setdefault(cell_group_col, {}).setdefault(group, {}).setdefault(sg, {})[other] = { + "csv_path": str(csv_path), + "png_path": str(png_path), + } + + # ------------------------------------------------------------ + # 3) For each sample group: subset, then compare cell groups within that subset (for this cell_group_col) + # ------------------------------------------------------------ + if has_sample_groups: + all_sample_groups = valid_groups(adata, sample_group_col, min_cells=3) + for sg in all_sample_groups: + print(f"\t- Processing sample group subset for column '{cell_group_col}': {sg}") + subset = adata[adata.obs[sample_group_col].astype(str) == sg].copy() + ensure_categorical_str(subset, sample_group_col) + ensure_categorical_str(subset, cell_group_col) + cg_sample_dir = col_outdir / "sample_groups" / sanitize_filename(sg) + groups_in_subset = valid_groups(subset, cell_group_col, min_cells=3) + if len(groups_in_subset) < 2: + print(f"\t\t- Skipping {cell_group_col} in sample '{sg}': fewer than 2 groups with ≥3 cells") + continue + for g in groups_in_subset: + g_dir = cg_sample_dir / sanitize_filename(g) + for other in [x for x in groups_in_subset if x != g]: + print(f"\t\t\t- {g} vs {other} within sample '{sg}'") + csv_path, png_path = run_and_save_de(subset, cell_group_col, g, other, g_dir, method) + results_cell_within_sample.setdefault(cell_group_col, {}).setdefault(sg, {}).setdefault(g, {})[other] = { + "csv_path": str(csv_path), + "png_path": str(png_path), + } + if len(groups_in_subset) > 2: + print(f"\t\t\t- {g} vs rest within sample '{sg}'") + csv_path, png_path = run_and_save_de(subset, cell_group_col, g, "rest", g_dir, method) + results_cell_within_sample.setdefault(cell_group_col, {}).setdefault(sg, {}).setdefault(g, {})["rest"] = { + "csv_path": str(csv_path), + "png_path": str(png_path), + } + +# Write an initial combined JSON snapshot +combined_results = { + "cell_groups_all_cells": results_all_cells, + "sample_groups_within_each_cell_group": results_sample_within_cell, + "cell_groups_within_each_sample_group": results_cell_within_sample, +} +with open(outdir / "differential_expression_results.json", "w") as f: + json.dump(combined_results, f, indent=2) versions = { "${task.process}": { "python": platform.python_version(), "scanpy": sc.__version__, - "pandas": pd.__version__ - } + "pandas": pd.__version__, + }, } with open("versions.yml", "w") as f: yaml.dump(versions, f) + +# Write final combined JSON snapshot at end of script +final_combined_results = { + "cell_group_all_cells": results_all_cells, + "sample_group_within_cell_group": results_sample_within_cell, + "cell_group_within_sample_group": results_cell_within_sample, +} diff --git a/nextflow.config b/nextflow.config index 8a01e97f..8874bb27 100644 --- a/nextflow.config +++ b/nextflow.config @@ -50,6 +50,10 @@ params { celltypist_model = '' celldex_reference = '' + // Differential expression options + rankgenesgroups_method = 'wilcoxon' + sample_group_col = null + // Pipeline options qc_only = false skip_liana = false diff --git a/nextflow_schema.json b/nextflow_schema.json index ee3c0332..ba02790e 100644 --- a/nextflow_schema.json +++ b/nextflow_schema.json @@ -221,6 +221,28 @@ } } }, + + "differential_expression_options": { + "title": "Tool options", + "type": "object", + "fa_icon": "fas fa-tools", + "description": "Options for various tools used in the pipeline.", + "properties": { + "sample_group_col": { + "type": "string", + "description": "The column in the AnnData object that contains the sample group information", + "help_text": "The column in the AnnData object that contains the sample group information", + "pattern": "^([a-zA-Z0-9_]*(,[a-zA-Z0-9_]*)*)?$" + }, + "rankgenesgroups_method": { + "type": "string", + "default": "wilcoxon", + "description": "Method to use for the rank_genes_groups step", + "help_text": "Method to use for the rank_genes_groups step. Available methods are: 'logreg', 't-test', 'wilcoxon', 't-test_overestim_var'", + "enum": ["logreg", "t-test", "wilcoxon", "t-test_overestim_var"] + } + } + }, "tool_options": { "title": "Tool options", "type": "object", @@ -540,6 +562,9 @@ { "$ref": "#/$defs/clustering_options" }, + { + "$ref": "#/$defs/differential_expression_options" + }, { "$ref": "#/$defs/tool_options" }, diff --git a/subworkflows/local/cluster/main.nf b/subworkflows/local/cluster/main.nf index debea2ed..6d7dbeae 100644 --- a/subworkflows/local/cluster/main.nf +++ b/subworkflows/local/cluster/main.nf @@ -19,6 +19,7 @@ workflow CLUSTER { ch_obsm = channel.empty() ch_multiqc_files = channel.empty() ch_h5ad = channel.empty() + ch_clusters = channel.empty() if (global) { ch_h5ad = ch_h5ad.mix(ch_input.map { meta, h5ad -> [meta + [subset: "global"], h5ad] }) @@ -71,7 +72,7 @@ workflow CLUSTER { ch_obs = ch_obs.mix(LEIDEN.out.obs) ch_h5ad_clustering = LEIDEN.out.h5ad ch_multiqc_files = ch_multiqc_files.mix(LEIDEN.out.multiqc_files) - + ch_clusters = ch_clusters.mix(LEIDEN.out.clusters) ch_entropy = LEIDEN.out.h5ad.multiMap { meta, h5ad -> h5ad: [meta, h5ad] group_col: meta.id + "_leiden" @@ -87,6 +88,7 @@ workflow CLUSTER { obsm = ch_obsm // channel: [ pkl ] h5ad_neighbors = ch_h5ad_neighbours // channel: [ integration, h5ad ] h5ad_clustering = ch_h5ad_clustering // channel: [ integration, h5ad ] + clusters = ch_clusters // channel: [ csv ] multiqc_files = ch_multiqc_files // channel: [ json ] versions = ch_versions // channel: [ versions.yml ] } diff --git a/subworkflows/local/combine.nf b/subworkflows/local/combine.nf index 2346f36c..1ed1f3b3 100644 --- a/subworkflows/local/combine.nf +++ b/subworkflows/local/combine.nf @@ -14,6 +14,7 @@ workflow COMBINE { ch_obs = channel.empty() ch_var = channel.empty() ch_obsm = channel.empty() + ch_celltypes = channel.empty() ADATA_MERGE( ch_h5ad.map { _meta, h5ad -> [[id: "merged"], h5ad] }.groupTuple(), @@ -23,6 +24,7 @@ workflow COMBINE { ch_outer = ADATA_MERGE.out.outer ch_inner = ADATA_MERGE.out.inner ch_versions = ch_versions.mix(ADATA_MERGE.out.versions) + ch_celltypes = ADATA_MERGE.out.celltypes INTEGRATE( ADATA_MERGE.out.integrate, @@ -64,6 +66,7 @@ workflow COMBINE { emit: h5ad = ch_outer // channel: [ merged, h5ad ] h5ad_inner = ch_inner // channel: [ merged, h5ad ] + celltypes = ch_celltypes // channel: [ csv ] integrations = ch_integrations // channel: [ integration, h5ad ] var = ch_var // channel: [ pkl ] obs = ch_obs // channel: [ pkl ] diff --git a/subworkflows/local/differential_expression.nf b/subworkflows/local/differential_expression.nf new file mode 100644 index 00000000..ac622ff0 --- /dev/null +++ b/subworkflows/local/differential_expression.nf @@ -0,0 +1,41 @@ +include { SCANPY_RANKGENESGROUPS as RANKGENESGROUPS_CELLTYPES } from '../../modules/local/scanpy/rankgenesgroups' +include { SCANPY_RANKGENESGROUPS as RANKGENESGROUPS_CLUSTERS } from '../../modules/local/scanpy/rankgenesgroups' + + +workflow DIFFERENTIAL_EXPRESSION { + take: + ch_h5ad // channel: [ meta, h5ad ] + ch_celltypes // channel: [ celltype ] + ch_clusters // channel: [ meta, cluster ] + + main: + ch_versions = Channel.empty() + ch_uns = Channel.empty() + ch_multiqc_files = Channel.empty() + ch_outdirs = Channel.empty() + + ch_input_celltypes = ch_h5ad.merge(ch_celltypes.flatten()) + RANKGENESGROUPS_CELLTYPES(ch_input_celltypes) + ch_outdirs = ch_outdirs.mix(RANKGENESGROUPS_CELLTYPES.out.outdir) + ch_versions = ch_versions.mix(RANKGENESGROUPS_CELLTYPES.out.versions) + ch_uns = ch_uns.mix(RANKGENESGROUPS_CELLTYPES.out.uns) + ch_multiqc_files = ch_multiqc_files.mix(RANKGENESGROUPS_CELLTYPES.out.multiqc_files) + + ch_input_clusters = ch_h5ad + .map { meta, h5ad -> [meta.integration, h5ad] } + .combine(ch_clusters.map {meta, cluster -> [meta.integration, cluster]}, by: 0) + .map { id, h5ad, cluster -> [[id: id], h5ad, cluster] } + + RANKGENESGROUPS_CLUSTERS(ch_input_clusters) + ch_outdirs = ch_outdirs.mix(RANKGENESGROUPS_CLUSTERS.out.outdir) + ch_versions = ch_versions.mix(RANKGENESGROUPS_CLUSTERS.out.versions) + ch_uns = ch_uns.mix(RANKGENESGROUPS_CLUSTERS.out.uns) + ch_multiqc_files = ch_multiqc_files.mix(RANKGENESGROUPS_CLUSTERS.out.multiqc_files) + + emit: + outdirs = ch_outdirs // channel: [ outdir ] + uns = ch_uns // channel: [ pkl ] + multiqc_files = ch_multiqc_files // channel: [ json ] + versions = ch_versions // channel: [ versions.yml ] + +} diff --git a/subworkflows/local/per_group.nf b/subworkflows/local/per_group.nf index 940cf801..2a7ae494 100644 --- a/subworkflows/local/per_group.nf +++ b/subworkflows/local/per_group.nf @@ -28,13 +28,6 @@ workflow PER_GROUP { ch_uns = ch_uns.mix(LIANA_RANKAGGREGATE.out.uns) } - if (!params.skip_rankgenesgroups) { - SCANPY_RANKGENESGROUPS(ch_no_neighbors) - ch_versions = ch_versions.mix(SCANPY_RANKGENESGROUPS.out.versions) - ch_uns = ch_uns.mix(SCANPY_RANKGENESGROUPS.out.uns) - ch_multiqc_files = ch_multiqc_files.mix(SCANPY_RANKGENESGROUPS.out.multiqc_files) - } - emit: uns = ch_uns // channel: [ pkl ] multiqc_files = ch_multiqc_files // channel: [ json ] diff --git a/workflows/scdownstream.nf b/workflows/scdownstream.nf index 0b2221cd..f9e84b39 100644 --- a/workflows/scdownstream.nf +++ b/workflows/scdownstream.nf @@ -19,6 +19,7 @@ include { paramsSummaryMap } from 'plugin/nf-schema' include { paramsSummaryMultiqc } from '../subworkflows/nf-core/utils_nfcore_pipeline' include { softwareVersionsToYAML } from '../subworkflows/nf-core/utils_nfcore_pipeline' include { methodsDescriptionText } from '../subworkflows/local/utils_nfcore_scdownstream_pipeline' +include { DIFFERENTIAL_EXPRESSION } from '../subworkflows/local/differential_expression' /* ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ @@ -42,6 +43,8 @@ workflow SCDOWNSTREAM { ch_uns = channel.empty() ch_layers = channel.empty() ch_multiqc_files = channel.empty() + ch_clusters = channel.empty() + ch_celltypes = channel.empty() if (params.input) { ch_obs_per_sample = channel.empty() @@ -98,6 +101,7 @@ workflow SCDOWNSTREAM { ch_obsm = ch_obsm.mix(COMBINE.out.obsm) ch_integrations = ch_integrations.mix(COMBINE.out.integrations) ch_finalization_base = COMBINE.out.h5ad + ch_celltypes = ch_celltypes.mix(COMBINE.out.celltypes) ch_label_grouping = COMBINE.out.h5ad_inner grouping_col = "label" @@ -134,6 +138,7 @@ workflow SCDOWNSTREAM { ch_obs = ch_obs.mix(CLUSTER.out.obs) ch_obsm = ch_obsm.mix(CLUSTER.out.obsm) ch_multiqc_files = ch_multiqc_files.mix(CLUSTER.out.multiqc_files) + ch_clusters = ch_clusters.mix(CLUSTER.out.clusters) if (params.pseudobulk) { PSEUDOBULKING( @@ -145,6 +150,8 @@ workflow SCDOWNSTREAM { ch_versions = ch_versions.mix(PSEUDOBULKING.out.versions) } + ch_h5ad_both = CLUSTER.out.h5ad_clustering.map { meta, h5ad -> [meta + [obs_key: "${meta.id}_leiden"], h5ad] } + PER_GROUP( CLUSTER.out.h5ad_clustering.map { meta, h5ad -> [meta + [obs_key: "${meta.id}_leiden"], h5ad] }, CLUSTER.out.h5ad_neighbors.map { meta, h5ad -> [meta + [obs_key: grouping_col], h5ad] }, @@ -154,6 +161,18 @@ workflow SCDOWNSTREAM { ch_uns = ch_uns.mix(PER_GROUP.out.uns) ch_multiqc_files = ch_multiqc_files.mix(PER_GROUP.out.multiqc_files) + + if (!params.skip_rankgenesgroups) { + DIFFERENTIAL_EXPRESSION( + ch_integrations, + ch_celltypes, + ch_clusters, + ) + ch_versions = ch_versions.mix(DIFFERENTIAL_EXPRESSION.out.versions) + ch_uns = ch_uns.mix(DIFFERENTIAL_EXPRESSION.out.uns) + ch_multiqc_files = ch_multiqc_files.mix(DIFFERENTIAL_EXPRESSION.out.multiqc_files) + } + FINALIZE(ch_finalization_base, ch_obs, ch_var, ch_obsm, ch_obsp, ch_uns, ch_layers) ch_versions = ch_versions.mix(FINALIZE.out.versions) }