|
1 | 1 | #include "pescanner.h" |
2 | | -#include "fastqreader.h" |
3 | | -#include <iostream> |
4 | | -#include "htmlreporter.h" |
5 | | -#include "jsonreporter.h" |
6 | | -#include "multihtmlreporter.h" |
7 | | -#include <unistd.h> |
8 | | -#include <functional> |
9 | | -#include <thread> |
10 | | -#include <memory.h> |
11 | | -#include "util.h" |
12 | | -#include "globalsettings.h" |
13 | | -#include "mutscan.h" |
14 | 2 |
|
15 | | -PairEndScanner::PairEndScanner(string mutationFile, string refFile, string read1File, string read2File, string html, string json, int threadNum){ |
16 | | - mRead1File = read1File; |
17 | | - mRead2File = read2File; |
18 | | - mMutationFile = mutationFile; |
19 | | - mRefFile = refFile; |
20 | | - mHtmlFile = html; |
21 | | - mJsonFile = json; |
22 | | - mProduceFinished = false; |
23 | | - mThreadNum = threadNum; |
24 | | - mRollingHash = NULL; |
| 3 | +PairEndScanner::PairEndScanner(string mutationFile, string refFile, string read1File, string read2File, string html, string json, int threadNum) |
| 4 | + : Scanner(mutationFile, refFile, read1File, read2File, html, json, threadNum) { |
25 | 5 | } |
26 | 6 |
|
27 | | -PairEndScanner::~PairEndScanner() { |
28 | | - if(mRollingHash){ |
29 | | - delete mRollingHash; |
30 | | - mRollingHash = NULL; |
31 | | - } |
32 | | - for(size_t i=0; i<mReadToDelete.size(); i++) { |
33 | | - delete mReadToDelete[i]; |
34 | | - mReadToDelete[i] = NULL; |
35 | | - } |
36 | | - for(size_t i=0; i<mBufToDelete.size(); i++) { |
37 | | - delete mBufToDelete[i]; |
38 | | - mBufToDelete[i] = NULL; |
39 | | - } |
40 | | -} |
41 | | - |
42 | | - |
43 | | -bool PairEndScanner::scan(){ |
44 | | - |
45 | | - if(mMutationFile!=""){ |
46 | | - if(ends_with(mMutationFile, ".vcf") || ends_with(mMutationFile, ".VCF") || ends_with(mMutationFile, ".Vcf")) |
47 | | - mutationList = Mutation::parseVcf(mMutationFile, mRefFile); |
48 | | - else |
49 | | - mutationList = Mutation::parseCsv(mMutationFile); |
50 | | - } |
51 | | - else |
52 | | - mutationList = Mutation::parseBuiltIn(); |
53 | | - |
54 | | - if(GlobalSettings::verbose) |
55 | | - cerr << "Scanning "<< mutationList.size() << " mutations..."<<endl; |
56 | | - |
57 | | - if(GlobalSettings::simplifiedModeToEvaluate) |
58 | | - MutScan::evaluateSimplifiedMode(mRead1File, mRead2File, mutationList.size()); |
59 | | - |
60 | | - if(!GlobalSettings::legacyMode && mRollingHash==NULL){ |
61 | | - mRollingHash = new RollingHash(); |
62 | | - mRollingHash->initMutations(mutationList); |
63 | | - } |
64 | | - |
65 | | - mutationMatches = new vector<Match*>[mutationList.size()]; |
66 | | - for(size_t i=0;i<mutationList.size();i++){ |
67 | | - mutationMatches[i] = vector<Match*>(); |
68 | | - } |
69 | | - |
70 | | - initPackRepository(); |
71 | | - std::thread producer(std::bind(&PairEndScanner::producerTask, this)); |
72 | | - |
73 | | - std::thread** threads = new thread*[mThreadNum]; |
74 | | - for(int t=0; t<mThreadNum; t++){ |
75 | | - threads[t] = new std::thread(std::bind(&PairEndScanner::consumerTask, this)); |
76 | | - } |
77 | | - |
78 | | - producer.join(); |
79 | | - for(int t=0; t<mThreadNum; t++){ |
80 | | - threads[t]->join(); |
81 | | - } |
82 | | - |
83 | | - for(int t=0; t<mThreadNum; t++){ |
84 | | - delete threads[t]; |
85 | | - threads[t] = NULL; |
86 | | - } |
87 | | - |
88 | | - // sort the matches to make the pileup more clear |
89 | | - for(size_t i=0;i<mutationList.size();i++){ |
90 | | - sort(mutationMatches[i].begin(), mutationMatches[i].end(), Match::greater); |
91 | | - } |
92 | | - |
93 | | - textReport(mutationList, mutationMatches); |
94 | | - htmlReport(mutationList, mutationMatches); |
95 | | - jsonReport(mutationList, mutationMatches); |
| 7 | +PairEndScanner::~PairEndScanner() {} |
96 | 8 |
|
97 | | - // free memory |
98 | | - for(size_t i=0;i<mutationList.size();i++){ |
99 | | - mutationMatches[i].clear(); |
100 | | - } |
101 | | - return true; |
102 | | -} |
103 | | - |
104 | | -void PairEndScanner::pushMatch(int i, Match* m, bool needStoreReadToDelete){ |
105 | | - std::unique_lock<std::mutex> lock(mMutationMtx); |
106 | | - mutationMatches[i].push_back(m); |
107 | | - if(needStoreReadToDelete) { |
108 | | - if(GlobalSettings::simplifiedMode) |
109 | | - mBufToDelete.push_back(m->getSequence()); |
110 | | - else |
111 | | - mReadToDelete.push_back(m->getRead()); |
112 | | - } |
113 | | - lock.unlock(); |
114 | | -} |
115 | | - |
116 | | -bool PairEndScanner::scanPairEnd(ReadPairPack* pack){ |
| 9 | +bool PairEndScanner::scanNextEnd(ReadPack<ReadPair>* pack){ |
117 | 10 | bool simplified = GlobalSettings::simplifiedMode; |
118 | 11 |
|
119 | 12 | for(int p=0;p<pack->count;p++){ |
@@ -155,242 +48,3 @@ bool PairEndScanner::scanPairEnd(ReadPairPack* pack){ |
155 | 48 |
|
156 | 49 | return true; |
157 | 50 | } |
158 | | - |
159 | | -bool PairEndScanner::scanRead(Read* r, ReadPair* originalPair, bool reversed) { |
160 | | - // just copy the sequence buffer |
161 | | - char* simplifiedBuf = NULL; |
162 | | - if(GlobalSettings::simplifiedMode) { |
163 | | - simplifiedBuf = r->to2bit(); |
164 | | - if(simplifiedBuf == NULL) |
165 | | - return false; |
166 | | - } |
167 | | - |
168 | | - bool matched = false; |
169 | | - if(!GlobalSettings::legacyMode){ |
170 | | - map<int, int> targets = mRollingHash->hitTargets(r->mSeq.mStr); |
171 | | - map<int, int>::iterator iter; |
172 | | - for(iter=targets.begin(); iter!=targets.end(); iter++) { |
173 | | - int t = iter->first; |
174 | | - int count = iter->second; |
175 | | - if(count==0) |
176 | | - continue; |
177 | | - Match* match = mutationList[t].searchInRead(r, simplifiedBuf); |
178 | | - if(match) { |
179 | | - if(!GlobalSettings::simplifiedMode) |
180 | | - match->addOriginalPair(originalPair); |
181 | | - match->setReversed(reversed); |
182 | | - pushMatch(t, match, !matched); |
183 | | - matched = true; |
184 | | - } |
185 | | - } |
186 | | - } else { |
187 | | - for(size_t i=0;i<mutationList.size();i++){ |
188 | | - Match* match = mutationList[i].searchInRead(r, simplifiedBuf); |
189 | | - if(match) { |
190 | | - if(!GlobalSettings::simplifiedMode) |
191 | | - match->addOriginalPair(originalPair); |
192 | | - match->setReversed(reversed); |
193 | | - pushMatch(i, match, !matched); |
194 | | - matched = true; |
195 | | - } |
196 | | - } |
197 | | - } |
198 | | - if(!matched && simplifiedBuf!=NULL) { |
199 | | - delete simplifiedBuf; |
200 | | - simplifiedBuf = NULL; |
201 | | - } |
202 | | - return matched; |
203 | | -} |
204 | | - |
205 | | -void PairEndScanner::initPackRepository() { |
206 | | - mRepo.packBuffer = new ReadPairPack*[PACK_NUM_LIMIT]; |
207 | | - memset(mRepo.packBuffer, 0, sizeof(ReadPairPack*)*PACK_NUM_LIMIT); |
208 | | - mRepo.writePos = 0; |
209 | | - mRepo.readPos = 0; |
210 | | - mRepo.readCounter = 0; |
211 | | - |
212 | | -} |
213 | | - |
214 | | -void PairEndScanner::destroyPackRepository() { |
215 | | - delete mRepo.packBuffer; |
216 | | - mRepo.packBuffer = NULL; |
217 | | -} |
218 | | - |
219 | | -void PairEndScanner::producePack(ReadPairPack* pack){ |
220 | | - std::unique_lock<std::mutex> lock(mRepo.mtx); |
221 | | - while(((mRepo.writePos + 1) % PACK_NUM_LIMIT) |
222 | | - == mRepo.readPos) { |
223 | | - mRepo.repoNotFull.wait(lock); |
224 | | - } |
225 | | - |
226 | | - mRepo.packBuffer[mRepo.writePos] = pack; |
227 | | - mRepo.writePos++; |
228 | | - |
229 | | - if (mRepo.writePos == PACK_NUM_LIMIT) |
230 | | - mRepo.writePos = 0; |
231 | | - |
232 | | - mRepo.repoNotEmpty.notify_all(); |
233 | | - lock.unlock(); |
234 | | -} |
235 | | - |
236 | | -void PairEndScanner::consumePack(){ |
237 | | - ReadPairPack* data; |
238 | | - std::unique_lock<std::mutex> lock(mRepo.mtx); |
239 | | - // read buffer is empty, just wait here. |
240 | | - while(mRepo.writePos % PACK_NUM_LIMIT == mRepo.readPos % PACK_NUM_LIMIT) { |
241 | | - if(mProduceFinished){ |
242 | | - lock.unlock(); |
243 | | - return; |
244 | | - } |
245 | | - mRepo.repoNotEmpty.wait(lock); |
246 | | - } |
247 | | - |
248 | | - data = mRepo.packBuffer[mRepo.readPos]; |
249 | | - mRepo.readPos++; |
250 | | - |
251 | | - if (mRepo.readPos >= PACK_NUM_LIMIT) |
252 | | - mRepo.readPos = 0; |
253 | | - |
254 | | - lock.unlock(); |
255 | | - mRepo.repoNotFull.notify_all(); |
256 | | - |
257 | | - scanPairEnd(data); |
258 | | -} |
259 | | - |
260 | | -void PairEndScanner::producerTask() |
261 | | -{ |
262 | | - int slept = 0; |
263 | | - long total = 0; |
264 | | - ReadPair** data = new ReadPair*[PACK_SIZE]; |
265 | | - memset(data, 0, sizeof(ReadPair*)*PACK_SIZE); |
266 | | - FastqReaderPair reader(mRead1File, mRead2File); |
267 | | - int count=0; |
268 | | - while(true){ |
269 | | - ReadPair* read = reader.read(); |
270 | | - if(!read){ |
271 | | - // the last pack |
272 | | - ReadPairPack* pack = new ReadPairPack; |
273 | | - pack->data = data; |
274 | | - pack->count = count; |
275 | | - producePack(pack); |
276 | | - data = NULL; |
277 | | - if(GlobalSettings::verbose) { |
278 | | - cerr << "Loaded all of " << total << " pairs" << endl; |
279 | | - } |
280 | | - break; |
281 | | - } |
282 | | - data[count] = read; |
283 | | - count++; |
284 | | - total++; |
285 | | - // a full pack |
286 | | - if(count == PACK_SIZE){ |
287 | | - ReadPairPack* pack = new ReadPairPack; |
288 | | - pack->data = data; |
289 | | - pack->count = count; |
290 | | - producePack(pack); |
291 | | - //re-initialize data for next pack |
292 | | - data = new ReadPair*[PACK_SIZE]; |
293 | | - memset(data, 0, sizeof(ReadPair*)*PACK_SIZE); |
294 | | - // reset count to 0 |
295 | | - count = 0; |
296 | | - // if the consumer is far behind this producer, sleep and wait to limit memory usage |
297 | | - while(mRepo.writePos - mRepo.readPos > PACK_IN_MEM_LIMIT){ |
298 | | - //cout<<"sleep"<<endl; |
299 | | - slept++; |
300 | | - usleep(1000); |
301 | | - } |
302 | | - if(GlobalSettings::verbose && total % 1000000 == 0) { |
303 | | - cerr << "Loaded " << total << " pairs" << endl; |
304 | | - } |
305 | | - } |
306 | | - } |
307 | | - |
308 | | - std::unique_lock<std::mutex> lock(mRepo.mtx); |
309 | | - mProduceFinished = true; |
310 | | - // ensure all waiting consommers exit |
311 | | - mRepo.repoNotEmpty.notify_all(); |
312 | | - lock.unlock(); |
313 | | - |
314 | | - // if the last data initialized is not used, free it |
315 | | - if(data != NULL) |
316 | | - delete data; |
317 | | -} |
318 | | - |
319 | | -void PairEndScanner::consumerTask() |
320 | | -{ |
321 | | - while(true) { |
322 | | - std::unique_lock<std::mutex> lock(mRepo.mtx); |
323 | | - if(mProduceFinished && mRepo.writePos == mRepo.readPos){ |
324 | | - lock.unlock(); |
325 | | - break; |
326 | | - } |
327 | | - lock.unlock(); |
328 | | - consumePack(); |
329 | | - } |
330 | | -} |
331 | | - |
332 | | -void PairEndScanner::textReport(vector<Mutation>& mutationList, vector<Match*> *mutationMatches) { |
333 | | - //output result |
334 | | - bool found = false; |
335 | | - int undetected = 0; |
336 | | - for(size_t i=0;i<mutationList.size();i++){ |
337 | | - vector<Match*> matches = mutationMatches[i]; |
338 | | - if((ssize_t)matches.size()>=GlobalSettings::minReadSupport){ |
339 | | - found = true; |
340 | | - cout<<endl<<"---------------"<<endl; |
341 | | - mutationList[i].print(); |
342 | | - for(size_t m=0; m<matches.size(); m++){ |
343 | | - cout<<m+1<<", "; |
344 | | - matches[m]->print(mutationList[i].mLeft.length(), mutationList[i].mCenter.length(), mutationList[i].mRight.length()); |
345 | | - } |
346 | | - } else { |
347 | | - undetected++; |
348 | | - } |
349 | | - } |
350 | | - if(found == false) { |
351 | | - cout << "MutScan didn't find any mutation" << endl; |
352 | | - } |
353 | | - // if processing VCF, output those with no supporting reads found |
354 | | - if(GlobalSettings::processingVCF && undetected>0) { |
355 | | - cerr << undetected << " mutations of this VCF are not detected" << endl; |
356 | | - for(size_t i=0;i<mutationList.size();i++){ |
357 | | - vector<Match*> matches = mutationMatches[i]; |
358 | | - if((ssize_t)matches.size()<GlobalSettings::minReadSupport){ |
359 | | - Mutation m = mutationList[i]; |
360 | | - cerr <<m.mChr << " " <<m.mName<<" "<<m.mLeft<<" "<<m.mCenter<<" "<<m.mRight <<endl; |
361 | | - } |
362 | | - } |
363 | | - } |
364 | | -} |
365 | | - |
366 | | -void PairEndScanner::jsonReport(vector<Mutation>& mutationList, vector<Match*> *mutationMatches) { |
367 | | - if(mJsonFile == "") |
368 | | - return; |
369 | | - |
370 | | - JsonReporter reporter(mJsonFile, mutationList, mutationMatches); |
371 | | - reporter.run(); |
372 | | -} |
373 | | - |
374 | | -void PairEndScanner::htmlReport(vector<Mutation>& mutationList, vector<Match*> *mutationMatches) { |
375 | | - if(mHtmlFile == "") |
376 | | - return; |
377 | | - |
378 | | - // display a warning for too many reads with standalone mode |
379 | | - if(GlobalSettings::standaloneHtml){ |
380 | | - int totalReadSize = 0; |
381 | | - for(size_t i=0;i<mutationList.size();i++){ |
382 | | - vector<Match*> matches = mutationMatches[i]; |
383 | | - totalReadSize += matches.size(); |
384 | | - } |
385 | | - if(totalReadSize > WARN_STANDALONE_READ_LIMIT) |
386 | | - cerr << "WARNING: found too many (" << totalReadSize << ") reads. The standalone HTML report file will be very big and not readable. Remove -s or --standalone to generate multiple HTML files for each mutation."<<endl; |
387 | | - } |
388 | | - |
389 | | - if(!GlobalSettings::standaloneHtml) { |
390 | | - MultiHtmlReporter reporter(mHtmlFile, mutationList, mutationMatches); |
391 | | - reporter.run(); |
392 | | - } else { |
393 | | - HtmlReporter reporter(mHtmlFile, mutationList, mutationMatches); |
394 | | - reporter.run(); |
395 | | - } |
396 | | -} |
0 commit comments