diff --git a/config_template.yml b/config_template.yml index 38383f6..6b65705 100644 --- a/config_template.yml +++ b/config_template.yml @@ -28,6 +28,8 @@ normalize: features: infer mad_robustize_fudge_factor: 0 image_features: true + subgroups: false + subgroup_col: min_cells: 1 normalize_negcon: perform: true @@ -35,6 +37,8 @@ normalize_negcon: features: infer mad_robustize_fudge_factor: 0 image_features: true + subgroups: false + subgroup_col: min_cells: 1 feature_select: perform: true @@ -47,6 +51,7 @@ feature_select: - correlation_threshold - drop_na_columns - blocklist + subgroups: false min_cells: 1 feature_select_negcon: perform: true @@ -59,6 +64,7 @@ feature_select_negcon: - correlation_threshold - drop_na_columns - blocklist + subgroups: false min_cells: 1 quality_control: perform: true diff --git a/profiles/profile.py b/profiles/profile.py index 21c2d2a..0e3d4a8 100644 --- a/profiles/profile.py +++ b/profiles/profile.py @@ -18,6 +18,7 @@ from pycytominer.cyto_utils.cells import SingleCells from pycytominer.cyto_utils import ( get_default_compartments, + output, write_gct, ) from pycytominer import ( @@ -187,7 +188,28 @@ def pipeline_normalize(self, batch, plate, steps, samples, suffix=None): normalization_features = cyto_utils.infer_cp_features( pd.read_csv(annotate_output_file), compartments=self.compartments ) - + if "subgroups" in normalize_steps.keys() and normalize_steps["subgroups"]: + profile_df = pd.read_csv(annotate_output_file) + normed_df = ( + profile_df + .groupby(normalize_steps["subgroup_col"], group_keys=False) + .apply( + lambda x:normalize( + profiles=x, + features=normalization_features, + image_features=image_features, + samples=samples, + method=normalization_method, + float_format=self.pipeline_options["float_format"], + mad_robustize_epsilon=fudge_factor, + ) + ) + ) + output( + normed_df, + output_filename=pathlib.PurePath(output_dir, f"{plate}_subgroup_normalized.csv.gz"), + compression_options=self.pipeline_options["compression"] + ) normalize( profiles=annotate_output_file, features=normalization_features, @@ -211,9 +233,11 @@ def pipeline_feature_select(self, steps, suffix=None,min_cells=1): image_features = feature_select_steps["image_features"] all_plates_df = pd.DataFrame() + sub_all_plates_df = pd.DataFrame() for batch in self.profile_config: batch_df = pd.DataFrame() + sub_batch_df = pd.DataFrame() for plate in self.profile_config[batch]: output_dir = pathlib.PurePath(".", pipeline_output, batch, plate) if suffix: @@ -222,7 +246,14 @@ def pipeline_feature_select(self, steps, suffix=None,min_cells=1): ) feature_select_output_file_plate = pathlib.PurePath( output_dir, - f"{plate}_normalized_feature_select_{suffix}_plate.csv.gz", + f"{plate}_subgroup_normalized_feature_select_{suffix}_plate.csv.gz", + ) + subgroup_normalize_output_file = pathlib.PurePath( + output_dir, f"{plate}_normalized_{suffix}.csv.gz" + ) + subgroup_feature_select_output_file_plate = pathlib.PurePath( + output_dir, + f"{plate}_subgroup_normalized_feature_select_{suffix}_plate.csv.gz", ) else: normalize_output_file = pathlib.PurePath( @@ -231,6 +262,12 @@ def pipeline_feature_select(self, steps, suffix=None,min_cells=1): feature_select_output_file_plate = pathlib.PurePath( output_dir, f"{plate}_normalized_feature_select_plate.csv.gz" ) + subgroup_normalize_output_file = pathlib.PurePath( + output_dir, f"{plate}_subgroup_normalized.csv.gz" + ) + subgroup_feature_select_output_file_plate = pathlib.PurePath( + output_dir, f"{plate}_subgroup_normalized_feature_select_plate.csv.gz" + ) if feature_select_features == "infer" and self.noncanonical: feature_select_features = cyto_utils.infer_cp_features( pd.read_csv(normalize_output_file), @@ -242,6 +279,12 @@ def pipeline_feature_select(self, steps, suffix=None,min_cells=1): .assign(Metadata_batch=batch) .astype({'Metadata_Plate': str}) ) + if "subgroups" in feature_select_steps.keys() and feature_select_steps["subgroups"]: + sub_df = ( + pd.read_csv(subgroup_normalize_output_file) + .assign(Metadata_batch=batch) + .astype({'Metadata_Plate': str}) + ) if level == "plate": df = df.drop(columns=["Metadata_batch"]) @@ -259,11 +302,25 @@ def pipeline_feature_select(self, steps, suffix=None,min_cells=1): float_format=self.pipeline_options["float_format"], samples=fs_samples, ) + if "subgroups" in feature_select_steps.keys() and feature_select_steps["subgroups"]: + sub_df = sub_df.drop(columns=["Metadata_batch"]) + feature_select( + profiles=sub_df, + features=feature_select_features, + image_features=image_features, + operation=feature_select_operations, + output_file=subgroup_feature_select_output_file_plate, + compression_options=self.pipeline_options["compression"], + float_format=self.pipeline_options["float_format"], + ) elif level == "batch": batch_df = concat_dataframes(batch_df, df, image_features) + if "subgroups" in feature_select_steps.keys() and feature_select_steps["subgroups"]: + sub_batch_df = concat_dataframes(sub_batch_df, sub_df, image_features) elif level == "all": all_plates_df = concat_dataframes(all_plates_df, df, image_features) - + if "subgroups" in feature_select_steps.keys() and feature_select_steps["subgroups"]: + sub_all_plates_df = concat_dataframes(sub_all_plates_df, sub_df, image_features) if level == "batch": if min_cells == 1: fs_samples = "all" @@ -276,6 +333,13 @@ def pipeline_feature_select(self, steps, suffix=None,min_cells=1): operation=feature_select_operations, samples=fs_samples, ) + if "subgroups" in feature_select_steps.keys() and feature_select_steps["subgroups"]: + sub_fs_df = feature_select( + profiles=sub_batch_df, + features=feature_select_features, + image_features=image_features, + operation=feature_select_operations, + ) for plate in self.profile_config[batch]: output_dir = pathlib.PurePath(".", pipeline_output, batch, plate) if suffix: @@ -283,25 +347,41 @@ def pipeline_feature_select(self, steps, suffix=None,min_cells=1): output_dir, f"{plate}_normalized_feature_select_{suffix}_batch.csv.gz", ) + sub_feature_select_output_file_batch = pathlib.PurePath( + output_dir, + f"{plate}_subgroup_normalized_feature_select_{suffix}_batch.csv.gz", + ) else: feature_select_output_file_batch = pathlib.PurePath( output_dir, f"{plate}_normalized_feature_select_batch.csv.gz", ) + sub_feature_select_output_file_batch = pathlib.PurePath( + output_dir, + f"{plate}_subgroup_normalized_feature_select_batch.csv.gz", + ) if feature_select_features == "infer" and self.noncanonical: feature_select_features = cyto_utils.infer_cp_features( batch_df, compartments=self.compartments ) df = fs_df.query("Metadata_Plate==@plate").reset_index(drop=True) - df = df.drop(columns=["Metadata_batch"]) - + df = df.drop(columns=["Metadata_batch"]) cyto_utils.output( output_filename=feature_select_output_file_batch, df=df, compression_options=self.pipeline_options["compression"], float_format=self.pipeline_options["float_format"], ) + if "subgroups" in feature_select_steps.keys() and feature_select_steps["subgroups"]: + sub_df = sub_fs_df.query("Metadata_Plate==@plate").reset_index(drop=True) + sub_df = sub_df.drop(columns=["Metadata_batch"]) + cyto_utils.output( + output_filename=sub_feature_select_output_file_batch, + df=sub_df, + compression_options=self.pipeline_options["compression"], + float_format=self.pipeline_options["float_format"], + ) if gct: create_gct_directories(batch) @@ -318,6 +398,18 @@ def pipeline_feature_select(self, steps, suffix=None,min_cells=1): batch, f"{batch}_normalized_feature_select_{suffix}_batch.gct", ) + sub_stacked_file = pathlib.PurePath( + ".", + "gct", + batch, + f"{batch}_subgroup_normalized_feature_select_{suffix}_batch.csv.gz", + ) + sub_gct_file = pathlib.PurePath( + ".", + "gct", + batch, + f"{batch}_subgroup_normalized_feature_select_{suffix}_batch.gct", + ) else: stacked_file = pathlib.PurePath( ".", @@ -331,6 +423,18 @@ def pipeline_feature_select(self, steps, suffix=None,min_cells=1): batch, f"{batch}_normalized_feature_select_batch.gct", ) + sub_stacked_file = pathlib.PurePath( + ".", + "gct", + batch, + f"{batch}_subgroup_normalized_feature_select_batch.csv.gz", + ) + sub_gct_file = pathlib.PurePath( + ".", + "gct", + batch, + f"{batch}_subgroup_normalized_feature_select_batch.gct", + ) cyto_utils.output( output_filename=stacked_file, df=fs_df, @@ -338,6 +442,14 @@ def pipeline_feature_select(self, steps, suffix=None,min_cells=1): float_format=self.pipeline_options["float_format"], ) write_gct(profiles=fs_df, output_file=gct_file) + if "subgroups" in feature_select_steps.keys() and feature_select_steps["subgroups"]: + cyto_utils.output( + output_filename=sub_stacked_file, + df=sub_fs_df, + compression_options=self.pipeline_options["compression"], + float_format=self.pipeline_options["float_format"], + ) + write_gct(profiles=sub_fs_df, output_file=sub_gct_file) if level == "all": if min_cells == 1: @@ -351,10 +463,21 @@ def pipeline_feature_select(self, steps, suffix=None,min_cells=1): operation=feature_select_operations, samples=fs_samples, ) + if "subgroups" in feature_select_steps.keys() and feature_select_steps["subgroups"]: + sub_fs_df = feature_select( + profiles=sub_all_plates_df, + features=feature_select_features, + image_features=image_features, + operation=feature_select_operations, + ) for batch in self.profile_config: fs_batch_df = fs_df.loc[fs_df.Metadata_batch == batch].reset_index( drop=True ) + if "subgroups" in feature_select_steps.keys() and feature_select_steps["subgroups"]: + sub_fs_batch_df = sub_fs_df.loc[sub_fs_df.Metadata_batch == batch].reset_index( + drop=True + ) for plate in self.profile_config[batch]: output_dir = pathlib.PurePath(".", pipeline_output, batch, plate) if suffix: @@ -362,10 +485,17 @@ def pipeline_feature_select(self, steps, suffix=None,min_cells=1): output_dir, f"{plate}_normalized_feature_select_{suffix}_all.csv.gz", ) + sub_feature_select_output_file_all = pathlib.PurePath( + output_dir, + f"{plate}_subgroup_normalized_feature_select_{suffix}_all.csv.gz", + ) else: feature_select_output_file_all = pathlib.PurePath( output_dir, f"{plate}_normalized_feature_select_all.csv.gz" ) + sub_feature_select_output_file_all = pathlib.PurePath( + output_dir, f"{plate}_subgroup_normalized_feature_select_all.csv.gz" + ) if feature_select_features == "infer" and self.noncanonical: feature_select_features = cyto_utils.infer_cp_features( all_plates_df, compartments=self.compartments @@ -383,6 +513,17 @@ def pipeline_feature_select(self, steps, suffix=None,min_cells=1): compression_options=self.pipeline_options["compression"], float_format=self.pipeline_options["float_format"], ) + if "subgroups" in feature_select_steps.keys() and feature_select_steps["subgroups"]: + sub_df = sub_fs_batch_df.query("Metadata_Plate==@plate").reset_index( + drop=True + ) + sub_df = sub_df.drop(columns=["Metadata_batch"]) + cyto_utils.output( + output_filename=sub_feature_select_output_file_all, + df=sub_df, + compression_options=self.pipeline_options["compression"], + float_format=self.pipeline_options["float_format"], + ) if gct: create_gct_directories(batch) @@ -399,6 +540,18 @@ def pipeline_feature_select(self, steps, suffix=None,min_cells=1): batch, f"{batch}_normalized_feature_select_{suffix}_all.gct", ) + sub_stacked_file = pathlib.PurePath( + ".", + "gct", + batch, + f"{batch}_subgroup_normalized_feature_select_{suffix}_all.csv.gz", + ) + sub_gct_file = pathlib.PurePath( + ".", + "gct", + batch, + f"{batch}_subgroup_normalized_feature_select_{suffix}_all.gct", + ) else: stacked_file = pathlib.PurePath( ".", @@ -412,6 +565,18 @@ def pipeline_feature_select(self, steps, suffix=None,min_cells=1): batch, f"{batch}_normalized_feature_select_all.gct", ) + sub_stacked_file = pathlib.PurePath( + ".", + "gct", + batch, + f"{batch}_subgroup_normalized_feature_select_all.csv.gz", + ) + sub_gct_file = pathlib.PurePath( + ".", + "gct", + batch, + f"{batch}_subgroup_normalized_feature_select_all.gct", + ) cyto_utils.output( output_filename=stacked_file, df=fs_batch_df, @@ -419,7 +584,14 @@ def pipeline_feature_select(self, steps, suffix=None,min_cells=1): float_format=self.pipeline_options["float_format"], ) write_gct(profiles=fs_batch_df, output_file=gct_file) - + if "subgroups" in feature_select_steps.keys() and feature_select_steps["subgroups"]: + cyto_utils.output( + output_filename=sub_stacked_file, + df=sub_fs_batch_df, + compression_options=self.pipeline_options["compression"], + float_format=self.pipeline_options["float_format"], + ) + write_gct(profiles=sub_fs_batch_df, output_file=sub_gct_file) def pipeline_quality_control(self, operations): pipeline_output = self.pipeline["output_dir"]