Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions custom_files/pipeline_project_map.json

This file was deleted.

9 changes: 9 additions & 0 deletions custom_files/pipeline_project_map_specifications.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
epi2me-human-variation:
nf-long-reads:
is_inputType_path: False
epi2me-somatic-variation:
mopopgen-support:
is_inputType_path: True
nfcore-rnaseq:
nf-tp53:
is_inputType_path: True
46 changes: 33 additions & 13 deletions pages/run_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import tabs.tab_command as tt
from pipeline_project_map import map_pipeline_project
from shared.sessionstate import retrieve_all_from_ss, save_in_ss, ss_set
from shared.sessionstate import retrieve_all_from_ss, save_in_ss, ss_get, ss_set
from shared.visual import header


Expand All @@ -29,35 +29,53 @@ def reset_button_state():
password = ss_values["password"]
PROJECT = ss_values["PROJECT"]
JOB_ID = ss_values["JOB_ID"]
WORKDIR = ss_values["WORK_DIR"]
WORK_DIR = ss_values["WORK_DIR"]
OUTPUT_DIR = ss_values["OUTPUT_DIR"]
run_pipeline_clicked = ss_values["run_pipeline_clicked"]
button_clicked = ss_values["button_clicked"]
custom_sample_list = ss_values["custom_sample_list"] # only availanle if custom sample is selected

samples = ["all", "demo"] # , "customised"]
samples = ["all", "demo", "customised"] # , "test"]

# Create the selectbox and update session state
options = ["select"] + list(map_pipeline_project.keys())
index = options.index(PIPELINE)
PIPELINE = st.selectbox("Select a pipeline", options=options, index=index) # , key="PIPELINE")
pipeline_options = ["select"] + list(map_pipeline_project.keys())
index = pipeline_options.index(PIPELINE)
PIPELINE = st.selectbox("Select a pipeline", options=pipeline_options, index=index) # , key="PIPELINE")
# adding "select" as the first and default choice
if PIPELINE != "select":
project_options = list(map_pipeline_project[PIPELINE].keys())

PROJECT = st.selectbox(
"Select your project",
options=map_pipeline_project[PIPELINE],
index=map_pipeline_project[PIPELINE].index(PIPELINE) if PIPELINE in map_pipeline_project[PIPELINE] else 0,
options=project_options,
index=project_options.index(PIPELINE) if PIPELINE in project_options else 0,
on_change=reset_button_state,
)

# samples here depend on the pipeline: human variation requires bam files, rnaseq requires a samplesheet
SAMPLE = st.selectbox(
"Select your samples",
options=samples,
index=samples.index(SAMPLE) if SAMPLE in samples else 0,
on_change=reset_button_state,
)

WORK_DIR = st.text_input("Working directory", value=SCRATCH)
OUTPUT_DIR = st.text_input("Output directory", value=SCRATCH)
# If "customised" is selected, show additional input
if SAMPLE == "customised":
is_input_type_path = map_pipeline_project[PIPELINE][PROJECT]["is_inputType_path"]
msg = "Enter your sample names (comma-separated)"
if is_input_type_path:
msg = "Enter path to samplesheet"
custom_samples = st.text_input(msg, key="custom_samples", value=",".join(custom_sample_list))

# Optionally process the input into a list
if custom_samples:
custom_sample_list = [s.strip() for s in custom_samples.split(",") if s.strip()]

st.write("Your custom samples:", custom_sample_list)
ss_set("custom_sample_list", custom_sample_list)

WORK_DIR = st.text_input("Working directory", value=WORK_DIR or SCRATCH)
OUTPUT_DIR = st.text_input("Output directory", value=OUTPUT_DIR or SCRATCH)

# passing inputs between tabs
if OK:
Expand All @@ -69,6 +87,7 @@ def reset_button_state():
selected_samples=SAMPLE,
work_dir=WORK_DIR,
output_dir=OUTPUT_DIR,
custom_sample_list=custom_sample_list,
)
save_in_ss(
{
Expand All @@ -84,11 +103,12 @@ def reset_button_state():
"SAMPLE": SAMPLE,
"PIPELINE": PIPELINE,
"PROJECT": PROJECT,
"JOB_ID": JOB_ID,
"WORKDIR": WORK_DIR,
# "JOB_ID": JOB_ID,
"WORK_DIR": WORK_DIR,
"OUTPUT_DIR": OUTPUT_DIR,
"run_pipeline_clicked": run_pipeline_clicked,
"button_clicked": button_clicked,
"custom_sample_list": custom_sample_list,
}
)
else:
Expand Down
13 changes: 4 additions & 9 deletions pipeline_project_map.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,6 @@
import json
import yaml

file_path = "custom_files/pipeline_project_map_specifications.yaml"

def load_json_to_dict(file_path):
with open(file_path, "r") as file:
data = json.load(file)
return data


file_path = "custom_files/pipeline_project_map.json"
map_pipeline_project = load_json_to_dict(file_path)
with open(file_path, "r") as f:
map_pipeline_project = yaml.safe_load(f)
14 changes: 11 additions & 3 deletions shared/command_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@ def get_path_to_script(selected_pipeline, selected_project, selected="all"):
script_mapping = {
"all": "launch_samples.sh",
"demo": "launch_demo.sh",
"single": "launch_sample_analysis.sh", # not yet supported
"customised": "launch_samples.sh",
}

if selected in script_mapping:
return os.path.join(base_path, script_mapping[selected])

raise ValueError(f"Invalid selection '{selected}'. Only 'all' and 'demo' are supported.")
raise ValueError(f"Invalid selection '{selected}'. Only 'customised' and 'demo' are supported.")


# launch command based on the project
Expand All @@ -27,6 +27,7 @@ def pipe_cmd(
selected_samples="all",
work_dir="work",
output_dir="output",
custom_sample_list=[],
):
def get_pipeline_command():
"""Generate the pipeline execution command based on the sample selection."""
Expand All @@ -42,7 +43,7 @@ def get_pipeline_command():

if selected_samples == "demo":
cmd_pipeline += f"sbatch -o {log_out} -e {log_err} {path_to_script} {work_dir} {output_dir}"
elif selected_samples == "all":
elif selected_samples == "all": # this has no more sense since we have to specify the sample name index
cmd_pipeline += f"sbatch -o {log_out} -e {log_err} {path_to_script} --work-dir {work_dir} --outdir {output_dir}"
##./your_script.sh --env "/my/custom/env" --work-dir "my_work" --outdir "my_output" --config "my_config" --params "parans.json"
# Usage:
Expand All @@ -52,7 +53,14 @@ def get_pipeline_command():
# --env "/data/rds/DIT/SCICOM/SCRSE/shared/conda/nextflow_env" \
# --params "/data/params/parameters.json" \
# --config "custom_config.config"
elif selected_samples == "customised":
if not len(custom_sample_list):
print("custom_sample_list cannot be empty")

tab_separated_string = "\t".join(custom_sample_list)
cmd_pipeline += f"sbatch -o {log_out} -e {log_err} {path_to_script} --work-dir {work_dir} --outdir {output_dir} --samples {tab_separated_string}"
# elif selected_samples == "test":
# cmd_pipeline += f"sbatch -o {log_out} -e {log_err} /data/scratch/DCO/DIGOPS/SCIENCOM/msarkis/NF-project-configurations/test.sh --work-dir {work_dir} --outdir {output_dir}"
return cmd_pipeline.strip()

# Command mappings
Expand Down
1 change: 1 addition & 0 deletions shared/ss_defaults.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,4 @@ keys_defaults:
group_selection: "Select an option"
WORK_DIR: None
OUTPUT_DIR: None
custom_sample_list: []
135 changes: 62 additions & 73 deletions tabs/tab_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,113 +4,102 @@

import shared.command_helper as cmd_hlp
import shared.helpers as hlp
from shared.sessionstate import retrieve_all_from_ss, ss_get, ss_set

# pull saved values if set, otherwise set to defaults
ss_values = retrieve_all_from_ss()
OK = ss_values["OK"]
MY_SSH = ss_values["MY_SSH"]
username = ss_values["user_name"]
server = ss_values["server"]
GROUP = ss_values["GROUP"]
GROUPS = ss_values["GROUPS"]
SCRATCH = ss_values["SCRATCH"]
RDS = ss_values["RDS"]
SAMPLE = ss_values["SAMPLE"]
PIPELINE = ss_values["PIPELINE"]
password = ss_values["password"]
PROJECT = ss_values["PROJECT"]
JOB_ID = ss_values["JOB_ID"]
WORKDIR = ss_values["WORK_DIR"]
OUTPUT_DIR = ss_values["OUTPUT_DIR"]
run_pipeline_clicked = ss_values["run_pipeline_clicked"]

def tab(
username,
MY_SSH,
selected_pipeline,
selected_project,
selected_samples="all",
work_dir="work",
output_dir="output",
custom_sample_list=[],
):
# --- Initialize session state ---
st.session_state.setdefault("username", username)
st.session_state.setdefault("JOB_ID", "17379785")
st.session_state.setdefault("run_pipeline_clicked", False)

def display_log(title, log_path, output_container):
"""function to display log content."""
try:
with hlp.st_capture(output_container.code):
print(f"{title} log:", log_path)
log_content = MY_SSH.read_file(log_path)
print(log_content)
except Exception as e:
st.error(f"Failed to read {title.lower()} log: {e}")


def tab(username, MY_SSH, selected_pipeline, selected_project, selected_samples="all", work_dir="work", output_dir="output"):
JOB_ID = ss_values["JOB_ID"]

cols = st.columns([1, 1, 1])
# --- Display username input ---
cols = st.columns([1])
with cols[0]:
username = st.text_input(
"Username(s):",
username,
key="username-mod",
help="Enter your username e.g. ralcraft",
)
st.session_state["username"] = st.text_input("Username(s):", st.session_state["username"])

# --- Log display helper ---
def display_log(title, log_path, output_container):
try:
log_file = MY_SSH.read_file(log_path)
log_content = log_file.read() if hasattr(log_file, "read") else str(log_file)
output_container.code(log_content, language="bash")
except Exception as e:
st.error(f"❌ Failed to read {title.lower()} log: {e}")

# --- Run pipeline logic ---
def run_nextflow():
cmd_pipeline = cmd_hlp.pipe_cmd(
username,
st.session_state["username"],
selected_pipeline,
selected_project,
cmd_num=0,
selected_samples=selected_samples,
output_dir=output_dir,
work_dir=work_dir,
custom_sample_list=custom_sample_list,
)
st.code(cmd_pipeline)
_dict = MY_SSH.run_cmd(cmd_pipeline)
# process output to get job id
match = re.search(r"Submitted batch job (\d+)", _dict["output"])
JOB_ID = match.group(1) if match else None
st.success(f"✅ Job ID: {JOB_ID}")
return JOB_ID
# to do, we need to wait for an asynchronous answer regarding slurm?
result = MY_SSH.run_cmd(cmd_pipeline)
match = re.search(r"Submitted batch job (\d+)", result["output"])
job_id = match.group(1) if match else None
st.session_state["JOB_ID"] = job_id
st.success(f"✅ Job submitted. ID: {job_id}")
return job_id

# --- Tabs ---
tabP, tabL, tabQ = st.tabs(["Run pipeline", "Check logs", "Check queues"])

# --- Pipeline tab ---
with tabP:
if st.button("Run the selected pipeline"):
st.session_state["run_pipeline_clicked"] = True
with st.spinner("Submitting pipeline..."):
try:
run_nextflow()
except Exception as e:
st.error(f"Pipeline error: {e}")

if st.session_state["JOB_ID"]:
st.success(f"Running Job ID: {st.session_state['JOB_ID']}")

# --- Logs tab ---
with tabL:
if st.button("Get Logs"):
if JOB_ID == "":
# st.write("📦 session_state:", dict(st.session_state))
job_id = st.session_state.get("JOB_ID")
st.write("📌 Accessed JOB_ID:", job_id) # DEBUG
if not job_id:
st.error("No job was launched yet")
else:
log_out = f"{work_dir}/logs/{JOB_ID}.out"
log_err = f"{work_dir}/logs/{JOB_ID}.err"
log_out = f"{work_dir}/logs/{job_id}.out"
log_err = f"{work_dir}/logs/{job_id}.err"
tO, tE = st.tabs(["Output", "Error"])
outputO, outputE = tO.empty(), tE.empty()
with st.spinner("Fetching...", show_time=True):
with st.spinner("Fetching logs..."):
display_log("Output", log_out, outputO)
display_log("Error", log_err, outputE)

# --- Queues tab ---
with tabQ:
if st.button("Check slurm queues"):
output = st.empty()
with st.spinner("Checking...", show_time=True):
with st.spinner("Checking queue..."):
with hlp.st_capture(output.code):
cmd_pipeline = cmd_hlp.pipe_cmd(username, cmd_num=1)
print("Executing command\n", cmd_pipeline)
cmd_pipeline = cmd_hlp.pipe_cmd(st.session_state["username"], cmd_num=1)
try:
results = MY_SSH.run_cmd(cmd_pipeline)
if results["err"] != None:
if results["err"] is not None:
st.error(results["err"])
else:
print("------------------------------")
print(results["output"])
except Exception as e:
st.error(f"Error {e}")
with tabP:
# disable button once the user clicks a first time. by default it gets disabled after calling the callback
clicked = st.button(f"Run the selected nextflow pipeline for {username}", disabled=ss_get("run_pipeline_clicked"))
JOB_ID = ss_get("JOB_ID")
if JOB_ID is not None:
st.success(f"Running Job ID: {JOB_ID}")
if clicked:
ss_set("run_pipeline_clicked", True)
output = st.empty()
with st.spinner("Starting...", show_time=True):
with hlp.st_capture(output.code):
try:
JOB_ID = run_nextflow()
ss_set("JOB_ID", JOB_ID)
except Exception as e:
st.error(f"Error {e}")
3 changes: 2 additions & 1 deletion tabs/tab_logon.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ def create_radio(GROUPS):
OK, MY_SSH, msg, GROUPS = handle_login(server, sftp_server, username, password)
if OK:
st.success(msg)
create_radio(GROUPS)
else:
st.error(msg)

Expand All @@ -98,6 +97,8 @@ def create_radio(GROUPS):
"GROUP": GROUP,
"GROUPS": GROUPS,
"SCRATCH": SCRATCH,
"WORK_DIR": SCRATCH,
"OUTPUT_DIR": SCRATCH, # set default to scratch
"RDS": RDS,
"SAMPLE": SAMPLE,
"PIPELINE": PIPELINE,
Expand Down