Skip to content

Commit 107e7d1

Browse files
committed
More tweaks to cram threading.
The previous commit, while valid, also revealed more woes of multi-slice containers, specifically when cancelling the current read by seeking (draining the read-ahead decode queue).
1 parent 651a936 commit 107e7d1

File tree

3 files changed

+45
-6
lines changed

3 files changed

+45
-6
lines changed

cram/cram_decode.c

Lines changed: 37 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3150,22 +3150,23 @@ static cram_slice *cram_next_slice(cram_fd *fd, cram_container **cp) {
31503150
if (s_next->hdr->ref_seq_id != fd->range.refid) {
31513151
fd->ooc = 1;
31523152
cram_free_slice(s_next);
3153-
s_next = NULL;
3153+
c_next->slice = s_next = NULL;
31543154
break;
31553155
}
31563156

31573157
// position beyond end of range; bail out
31583158
if (s_next->hdr->ref_seq_start > fd->range.end) {
31593159
fd->ooc = 1;
31603160
cram_free_slice(s_next);
3161-
s_next = NULL;
3161+
c_next->slice = s_next = NULL;
31623162
break;
31633163
}
31643164

31653165
// before start of range; skip to next slice
31663166
if (s_next->hdr->ref_seq_start + s_next->hdr->ref_seq_span-1 <
31673167
fd->range.start) {
31683168
cram_free_slice(s_next);
3169+
c_next->slice = s_next = NULL;
31693170
continue;
31703171
}
31713172
}
@@ -3346,14 +3347,27 @@ int cram_get_bam_seq(cram_fd *fd, bam_seq_t **bam) {
33463347
* Drains and frees the decode read-queue for a multi-threaded reader.
33473348
*/
33483349
void cram_drain_rqueue(cram_fd *fd) {
3350+
cram_container *lc = NULL;
3351+
33493352
if (!fd->pool)
33503353
return;
33513354

33523355
// drain queue of any in-flight decode jobs
33533356
while (!hts_tpool_process_empty(fd->rqueue)) {
33543357
hts_tpool_result *r = hts_tpool_next_result_wait(fd->rqueue);
33553358
cram_decode_job *j = (cram_decode_job *)hts_tpool_result_data(r);
3356-
cram_free_container(j->c);
3359+
if (j->c->slice == j->s)
3360+
j->c->slice = NULL;
3361+
if (j->c != lc) {
3362+
if (lc) {
3363+
if (fd->ctr == lc)
3364+
fd->ctr = NULL;
3365+
if (fd->ctr_mt == lc)
3366+
fd->ctr_mt = NULL;
3367+
cram_free_container(lc);
3368+
}
3369+
lc = j->c;
3370+
}
33573371
cram_free_slice(j->s);
33583372
hts_tpool_delete_result(r, 1);
33593373
}
@@ -3362,9 +3376,28 @@ void cram_drain_rqueue(cram_fd *fd) {
33623376
// due to the input queue being full.
33633377
if (fd->job_pending) {
33643378
cram_decode_job *j = (cram_decode_job *)fd->job_pending;
3365-
cram_free_container(j->c);
3379+
if (j->c->slice == j->s)
3380+
j->c->slice = NULL;
3381+
if (j->c != lc) {
3382+
if (lc) {
3383+
if (fd->ctr == lc)
3384+
fd->ctr = NULL;
3385+
if (fd->ctr_mt == lc)
3386+
fd->ctr_mt = NULL;
3387+
cram_free_container(lc);
3388+
}
3389+
lc = j->c;
3390+
}
33663391
cram_free_slice(j->s);
33673392
free(j);
33683393
fd->job_pending = NULL;
33693394
}
3395+
3396+
if (lc) {
3397+
if (fd->ctr == lc)
3398+
fd->ctr = NULL;
3399+
if (fd->ctr_mt == lc)
3400+
fd->ctr_mt = NULL;
3401+
cram_free_container(lc);
3402+
}
33703403
}

cram/cram_index.c

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -497,6 +497,8 @@ int cram_seek_to_refpos(cram_fd *fd, cram_range *r) {
497497

498498
if (fd->ctr) {
499499
cram_free_container(fd->ctr);
500+
if (fd->ctr_mt && fd->ctr_mt != fd->ctr)
501+
cram_free_container(fd->ctr_mt);
500502
fd->ctr = NULL;
501503
fd->ctr_mt = NULL;
502504
fd->ooc = 0;

cram/cram_io.c

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2746,6 +2746,7 @@ void cram_free_container(cram_container *c) {
27462746
if (c->comp_hdr_block)
27472747
cram_free_block(c->comp_hdr_block);
27482748

2749+
// Free the slices; filled out by encoder only
27492750
if (c->slices) {
27502751
for (i = 0; i < c->max_slice; i++) {
27512752
if (c->slices[i])
@@ -2756,8 +2757,10 @@ void cram_free_container(cram_container *c) {
27562757
free(c->slices);
27572758
}
27582759

2760+
// Free the current slice; set by both encoder & decoder
27592761
if (c->slice) {
2760-
cram_free_slice(c->slice);
2762+
cram_free_slice(c->slice);
2763+
c->slice = NULL;
27612764
}
27622765

27632766
for (id = DS_RN; id < DS_TN; id++)
@@ -3136,7 +3139,7 @@ static int cram_flush_result(cram_fd *fd) {
31363139
if (0 != cram_flush_container2(fd, c))
31373140
return -1;
31383141

3139-
/* Free the container */
3142+
// Free the slices; filled out by encoder only
31403143
if (c->slices) {
31413144
for (i = 0; i < c->max_slice; i++) {
31423145
if (c->slices[i])
@@ -3147,6 +3150,7 @@ static int cram_flush_result(cram_fd *fd) {
31473150
}
31483151
}
31493152

3153+
// Free the current slice; set by both encoder & decoder
31503154
if (c->slice) {
31513155
cram_free_slice(c->slice);
31523156
c->slice = NULL;

0 commit comments

Comments
 (0)