diff --git a/docs/how_varvamp_works.md b/docs/how_varvamp_works.md index 1547410..0962583 100644 --- a/docs/how_varvamp_works.md +++ b/docs/how_varvamp_works.md @@ -56,6 +56,24 @@ To search for the best amplicon, varVAMP uses a graph based approach. 7. Test amplicons for their [minimal free energy](https://en.wikipedia.org/wiki/Gibbs_free_energy) at their lowest primer temperature with [`seqfold`](https://github.com/Lattice-Automation/seqfold) and filter to avoid secondary structures. Amplicons with large potential deletions (>QAMPLICON_DEL_CUTOFF) will be ignored. Smaller deletions will be accepted. 8. Take the best qPCR schemes of overlapping schemes. + +#### Multi-processing +varVAMP can use multiple cores at some steps in the workflow. If you have performance issues +at the following steps it might be worth increasing the number of cores. + +1. All workflows: +- BLAST search: Each amplicon is searched for off-targets in the BLAST database. +- Primer dimer search against a user-provided list of primers. +- Primer search: Each kmer is evaluated as a potential primer. + +2. Tiled workflow: +- Final primer dimer solve. + +3. qPCR workflow: +- Probe search: Each kmer is evaluated as a potential probe. +- Amplicon search: Each potential amplicon is checked for its parameters. +- Amplicon deltaG calculation: Each amplicon is checked for potential secondary structures. + #### Penalty calculation ```python diff --git a/docs/output.md b/docs/output.md index 571204b..b310ecc 100644 --- a/docs/output.md +++ b/docs/output.md @@ -11,7 +11,7 @@ varVAMP produces multiple main output files: | ALL | per_base_mismatches.pdf | Barplot of the percent mismatches at each nucleotide position of the primer. | | ALL | primers.bed | A bed file with the primer locations. Includes the primer penalty. The lower, the better. | | ALL | varvamp_log.txt | Log file. | -| TILED | unsolvable_primer_dimers.tsv | Only produced if there are primer dimers without replacements. Tells which primers form dimers and at which temperature. | +| TILED | unsolvable_primer_dimers.txt | Only produced if there are primer dimers without replacements. Tells which primers form dimers and at which temperature. | | TILED | primers_pool_0/1.fasta | Primer sequences per pool in fasta format. | | SINGLE | primers.fasta | Primer sequences in fasta format. | | TILED/SINGLE | primer_to_amplicon_assignments.tabular | Simple tab separated file, which primers belong together. Useful for bioinformatic workflows that include primer trimming | diff --git a/varvamp/command.py b/varvamp/command.py index cc47fdb..6039b45 100644 --- a/varvamp/command.py +++ b/varvamp/command.py @@ -46,7 +46,7 @@ def get_args(sysargs): QPCR_parser = mode_parser.add_parser( "qpcr", help="design qPCR primers", - usage="varvamp qpcr [optional arguments] " + usage="varvamp qpcr -t [optional arguments] " ) parser.add_argument( "input", @@ -73,7 +73,7 @@ def get_args(sysargs): par.add_argument( "-th", "--threads", - help="number of threads for BLAST search and deltaG calculations", + help="number of threads", metavar="1", type=int, default=1 @@ -85,6 +85,14 @@ def get_args(sysargs): type=str, default="varVAMP" ) + par.add_argument( + "--compatible-primers", + metavar="None", + type=str, + default=None, + help="FASTA primer file with which new primers should not form dimers. Sequences >40 nt are ignored. Can significantly increase runtime." + ) + for par in (SINGLE_parser, TILED_parser): par.add_argument( "-t", @@ -138,9 +146,7 @@ def get_args(sysargs): "-t", "--threshold", required=True, - metavar="0.9", type=float, - default=0.9, help="consensus threshold (0-1) - higher values result in higher specificity at the expense of found primers" ) QPCR_parser.add_argument( @@ -184,9 +190,18 @@ def shared_workflow(args, log_file): """ # start varvamp logging.varvamp_progress(log_file, mode=args.mode) - # read in alignment and preprocess preprocessed_alignment = alignment.preprocess(args.input[0]) + # read in external primer sequences with which new primers should not form dimers + if args.compatible_primers is not None: + compatible_primers = primers.parse_primer_fasta(args.compatible_primers) + if not compatible_primers: + logging.raise_error( + "no valid primers found in the provided primer file.\n", + log_file, + ) + else: + compatible_primers = None # check alignment length and number of gaps and report if its significantly more/less than expected logging.check_alignment_length(preprocessed_alignment, log_file) logging.check_gaped_sequences(preprocessed_alignment, log_file) @@ -276,7 +291,8 @@ def shared_workflow(args, log_file): left_primer_candidates, right_primer_candidates = primers.find_primers( kmers, ambiguous_consensus, - alignment_cleaned + alignment_cleaned, + args.threads ) for primer_type, primer_candidates in [("+", left_primer_candidates), ("-", right_primer_candidates)]: if not primer_candidates: @@ -292,7 +308,22 @@ def shared_workflow(args, log_file): progress_text=f"{len(left_primer_candidates)} fw and {len(right_primer_candidates)} rv potential primers" ) - return alignment_cleaned, majority_consensus, ambiguous_consensus, primer_regions, left_primer_candidates, right_primer_candidates, potential_primer_regions + # filter primers against user-provided list of compatible primers, can use multi-processing + if compatible_primers: + left_primer_candidates = primers.filter_non_dimer_candidates( + left_primer_candidates, compatible_primers, args.threads + ) + right_primer_candidates = primers.filter_non_dimer_candidates( + right_primer_candidates, compatible_primers, args.threads + ) + logging.varvamp_progress( + log_file, + progress=0.65, + job="Filtering primers against provided primers.", + progress_text=f"{len(left_primer_candidates)} fw and {len(right_primer_candidates)} rv primers after filtering" + ) + + return alignment_cleaned, majority_consensus, ambiguous_consensus, primer_regions, left_primer_candidates, right_primer_candidates, potential_primer_regions, compatible_primers def single_and_tiled_shared_workflow(args, left_primer_candidates, right_primer_candidates, potential_primer_regions, data_dir, log_file): @@ -323,8 +354,7 @@ def single_and_tiled_shared_workflow(args, left_primer_candidates, right_primer_ ) if not amplicons: logging.raise_error( - "no amplicons found. Increase the max amplicon length or \ - number of ambiguous bases or lower threshold!\n", + "no amplicons found. Increase the max amplicon length or number of ambiguous bases or lower threshold!\n", log_file, exit=True ) @@ -369,7 +399,7 @@ def single_workflow(args, amplicons, log_file): return amplicon_scheme -def tiled_workflow(args, amplicons, left_primer_candidates, right_primer_candidates, all_primers, ambiguous_consensus, log_file, results_dir): +def tiled_workflow(args, amplicons, left_primer_candidates, right_primer_candidates, all_primers, ambiguous_consensus, log_file): """ part of the workflow specific for the tiled mode """ @@ -383,21 +413,9 @@ def tiled_workflow(args, amplicons, left_primer_candidates, right_primer_candida amplicon_graph ) - # check for dimers - dimers_not_solved = scheme.check_and_solve_heterodimers( - amplicon_scheme, - left_primer_candidates, - right_primer_candidates, - all_primers) - if dimers_not_solved: - logging.raise_error( - f"varVAMP found {len(dimers_not_solved)} primer dimers without replacements. Check the dimer file and perform the PCR for incomaptible amplicons in a sperate reaction.", - log_file - ) - reporting.write_dimers(results_dir, dimers_not_solved) - # evaluate coverage - # ATTENTION: Genome coverage of the scheme might still change slightly through resolution of primer dimers, but this potential, minor inaccuracy is currently accepted. + # ATTENTION: Genome coverage of the scheme might still change slightly through resolution of primer dimers, + # but this potential, minor inaccuracy is currently accepted. percent_coverage = round(coverage/len(ambiguous_consensus)*100, 2) logging.varvamp_progress( log_file, @@ -414,10 +432,36 @@ def tiled_workflow(args, amplicons, left_primer_candidates, right_primer_candida "\t - relax primer settings (not recommended)\n", log_file ) - return amplicon_scheme + + # check for dimers + dimers_not_solved, n_initial_dimers = scheme.check_and_solve_heterodimers( + amplicon_scheme, + left_primer_candidates, + right_primer_candidates, + all_primers, + args.threads + ) + + # report dimers solve + if n_initial_dimers > 0 and not dimers_not_solved: + logging.varvamp_progress( + log_file, + progress=0.95, + job="Trying to solve primer dimers.", + progress_text=f"all dimers (n={n_initial_dimers}) could be resolved" + ) + elif dimers_not_solved: + logging.varvamp_progress( + log_file, + progress=0.95, + job="Trying to solve primer dimers.", + progress_text=f"{len(dimers_not_solved)}/{n_initial_dimers} dimers could not be resolved" + ) + + return amplicon_scheme, dimers_not_solved -def qpcr_workflow(args, data_dir, alignment_cleaned, ambiguous_consensus, majority_consensus, left_primer_candidates, right_primer_candidates, log_file): +def qpcr_workflow(args, data_dir, alignment_cleaned, ambiguous_consensus, majority_consensus, left_primer_candidates, right_primer_candidates, compatible_primers, log_file): """ part of the workflow specific for the tiled mode """ @@ -428,7 +472,7 @@ def qpcr_workflow(args, data_dir, alignment_cleaned, ambiguous_consensus, majori ) if not probe_regions: logging.raise_error( - "no regions that fullfill probe criteria! lower threshold or increase number of ambiguous chars in probe\n", + "no regions that fulfill probe criteria! lower threshold or increase number of ambiguous chars in probe\n", log_file, exit=True ) @@ -440,7 +484,7 @@ def qpcr_workflow(args, data_dir, alignment_cleaned, ambiguous_consensus, majori config.QPROBE_SIZES ) # find potential probes - qpcr_probes = qpcr.get_qpcr_probes(probe_kmers, ambiguous_consensus, alignment_cleaned) + qpcr_probes = qpcr.get_qpcr_probes(probe_kmers, ambiguous_consensus, alignment_cleaned, args.threads) if not qpcr_probes: logging.raise_error( "no qpcr probes found\n", @@ -454,8 +498,21 @@ def qpcr_workflow(args, data_dir, alignment_cleaned, ambiguous_consensus, majori progress_text=f"{len(qpcr_probes)} potential qPCR probes" ) + # filter primers against non-dimer sequences if provided + if compatible_primers: + qpcr_probes = primers.filter_non_dimer_candidates( + qpcr_probes, compatible_primers, args.threads) + logging.varvamp_progress( + log_file, + progress=0.75, + job="Filtering probes against provided primers.", + progress_text=f"{len(qpcr_probes)} potential qPCR probes after filtering" + ) + # find unique amplicons with a low penalty and an internal probe - qpcr_scheme_candidates = qpcr.find_qcr_schemes(qpcr_probes, left_primer_candidates, right_primer_candidates, majority_consensus, ambiguous_consensus) + qpcr_scheme_candidates = qpcr.find_qcr_schemes( + qpcr_probes, left_primer_candidates, right_primer_candidates, majority_consensus, ambiguous_consensus, args.threads + ) if not qpcr_scheme_candidates: logging.raise_error( "no qPCR scheme candidates found. lower threshold or increase number of ambiguous chars in primer and/or probe\n", @@ -516,7 +573,7 @@ def main(): blast.check_BLAST_installation(log_file) # mode unspecific part of the workflow - alignment_cleaned, majority_consensus, ambiguous_consensus, primer_regions, left_primer_candidates, right_primer_candidates, potential_primer_regions = shared_workflow(args, log_file) + alignment_cleaned, majority_consensus, ambiguous_consensus, primer_regions, left_primer_candidates, right_primer_candidates, potential_primer_regions, compatible_primers = shared_workflow(args, log_file) # write files that are shared in all modes reporting.write_regions_to_bed(primer_regions, args.name, data_dir) @@ -538,6 +595,7 @@ def main(): # SINGLE/TILED mode if args.mode == "tiled" or args.mode == "single": + dimers_not_solved = None all_primers, amplicons = single_and_tiled_shared_workflow( args, left_primer_candidates, @@ -553,7 +611,7 @@ def main(): log_file ) elif args.mode == "tiled": - amplicon_scheme = tiled_workflow( + amplicon_scheme, dimers_not_solved = tiled_workflow( args, amplicons, left_primer_candidates, @@ -561,7 +619,6 @@ def main(): all_primers, ambiguous_consensus, log_file, - results_dir ) # write files @@ -578,7 +635,8 @@ def main(): ambiguous_consensus, args.name, args.mode, - log_file + log_file, + dimers_not_solved ) reporting.varvamp_plot( results_dir, @@ -600,6 +658,7 @@ def main(): majority_consensus, left_primer_candidates, right_primer_candidates, + compatible_primers, log_file ) diff --git a/varvamp/scripts/default_config.py b/varvamp/scripts/default_config.py index 6c26967..4d867b0 100644 --- a/varvamp/scripts/default_config.py +++ b/varvamp/scripts/default_config.py @@ -9,7 +9,7 @@ 'PCR_DNA_CONC', 'PCR_DNTP_CONC', 'PCR_DV_CONC', 'PCR_MV_CONC', 'PRIMER_3_PENALTY', 'PRIMER_GC_END', 'PRIMER_GC_PENALTY', 'PRIMER_GC_RANGE', 'PRIMER_HAIRPIN', 'PRIMER_MAX_BASE_PENALTY', - 'PRIMER_MAX_DIMER_TMP', 'PRIMER_MAX_DINUC_REPEATS', 'PRIMER_MAX_POLYX', + 'PRIMER_MAX_DIMER_TMP', 'PRIMER_MAX_DIMER_DELTAG', 'PRIMER_MAX_DINUC_REPEATS', 'PRIMER_MAX_POLYX', 'PRIMER_MIN_3_WITHOUT_AMB', 'PRIMER_PERMUTATION_PENALTY', 'PRIMER_SIZES', 'PRIMER_SIZE_PENALTY', 'PRIMER_TMP', 'PRIMER_TM_PENALTY', @@ -30,7 +30,10 @@ PRIMER_HAIRPIN = 47 # max melting temp for secondary structure PRIMER_GC_END = (1, 3) # min/max GCs in the last 5 bases of the 3' end PRIMER_MIN_3_WITHOUT_AMB = 3 # min len of 3' without ambiguous charaters -PRIMER_MAX_DIMER_TMP = 47 # max melting temp for dimers (homo- or heterodimers) +# primer dimer constraints +PRIMER_MAX_DIMER_TMP = 35 # max melting temp for dimers, lower temperature means more stringent filtering +PRIMER_MAX_DIMER_DELTAG = -9000 # max allowed gibbs free energy for dimer formation, higher values mean more stringent filtering +END_OVERLAP = 5 # maximum allowed nt overlap between ends of primers to be considered a dimer # QPCR parameters # basic probe parameters @@ -42,7 +45,6 @@ QPRIMER_DIFF = 2 # maximal temperature diff of qPCR primers QPROBE_TEMP_DIFF = (5, 10) # min/max temp diff between probe and primers QPROBE_DISTANCE = (4, 15) # min/max distance to the primer on the same strand -END_OVERLAP = 5 # maximum allowed nt overlap between the ends of probe and primer QAMPLICON_LENGTH = (70, 200) # min/max length of the qPCR amplicon QAMPLICON_GC = (40, 60) # GC min/max of the qPCR amplicon QAMPLICON_DEL_CUTOFF = 4 # consider regions of the alignment for deltaG calculation if they have smaller deletions than cutoff diff --git a/varvamp/scripts/logging.py b/varvamp/scripts/logging.py index 0c98460..3006cbb 100644 --- a/varvamp/scripts/logging.py +++ b/varvamp/scripts/logging.py @@ -107,13 +107,12 @@ def raise_arg_errors(args, log_file): "degeneracy. Consider reducing.", log_file ) - if args.database is not None: - if args.threads < 1: - raise_error( - "number of threads cannot be smaller than 1.", - log_file, - exit=True - ) + if args.threads < 1: + raise_error( + "number of threads cannot be smaller than 1.", + log_file, + exit=True + ) if args.mode in ("tiled", "single"): if args.opt_length > args.max_length: raise_error( @@ -281,7 +280,7 @@ def confirm_config(args, log_file): # check if all variables exists all_vars = [ - # arg independent TILED, SINGLE mode + # arg independent all modes ( "PRIMER_TMP", "PRIMER_GC_RANGE", @@ -291,6 +290,7 @@ def confirm_config(args, log_file): "PRIMER_MAX_DINUC_REPEATS", "PRIMER_GC_END", "PRIMER_MAX_DIMER_TMP", + "PRIMER_MAX_DIMER_DELTAG", "PRIMER_MIN_3_WITHOUT_AMB", "PCR_MV_CONC", "PCR_DV_CONC", @@ -301,7 +301,8 @@ def confirm_config(args, log_file): "PRIMER_SIZE_PENALTY", "PRIMER_MAX_BASE_PENALTY", "PRIMER_3_PENALTY", - "PRIMER_PERMUTATION_PENALTY" + "PRIMER_PERMUTATION_PENALTY", + "END_OVERLAP" ), # arg independent QPCR mode ( @@ -312,7 +313,6 @@ def confirm_config(args, log_file): "QPRIMER_DIFF", "QPROBE_TEMP_DIFF", "QPROBE_DISTANCE", - "END_OVERLAP", "QAMPLICON_LENGTH", "QAMPLICON_GC", "QAMPLICON_DEL_CUTOFF" @@ -408,7 +408,7 @@ def confirm_config(args, log_file): ("max base penalty", config.PRIMER_MAX_BASE_PENALTY), ("primer permutation penalty", config.PRIMER_PERMUTATION_PENALTY), ("qpcr flanking primer difference", config.QPRIMER_DIFF), - ("probe and primer end overlap", config.END_OVERLAP), + ("end overlap", config.END_OVERLAP), ("qpcr deletion size still considered for deltaG calculation", config.QAMPLICON_DEL_CUTOFF), ("maximum difference between primer and blast db", config.BLAST_MAX_DIFF), ("multiplier of the maximum length for non-specific amplicons", config.BLAST_SIZE_MULTI), @@ -446,15 +446,20 @@ def confirm_config(args, log_file): ) if config.PRIMER_HAIRPIN < 0: raise_error( - "decreasing hairpin melting temp to negative values " - "will influence successful primer search!", + "decreasing hairpin melting temp to negative values will influence successful primer search!", + log_file + ) + if config.PRIMER_MAX_DIMER_TMP < 21: + raise_error( + "there is no need to set max dimer melting temp below room temperature.", log_file ) - if config.PRIMER_MAX_DIMER_TMP < 0: + if config.PRIMER_MAX_DIMER_DELTAG > -6000: raise_error( - "there is no need to set max dimer melting temp below 0.", + "primer interactions with deltaG values higher than -6000 are generally considered unproblematic.", log_file ) + if config.PRIMER_MAX_BASE_PENALTY < 8: raise_error( "decreasing the base penalty will filter out more primers.", @@ -595,7 +600,6 @@ def goodbye_message(): messages = [ "Thank you. Come again.", ">Placeholder for your advertisement<", - "Make primers great again!", "Ciao cacao!", "And now lets pray to the PCR gods.", "**bibobibobop** task finished", @@ -611,6 +615,19 @@ def goodbye_message(): "Task failed successfully.", "Never gonna give you up, never gonna let you down.", "Have you tried turning it off and on again?", + "Just try it. PCR is magic!", + "One goat was sacrificed for this primer design to work.", + "You seem trustworthy. Here's a cookie!", + "Owwww yeah, primers done!", + "These primers were designed without AI assistance.", + "Research is fun (if you ignore the pipetting).", + "Balance your primers, balance your life.", + "Keep calm and PCR on.", + "In silico we trust.", + "May the primers be with you.", + "Designing primers like a boss!", + "Primer design completed. Time for a break!", + "Eureka! Your primers are ready.", "Look, I am your primer scheme.", "Quod erat demonstrandum.", "Miau?", diff --git a/varvamp/scripts/primers.py b/varvamp/scripts/primers.py index c46e6b1..fbdac56 100644 --- a/varvamp/scripts/primers.py +++ b/varvamp/scripts/primers.py @@ -2,8 +2,15 @@ primer creation and evaluation """ +# BUILTIN +import itertools +import re +import multiprocessing +import functools + # LIBS -from Bio.Seq import Seq +from Bio.Seq import MutableSeq +from Bio import SeqIO import primer3 as p3 # varVAMP @@ -59,6 +66,50 @@ def calc_dimer(seq1, seq2, structure=False): ) +def has_end_overlap(dimer_result): + """ + checks if two oligos overlap at their ends + Example: + xxxxxxxxtagc------- + --------atcgxxxxxxx + """ + if dimer_result.structure_found: + # clean structure + structure = [x[4:] for x in dimer_result.ascii_structure_lines] + # check if we have an overlap that is large enough + overlap = len(structure[1].replace(" ", "")) + if overlap <= config.END_OVERLAP: + return False + # not more than one conseq. internal mismatch + if ' ' in structure[1].lstrip(" "): + return False + # The alignment length of the ACII structure is equal to the first part of the structure + # and the maximum possible alignment length is the cumulative length of both primers (-> no overlap at all) + alignment_length = len(structure[0]) + maximum_alignment_length = len(re.findall("[ATCG]", "".join(structure))) + # this means that for a perfect end overlap the alignment length is equal to: + # len(primer1) + len(primer2) - overlap. + if alignment_length == maximum_alignment_length - overlap: + return True + + return False + + +def is_dimer(seq1, seq2): + """ + check if two sequences dimerize above threshold or are overlapping at their ends + """ + dimer_result = calc_dimer(seq1, seq2, structure=True) + # check both the temperature and the deltaG + if dimer_result.tm > config.PRIMER_MAX_DIMER_TMP or dimer_result.dg < config.PRIMER_MAX_DIMER_DELTAG: + return True + # check for perfect end overlaps (this can result in primer extensions even though the tm/dg are okay) + if has_end_overlap(dimer_result): + return True + + return False + + def calc_max_polyx(seq): """ calculate maximum polyx of a seq @@ -125,7 +176,7 @@ def rev_complement(seq): """ reverse complement a sequence """ - return str(Seq(seq).reverse_complement()) + return str(MutableSeq(seq).reverse_complement(inplace=True)) def calc_permutation_penalty(amb_seq): @@ -261,13 +312,14 @@ def filter_kmer_direction_independent(seq, primer_temps=config.PRIMER_TMP, gc_ra filter kmer for temperature, gc content, poly x, dinucleotide repeats and homodimerization """ + return( (primer_temps[0] <= calc_temp(seq) <= primer_temps[1]) and (gc_range[0] <= calc_gc(seq) <= gc_range[1]) and (calc_max_polyx(seq) <= config.PRIMER_MAX_POLYX) and (calc_max_dinuc_repeats(seq) <= config.PRIMER_MAX_DINUC_REPEATS) and (calc_base_penalty(seq, primer_temps, gc_range, primer_sizes) <= config.PRIMER_MAX_BASE_PENALTY) - and (calc_dimer(seq, seq).tm <= config.PRIMER_MAX_DIMER_TMP) + and not is_dimer(seq, seq) ) @@ -291,51 +343,66 @@ def filter_kmer_direction_dependend(direction, kmer, ambiguous_consensus): ) -def find_primers(kmers, ambiguous_consensus, alignment): +def _process_kmer_batch(ambiguous_consensus, alignment, kmers): """ - filter kmers direction specific and append penalties - --> potential primers + Helper function for multiprocessing: process a batch of kmers. + Returns (left_primers, right_primers) tuples. """ - left_primer_candidates = [] - right_primer_candidates = [] + left_primers = [] + right_primers = [] for kmer in kmers: - # filter kmers based on their direction independend stats if not filter_kmer_direction_independent(kmer[0]): continue - # calc base penalty - base_penalty = calc_base_penalty(kmer[0],config.PRIMER_TMP, config.PRIMER_GC_RANGE, config.PRIMER_SIZES) - # calculate per base mismatches - per_base_mismatches = calc_per_base_mismatches( - kmer, - alignment, - ambiguous_consensus - ) - # calculate permutation penealty - permutation_penalty = calc_permutation_penalty( - ambiguous_consensus[kmer[1]:kmer[2]] - ) - # now check direction specific + # calc penalties + base_penalty = calc_base_penalty(kmer[0], config.PRIMER_TMP, config.PRIMER_GC_RANGE, config.PRIMER_SIZES) + per_base_mismatches = calc_per_base_mismatches(kmer, alignment, ambiguous_consensus) + permutation_penalty = calc_permutation_penalty(ambiguous_consensus[kmer[1]:kmer[2]]) + # some filters depend on the direction of each primer for direction in ["+", "-"]: - # check if kmer passes direction filter if not filter_kmer_direction_dependend(direction, kmer, ambiguous_consensus): continue - # calculate the 3' penalty - three_prime_penalty = calc_3_prime_penalty( - direction, - per_base_mismatches - ) - # add all penalties + # calc penalties + three_prime_penalty = calc_3_prime_penalty(direction, per_base_mismatches) primer_penalty = base_penalty + permutation_penalty + three_prime_penalty - # sort into lists + # add to lists depending on their direction if direction == "+": - left_primer_candidates.append( - [kmer[0], kmer[1], kmer[2], primer_penalty, per_base_mismatches] - ) - if direction == "-": - right_primer_candidates.append( - [rev_complement(kmer[0]), kmer[1], kmer[2], primer_penalty, per_base_mismatches] - ) + left_primers.append([kmer[0], kmer[1], kmer[2], primer_penalty, per_base_mismatches]) + else: + right_primers.append([rev_complement(kmer[0]), kmer[1], kmer[2], primer_penalty, per_base_mismatches]) + + return left_primers, right_primers + + +def find_primers(kmers, ambiguous_consensus, alignment, num_processes): + """ + Filter kmers direction specific and append penalties --> potential primers. + Uses multiprocessing to process kmers in parallel. + """ + if not kmers: + return [], [] + + # Convert kmers set to list for slicing + kmers = list(kmers) + batch_size = max(1, int(len(kmers)/num_processes)) + + # Split kmers into batches + batches = [kmers[i:i + batch_size] for i in range(0, len(kmers), batch_size)] + callable_f = functools.partial( + _process_kmer_batch, + ambiguous_consensus, alignment + ) + + # Solve dimers in parallel + with multiprocessing.Pool(processes=num_processes) as pool: + results = pool.map(callable_f, batches) + + # Aggregate results + left_primer_candidates = [] + right_primer_candidates = [] + for left_primers, right_primers in results: + left_primer_candidates.extend(left_primers) + right_primer_candidates.extend(right_primers) return left_primer_candidates, right_primer_candidates @@ -350,7 +417,7 @@ def create_primer_dictionary(primer_candidates, direction): for primer in primer_candidates: if direction == "+": direction_name = "LEFT" - elif direction == "-": + else: direction_name = "RIGHT" primer_name = f"{direction_name}_{primer_idx}" primer_dict[primer_name] = primer @@ -412,3 +479,77 @@ def find_best_primers(left_primer_candidates, right_primer_candidates, high_cons # and create a dict return all_primers + + +def get_permutations(seq): + """ + get all permutations of an ambiguous sequence. + """ + splits = [config.AMBIG_NUCS.get(nuc, [nuc]) for nuc in seq] + + return[''.join(p) for p in itertools.product(*splits)] + + +def parse_primer_fasta(fasta_path): + """ + Parse a primer FASTA file and return a list of sequences using BioPython. + """ + + sequences = [] + + for record in SeqIO.parse(fasta_path, "fasta"): + seq = str(record.seq).lower() + # Only include primers up to 40 nucleotides + if len(seq) <= 40: + sequences += get_permutations(seq) + + return list(set(sequences)) # deduplication + + +def check_primer_against_externals(external_sequences, primer): + """ + Worker function to check a single primer against all external sequences. + Returns the primer if it passes, None otherwise. + Handles both list format and dict format (name, data) tuples. + """ + + # Extract sequence based on input format + if isinstance(primer, tuple): + name, data = primer + seq = data[0] + else: + seq = primer[0] + + for ext_seq in external_sequences: + if is_dimer(seq, ext_seq): + return None + + return primer + + +def filter_non_dimer_candidates(primer_candidates, external_sequences, n_processes): + """ + Filter out primer candidates that form dimers with external sequences. + Uses multiprocessing to speed up checks. + """ + is_dict = isinstance(primer_candidates, dict) + + callable_f = functools.partial( + check_primer_against_externals, + external_sequences + ) + + with multiprocessing.Pool(processes=n_processes) as pool: + # Prepare arguments based on input type + # qpcr probes are stored in dictionaries --> result in tuples when unpacked + if is_dict: + results = pool.map(callable_f, primer_candidates.items()) + else: + results = pool.map(callable_f, primer_candidates) + + # Filter and restore original format + if is_dict: + filtered_results = [result for result in results if result is not None] + return {name: data for name, data in filtered_results} + else: + return [primer for primer in results if primer is not None] diff --git a/varvamp/scripts/qpcr.py b/varvamp/scripts/qpcr.py index f2d89e9..17b085f 100644 --- a/varvamp/scripts/qpcr.py +++ b/varvamp/scripts/qpcr.py @@ -7,11 +7,11 @@ import seqfold import itertools import multiprocessing +import functools # varVAMP from varvamp.scripts import config from varvamp.scripts import primers -from varvamp.scripts import reporting def choose_probe_direction(seq): @@ -51,35 +51,25 @@ def filter_probe_direction_dependent(seq): ) -def get_qpcr_probes(kmers, ambiguous_consensus, alignment_cleaned): +def _process_kmer_batch_probes(ambiguous_consensus, alignment_cleaned, kmers): """ - find potential qPCR probes + Helper function for multiprocessing: process a batch of kmers for probes. + Returns probe_candidates dictionary. """ probe_candidates = {} probe_idx = 0 for kmer in kmers: - # filter probe for base params - if not primers.filter_kmer_direction_independent(kmer[0], config.QPROBE_TMP, config.QPROBE_GC_RANGE, - config.QPROBE_SIZES): + if not primers.filter_kmer_direction_independent(kmer[0], config.QPROBE_TMP, config.QPROBE_GC_RANGE, config.QPROBE_SIZES): continue - # do not allow ambiguous chars at both ends if ambiguous_ends(ambiguous_consensus[kmer[1]:kmer[2]]): continue - # calc penalties analogous to primer search - base_penalty = primers.calc_base_penalty(kmer[0], config.QPROBE_TMP, config.QPROBE_GC_RANGE, - config.QPROBE_SIZES) - per_base_mismatches = primers.calc_per_base_mismatches( - kmer, - alignment_cleaned, - ambiguous_consensus - ) - permutation_penalty = primers.calc_permutation_penalty( - ambiguous_consensus[kmer[1]:kmer[2]] - ) - # determine the direction with more cytosine or set both if 50 % + + base_penalty = primers.calc_base_penalty(kmer[0], config.QPROBE_TMP, config.QPROBE_GC_RANGE, config.QPROBE_SIZES) + per_base_mismatches = primers.calc_per_base_mismatches(kmer, alignment_cleaned, ambiguous_consensus) + permutation_penalty = primers.calc_permutation_penalty(ambiguous_consensus[kmer[1]:kmer[2]]) direction = choose_probe_direction(kmer[0]) - # create probe dictionary + if "+" in direction: if filter_probe_direction_dependent(kmer[0]): probe_name = f"PROBE_{probe_idx}_LEFT" @@ -96,7 +86,44 @@ def get_qpcr_probes(kmers, ambiguous_consensus, alignment_cleaned): base_penalty + permutation_penalty + three_prime_penalty, per_base_mismatches, direction] probe_idx += 1 - # sort by penalty + + return probe_candidates + + +def get_qpcr_probes(kmers, ambiguous_consensus, alignment_cleaned, num_processes): + """ + Find potential qPCR probes using multiprocessing. + """ + + # Convert kmers set to list for batching + kmers = list(kmers) + + # Split kmers into batches + batch_size = max(1, int(len(kmers) / num_processes)) + batches = [kmers[i:i + batch_size] for i in range(0, len(kmers), batch_size)] + + # Prepare arguments for each dimer + callable_f = functools.partial( + _process_kmer_batch_probes, + ambiguous_consensus, alignment_cleaned + ) + with multiprocessing.Pool(processes=num_processes) as pool: + results = pool.map(callable_f, batches) + + # Aggregate results and re-index probe names + probe_candidates = {} + probe_idx = 0 + for batch_probes in results: + if batch_probes is None: + continue + for probe_name, probe_data in batch_probes.items(): + # Extract direction from original probe name + direction = "LEFT" if "LEFT" in probe_name else "RIGHT" + new_probe_name = f"PROBE_{probe_idx}_{direction}" + probe_candidates[new_probe_name] = probe_data + probe_idx += 1 + + # Sort by penalty probe_candidates = dict(sorted(probe_candidates.items(), key=lambda x: x[1][3])) return probe_candidates @@ -139,54 +166,30 @@ def hardfilter_amplicon(majority_consensus, left_primer, right_primer): ) -def check_end_overlap(dimer_result): - """ - checks if two oligos overlap at their ends (pretty rare) - Example: - xxxxxxxxtagc------- - --------atcgxxxxxxx - """ - if dimer_result.structure_found: - # clean structure - structure = [x[4:] for x in dimer_result.ascii_structure_lines] - # calc overlap and the cumulative len of the oligos - overlap = len(structure[1].replace(" ", "")) - nt_count = len(re.findall("[ATCG]", "".join(structure))) - # check for overlaps at the ends and the min overlap (allows for some amount of mismatches) - if overlap > config.END_OVERLAP and nt_count <= len(structure[0]) + overlap + 1 and " " not in structure[1].lstrip(" "): - return True - - return False - - -def forms_dimer_or_overhangs(right_primer, left_primer, probe, ambiguous_consensus): +def dimer_in_combinations(right_primer, left_primer, probe, ambiguous_consensus): """ - checks if combinations of primers/probe form dimers or overhangs + checks if primers cause dimers and if combinations of primers/probe including all permutations form dimers """ forms_structure = False # first check if there are dimers between the two flanking primers - if primers.calc_dimer(left_primer[0], right_primer[0]).tm > config.PRIMER_MAX_DIMER_TMP: + if primers.is_dimer(left_primer[0], right_primer[0]): return True # for the probe check all permutations and possible overhangs to ensure # that none of the primers could cause unspecific probe binding. # first get all permutations - probe_per = reporting.get_permutations(ambiguous_consensus[probe[1]:probe[2]]) - left_per = reporting.get_permutations(ambiguous_consensus[left_primer[1]:left_primer[2]]) - right_per = reporting.get_permutations(ambiguous_consensus[right_primer[1]:right_primer[2]]) + probe_per = primers.get_permutations(ambiguous_consensus[probe[1]:probe[2]]) + left_per = primers.get_permutations(ambiguous_consensus[left_primer[1]:left_primer[2]]) + right_per = primers.get_permutations(ambiguous_consensus[right_primer[1]:right_primer[2]]) # then check all permutations for combination in [(probe_per, left_per), (probe_per, right_per)]: - for oligo1 in combination[0]: - for oligo2 in combination[1]: - dimer_result = primers.calc_dimer(oligo1, oligo2, structure=True) - if dimer_result.tm >= config.PRIMER_MAX_DIMER_TMP or check_end_overlap(dimer_result): - forms_structure = True - break - # break all loops because we found an unwanted structure in one of the permutations - # (either dimer formation or a too long overlap at the ends of the primer) - if forms_structure: + for oligo1, oligo2 in itertools.product(*combination): + if primers.is_dimer(oligo1, oligo2): + forms_structure = True break + # break also outer loop because we found an unwanted structure in one of the permutations + # (either dimer formation or a too long overlap at the ends of the primer) if forms_structure: break @@ -231,7 +234,7 @@ def assess_amplicons(left_subset, right_subset, qpcr_probes, probe, majority_con [config.QPROBE_TEMP_DIFF[0] <= probe_temp - x <= config.QPROBE_TEMP_DIFF[1] for x in primer_temps]): continue # .... all combination of oligos do not form dimers or overhangs. - if forms_dimer_or_overhangs(right_primer, left_primer, qpcr_probes[probe], ambiguous_consensus): + if dimer_in_combinations(right_primer, left_primer, qpcr_probes[probe], ambiguous_consensus): continue # append to list and break as this is the primer combi # with the lowest penalty (primers are sorted by penalty) @@ -245,54 +248,74 @@ def assess_amplicons(left_subset, right_subset, qpcr_probes, probe, majority_con return primer_combinations -def find_qcr_schemes(qpcr_probes, left_primer_candidates, right_primer_candidates, majority_consensus, - ambiguous_consensus): +def find_single_qpcr_scheme(left_primer_candidates, right_primer_candidates, qpcr_probes, + majority_consensus, ambiguous_consensus, probe): """ - this finds the final qPCR schemes. it slices for primers flanking a probe and - test all left/right combinations whether they are potential amplicons. as primers - are sorted by penalty, only the very first match is considered as this has the - lowest penalty. however, probes are overlapping and there is a high chance that - left and right primers are found multiple times. to consider only one primer-probe - combination the probes are also sorted by penalty. therefore, if a primer - combination has been found already the optimal probe was already selected and - there is no need to consider this primer probe combination. + Find a qPCR scheme for a single probe. """ + probe_name, probe_data = probe + + # Generate flanking subsets within the worker process + left_subset = flanking_primer_subset(left_primer_candidates, "+", probe_data) + right_subset = flanking_primer_subset(right_primer_candidates, "-", probe_data) + + if not left_subset or not right_subset: + return probe_name, None + + primer_combination = assess_amplicons( + left_subset, right_subset, qpcr_probes, probe_name, + majority_consensus, ambiguous_consensus + ) + + return probe_name, primer_combination + + +def find_qcr_schemes(qpcr_probes, left_primer_candidates, right_primer_candidates, + majority_consensus, ambiguous_consensus, num_processes): + """ + Find final qPCR schemes using multiprocessing to evaluate probes in parallel. + Probes are sorted by penalty, ensuring optimal probe selection. + """ qpcr_scheme_candidates = [] found_amplicons = [] amplicon_nr = -1 - for probe in qpcr_probes: - left_subset = flanking_primer_subset(left_primer_candidates, "+", qpcr_probes[probe]) - right_subset = flanking_primer_subset(right_primer_candidates, "-", qpcr_probes[probe]) - # consider if there are primers flanking the probe ... - if not left_subset or not right_subset: - continue - primer_combination = assess_amplicons(left_subset, right_subset, qpcr_probes, probe, majority_consensus, - ambiguous_consensus) - # ... a combi has been found, ... + # Prepare arguments for parallel processing - pass full primer lists + batch_size = max(1, int(len(qpcr_probes) / num_processes)) + callable_f = functools.partial( + find_single_qpcr_scheme, + left_primer_candidates, right_primer_candidates, qpcr_probes, majority_consensus, ambiguous_consensus + ) + + # Process probes in parallel + with multiprocessing.Pool(processes=num_processes) as pool: + results = pool.map(callable_f, qpcr_probes.items(), chunksize=batch_size) + + # Aggregate results in original probe order (sorted by penalty) + for probe_name, primer_combination in results: if not primer_combination: continue - # ...and this combi is not already present for a probe with a better penalty. if primer_combination in found_amplicons: continue - # populate the primer dictionary: + amplicon_nr += 1 found_amplicons.append(primer_combination) - qpcr_scheme_candidates.append( - { - "id": f"AMPLICON_{amplicon_nr}", - "penalty": qpcr_probes[probe][3] + primer_combination[0][3] + primer_combination[1][3], - "PROBE": qpcr_probes[probe], - "LEFT": primer_combination[0], - "RIGHT": primer_combination[1] - } - ) - # and again sort by total penalty (left + right + probe) + qpcr_scheme_candidates.append({ + "id": f"AMPLICON_{amplicon_nr}", + "penalty": qpcr_probes[probe_name][3] + primer_combination[0][3] + primer_combination[1][3], + "PROBE": qpcr_probes[probe_name], + "LEFT": primer_combination[0], + "RIGHT": primer_combination[1] + }) + + # Sort by total penalty + qpcr_scheme_candidates.sort(key=lambda x: x["penalty"]) + return qpcr_scheme_candidates -def process_single_amplicon_deltaG(amplicon, majority_consensus): +def process_single_amplicon_deltaG(majority_consensus, amplicon): """ Process a single amplicon to test its deltaG and apply filtering. This function will be called concurrently by multiple threads. @@ -310,7 +333,7 @@ def process_single_amplicon_deltaG(amplicon, majority_consensus): return amplicon -def test_amplicon_deltaG_parallel(qpcr_schemes_candidates, majority_consensus, n_to_test, deltaG_cutoff, n_threads): +def test_amplicon_deltaG_parallel(qpcr_schemes_candidates, majority_consensus, n_to_test, deltaG_cutoff, n_processes): """ Test all amplicon deltaGs for the top n hits at the lowest primer temperature and filters if they fall below the cutoff. Multiple processes are used @@ -318,32 +341,33 @@ def test_amplicon_deltaG_parallel(qpcr_schemes_candidates, majority_consensus, n """ final_amplicons = [] - # Create a pool of processes to handle the concurrent processing - with multiprocessing.Pool(processes=n_threads) as pool: - # Create a list of the first n amplicon tuples for processing - # The list is sorted first on whether offset targets were predicted for the amplicon, - # then by penalty. This ensures that amplicons with offset targets are always considered last - amplicons = itertools.islice( - sorted(qpcr_schemes_candidates, key=lambda x: (x.get("offset_targets", False), x["penalty"])), - n_to_test - ) - # process amplicons concurrently - results = pool.starmap(process_single_amplicon_deltaG, [(amp, majority_consensus) for amp in amplicons]) - # Process the results - retained_ranges = [] - for amp in results: - # check if the amplicon overlaps with an amplicon that was previously - # found and had a high enough deltaG - if amp["deltaG"] <= deltaG_cutoff: - continue - amp_range = range(amp["LEFT"][1], amp["RIGHT"][2]) - overlaps_retained = False - for r in retained_ranges: - if amp_range.start < r.stop and r.start < amp_range.stop: - overlaps_retained = True - break - if not overlaps_retained: - final_amplicons.append(amp) - retained_ranges.append(amp_range) + # Create a list of the first n amplicon tuples for processing + # The list is sorted first on whether offset targets were predicted for the amplicon, + # then by penalty. This ensures that amplicons with offset targets are always considered last + amplicons = list(sorted(qpcr_schemes_candidates, key=lambda x: (x.get("offset_targets", False), x["penalty"])))[:n_to_test] + # process amplicons concurrently + batch_size = max(1, int(n_to_test / n_processes)) + callable_f = functools.partial( + process_single_amplicon_deltaG, + majority_consensus + ) + with multiprocessing.Pool(processes=n_processes) as pool: + results = pool.map(callable_f, amplicons, chunksize=batch_size) + # Process the results + retained_ranges = [] + for amp in results: + # check if the amplicon overlaps with an amplicon that was previously + # found and had a high enough deltaG + if amp["deltaG"] <= deltaG_cutoff: + continue + amp_range = range(amp["LEFT"][1], amp["RIGHT"][2]) + overlaps_retained = False + for r in retained_ranges: + if amp_range.start < r.stop and r.start < amp_range.stop: + overlaps_retained = True + break + if not overlaps_retained: + final_amplicons.append(amp) + retained_ranges.append(amp_range) return final_amplicons diff --git a/varvamp/scripts/reporting.py b/varvamp/scripts/reporting.py index 296ab5b..bd645cc 100644 --- a/varvamp/scripts/reporting.py +++ b/varvamp/scripts/reporting.py @@ -4,7 +4,6 @@ # BUILT-INS import os import math -import itertools # LIBS import pandas as pd @@ -94,20 +93,6 @@ def write_all_primers(path, scheme_name, all_primers): write_primers_to_bed(outfile, scheme_name, primer, all_primers[direction][primer], round(all_primers[direction][primer][3], 2), direction) -def get_permutations(seq): - """ - get all permutations of an ambiguous sequence. needed to - correctly report the gc and the temperature. - """ - groups = itertools.groupby(seq, lambda char: char not in config.AMBIG_NUCS) - splits = [] - for b, group in groups: - if b: - splits.extend([[g] for g in group]) - else: - for nuc in group: - splits.append(config.AMBIG_NUCS[nuc]) - return[''.join(p) for p in itertools.product(*splits)] def calc_mean_stats(permutations): @@ -190,7 +175,7 @@ def write_qpcr_to_files(path, final_schemes, ambiguous_consensus, scheme_name, l else: direction = "+" - permutations = get_permutations(seq) + permutations = primers.get_permutations(seq) gc, temp = calc_mean_stats(permutations) primer_name = f"{amp_name}_{oligo_type}" @@ -224,7 +209,7 @@ def write_qpcr_to_files(path, final_schemes, ambiguous_consensus, scheme_name, l print(f">{primer_name}\n{seq.upper()}", file=fasta) -def write_scheme_to_files(path, amplicon_scheme, ambiguous_consensus, scheme_name, mode, log_file): +def write_scheme_to_files(path, amplicon_scheme, ambiguous_consensus, scheme_name, mode, log_file, primer_dimers=None): """ write all relevant bed files and a tsv file with all primer stats """ @@ -233,6 +218,9 @@ def write_scheme_to_files(path, amplicon_scheme, ambiguous_consensus, scheme_nam amplicon_bed_file = os.path.join(path, "amplicons.bed") tabular_file = os.path.join(path, "primer_to_amplicon_assignment.tabular") + # Map old primer names to new amplicon-based names + name_mapping = {} + # open files to write with open(tsv_file, "w") as tsv, open(amplicon_bed_file, "w") as bed, open(tabular_file, "w") as tabular: # write header for primer tsv @@ -248,11 +236,11 @@ def write_scheme_to_files(path, amplicon_scheme, ambiguous_consensus, scheme_nam if mode == "single": primer_fasta_file = os.path.join(path, "primers.fasta") else: - primer_fasta_file = os.path.join(path, f"primers_pool_{pool+1}.fasta") + primer_fasta_file = os.path.join(path, f"primers_pool_{pool + 1}.fasta") with open(primer_fasta_file, "w") as primer_fasta: for counter, amp in enumerate(amplicon_scheme[pool::len(pools)]): # give a new amplicon name - amplicon_index = counter*len(pools) + pool + amplicon_index = counter * len(pools) + pool amp_name = f"{scheme_name}_{amplicon_index}" # get left and right primers and their names amp_length = amp["RIGHT"][2] - amp["LEFT"][1] @@ -266,7 +254,7 @@ def write_scheme_to_files(path, amplicon_scheme, ambiguous_consensus, scheme_nam amplicon_has_off_target = "n.d." # write amplicon bed if mode == "tiled": - bed_score = pool+1 + bed_score = pool + 1 elif mode == "single": bed_score = round(amp["LEFT"][3] + amp["RIGHT"][3], 1) amplicon_bed_records.append( @@ -284,6 +272,10 @@ def write_scheme_to_files(path, amplicon_scheme, ambiguous_consensus, scheme_nam (f"{amp_name}_LEFT", f"{amp_name}_RIGHT") ) ) + # Build name mapping for dimers + name_mapping[amp["LEFT"][-1]] = f"{amp_name}_LEFT" + name_mapping[amp["RIGHT"][-1]] = f"{amp_name}_RIGHT" + # write primer tsv and primer bed for direction, primer in [("+", amp["LEFT"]), ("-", amp["RIGHT"])]: seq = ambiguous_consensus[primer[1]:primer[2]] @@ -295,7 +287,7 @@ def write_scheme_to_files(path, amplicon_scheme, ambiguous_consensus, scheme_nam # write primers to fasta pool file print(f">{primer_name}\n{seq.upper()}", file=primer_fasta) # calc primer parameters for all permutations - permutations = get_permutations(seq) + permutations = primers.get_permutations(seq) gc, temp = calc_mean_stats(permutations) # write tsv file print( @@ -303,7 +295,7 @@ def write_scheme_to_files(path, amplicon_scheme, ambiguous_consensus, scheme_nam amp_length, primer_name, primer[-1], - pool+1, + pool + 1, primer[1] + 1, primer[2], seq.upper(), @@ -321,7 +313,7 @@ def write_scheme_to_files(path, amplicon_scheme, ambiguous_consensus, scheme_nam ( # will need amplicon_index for sorting amplicon_index, - (primer_name, primer, pool+1, direction, seq.upper()) + (primer_name, primer, pool + 1, direction, seq.upper()) ) ) # write amplicon bed with amplicons sorted by start position @@ -348,26 +340,41 @@ def write_scheme_to_files(path, amplicon_scheme, ambiguous_consensus, scheme_nam *record[1] ) + # Write dimers with renamed primers + if primer_dimers: + write_dimers(path, primer_dimers, name_mapping) -def write_dimers(path, primer_dimers): + +def write_dimers(path, primer_dimers, name_mapping): """ write dimers for which no replacement was found to file """ - tsv_file = os.path.join(path, "unsolvable_primer_dimers.tsv") - with open(tsv_file, "w") as tsv: - print( - "pool\tprimer_name_1\tprimer_name_2\tdimer melting temp", - file=tsv - ) + file = os.path.join(path, "unsolvable_primer_dimers.txt") + with open(file, "w") as f: for pool, primer1, primer2 in primer_dimers: + dimer_result = primers.calc_dimer(primer1[2][0], primer2[2][0], structure=True) + print( + "pool\tprimer 1\tprimer 2\tdimer melting temp\tdeltaG", + file=f + ) print( pool+1, - primer1[1], - primer2[1], - round(primers.calc_dimer(primer1[2][0], primer2[2][0]).tm, 1), + name_mapping[primer1[1]], + name_mapping[primer2[1]], + round(dimer_result.tm, 1), + dimer_result.dg, sep="\t", - file=tsv + file=f ) + structure = [x[4:] for x in dimer_result.ascii_structure_lines] + print("\nDimer structure:", file=f) + for line in structure: + print( + line, + file=f + ) + print(file=f) + def entropy(chars, states): """ diff --git a/varvamp/scripts/scheme.py b/varvamp/scripts/scheme.py index f066600..4d8e420 100644 --- a/varvamp/scripts/scheme.py +++ b/varvamp/scripts/scheme.py @@ -5,6 +5,8 @@ # BUILT-INS import heapq import math +import multiprocessing +import functools # varVAMP from varvamp.scripts import config, primers @@ -73,7 +75,7 @@ def find_amplicons(all_primers, opt_len, max_len): amplicon_length = right_primer[2] - left_primer[1] if not opt_len <= amplicon_length <= max_len: continue - if primers.calc_dimer(left_primer[0], right_primer[0]).tm > config.PRIMER_MAX_DIMER_TMP: + if primers.is_dimer(left_primer[0], right_primer[0]): continue # calculate length dependend amplicon costs as the cumulative primer # penalty multiplied by the e^(fold length of the optimal length). @@ -286,6 +288,7 @@ def find_best_covering_scheme(amplicons, amplicon_graph): # if no previous nodes are found but the single amplicon results in the largest # coverage - return as the best scheme amplicon_path = [best_start_node] + return best_coverage, create_scheme(amplicon_path, amps_by_id) @@ -295,8 +298,15 @@ def test_scheme_for_dimers(amplicon_scheme): """ primer_dimers = [] - pools = {amp["pool"] for amp in amplicon_scheme} - for pool in pools: + non_dimers = {amp["pool"]:set() for amp in amplicon_scheme} + # write all primer sequences in the respective pools --> + # these primers should not be violated by primer switching + # and primers are only switched later if no primer dimers + # with the existing 'good' scheme are created + for amp in amplicon_scheme: + non_dimers[amp["pool"]].add(amp["LEFT"][0]) + non_dimers[amp["pool"]].add(amp["RIGHT"][0]) + for pool in non_dimers: # test the primer dimers only within the respective pools tested_primers = [] for amp_index, amp in enumerate(amplicon_scheme): @@ -309,13 +319,16 @@ def test_scheme_for_dimers(amplicon_scheme): current_seq = current_primer[2][0] for tested in tested_primers: tested_seq = tested[2][0] - if primers.calc_dimer(current_seq, tested_seq).tm <= config.PRIMER_MAX_DIMER_TMP: + if not primers.is_dimer(current_seq, tested_seq): continue primer_dimers.append((current_primer, tested)) + non_dimers[pool].discard(current_seq) + non_dimers[pool].discard(tested_seq) # and remember all tested primers tested_primers.append(current_primer) - return primer_dimers + # report both dimers and non-dimers + return primer_dimers, non_dimers def get_overlapping_primers(dimer, left_primer_candidates, right_primer_candidates): @@ -329,13 +342,16 @@ def get_overlapping_primers(dimer, left_primer_candidates, right_primer_candidat # test each primer in dimer for amp_index, primer_name, primer in dimer: overlapping_primers_temp = [] - thirds_len = int((primer[2] - primer[1]) / 3) - # get the middle third of the primer (here are the previously excluded primers) - overlap_set = set(range(primer[1] + thirds_len, primer[2] - thirds_len)) - # check in which list to look for them + # as switching could violate overlap criteria, + # only consider primers that overlap in the left half (LEFT primers) + # or right half (RIGHT primers) respectively, however this can result in slightly + # longer amplicons than allowed. + half_length = int((primer[2] - primer[1]) / 2) if "RIGHT" in primer_name: + overlap_set = set(range(primer[1] + half_length, primer[2])) primers_to_test = right_primer_candidates else: + overlap_set = set(range(primer[1], primer[1] + half_length)) primers_to_test = left_primer_candidates # and check this list for all primers that overlap for potential_new in primers_to_test: @@ -349,40 +365,60 @@ def get_overlapping_primers(dimer, left_primer_candidates, right_primer_candidat return overlapping_primers -def test_overlaps_for_dimers(overlapping_primers): +def test_overlaps_for_dimers(overlapping_primers, non_dimers): """ - test the overlapping primers for dimers. return new primers. + test all possible overlapping primers against each other for dimers + and return the first pair that doesn't form a dimer with each other + and with all non-dimer forming primers in the pool. """ for first_overlap in overlapping_primers[0]: + if any(primers.is_dimer(seq, first_overlap[2][0]) for seq in non_dimers): + continue for second_overlap in overlapping_primers[1]: - # return the first match. primers are sorted by penalty. - # first pair that makes it has the lowest penalty - if primers.calc_dimer(first_overlap[2][0], second_overlap[2][0]).tm <= config.PRIMER_MAX_DIMER_TMP: + if any(primers.is_dimer(seq, second_overlap[2][0]) for seq in non_dimers): + continue + if not primers.is_dimer(first_overlap[2][0], second_overlap[2][0]): return [first_overlap, second_overlap] -def check_and_solve_heterodimers(amplicon_scheme, left_primer_candidates, right_primer_candidates, all_primers): +def _solve_single_dimer(amplicon_scheme, left_primer_candidates, right_primer_candidates, non_dimers_all_pools, dimer): + """ + Helper function for multiprocessing: solve a single dimer independently. + Returns (amp_index, primer_name, new_primer) tuples or empty list if no solution. + """ + pool = amplicon_scheme[dimer[0][0]]["pool"] + non_dimers = non_dimers_all_pools[pool] + + overlapping_primers = get_overlapping_primers(dimer, left_primer_candidates, right_primer_candidates) + new_primers = test_overlaps_for_dimers(overlapping_primers, non_dimers) + + return new_primers if new_primers else [] + + +def check_and_solve_heterodimers(amplicon_scheme, left_primer_candidates, right_primer_candidates, all_primers, num_processes): """ check scheme for heterodimers, try to find new primers that overlap and replace the existing ones. - this can lead to new primer dimers. therefore the scheme - is checked a second time. if there are still primer dimers - present the non-solvable dimers are returned + Uses multiprocessing to solve dimers in parallel. """ + primer_dimers, non_dimers_all_pools = test_scheme_for_dimers(amplicon_scheme) + n_initial_dimers = len(primer_dimers) - primer_dimers = test_scheme_for_dimers(amplicon_scheme) + if not primer_dimers: + return [], 0 - if primer_dimers: - print(f"varVAMP found {len(primer_dimers)} dimer pairs in scheme ... trying to find replacements") - else: - return [] - - for dimer in primer_dimers: - # get overlapping primers that have not been considered - overlapping_primers = get_overlapping_primers(dimer, left_primer_candidates, right_primer_candidates) - # test all possible primers against each other for dimers - new_primers = test_overlaps_for_dimers(overlapping_primers) - # now change these primers in the scheme + # Prepare arguments for each dimer + callable_f = functools.partial( + _solve_single_dimer, + amplicon_scheme, left_primer_candidates, right_primer_candidates, non_dimers_all_pools + ) + + # Solve dimers in parallel + with multiprocessing.Pool(processes=num_processes) as pool: + results = pool.map(callable_f, primer_dimers) + + # Apply all solutions to the scheme + for new_primers in results: if new_primers: for amp_index, primer_name, primer in new_primers: # overwrite in final scheme @@ -398,12 +434,13 @@ def check_and_solve_heterodimers(amplicon_scheme, left_primer_candidates, right_ # and in all primers all_primers[strand][primer_name] = primer # get remaining dimers in the revised scheme and add pool identifier for reporting + remaining_primer_dimers, _ = test_scheme_for_dimers(amplicon_scheme) primer_dimers = [ (amplicon_scheme[primer1[0]]["pool"], primer1, primer2) - for primer1, primer2 in test_scheme_for_dimers(amplicon_scheme) + for primer1, primer2 in remaining_primer_dimers ] - return primer_dimers + return primer_dimers, n_initial_dimers def find_single_amplicons(amplicons, n):