Skip to content

Commit 45de197

Browse files
committed
make convertVCF2MutationComplex faster, 1KGP save 40% time
by splitting files first. IO might be a limit
1 parent 0dba01c commit 45de197

File tree

1 file changed

+210
-135
lines changed

1 file changed

+210
-135
lines changed

src/vcf2mutation.py

Lines changed: 210 additions & 135 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111
import threading # For the writer thread
1212
import sqlite3
1313
import numpy as np
14+
import sys
15+
import shutil
1416

1517
try:
1618
import tqdm
@@ -538,6 +540,139 @@ def writer_job(q, file_handle):
538540
print("Writer thread finished.") # Debug print
539541

540542

543+
544+
def get_header_sample_col(file_vcf,individual_input="ALL_SAMPLES"):
545+
'''
546+
return the header line based on files_vcf and individual_input
547+
file_vcf should be a vcf file
548+
return header line and individual_col
549+
'''
550+
vcf_file_path = file_vcf
551+
try:
552+
fo = openFile(vcf_file_path)
553+
except FileNotFoundError:
554+
print(f"Warning: Could not open file {vcf_file_path}. Skipping.")
555+
return '', [], None
556+
except Exception as e:
557+
print(f"Warning: Error opening file {vcf_file_path}: {e}. Skipping.")
558+
return '', [], None
559+
560+
# Read header lines to find samples
561+
header_line = None
562+
for line in fo:
563+
if line.startswith('##'):
564+
continue
565+
if line.startswith('#CHROM'):
566+
header_line = line
567+
break
568+
else: # Unexpected line before #CHROM header
569+
print(f"Warning: Unexpected line format before #CHROM header in {vcf_file_path}. Attempting to continue.")
570+
header_line = line # Try processing it as header anyway? Risky.
571+
break
572+
573+
if header_line is None:
574+
print(f"Warning: #CHROM header line not found in {vcf_file_path}. Skipping.")
575+
fo.close()
576+
return '', [], None
577+
578+
columns = header_line.strip().split('\t')
579+
if len(columns) < 9:
580+
print(f"Warning: #CHROM header line in {vcf_file_path} has fewer than 9 columns. Skipping.")
581+
fo.close()
582+
return '', [], None
583+
584+
585+
# Determine individuals and column indices
586+
individual = []
587+
individual_col = []
588+
all_samples = columns[9:]
589+
590+
if individual_input == 'ALL_SAMPLES':
591+
individual = all_samples
592+
elif individual_input == 'ALL_VARIANTS':
593+
individual = [] # No sample columns needed
594+
elif individual_input is None:
595+
if all_samples:
596+
individual = [all_samples[0]] # Default to first sample
597+
print(f"No sample specified, using first sample: {individual[0]}")
598+
else:
599+
print(f"Warning: No sample specified and no samples found in header of {vcf_file_path}. Processing as ALL_VARIANTS.")
600+
individual = [] # Fallback to no samples
601+
else:
602+
requested_individuals = list(dict.fromkeys([i for i in individual_input.split(',') if i]))
603+
individual = [s for s in requested_individuals if s in all_samples]
604+
missing = [s for s in requested_individuals if s not in all_samples]
605+
if missing:
606+
print(f"Warning: Requested samples not found in {vcf_file_path}: {', '.join(missing)}")
607+
if not individual:
608+
print(f"Warning: None of the requested samples found in {vcf_file_path}. Processing as ALL_VARIANTS.")
609+
610+
611+
# Get column indices for the selected individuals
612+
individual_col = [columns.index(indi) for indi in individual]
613+
614+
615+
# Define output column names (only needs to be done once if consistent across files)
616+
current_column_for_samples = []
617+
for indi_name in individual: # Use determined 'individual' list
618+
current_column_for_samples.append(indi_name + '__1')
619+
current_column_for_samples.append(indi_name + '__2')
620+
621+
output_column_names = ['chr', 'pos', 'ref', 'alt'] + current_column_for_samples
622+
header_string = '\t'.join(output_column_names) + '\n'
623+
624+
return header_string, individual_col, fo
625+
626+
def split_large_vcf(file_vcf, output_prefix=None, group_size = 10000):
627+
'''
628+
'''
629+
header_string, individual_col, fo = get_header_sample_col(file_vcf)
630+
if not header_string:
631+
return []
632+
if output_prefix is None:
633+
output_prefix = file_vcf + '__split__'
634+
635+
individual_col_str = '\t'.join([str(i) for i in individual_col]) + '\n'
636+
637+
# Read in data
638+
ls_files = []
639+
for i, chunk in enumerate(grouper_it(group_size, fo, 1)):
640+
file_output = f'{output_prefix}.{i}.vcf'
641+
ls_files.append(file_output)
642+
fo = open(file_output, 'w')
643+
fo.write(header_string)
644+
fo.write(individual_col_str)
645+
fo.write(''.join(chunk[0]))
646+
fo.close()
647+
return ls_files
648+
649+
def convertVCF2MutationComplexHelper(file_vcf, file_output, header_string, individual_col, filter_PASS = True, chromosome_only = True, info_field = None, info_field_thres=None):
650+
'''file_vcf is output from split_large_vcf
651+
'''
652+
fo = open(file_vcf,'r')
653+
header_string = fo.readline()
654+
individual_col = [int(i) for i in fo.readline().strip().split('\t')]
655+
656+
if info_field:
657+
try:
658+
info_field_thres = float(info_field_thres)
659+
except ValueError:
660+
print(f"Error: --info_field_thres ('{info_field_thres}') must be a number.")
661+
return None # Or raise error
662+
663+
fout = open(file_output,'w')
664+
fout.write(header_string)
665+
for line in fo:
666+
new_line = processOneLineOfVCF(line, individual_col, chromosome_only, filter_PASS, info_field, info_field_thres, CHROMOSOMES, file_vcf)
667+
fout.write(new_line)
668+
fout.close()
669+
fo.close()
670+
return file_output
671+
672+
def convertVCF2MutationComplexHelper_wrapper(args):
673+
file_vcf_split, file_vcf_split_processed,header_string, individual_col, filter_PASS,chromosome_only,info_field,info_field_thres = args
674+
return convertVCF2MutationComplexHelper(file_vcf_split, file_vcf_split_processed,header_string, individual_col, filter_PASS,chromosome_only,info_field,info_field_thres)
675+
541676
def convertVCF2MutationComplex(file_vcf, outprefix = None, individual_input="ALL_SAMPLES", filter_PASS = True, chromosome_only = True, info_field = None, info_field_thres=None, threads = 1):
542677
'''convert vcf file to tsv file. with columns: chr, pos, ref, sample1__1, sample1__2, sample2__1, sample2__2, ..., sampleN__1, sampleN__2 columns
543678
If individual is None, use the first sample in the vcf file.
@@ -570,150 +705,90 @@ def convertVCF2MutationComplex(file_vcf, outprefix = None, individual_input="ALL
570705
# file_output = file_vcf + '.tsv'
571706
else:
572707
file_output = outprefix + '.tsv'
573-
708+
709+
# check if file_output is already finished
710+
file_output_done = file_output + '.done'
711+
if os.path.exists(file_output_done):
712+
print(f"Output file '{file_output}' already exists. Skipping.")
713+
return open(file_output_done, 'r').read().strip().split()
714+
574715
if info_field:
575716
try:
576717
info_field_thres = float(info_field_thres)
577718
except ValueError:
578719
print(f"Error: --info_field_thres ('{info_field_thres}') must be a number.")
579720
return [] # Or raise error
580721

581-
# --- Setup Writer Thread ---
582722
fout = open(file_output, 'w')
583-
write_queue = queue.Queue(maxsize=threads * 2) # Buffer size heuristic
584-
writer_thread = threading.Thread(target=writer_job, args=(write_queue, fout))
585-
writer_thread.start()
586-
587-
588-
589-
590-
write_header = True
591-
column_for_samples = [] # Keep track of sample columns generated
592-
593-
try:
594-
for file_vcf in files_vcf:
595-
vcf_file_path = file_vcf
596-
print('start converting vcf file:', file_vcf)
597-
try:
598-
fo = openFile(vcf_file_path)
599-
except FileNotFoundError:
600-
print(f"Warning: Could not open file {vcf_file_path}. Skipping.")
601-
continue
602-
except Exception as e:
603-
print(f"Warning: Error opening file {vcf_file_path}: {e}. Skipping.")
604-
continue
605-
606-
# Read header lines to find samples
607-
header_line = None
608-
for line in fo:
609-
if line.startswith('##'):
610-
continue
611-
if line.startswith('#CHROM'):
612-
header_line = line
613-
break
614-
else: # Unexpected line before #CHROM header
615-
print(f"Warning: Unexpected line format before #CHROM header in {vcf_file_path}. Attempting to continue.")
616-
header_line = line # Try processing it as header anyway? Risky.
617-
break
618-
619-
if header_line is None:
620-
print(f"Warning: #CHROM header line not found in {vcf_file_path}. Skipping.")
621-
fo.close()
622-
continue
623-
624-
columns = header_line.strip().split('\t')
625-
if len(columns) < 9:
626-
print(f"Warning: #CHROM header line in {vcf_file_path} has fewer than 9 columns. Skipping.")
627-
fo.close()
628-
continue
629-
630-
631-
# Determine individuals and column indices
632-
individual = []
633-
individual_col = []
634-
all_samples = columns[9:]
635-
636-
if individual_input == 'ALL_SAMPLES':
637-
individual = all_samples
638-
elif individual_input == 'ALL_VARIANTS':
639-
individual = [] # No sample columns needed
640-
elif individual_input is None:
641-
if all_samples:
642-
individual = [all_samples[0]] # Default to first sample
643-
print(f"No sample specified, using first sample: {individual[0]}")
644-
else:
645-
print(f"Warning: No sample specified and no samples found in header of {vcf_file_path}. Processing as ALL_VARIANTS.")
646-
individual = [] # Fallback to no samples
723+
dc_header_string_and_other_info = {file_vcf:get_header_sample_col(file_vcf) for file_vcf in files_vcf}
724+
dc_header_string_and_other_info = {k:v for k,v in dc_header_string_and_other_info.items() if v[0]}
725+
if len(set([i[0] for i in dc_header_string_and_other_info.values()])) != 1:
726+
print(f"Error: Different header strings found in {files_vcf}.")
727+
return []
728+
header_string = list(dc_header_string_and_other_info.values())[0][0]
729+
column_for_samples = header_string.strip().split()[4:]
730+
fout.write(header_string)
731+
732+
total_vcf_size = sum([os.path.getsize(file_vcf) for file_vcf in files_vcf])
733+
734+
if threads == 1 or total_vcf_size < 10*1024*1024:
735+
for file_vcf in dc_header_string_and_other_info:
736+
header_string, individual_col, fo = dc_header_string_and_other_info[file_vcf]
737+
if tqdm:
738+
to_iter = tqdm.tqdm(fo)
647739
else:
648-
requested_individuals = list(dict.fromkeys([i for i in individual_input.split(',') if i]))
649-
individual = [s for s in requested_individuals if s in all_samples]
650-
missing = [s for s in requested_individuals if s not in all_samples]
651-
if missing:
652-
print(f"Warning: Requested samples not found in {vcf_file_path}: {', '.join(missing)}")
653-
if not individual:
654-
print(f"Warning: None of the requested samples found in {vcf_file_path}. Processing as ALL_VARIANTS.")
655-
656-
657-
# Get column indices for the selected individuals
658-
individual_col = [columns.index(indi) for indi in individual]
659-
660-
661-
# Define output column names (only needs to be done once if consistent across files)
662-
if write_header:
663-
current_column_for_samples = []
664-
for indi_name in individual: # Use determined 'individual' list
665-
current_column_for_samples.append(indi_name + '__1')
666-
current_column_for_samples.append(indi_name + '__2')
667-
668-
# Check if sample columns changed from previous file (shouldn't if using ALL_SAMPLES)
669-
if column_for_samples and column_for_samples != current_column_for_samples:
670-
print("Warning: Sample columns differ between input VCF files. Header might be inconsistent.")
671-
# Decide how to handle: error out, use first header, use union?
672-
# For now, we'll use the first set determined.
673-
674-
if not column_for_samples: # If first file or consistent
675-
column_for_samples = current_column_for_samples
676-
677-
output_column_names = ['chr', 'pos', 'ref', 'alt'] + column_for_samples
678-
header_string = '\t'.join(output_column_names) + '\n'
679-
write_queue.put(header_string) # Send header to writer thread
680-
write_header = False # Prevent writing header again
681-
682-
# print(threads)
683-
# use single thread is thread set to 1 or file_vcf smaller than 10MB
684-
if threads == 1 or os.path.getsize(file_vcf) < 10*1024*1024:
685-
for line in fo:
686-
new_line = processOneLineOfVCF(line, individual_col, chromosome_only, filter_PASS, info_field, info_field_thres, CHROMOSOMES, file_vcf)
687-
if new_line:
688-
write_queue.put(new_line)
689-
else:
690-
pool = Pool(threads)
691-
if tqdm:
692-
to_iter_value = tqdm.tqdm(grouper_it(threads*100, fo, threads))
693-
else:
694-
to_iter_value = grouper_it(threads*100, fo, threads)
695-
for ls_lines in to_iter_value:
696-
new_lines = pool.starmap(processManyLineOfVCF, [(lines, individual_col, chromosome_only, filter_PASS, info_field, info_field_thres, CHROMOSOMES, file_vcf) for lines in ls_lines])
697-
# Put results into the write queue
698-
for result_chunk in new_lines:
699-
if result_chunk: # Only queue if non-empty
700-
write_queue.put(result_chunk)
701-
# fout.write(''.join(new_lines))
702-
pool.close()
703-
pool.join()
704-
705-
print('finished converting vcf file:', file_vcf)
706-
finally:
707-
# --- Signal writer thread to finish ---
708-
write_queue.put(None) # Send sentinel
709-
print("Waiting for writer thread to finish...")
710-
write_queue.join() # Wait for queue to be empty
711-
writer_thread.join() # Wait for thread to terminate
712-
fout.close() # Close the output file
713-
print("Writer thread finished. Output file closed.")
714-
715-
716-
# fout.close()
740+
to_iter = fo
741+
for line in to_iter:
742+
new_line = processOneLineOfVCF(line, individual_col, chromosome_only, filter_PASS, info_field, info_field_thres, CHROMOSOMES, file_vcf)
743+
if new_line:
744+
fout.write(new_line)
745+
print('finish converting vcf file:', file_vcf)
746+
fo.close()
747+
748+
return column_for_samples
749+
750+
# use multiple threads
751+
print('use multiple threads to convert the vcf files by splitting the vcf files into smaller files')
752+
folder_temp = file_output + '_temp'
753+
folder_temp_split = os.path.join(folder_temp, 'vcf_split')
754+
755+
if not os.path.exists(folder_temp_split):
756+
os.makedirs(folder_temp_split)
757+
pool = Pool(threads)
758+
params = []
759+
for file_vcf in dc_header_string_and_other_info:
760+
params.append([file_vcf, os.path.join(folder_temp_split,os.path.basename(file_vcf)), 10000])
761+
files_vcf_split = pool.starmap(split_large_vcf, params, chunksize=1)
762+
pool.close()
763+
pool.join()
764+
print('vcf files were splitted to {} files'.format(len([i for j in files_vcf_split for i in j])))
765+
766+
pool = Pool(threads)
767+
params = []
768+
for file_vcf, files_vcf_split_batch in zip(dc_header_string_and_other_info.keys(), files_vcf_split):
769+
header_string, individual_col, fo = dc_header_string_and_other_info[file_vcf]
770+
for file_vcf_split in files_vcf_split_batch:
771+
file_vcf_split_processed = os.path.join(folder_temp, os.path.basename(file_vcf_split))+'processed.tsv'
772+
params.append([file_vcf_split, file_vcf_split_processed,header_string, individual_col, filter_PASS,chromosome_only,info_field,info_field_thres])
773+
774+
imap_files_vcf_split_processed = pool.imap(convertVCF2MutationComplexHelper_wrapper, params, chunksize=1)
775+
if tqdm:
776+
files_vcf_split_processed = list(tqdm.tqdm(imap_files_vcf_split_processed, total=len(params), desc=f"vcf converting to tsv"))
777+
else:
778+
files_vcf_split_processed = list(imap_files_vcf_split_processed)
779+
# files_vcf_split_processed = pool.starmap(convertVCF2MutationComplexHelper, params, chunksize=1)
780+
pool.close()
781+
pool.join()
782+
783+
shutil.rmtree(folder_temp_split)
784+
for file_vcf_split_processed in files_vcf_split_processed:
785+
fo = open(file_vcf_split_processed,'r')
786+
fo.readline()
787+
fout.write(fo.read())
788+
789+
shutil.rmtree(folder_temp)
790+
fout.close()
791+
open(file_output_done, 'w').write('\n'.join(column_for_samples))
717792
return column_for_samples
718793

719794
description = '''convert extract mutation information from vcf file

0 commit comments

Comments
 (0)