Skip to content

Commit c7171f2

Browse files
committed
Added 3 x retry to build_all.py for each step that fails. Attempting to stream hcmi data instead of hold in storage
1 parent 8018f8b commit c7171f2

File tree

2 files changed

+166
-110
lines changed

2 files changed

+166
-110
lines changed

build/build_all.py

Lines changed: 27 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -59,24 +59,42 @@ def run_docker_cmd(cmd_arr,filename):
5959
'''
6060
Essentially a wrapper for 'docker run'. Also provides output.
6161
'''
62+
retries=3
6263
print('running...'+filename)
6364
env = os.environ.copy()
6465
if 'SYNAPSE_AUTH_TOKEN' not in env.keys():
6566
print('You need to set the SYNAPSE_AUTH_TOKEN to acess the MPNST, beatAML, bladderpdo, pancpdo, liverpdo, or sarcpdo datasets')
6667
docker_run = ['docker','run','--rm','-v',env['PWD']+'/local/:/tmp/','--platform=linux/amd64']
6768
else:
6869
docker_run = ['docker','run','--rm','-v',env['PWD']+'/local/:/tmp/','-e','SYNAPSE_AUTH_TOKEN='+env['SYNAPSE_AUTH_TOKEN'],'--platform=linux/amd64']
69-
70-
7170
cmd = docker_run+cmd_arr
7271
print(cmd)
73-
# res = subprocess.run(cmd,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
74-
res = subprocess.run(cmd, stdout=sys.stdout, stderr=sys.stderr)
75-
if res.returncode !=0:
76-
print(res.stderr)
77-
exit(filename+' file failed')
78-
else:
79-
print(filename+' retrieved')
72+
73+
attempt = 1
74+
while attempt <= retries:
75+
print(f"[{filename}] Attempt {attempt}/{retries}: {' '.join(cmd)}")
76+
res = subprocess.run(cmd, stdout=sys.stdout, stderr=sys.stderr)
77+
if res.returncode == 0:
78+
print(f"[{filename}] succeeded on attempt {attempt}.")
79+
return
80+
else:
81+
print(f"[{filename}] failed (exit {res.returncode}).")
82+
if attempt < retries:
83+
print("Retrying...")
84+
print(cmd)
85+
attempt += 1
86+
raise RuntimeError(f"{filename} failed after {retries} attempts")
87+
88+
89+
# cmd = docker_run+cmd_arr
90+
# print(cmd)
91+
# # res = subprocess.run(cmd,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
92+
# res = subprocess.run(cmd, stdout=sys.stdout, stderr=sys.stderr)
93+
# if res.returncode !=0:
94+
# print(res.stderr)
95+
# exit(filename+' file failed')
96+
# else:
97+
# print(filename+' retrieved')
8098

8199

82100
def process_docker(datasets):

build/hcmi/02-getHCMIData.py

Lines changed: 139 additions & 101 deletions
Original file line numberDiff line numberDiff line change
@@ -36,30 +36,6 @@ def download_tool(url):
3636
# Download the file
3737
print("Downloading tool...")
3838
filename = wget.download(url)
39-
##commented due to merge conflict
40-
# files_before = os.listdir()
41-
# # shutil.unpack_archive(filename)
42-
# ##there are two files to unpack
43-
# print('Unpacking platform-specific path')
44-
# shutil.unpack_archive(os.path.basename(url))
45-
# #This is just set for AWS to debug. This will have to be mapped to OS. They changed their file structure. This should be updated.
46-
# print('Unpacking secondary zip')
47-
# fnames={
48-
# 'Darwin':"gdc-client_2.3_OSX_x64.zip",
49-
# 'Linux':"gdc-client_2.3_Ubuntu_x64.zip",
50-
# 'Windows':"gdc-client_2.3_Windows_x64.zip"
51-
# }
52-
# shutil.unpack_archive(fnames[platform.system()])
53-
# #This is just set for AWS to debug. This will have to be mapped to OS. They changed their file structure. This should be updated.
54-
# shutil.unpack_archive("gdc-client_2.3_Ubuntu_x64.zip")
55-
# if not os.path.exists('gdc-client'):
56-
# raise FileNotFoundError("gdc-client executable not found after extraction.")
57-
# # Ensure 'gdc-client' is executable
58-
# st = os.stat('gdc-client')
59-
# os.chmod('gdc-client', st.st_mode | stat.S_IEXEC)
60-
# # Return the path to the executable
61-
# return './gdc-client'
62-
6339

6440
# First extraction
6541
print(f"\nExtracting {filename}...")
@@ -242,7 +218,7 @@ def _verify_md5(file_id, expected_md5, expected_filename):
242218

243219
# Initial download attempt
244220
print("Starting secondary download...")
245-
subprocess.run(['./gdc-client', 'download', '-d', manifest_loc, '-m', 'new_manifest.txt'])
221+
subprocess.run(['./gdc-client', 'download', '-d', manifest_loc, '-m', 'new_manifest.txt'],stdout=subprocess.DEVNULL)
246222
print("Secondary download complete.")
247223

248224
# Check for missing or corrupt files and retry if necessary
@@ -291,7 +267,7 @@ def _verify_md5(file_id, expected_md5, expected_filename):
291267

292268
# Retry download
293269
print(f"Starting retry {retries} download...")
294-
subprocess.run(['./gdc-client', 'download', '-d', manifest_loc, '-m', 'retry_manifest.txt'])
270+
subprocess.run(['./gdc-client', 'download', '-d', manifest_loc, '-m', 'retry_manifest.txt'],stdout=subprocess.DEVNULL)
295271
print(f"Retry {retries} complete.")
296272

297273
if missing_or_corrupt_ids:
@@ -308,68 +284,139 @@ def _verify_md5(file_id, expected_md5, expected_filename):
308284
return metadata
309285

310286

311-
def get_clean_files(data_type):
312-
"""
313-
Extract clean files of a specified data type from manifest folders.
287+
# def get_clean_files(data_type):
288+
# """
289+
# Extract clean files of a specified data type from manifest folders.
314290

315-
Given a specific data type, this function looks through manifest folders to find
316-
matching files and process them accordingly.
291+
# Given a specific data type, this function looks through manifest folders to find
292+
# matching files and process them accordingly.
317293

318-
Parameters
319-
----------
320-
data_type : string
321-
The type of data being processed, e.g., "transcriptomics", "copy_number", or "mutations".
294+
# Parameters
295+
# ----------
296+
# data_type : string
297+
# The type of data being processed, e.g., "transcriptomics", "copy_number", or "mutations".
322298

323-
Returns
324-
-------
325-
list of pl.DataFrame
326-
A list of polars dataframes containing cleaned data extracted from the manifest folders.
327-
"""
299+
# Returns
300+
# -------
301+
# list of pl.DataFrame
302+
# A list of polars dataframes containing cleaned data extracted from the manifest folders.
303+
# """
328304

329305

330-
data_suffixes = {
306+
# data_suffixes = {
307+
# "transcriptomics": "rna_seq.augmented_star_gene_counts.tsv",
308+
# "copy_number": "copy_number_variation.tsv",
309+
# "mutations": "ensemble_masked.maf.gz"
310+
# }
311+
312+
# suffix = data_suffixes.get(data_type)
313+
# manifest = 'full_manifest_files'
314+
# manifest_folders = [folder for folder in os.listdir(manifest) if folder != '.DS_Store']
315+
# all_dataframes = []
316+
317+
# for folder_name in manifest_folders:
318+
# folder_path = os.path.join(manifest, folder_name)
319+
# folder_files = os.listdir(folder_path)
320+
321+
# sample_filenames = [x for x in folder_files if suffix in x and '.ipynb_checkpoints' not in x]
322+
323+
# for sample in sample_filenames:
324+
# filepath = os.path.join(manifest, folder_name, sample)
325+
# #gzipped data is mutation data
326+
# if ".gz" in filepath:
327+
# with gzip.open(filepath, 'rt') as f:
328+
# # Read into pandas DataFrame then convert. This is the only time pandas is used.
329+
# dataframe_pd = pd.read_csv(f, sep='\t', skiprows=7,low_memory=False)
330+
# dataframe = pl.DataFrame(dataframe_pd)
331+
# else:
332+
# if data_type == "transcriptomics":
333+
# dataframe = pl.read_csv(filepath, separator='\t',skip_rows=1)
334+
# else:
335+
# dataframe = pl.read_csv(filepath, separator='\t')
336+
337+
# dataframe = dataframe.with_columns(pl.lit(folder_name).alias('file_id'))
338+
339+
# if data_type == "transcriptomics":
340+
# dataframe = dataframe[4:]
341+
# if 'tpm_unstranded' in dataframe.columns:
342+
# new_columns = ['gene_id', 'gene_name', 'gene_type', 'tpm_unstranded', 'file_id']
343+
# dataframe = dataframe.select(new_columns)
344+
# dataframe = dataframe.filter(dataframe['gene_type'] == 'protein_coding')
345+
346+
# all_dataframes.append(dataframe)
347+
348+
# return all_dataframes
349+
350+
def stream_clean_files(data_type: str, manifest_dir: str, out_path: str):
351+
"""
352+
Read each sample file of the given data_type from manifest_dir,
353+
apply filtering/transformation, and append to out_path in CSV,
354+
so you never hold all samples in RAM at once.
355+
"""
356+
suffix_map = {
331357
"transcriptomics": "rna_seq.augmented_star_gene_counts.tsv",
332-
"copy_number": "copy_number_variation.tsv",
333-
"mutations": "ensemble_masked.maf.gz"
358+
"copy_number": "copy_number_variation.tsv",
359+
"mutations": "ensemble_masked.maf.gz",
334360
}
335-
336-
suffix = data_suffixes.get(data_type)
337-
manifest = 'full_manifest_files'
338-
manifest_folders = [folder for folder in os.listdir(manifest) if folder != '.DS_Store']
339-
all_dataframes = []
340-
341-
for folder_name in manifest_folders:
342-
folder_path = os.path.join(manifest, folder_name)
343-
folder_files = os.listdir(folder_path)
344-
345-
sample_filenames = [x for x in folder_files if suffix in x and '.ipynb_checkpoints' not in x]
346-
347-
for sample in sample_filenames:
348-
filepath = os.path.join(manifest, folder_name, sample)
349-
#gzipped data is mutation data
350-
if ".gz" in filepath:
351-
with gzip.open(filepath, 'rt') as f:
352-
# Read into pandas DataFrame then convert. This is the only time pandas is used.
353-
dataframe_pd = pd.read_csv(f, sep='\t', skiprows=7,low_memory=False)
354-
dataframe = pl.DataFrame(dataframe_pd)
361+
suffix = suffix_map[data_type]
362+
header_written = False
363+
364+
# If the output file already exists, remove it so we start fresh
365+
if os.path.exists(out_path):
366+
os.remove(out_path)
367+
368+
# Iterate over each sample folder
369+
for folder_name in os.listdir(manifest_dir):
370+
folder_path = os.path.join(manifest_dir, folder_name)
371+
if not os.path.isdir(folder_path):
372+
continue
373+
374+
# Look for the right file suffix in this folder
375+
for fname in os.listdir(folder_path):
376+
if suffix not in fname or fname.startswith('.'):
377+
continue
378+
fpath = os.path.join(folder_path, fname)
379+
380+
# Load the file (gzipped for mutations, plain TSV otherwise)
381+
if fpath.endswith('.gz'):
382+
with gzip.open(fpath, 'rt') as f:
383+
df = pl.read_csv(f, separator='\t', skip_rows=7)
355384
else:
356-
if data_type == "transcriptomics":
357-
dataframe = pl.read_csv(filepath, separator='\t',skip_rows=1)
358-
else:
359-
dataframe = pl.read_csv(filepath, separator='\t')
385+
skip = 1 if data_type == "transcriptomics" else 0
386+
df = pl.read_csv(fpath, separator='\t', skip_rows=skip)
360387

361-
dataframe = dataframe.with_columns(pl.lit(folder_name).alias('file_id'))
388+
# Trim off the header rows for transcriptomics
389+
if data_type == "transcriptomics":
390+
df = df[4:]
362391

392+
# Apply per-type filters and rename
363393
if data_type == "transcriptomics":
364-
dataframe = dataframe[4:]
365-
if 'tpm_unstranded' in dataframe.columns:
366-
new_columns = ['gene_id', 'gene_name', 'gene_type', 'tpm_unstranded', 'file_id']
367-
dataframe = dataframe.select(new_columns)
368-
dataframe = dataframe.filter(dataframe['gene_type'] == 'protein_coding')
394+
df = (
395+
df
396+
.filter(pl.col("gene_type") == "protein_coding")
397+
.select(["gene_id", "gene_name", "tpm_unstranded"])
398+
.rename({"tpm_unstranded": "transcriptomics"})
399+
)
400+
401+
# Add identifying columns
402+
df = df.with_columns([
403+
pl.lit(folder_name).alias("file_id"),
404+
pl.lit("GDC").alias("source"),
405+
pl.lit("HCMI").alias("study"),
406+
])
407+
408+
# Append to disk
409+
mode = 'a' if header_written else 'w'
410+
with open(out_path, mode) as f:
411+
df.write_csv(f, has_header=not header_written)
412+
413+
header_written = True
414+
415+
# Free memory immediately
416+
del df
417+
gc.collect()
418+
369419

370-
all_dataframes.append(dataframe)
371-
372-
return all_dataframes
373420

374421
def map_and_combine(dataframe_list, data_type, metadata, entrez_map_file):
375422
"""
@@ -462,27 +509,6 @@ def map_and_combine(dataframe_list, data_type, metadata, entrez_map_file):
462509

463510
return final_dataframe
464511

465-
def retrieve_figshare_data(url):
466-
"""
467-
Download data from a given Figshare URL.
468-
469-
Parameters
470-
----------
471-
url : string
472-
The Figshare URL to download data from.
473-
474-
Returns
475-
-------
476-
string
477-
Name of the downloaded file.
478-
"""
479-
480-
files_0 = os.listdir()
481-
wget.download(url)
482-
files_1 = os.listdir()
483-
new_file = str(next(iter(set(files_1) - set(files_0))))
484-
return new_file
485-
486512
def copy_num(arr):
487513
"""
488514
Determine copy number variations for a given array of values.
@@ -707,12 +733,24 @@ def main():
707733
metadata = use_gdc_tool(args.manifest, args.type, download_data=download_option)
708734
# Extract data files
709735
print("Running 'get_clean_files' function")
710-
data_files = get_clean_files(args.type)
736+
# data_files = get_clean_files(args.type)
737+
738+
intermediate_csv = f"/tmp/hcmi_{args.type}_cleaned.csv"
739+
print(f"Streaming cleaned files for {args.type}{intermediate_csv}")
740+
stream_clean_files(
741+
args.type,
742+
args.manifestfolder or "full_manifest_files",
743+
intermediate_csv
744+
)
745+
746+
# Load that cleaned CSV lazily, then collect into one DataFrame for mapping
747+
print("Loading cleaned data for mapping …")
748+
df_clean = pl.scan_csv(intermediate_csv).collect(streaming=True)
749+
data_files = [df_clean]
750+
711751

712752
# Retrieve figshare gene data for entrez map
713-
print("Running 'retrieve_figshare_data' function")
714-
gene_url = "https://figshare.com/ndownloader/files/40576109?private_link=525f7777039f4610ef47"
715-
entrez_map_file = args.genes #retrieve_figshare_data(gene_url)
753+
entrez_map_file = args.genes
716754
gc.collect()
717755

718756
# Combine the data

0 commit comments

Comments
 (0)