Skip to content

Attempts at multithreading lead to various errors #30

@adkinsrs

Description

@adkinsrs

I have a tool (https://github.com/IGS/FADU) that I would like to enable multithreading for. At an earlier iteration of the tool, I was using the BioAlignments package for BAM-file processing, and I believe multithreading was working properly then (at least implicitly where I would not have to call @threads or @spawn in my code). Because of the BAM functionality being moved to XAM.jl, I updated packages and it seems to have broken multithreading functionality. Currently I am trying to follow some tutorials (https://julialang.org/blog/2023/07/PSA-dont-use-threadid/#better_fix_work_directly_with_tasks) in an effort to get multithreading working again, but am not having much luck.

I wasn't sure what BioJulia repo to put this in, but judging from (BioJulia/XAM.jl#44) and (BioJulia/BioAlignments.jl#4) it seems this has not been implemented. The error I cite is from the BGZFStreams package so I am writing this here, but I can copy this over to XAM if it should be there.

Expected Behavior

If I run the following code without --threads=N, I expect the code to run successfully without erroring. Same if I run with julia --threads=N.

Current Behavior

If my GFF3 file has exactly 1 feature to process, then the test script runs as expected (with or with --threads command-line argument).

When there is 2+ features to process, then if --threads is not used or if --threads=1, I get the following:

ERROR: LoadError: TaskFailedException
Stacktrace:
 [1] wait
   @ ./task.jl:349 [inlined]
 [2] fetch
   @ ./task.jl:369 [inlined]
 [3] _broadcast_getindex_evalf
   @ ./broadcast.jl:683 [inlined]
 [4] _broadcast_getindex
   @ ./broadcast.jl:656 [inlined]
 [5] getindex
   @ ./broadcast.jl:610 [inlined]
 [6] copy
   @ ./broadcast.jl:912 [inlined]
 [7] materialize
   @ ./broadcast.jl:873 [inlined]
 [8] process_all_features(features::Vector{GFF3.Record}, bam_file::String)
   @ Main ~/git/FADU/tst2.jl:92
 [9] top-level scope
   @ ~/git/FADU/tst2.jl:103

    nested task error: ArgumentError: too large in-block offset
    Stacktrace:
     [1] seek(stream::BGZFStreams.BGZFStream{IOStream}, voffset::BGZFStreams.VirtualOffset)
       @ BGZFStreams ~/.julia/packages/BGZFStreams/aVKm9/src/bgzfstream.jl:232
     [2] seek
       @ ~/.julia/packages/XAM/l6LT2/src/bam/reader.jl:68 [inlined]
     [3] iterate(iter::XAM.BAM.OverlapIterator{IOStream})
       @ XAM.BAM ~/.julia/packages/XAM/l6LT2/src/bam/overlap.jl:61
     [4] process_feature(feature::GFF3.Record, reader::XAM.BAM.Reader{IOStream})
       @ Main ~/git/FADU/tst2.jl:62
     [5] macro expansion
       @ ~/git/FADU/tst2.jl:85 [inlined]
     [6] (::var"#4#6"{SubArray{GFF3.Record, 1, Vector{GFF3.Record}, Tuple{UnitRange{Int64}}, true}, XAM.BAM.Reader{IOStream}})()
       @ Main ./threadingconstructs.jl:410
in expression starting at /Users/sadkins/git/FADU/tst2.jl:103

If --threads=2 is used, I get this:

ERROR: LoadError: TaskFailedException
Stacktrace:
    ### same "wait" stacktrace"
nested task error: zlib failed to inflate a compressed block
    Stacktrace:
     [1] error(s::String)
       @ Base ./error.jl:35
     [2] read_blocks!(stream::BGZFStreams.BGZFStream{IOStream})
       @ BGZFStreams ~/.julia/packages/BGZFStreams/aVKm9/src/bgzfstream.jl:402
     [3] seek(stream::BGZFStreams.BGZFStream{IOStream}, voffset::BGZFStreams.VirtualOffset)
       @ BGZFStreams ~/.julia/packages/BGZFStreams/aVKm9/src/bgzfstream.jl:229
     [4] seek
       @ ~/.julia/packages/XAM/l6LT2/src/bam/reader.jl:68 [inlined]
     [5] iterate(iter::XAM.BAM.OverlapIterator{IOStream})
       @ XAM.BAM ~/.julia/packages/XAM/l6LT2/src/bam/overlap.jl:61
     [6] process_feature(feature::GFF3.Record, reader::XAM.BAM.Reader{IOStream})
       @ Main ~/git/FADU/tst2.jl:196
     [7] macro expansion
       @ ~/git/FADU/tst2.jl:219 [inlined]
     [8] (::var"#4#6"{SubArray{GFF3.Record, 1, Vector{GFF3.Record}, Tuple{UnitRange{Int64}}, true}, XAM.BAM.Reader{IOStream}})()
       @ Main ./threadingconstructs.jl:410
in expression starting at /Users/sadkins/git/FADU/tst2.jl:103

Possible Solution / Implementation

I'm not sure of one at the moment.

Steps to Reproduce (for bugs)

using Base.Threads: nthreads, @spawn
using Base.Iterators: partition

using XAM
using GFF3
using GenomicFeatures

function process_feature(feature::GFF3.Record, reader::BAM.Reader)
    count = 0
    for alignment in eachoverlap(reader, feature)   # from XAM.jl   (BGZFStreams fails here)
        # Process the alignment here...
        count += 1
    end

    println("Processed feature: ", GFF3.attributes(feature, "ID"), " with ", count, " overlaps")
end

# this is inefficient, but it's just an example
function process_all_features(features::Vector{GFF3.Record}, bam_file::String)
    bai_file = bam_file * ".bai"
    reader = open(BAM.Reader, bam_file, index = bai_file)

    tasks_per_thread = 2

    chunk_size = max(1, length(features) ÷ (tasks_per_thread * nthreads()))
    data_chunks = partition(features, chunk_size) # partition your data into chunks that
                                                  # individual tasks will deal with
    tasks = map(data_chunks) do chunk
        # Each chunk of your data gets its own spawned task that does its own local, sequential work
        # and then returns the result
        @spawn begin
            for (i, feature) in enumerate(chunk)
                process_feature(feature, reader)

            end
            return
        end
    end

    fetch.(tasks) # get all the values returned by the individual tasks. fetch is type
                                # unstable, so you may optionally want to assert a specific return type.
    close(reader)
end

# Example usage:
gff_file = "test_data/small.gff3" #"path_to_your_gff_file" with 2 or more gene features
bam_file = "test_data/small.bam" #"path_to_your_bam_file" for my test, I just took the first 100 lines of a larger bam file

features = open(collect, GFF3.Reader, gff_file)    # Array of GFF3.Record objects
filter!(x -> GFF3.featuretype(x) == "gene", features)   # Filter to only gene features
process_all_features(features, bam_file)

I also realize I may just be doing this incorrectly as well. It has been a while since I have used Julia.

Context

Currently with the FADU tool I have worked on, we are seeing an increase in cases where the run times are super long (i.e. IGS/FADU#12) due to huge amounts of BAM read/GFF feature overlaps that need to be processed, and fixing the multithreading would go a long way to remedying some of this. I don't want to make it seem like it's the sole reason, as I am actively looking for other ways to speed up the execution as well.

Your Environment

  • Package Version used:
  • Julia Version used: v1.9.3
  • Operating System and version (desktop or mobile): Mac OS 14.2.1
  • Link to your project: https://github.com/IGS/FADU, though the code I am editing is not committed. My minimal test example for how I would use the multithreading is above.

The packages I am working with on my desktop are a bit newer than the ones I list in my tool. It is stable... just the multithreading isn't working.

[c7e460c6] ArgParse v1.1.4
[8e4a8c10] BED v0.3.0
[28d598bf] BGZFStreams v0.3.2
[af1dc308] GFF3 v0.2.3
⌅ [899a7d2d] GenomicFeatures v2.1.0
⌅ [4ffb77ac] Indexes v0.1.3
[09ab397b] StructArrays v0.6.17
[28d57a85] Transducers v0.4.81
⌃ [d759349c] XAM v0.3.1 - I'm aware there is a XAM v0.4 but Pkg.up does not update it.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions