Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions config_template.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,17 @@ normalize:
features: infer
mad_robustize_fudge_factor: 0
image_features: true
subgroups: false
subgroup_col: <Column to subdivide on>
min_cells: 1
normalize_negcon:
perform: true
method: mad_robustize
features: infer
mad_robustize_fudge_factor: 0
image_features: true
subgroups: false
subgroup_col: <Column to subdivide on>
min_cells: 1
feature_select:
perform: true
Expand All @@ -47,6 +51,7 @@ feature_select:
- correlation_threshold
- drop_na_columns
- blocklist
subgroups: false
min_cells: 1
feature_select_negcon:
perform: true
Expand All @@ -59,6 +64,7 @@ feature_select_negcon:
- correlation_threshold
- drop_na_columns
- blocklist
subgroups: false
min_cells: 1
quality_control:
perform: true
Expand Down
184 changes: 178 additions & 6 deletions profiles/profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from pycytominer.cyto_utils.cells import SingleCells
from pycytominer.cyto_utils import (
get_default_compartments,
output,
write_gct,
)
from pycytominer import (
Expand Down Expand Up @@ -187,7 +188,28 @@ def pipeline_normalize(self, batch, plate, steps, samples, suffix=None):
normalization_features = cyto_utils.infer_cp_features(
pd.read_csv(annotate_output_file), compartments=self.compartments
)

if "subgroups" in normalize_steps.keys() and normalize_steps["subgroups"]:
profile_df = pd.read_csv(annotate_output_file)
normed_df = (
profile_df
.groupby(normalize_steps["subgroup_col"], group_keys=False)
.apply(
lambda x:normalize(
profiles=x,
features=normalization_features,
image_features=image_features,
samples=samples,
method=normalization_method,
float_format=self.pipeline_options["float_format"],
mad_robustize_epsilon=fudge_factor,
)
)
)
output(
normed_df,
output_filename=pathlib.PurePath(output_dir, f"{plate}_subgroup_normalized.csv.gz"),
compression_options=self.pipeline_options["compression"]
)
normalize(
profiles=annotate_output_file,
features=normalization_features,
Expand All @@ -211,9 +233,11 @@ def pipeline_feature_select(self, steps, suffix=None,min_cells=1):
image_features = feature_select_steps["image_features"]

all_plates_df = pd.DataFrame()
sub_all_plates_df = pd.DataFrame()

for batch in self.profile_config:
batch_df = pd.DataFrame()
sub_batch_df = pd.DataFrame()
for plate in self.profile_config[batch]:
output_dir = pathlib.PurePath(".", pipeline_output, batch, plate)
if suffix:
Expand All @@ -222,7 +246,14 @@ def pipeline_feature_select(self, steps, suffix=None,min_cells=1):
)
feature_select_output_file_plate = pathlib.PurePath(
output_dir,
f"{plate}_normalized_feature_select_{suffix}_plate.csv.gz",
f"{plate}_subgroup_normalized_feature_select_{suffix}_plate.csv.gz",
)
subgroup_normalize_output_file = pathlib.PurePath(
output_dir, f"{plate}_normalized_{suffix}.csv.gz"
)
subgroup_feature_select_output_file_plate = pathlib.PurePath(
output_dir,
f"{plate}_subgroup_normalized_feature_select_{suffix}_plate.csv.gz",
)
else:
normalize_output_file = pathlib.PurePath(
Expand All @@ -231,6 +262,12 @@ def pipeline_feature_select(self, steps, suffix=None,min_cells=1):
feature_select_output_file_plate = pathlib.PurePath(
output_dir, f"{plate}_normalized_feature_select_plate.csv.gz"
)
subgroup_normalize_output_file = pathlib.PurePath(
output_dir, f"{plate}_subgroup_normalized.csv.gz"
)
subgroup_feature_select_output_file_plate = pathlib.PurePath(
output_dir, f"{plate}_subgroup_normalized_feature_select_plate.csv.gz"
)
if feature_select_features == "infer" and self.noncanonical:
feature_select_features = cyto_utils.infer_cp_features(
pd.read_csv(normalize_output_file),
Expand All @@ -242,6 +279,12 @@ def pipeline_feature_select(self, steps, suffix=None,min_cells=1):
.assign(Metadata_batch=batch)
.astype({'Metadata_Plate': str})
)
if "subgroups" in feature_select_steps.keys() and feature_select_steps["subgroups"]:
sub_df = (
pd.read_csv(subgroup_normalize_output_file)
.assign(Metadata_batch=batch)
.astype({'Metadata_Plate': str})
)

if level == "plate":
df = df.drop(columns=["Metadata_batch"])
Expand All @@ -259,11 +302,25 @@ def pipeline_feature_select(self, steps, suffix=None,min_cells=1):
float_format=self.pipeline_options["float_format"],
samples=fs_samples,
)
if "subgroups" in feature_select_steps.keys() and feature_select_steps["subgroups"]:
sub_df = sub_df.drop(columns=["Metadata_batch"])
feature_select(
profiles=sub_df,
features=feature_select_features,
image_features=image_features,
operation=feature_select_operations,
output_file=subgroup_feature_select_output_file_plate,
compression_options=self.pipeline_options["compression"],
float_format=self.pipeline_options["float_format"],
)
elif level == "batch":
batch_df = concat_dataframes(batch_df, df, image_features)
if "subgroups" in feature_select_steps.keys() and feature_select_steps["subgroups"]:
sub_batch_df = concat_dataframes(sub_batch_df, sub_df, image_features)
elif level == "all":
all_plates_df = concat_dataframes(all_plates_df, df, image_features)

if "subgroups" in feature_select_steps.keys() and feature_select_steps["subgroups"]:
sub_all_plates_df = concat_dataframes(sub_all_plates_df, sub_df, image_features)
if level == "batch":
if min_cells == 1:
fs_samples = "all"
Expand All @@ -276,32 +333,55 @@ def pipeline_feature_select(self, steps, suffix=None,min_cells=1):
operation=feature_select_operations,
samples=fs_samples,
)
if "subgroups" in feature_select_steps.keys() and feature_select_steps["subgroups"]:
sub_fs_df = feature_select(
profiles=sub_batch_df,
features=feature_select_features,
image_features=image_features,
operation=feature_select_operations,
)
for plate in self.profile_config[batch]:
output_dir = pathlib.PurePath(".", pipeline_output, batch, plate)
if suffix:
feature_select_output_file_batch = pathlib.PurePath(
output_dir,
f"{plate}_normalized_feature_select_{suffix}_batch.csv.gz",
)
sub_feature_select_output_file_batch = pathlib.PurePath(
output_dir,
f"{plate}_subgroup_normalized_feature_select_{suffix}_batch.csv.gz",
)
else:
feature_select_output_file_batch = pathlib.PurePath(
output_dir,
f"{plate}_normalized_feature_select_batch.csv.gz",
)
sub_feature_select_output_file_batch = pathlib.PurePath(
output_dir,
f"{plate}_subgroup_normalized_feature_select_batch.csv.gz",
)
if feature_select_features == "infer" and self.noncanonical:
feature_select_features = cyto_utils.infer_cp_features(
batch_df, compartments=self.compartments
)

df = fs_df.query("Metadata_Plate==@plate").reset_index(drop=True)
df = df.drop(columns=["Metadata_batch"])

df = df.drop(columns=["Metadata_batch"])
cyto_utils.output(
output_filename=feature_select_output_file_batch,
df=df,
compression_options=self.pipeline_options["compression"],
float_format=self.pipeline_options["float_format"],
)
if "subgroups" in feature_select_steps.keys() and feature_select_steps["subgroups"]:
sub_df = sub_fs_df.query("Metadata_Plate==@plate").reset_index(drop=True)
sub_df = sub_df.drop(columns=["Metadata_batch"])
cyto_utils.output(
output_filename=sub_feature_select_output_file_batch,
df=sub_df,
compression_options=self.pipeline_options["compression"],
float_format=self.pipeline_options["float_format"],
)

if gct:
create_gct_directories(batch)
Expand All @@ -318,6 +398,18 @@ def pipeline_feature_select(self, steps, suffix=None,min_cells=1):
batch,
f"{batch}_normalized_feature_select_{suffix}_batch.gct",
)
sub_stacked_file = pathlib.PurePath(
".",
"gct",
batch,
f"{batch}_subgroup_normalized_feature_select_{suffix}_batch.csv.gz",
)
sub_gct_file = pathlib.PurePath(
".",
"gct",
batch,
f"{batch}_subgroup_normalized_feature_select_{suffix}_batch.gct",
)
else:
stacked_file = pathlib.PurePath(
".",
Expand All @@ -331,13 +423,33 @@ def pipeline_feature_select(self, steps, suffix=None,min_cells=1):
batch,
f"{batch}_normalized_feature_select_batch.gct",
)
sub_stacked_file = pathlib.PurePath(
".",
"gct",
batch,
f"{batch}_subgroup_normalized_feature_select_batch.csv.gz",
)
sub_gct_file = pathlib.PurePath(
".",
"gct",
batch,
f"{batch}_subgroup_normalized_feature_select_batch.gct",
)
cyto_utils.output(
output_filename=stacked_file,
df=fs_df,
compression_options=self.pipeline_options["compression"],
float_format=self.pipeline_options["float_format"],
)
write_gct(profiles=fs_df, output_file=gct_file)
if "subgroups" in feature_select_steps.keys() and feature_select_steps["subgroups"]:
cyto_utils.output(
output_filename=sub_stacked_file,
df=sub_fs_df,
compression_options=self.pipeline_options["compression"],
float_format=self.pipeline_options["float_format"],
)
write_gct(profiles=sub_fs_df, output_file=sub_gct_file)

if level == "all":
if min_cells == 1:
Expand All @@ -351,21 +463,39 @@ def pipeline_feature_select(self, steps, suffix=None,min_cells=1):
operation=feature_select_operations,
samples=fs_samples,
)
if "subgroups" in feature_select_steps.keys() and feature_select_steps["subgroups"]:
sub_fs_df = feature_select(
profiles=sub_all_plates_df,
features=feature_select_features,
image_features=image_features,
operation=feature_select_operations,
)
for batch in self.profile_config:
fs_batch_df = fs_df.loc[fs_df.Metadata_batch == batch].reset_index(
drop=True
)
if "subgroups" in feature_select_steps.keys() and feature_select_steps["subgroups"]:
sub_fs_batch_df = sub_fs_df.loc[sub_fs_df.Metadata_batch == batch].reset_index(
drop=True
)
for plate in self.profile_config[batch]:
output_dir = pathlib.PurePath(".", pipeline_output, batch, plate)
if suffix:
feature_select_output_file_all = pathlib.PurePath(
output_dir,
f"{plate}_normalized_feature_select_{suffix}_all.csv.gz",
)
sub_feature_select_output_file_all = pathlib.PurePath(
output_dir,
f"{plate}_subgroup_normalized_feature_select_{suffix}_all.csv.gz",
)
else:
feature_select_output_file_all = pathlib.PurePath(
output_dir, f"{plate}_normalized_feature_select_all.csv.gz"
)
sub_feature_select_output_file_all = pathlib.PurePath(
output_dir, f"{plate}_subgroup_normalized_feature_select_all.csv.gz"
)
if feature_select_features == "infer" and self.noncanonical:
feature_select_features = cyto_utils.infer_cp_features(
all_plates_df, compartments=self.compartments
Expand All @@ -383,6 +513,17 @@ def pipeline_feature_select(self, steps, suffix=None,min_cells=1):
compression_options=self.pipeline_options["compression"],
float_format=self.pipeline_options["float_format"],
)
if "subgroups" in feature_select_steps.keys() and feature_select_steps["subgroups"]:
sub_df = sub_fs_batch_df.query("Metadata_Plate==@plate").reset_index(
drop=True
)
sub_df = sub_df.drop(columns=["Metadata_batch"])
cyto_utils.output(
output_filename=sub_feature_select_output_file_all,
df=sub_df,
compression_options=self.pipeline_options["compression"],
float_format=self.pipeline_options["float_format"],
)

if gct:
create_gct_directories(batch)
Expand All @@ -399,6 +540,18 @@ def pipeline_feature_select(self, steps, suffix=None,min_cells=1):
batch,
f"{batch}_normalized_feature_select_{suffix}_all.gct",
)
sub_stacked_file = pathlib.PurePath(
".",
"gct",
batch,
f"{batch}_subgroup_normalized_feature_select_{suffix}_all.csv.gz",
)
sub_gct_file = pathlib.PurePath(
".",
"gct",
batch,
f"{batch}_subgroup_normalized_feature_select_{suffix}_all.gct",
)
else:
stacked_file = pathlib.PurePath(
".",
Expand All @@ -412,14 +565,33 @@ def pipeline_feature_select(self, steps, suffix=None,min_cells=1):
batch,
f"{batch}_normalized_feature_select_all.gct",
)
sub_stacked_file = pathlib.PurePath(
".",
"gct",
batch,
f"{batch}_subgroup_normalized_feature_select_all.csv.gz",
)
sub_gct_file = pathlib.PurePath(
".",
"gct",
batch,
f"{batch}_subgroup_normalized_feature_select_all.gct",
)
cyto_utils.output(
output_filename=stacked_file,
df=fs_batch_df,
compression_options=self.pipeline_options["compression"],
float_format=self.pipeline_options["float_format"],
)
write_gct(profiles=fs_batch_df, output_file=gct_file)

if "subgroups" in feature_select_steps.keys() and feature_select_steps["subgroups"]:
cyto_utils.output(
output_filename=sub_stacked_file,
df=sub_fs_batch_df,
compression_options=self.pipeline_options["compression"],
float_format=self.pipeline_options["float_format"],
)
write_gct(profiles=sub_fs_batch_df, output_file=sub_gct_file)
def pipeline_quality_control(self, operations):
pipeline_output = self.pipeline["output_dir"]

Expand Down