@@ -33,6 +33,7 @@ import groovyx.gpars.GParsPool
3333
3434import java.util.concurrent.LinkedBlockingQueue
3535import java.util.concurrent.atomic.AtomicInteger
36+ import java.util.concurrent.atomic.AtomicLong
3637
3738import static com.milaboratory.migec.Util.BLANK_PATH
3839
@@ -193,7 +194,7 @@ while ((header1 = reader1.readLine()) != null) {
193194 reader2. readLine()
194195 reader2. readLine()
195196
196- if (header2 == null ){
197+ if (header2 == null ) {
197198 println " [ERROR] R1 file has more reads than R2"
198199 System . exit(1 )
199200 }
@@ -219,7 +220,7 @@ while ((header1 = reader1.readLine()) != null) {
219220if (paired) {
220221 header2 = reader2. readLine()
221222
222- if (header2 != null ){
223+ if (header2 != null ) {
223224 println " [ERROR] R2 file has more reads than R1"
224225 System . exit(1 )
225226 }
@@ -239,7 +240,10 @@ def detailsWriter1 = alignmentDetails ? Util.getWriter(outputDir + "/" + outputF
239240 detailsWriter2 = bothReads && alignmentDetails ? Util . getWriter(outputDir + " /" + outputFileNameNoExt2 + " .asm" , compressed) : null
240241
241242println " [${ new Date()} $scriptName ] Starting assembly.."
242- def writeThread = new Thread ({ // Writing thread, listening to queue
243+
244+ // Writing thread, listening to queue
245+
246+ def writeThread = new Thread ({
243247 String [] result
244248 while (true ) {
245249 result = writeQueue. take()
@@ -272,16 +276,37 @@ def writeThread = new Thread({ // Writing thread, listening to queue
272276} as Runnable )
273277writeThread. start()
274278
279+ // / Counters
280+
275281def nMigs = new AtomicInteger (),
276- nReadsInMigs = new AtomicInteger (), nDroppedReads = new AtomicInteger (),
277282 nCollisions = new AtomicInteger ()
278- def nGoodMigs = new AtomicInteger [3 ], nReadsInGoodMigs = new AtomicInteger [3 ]
283+
284+ def nReadsInMigs = new AtomicLong (), nDroppedReads = new AtomicLong ()
285+
286+ def nGoodMigs = new AtomicInteger [3 ]
279287nGoodMigs[0 ] = new AtomicInteger ()
280288nGoodMigs[1 ] = new AtomicInteger ()
281289nGoodMigs[2 ] = new AtomicInteger ()
282- nReadsInGoodMigs[0 ] = new AtomicInteger ()
283- nReadsInGoodMigs[1 ] = new AtomicInteger ()
284- nReadsInGoodMigs[2 ] = new AtomicInteger ()
290+ def nReadsInGoodMigs = new AtomicLong [3 ]
291+ nReadsInGoodMigs[0 ] = new AtomicLong ()
292+ nReadsInGoodMigs[1 ] = new AtomicLong ()
293+ nReadsInGoodMigs[2 ] = new AtomicLong ()
294+
295+ def nCollisionMigs = new AtomicInteger [2 ]
296+ nCollisionMigs[0 ] = new AtomicInteger ()
297+ nCollisionMigs[1 ] = new AtomicInteger ()
298+ def nCollisionReads = new AtomicLong [2 ]
299+ nCollisionReads[0 ] = new AtomicLong ()
300+ nCollisionReads[1 ] = new AtomicLong ()
301+
302+ def nOverseqMigs = new AtomicInteger [2 ]
303+ nOverseqMigs[0 ] = new AtomicInteger ()
304+ nOverseqMigs[1 ] = new AtomicInteger ()
305+ def nOverseqReads = new AtomicLong [2 ]
306+ nOverseqReads[0 ] = new AtomicLong ()
307+ nOverseqReads[1 ] = new AtomicLong ()
308+
309+ // / Aux functions
285310
286311def getCoreSeq = { String seq , int offset ->
287312 int mid = seq. length() / 2
@@ -292,6 +317,8 @@ def sum = { Collection c ->
292317 c. size() > 0 ? (int ) c. sum() : 0i
293318}
294319
320+ // Process in parallel
321+
295322GParsPool . withPool THREADS , {
296323 migData[0 ]. eachWithIndexParallel { migEntry , gInd ->
297324 String umi = migEntry. key
@@ -465,7 +492,7 @@ GParsPool.withPool THREADS, {
465492 consensus. append(Util . code2nt(mostFreqLetter))
466493 qual. append(Util . symbolFromQual(Math . max(2 , (int ) ((maxLetterFreq / count - 0.25) / 0.75 * 40.0 ))))
467494 }
468- assembledReads[ind] = " @MIG.${ gInd} R${ ind+ 1} UMI:$umi :$count \n ${ consensus.toString()} \n +\n ${ qual.toString()} " . toString()
495+ assembledReads[ind] = " @MIG.${ gInd} R${ ind + 1} UMI:$umi :$count \n ${ consensus.toString()} \n +\n ${ qual.toString()} " . toString()
469496
470497 if (alignmentDetails)
471498 assembledReads[ind + 2 ] = detailInfo
@@ -474,8 +501,23 @@ GParsPool.withPool THREADS, {
474501 nReadsInGoodMigsCurrent[ind] = nReadsInGoodMigs[ind]. addAndGet(count)
475502 }
476503 }
504+ } else {
505+ if (! noCollision) {
506+ counts. eachWithIndex { int count , int i ->
507+ nCollisionMigs[i]. incrementAndGet()
508+ nCollisionReads[i]. addAndGet(count)
509+ }
510+ } else {
511+ counts. eachWithIndex { int count , int i ->
512+ if (count < minMigSize) {
513+ nOverseqMigs[i]. incrementAndGet()
514+ nOverseqReads[i]. addAndGet(count)
515+ }
516+ }
517+ }
477518 }
478519
520+ // Send to writer
479521 if ((! assemblyIndices[0 ] || assembledReads[0 ] != null ) &&
480522 (! assemblyIndices[1 ] || assembledReads[0 ] != null ) &&
481523 (! (assemblyIndices[0 ] && assemblyIndices[1 ]) || assembledReads[1 ] != null )) {
@@ -484,7 +526,7 @@ GParsPool.withPool THREADS, {
484526 nReadsInGoodMigsCurrent[2 ] = nReadsInGoodMigs[2 ]. addAndGet(avgCount)
485527 }
486528
487-
529+ // Report
488530 if (nMigsCurrent % 10000 == 0 )
489531 println " [${ new Date()} $scriptName ] Processed $nMigsCurrent MIGs, $nReadsInMigsCurrent reads total, " +
490532 " $nCollisionsCurrent collisions detected, assembled so far: " +
@@ -517,8 +559,16 @@ def logLine = [assemblyIndices[0] ? new File(inputFileName1).absolutePath : BLAN
517559
518560 nGoodMigs[0 ]. get(), nGoodMigs[1 ]. get(), nGoodMigs[2 ]. get(), nMigs. get(),
519561
520- nReadsInGoodMigs[0 ]. get(), nReadsInGoodMigs[1 ]. get(), nReadsInGoodMigs[2 ]. get(), nReadsInMigs. get(),
521- nDroppedReads. get()]. join(" \t " )
562+ nReadsInGoodMigs[0 ]. get(), nReadsInGoodMigs[1 ]. get(), nReadsInGoodMigs[2 ]. get(),
563+
564+ nReadsInMigs. get(), nDroppedReads. get(),
565+
566+ nOverseqMigs[0 ]. get(), nOverseqMigs[1 ]. get(),
567+ nOverseqReads[0 ]. get(), nOverseqReads[1 ]. get(),
568+
569+ nCollisionMigs[0 ]. get(), nCollisionMigs[1 ]. get(),
570+ nCollisionReads[0 ]. get(), nCollisionReads[1 ]. get()
571+ ]. join(" \t " )
522572
523573
524574if (logFileName) {
0 commit comments