|
| 1 | +#!/usr/bin/env python2 |
| 2 | +# ENCODE_map 0.0.1 |
| 3 | + |
| 4 | +import os |
| 5 | +import subprocess |
| 6 | +import shlex |
| 7 | +from multiprocessing import cpu_count |
| 8 | +import logging |
| 9 | +import sys |
| 10 | + |
| 11 | + |
| 12 | +logger = logging.getLogger(__name__) |
| 13 | +logger.propagate = False |
| 14 | +logger.setLevel(logging.INFO) |
| 15 | + |
| 16 | +BWA_PATH = "/image_software/bwa_0_7_10/bwa/bwa" |
| 17 | +TRIMMOMATIC_PATH = "/image_software/Trimmomatic-0.36/trimmomatic-0.36.jar" |
| 18 | + |
| 19 | +# the order of this list is important. |
| 20 | +# strip_extensions strips from the right inward, so |
| 21 | +# the expected right-most extensions should appear first (like .gz) |
| 22 | +STRIP_EXTENSIONS = ['.gz', '.fq', '.fastq', '.fa', '.fasta'] |
| 23 | + |
| 24 | + |
| 25 | +def strip_extensions(filename, extensions): |
| 26 | + basename = filename.split('/')[-1] |
| 27 | + for extension in extensions: |
| 28 | + basename = basename.rpartition(extension)[0] or basename |
| 29 | + return basename |
| 30 | + |
| 31 | + |
| 32 | +def resolve_reference(reference_tar_filename, reference_dirname): |
| 33 | + if reference_tar_filename.endswith('.gz') or reference_tar_filename.endswith('.tgz'): |
| 34 | + tar_command = \ |
| 35 | + 'tar -xzv --no-same-owner --no-same-permissions -C %s -f %s' \ |
| 36 | + % (reference_dirname, reference_tar_filename) |
| 37 | + else: |
| 38 | + tar_command = \ |
| 39 | + 'tar -xv --no-same-owner --no-same-permissions -C %s -f %s' \ |
| 40 | + % (reference_dirname, reference_tar_filename) |
| 41 | + |
| 42 | + logger.info("Unpacking %s with %s" % (reference_tar_filename, tar_command)) |
| 43 | + |
| 44 | + print(subprocess.check_output(shlex.split(tar_command))) |
| 45 | + |
| 46 | + # assume the reference file is the only .fa or .fna file |
| 47 | + filename = next((f for f in os.listdir(reference_dirname) if f.endswith('.fa') or f.endswith('.fna') or f.endswith('.fa.gz') or f.endswith('.fna.gz')), None) |
| 48 | + |
| 49 | + return '/'.join([reference_dirname, filename]) |
| 50 | + |
| 51 | + |
| 52 | +def crop(reads1_file, reads2_file, crop_length, debug): |
| 53 | + |
| 54 | + if debug: |
| 55 | + logger.setLevel(logging.DEBUG) |
| 56 | + else: |
| 57 | + logger.setLevel(logging.INFO) |
| 58 | + |
| 59 | + logger.setLevel(logging.INFO) |
| 60 | + if crop_length == 'native': |
| 61 | + local_copy_reads1 = reads1_file.split('/')[-1] |
| 62 | + subprocess.check_output(shlex.split('cp %s %s' % reads1_file, local_copy_reads1 )) |
| 63 | + local_copy_reads2 = None |
| 64 | + if reads2_file: |
| 65 | + local_copy_reads2 = reads2_file.split('/')[-1] |
| 66 | + subprocess.check_output(shlex.split('cp %s %s' % reads2_file, local_copy_reads2 )) |
| 67 | + output = dict(zip( |
| 68 | + ["cropped_reads1", "cropped_reads2"], [local_copy_reads1, local_copy_reads2])) |
| 69 | + else: |
| 70 | + reads1_filename = reads1_file |
| 71 | + reads1_basename = strip_extensions(reads1_filename, STRIP_EXTENSIONS) |
| 72 | + if reads2_file: |
| 73 | + end_string = "PE" |
| 74 | + reads2_filename = reads2_file |
| 75 | + reads2_basename = \ |
| 76 | + strip_extensions(reads2_filename, STRIP_EXTENSIONS) |
| 77 | + output_fwd_paired_filename = reads1_basename + '-crop-paired.fq.gz' |
| 78 | + output_fwd_unpaired_filename = \ |
| 79 | + reads1_basename + '-crop-unpaired.fq.gz' |
| 80 | + output_rev_paired_filename = reads2_basename + '-crop-paired.fq.gz' |
| 81 | + output_rev_unpaired_filename = \ |
| 82 | + reads2_basename + '-crop-unpaired.fq.gz' |
| 83 | + SE_output_filename = None |
| 84 | + else: |
| 85 | + end_string = "SE" |
| 86 | + reads2_filename = None |
| 87 | + reads2_basename = None |
| 88 | + output_fwd_paired_filename = None |
| 89 | + output_fwd_unpaired_filename = None |
| 90 | + output_rev_paired_filename = None |
| 91 | + output_rev_unpaired_filename = None |
| 92 | + SE_output_filename = reads1_basename + "-crop.fq.gz" |
| 93 | + |
| 94 | + crop_command = ' '.join([s for s in [ |
| 95 | + 'java -jar', |
| 96 | + TRIMMOMATIC_PATH, |
| 97 | + end_string, |
| 98 | + '-threads %d' % (cpu_count()), |
| 99 | + reads1_filename, |
| 100 | + reads2_filename, |
| 101 | + SE_output_filename, |
| 102 | + output_fwd_paired_filename, |
| 103 | + output_fwd_unpaired_filename, |
| 104 | + output_rev_paired_filename, |
| 105 | + output_rev_unpaired_filename, |
| 106 | + 'MINLEN:%s' % (crop_length), |
| 107 | + 'CROP:%s' % (crop_length)] |
| 108 | + if s]) |
| 109 | + |
| 110 | + logger.info("Cropping with: %s" % (crop_command)) |
| 111 | + print(subprocess.check_output(shlex.split(crop_command))) |
| 112 | + print(subprocess.check_output(shlex.split('ls -l'))) |
| 113 | + |
| 114 | + if SE_output_filename: |
| 115 | + SE_output = SE_output_filename |
| 116 | + cropped_reads = [SE_output] |
| 117 | + else: |
| 118 | + output_fwd_paired = output_fwd_paired_filename |
| 119 | + output_rev_paired = output_rev_paired_filename |
| 120 | + cropped_reads = [ |
| 121 | + output_fwd_paired, |
| 122 | + output_rev_paired] |
| 123 | + |
| 124 | + output = dict(zip(["cropped_reads1", "cropped_reads2"], cropped_reads)) |
| 125 | + |
| 126 | + logger.info("returning from crop with output %s" % (output)) |
| 127 | + return output |
| 128 | + |
| 129 | + |
| 130 | +def process(reads_file, reference_tar, bwa_aln_params, debug): |
| 131 | + # reads_file, reference_tar should be links to file objects. |
| 132 | + # reference_tar should be a tar of files generated by bwa index and |
| 133 | + # the tar should be uncompressed to avoid repeating the decompression. |
| 134 | + |
| 135 | + if debug: |
| 136 | + logger.setLevel(logging.DEBUG) |
| 137 | + else: |
| 138 | + logger.setLevel(logging.INFO) |
| 139 | + |
| 140 | + bwa = BWA_PATH |
| 141 | + logger.info("In process with bwa %s" % (bwa)) |
| 142 | + |
| 143 | + # Generate filename strings and download the files to the local filesystem |
| 144 | + reads_filename = reads_file |
| 145 | + reads_basename = strip_extensions(reads_filename, STRIP_EXTENSIONS) |
| 146 | + |
| 147 | + reference_tar_filename = reference_tar |
| 148 | + |
| 149 | + reference_dirname = '.' |
| 150 | + |
| 151 | + reference_filename = \ |
| 152 | + resolve_reference(reference_tar_filename, reference_dirname) |
| 153 | + |
| 154 | + logger.info("Using reference file: %s" % (reference_filename)) |
| 155 | + |
| 156 | + print(subprocess.check_output('ls -l', shell=True)) |
| 157 | + |
| 158 | + # generate the suffix array index file |
| 159 | + sai_filename = '%s.sai' % (reads_basename) |
| 160 | + with open(sai_filename, 'w') as sai_file: |
| 161 | + # Build the bwa command and call bwa |
| 162 | + bwa_command = "%s aln %s -t %d %s %s" \ |
| 163 | + % (bwa, bwa_aln_params, cpu_count(), |
| 164 | + reference_filename, reads_filename) |
| 165 | + logger.info("Running bwa with %s" % (bwa_command)) |
| 166 | + |
| 167 | + subprocess.check_call(shlex.split(bwa_command), stdout=sai_file) |
| 168 | + |
| 169 | + print(subprocess.check_output('ls -l', shell=True)) |
| 170 | + |
| 171 | + # Upload the output to the DNAnexus project |
| 172 | + logger.info("Uploading suffix array %s" % (sai_filename)) |
| 173 | + sai_dxfile = sai_filename |
| 174 | + output = {"suffix_array_index": sai_dxfile} |
| 175 | + logger.info("Returning from process with %s" % (output)) |
| 176 | + return output |
| 177 | + |
| 178 | + |
| 179 | +# always only read1, because each end is mapped independently |
| 180 | +# probbaly should update code accordingly |
| 181 | +def main(crop_length, reference_tar, |
| 182 | + bwa_aln_params, samtools_version, debug, reads1, reads2): |
| 183 | + # Main entry-point. Parameter defaults assumed to come from dxapp.json. |
| 184 | + # reads1, reference_tar, reads2 are links to DNAnexus files or None |
| 185 | + |
| 186 | + # create a file handler |
| 187 | + handler = logging.FileHandler('mapping.log') |
| 188 | + |
| 189 | + if debug: |
| 190 | + handler.setLevel(logging.DEBUG) |
| 191 | + else: |
| 192 | + handler.setLevel(logging.INFO) |
| 193 | + logger.addHandler(handler) |
| 194 | + |
| 195 | + # This spawns only one or two subjobs for single- or paired-end, |
| 196 | + # respectively. It could also download the files, chunk the reads, |
| 197 | + # and spawn multiple subjobs. |
| 198 | + |
| 199 | + # Files are downloaded later by subjobs into their own filesystems |
| 200 | + # and uploaded to the project. |
| 201 | + |
| 202 | + # Initialize file handlers for input files. |
| 203 | + |
| 204 | + paired_end = reads2 is not None |
| 205 | + |
| 206 | + if crop_length == 'native': |
| 207 | + crop_subjob = None |
| 208 | + local_copy_reads1 = reads1.split('/')[-1] |
| 209 | + subprocess.check_output(shlex.split('cp %s %s' % (reads1, local_copy_reads1))) |
| 210 | + local_copy_reads2 = None |
| 211 | + if paired_end: |
| 212 | + local_copy_reads2 = reads2.split('/')[-1] |
| 213 | + subprocess.check_output(shlex.split('cp %s %s' % (reads2, local_copy_reads2))) |
| 214 | + unmapped_reads = [local_copy_reads1, local_copy_reads2] |
| 215 | + else: |
| 216 | + crop_subjob_input = { |
| 217 | + "reads1_file": reads1, |
| 218 | + "reads2_file": reads2, |
| 219 | + "crop_length": crop_length, |
| 220 | + "debug": debug |
| 221 | + } |
| 222 | + logger.info("Crop job input: %s" % (crop_subjob_input)) |
| 223 | + |
| 224 | + crop_subjob = crop(reads1, reads2, crop_length, debug) |
| 225 | + |
| 226 | + unmapped_reads = [crop_subjob.get("cropped_reads1")] |
| 227 | + if paired_end: |
| 228 | + unmapped_reads.append(crop_subjob.get("cropped_reads2")) |
| 229 | + else: |
| 230 | + unmapped_reads.append(None) |
| 231 | + |
| 232 | + unmapped_reads = [r for r in unmapped_reads if r] |
| 233 | + |
| 234 | + for reads in unmapped_reads: |
| 235 | + mapping_subjob_input = { |
| 236 | + "reads_file": reads, |
| 237 | + "reference_tar": reference_tar, |
| 238 | + "bwa_aln_params": bwa_aln_params, |
| 239 | + "debug": debug |
| 240 | + } |
| 241 | + logger.info("Mapping job input: %s" % (mapping_subjob_input)) |
| 242 | + |
| 243 | + process(reads, reference_tar, bwa_aln_params, debug) |
| 244 | + |
| 245 | + output = { |
| 246 | + "crop_length": crop_length, |
| 247 | + "paired_end": paired_end, |
| 248 | + } |
| 249 | + |
| 250 | + logger.info("Exiting mapping with output: %s" % (output)) |
| 251 | + return output |
| 252 | + |
| 253 | +if len(sys.argv) == 4: |
| 254 | + main(sys.argv[2], sys.argv[1], "-q 5 -l 32 -k 2", "1.0", False, sys.argv[3], None) |
| 255 | +else: |
| 256 | + main(sys.argv[2], sys.argv[1], "-q 5 -l 32 -k 2", "1.0", False, sys.argv[3], sys.argv[4]) |
0 commit comments