Skip to content

Commit 9b509e4

Browse files
authored
Merge pull request #60 from EBI-Metagenomics/feature/chunk-contigs-and-ips
Chunk SanntiS - and "sync" inputs
2 parents b823a18 + 378faa7 commit 9b509e4

File tree

10 files changed

+532
-44
lines changed

10 files changed

+532
-44
lines changed

bin/filter_ips_by_contigs.py

Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,173 @@
1+
#!/usr/bin/env python3
2+
# -*- coding: utf-8 -*-
3+
4+
# Copyright 2026 EMBL - European Bioinformatics Institute
5+
#
6+
# Licensed under the Apache License, Version 2.0 (the "License");
7+
# you may not use this file except in compliance with the License.
8+
# You may obtain a copy of the License at
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
17+
18+
import argparse
19+
import gzip
20+
import logging
21+
import sys
22+
from pathlib import Path
23+
from typing import Iterator
24+
25+
logging.basicConfig(
26+
format="%(levelname)s: %(message)s",
27+
level=logging.INFO,
28+
stream=sys.stderr,
29+
)
30+
31+
32+
def _open(path: Path | str):
33+
"""Open a plain or gzip-compressed text file for reading.
34+
35+
:param path: Path to the file.
36+
:return: A text-mode file object.
37+
"""
38+
if Path(path).suffix == ".gz":
39+
return gzip.open(path, "rt")
40+
return open(path, "r")
41+
42+
43+
def _contig_ids_from_fasta(fasta_path: Path) -> set[str]:
44+
"""Parse a FASTA file and return the set of sequence IDs.
45+
46+
Only the first whitespace-delimited token of each header line is used,
47+
matching the behaviour of seqkit and other common tools.
48+
49+
:param fasta_path: Path to the FASTA file (plain or gzip).
50+
:return: Set of contig IDs (without the leading '>').
51+
"""
52+
ids: set[str] = set()
53+
with _open(fasta_path) as handle:
54+
for line in handle:
55+
if line.startswith(">"):
56+
ids.add(line[1:].split()[0])
57+
58+
if not ids:
59+
logging.error(f"No sequences found in {fasta_path}")
60+
sys.exit(1)
61+
62+
return ids
63+
64+
65+
def _contig_id_from_protein_id(protein_id: str, contig_ids: set[str]) -> str | None:
66+
"""Derive the contig ID from a protein ID, handling both CGC callers.
67+
68+
The CGC produces two protein ID formats:
69+
70+
- Pyrodigal: ``{contig_id}_{cds_n}`` — strip the last component.
71+
- FragGeneScanRS: ``{contig_id}_{start}_{end}_{strand}`` — strip the last three.
72+
73+
Both are tried in order; the first match against *contig_ids* is returned.
74+
Returns ``None`` if neither resolves to a known contig.
75+
76+
:param protein_id: Protein identifier from the IPS TSV.
77+
:param contig_ids: Set of known contig IDs.
78+
:return: Matching contig ID, or ``None``.
79+
"""
80+
for candidate in (protein_id.rsplit("_", 1)[0], protein_id.rsplit("_", 3)[0]):
81+
if candidate in contig_ids:
82+
return candidate
83+
return None
84+
85+
86+
def _iter_filtered_rows(
87+
ips_path: Path,
88+
contig_ids: set[str],
89+
) -> Iterator[tuple[str, str]]:
90+
"""Stream IPS TSV rows that belong to contigs in *contig_ids*.
91+
92+
The protein ID is column 1 (0-indexed: column 0) of the tab-separated file.
93+
Both Pyrodigal (``{contig_id}_{cds_n}``) and FragGeneScanRS
94+
(``{contig_id}_{start}_{end}_{strand}``) protein ID formats are handled.
95+
96+
:param ips_path: Path to the InterProScan TSV (plain or gzip).
97+
:param contig_ids: Set of contig IDs to keep.
98+
:return: Iterator of ``(protein_id, raw_line)`` tuples for matching rows.
99+
:raises SystemExit: If a malformed row (no tab separator) is encountered.
100+
"""
101+
with _open(ips_path) as handle:
102+
for lineno, line in enumerate(handle, start=1):
103+
if not line.strip() or line.startswith("#"):
104+
continue
105+
parts = line.split("\t", 1)
106+
if len(parts) < 2:
107+
logging.error(
108+
f"Malformed IPS row at line {lineno} (no tab separator): {line.rstrip()}"
109+
)
110+
sys.exit(1)
111+
protein_id = parts[0]
112+
contig_id = _contig_id_from_protein_id(protein_id, contig_ids)
113+
if contig_id is not None:
114+
yield protein_id, line
115+
116+
117+
def filter_ips(
118+
contigs_path: Path,
119+
ips_path: Path,
120+
out_path: Path,
121+
) -> None:
122+
"""Filter *ips_path* to rows whose proteins belong to the provided contigs.
123+
124+
Each matched protein ID is written to stdout (one per line) so the caller
125+
can pipe them into ``seqkit grep -f -``.
126+
127+
:param contigs_path: FASTA file containing the contig subset.
128+
:param ips_path: Full InterProScan TSV to filter.
129+
:param out_path: Destination for the filtered TSV (written as gzip).
130+
"""
131+
contig_ids = _contig_ids_from_fasta(contigs_path)
132+
133+
with gzip.open(out_path, "wt") as out_fh:
134+
for protein_id, line in _iter_filtered_rows(ips_path, contig_ids):
135+
out_fh.write(line)
136+
sys.stdout.write(protein_id + "\n")
137+
sys.stdout.flush()
138+
139+
140+
if __name__ == "__main__":
141+
"""Filter an InterProScan TSV to rows whose proteins belong to a given set of contigs.
142+
143+
Matched protein IDs are written to stdout so the caller can pipe them directly
144+
into ``seqkit grep -f -`` to subset a FAA file without creating a temporary file.
145+
"""
146+
parser = argparse.ArgumentParser()
147+
parser.add_argument(
148+
"--contigs",
149+
required=True,
150+
type=Path,
151+
metavar="FASTA",
152+
help="Contig chunk FASTA (plain or gzip).",
153+
)
154+
parser.add_argument(
155+
"--ips",
156+
required=True,
157+
type=Path,
158+
metavar="TSV",
159+
help="Full InterProScan TSV (plain or gzip).",
160+
)
161+
parser.add_argument(
162+
"--out",
163+
required=True,
164+
type=Path,
165+
metavar="TSV_GZ",
166+
help="Output filtered TSV (written as gzip).",
167+
)
168+
args = parser.parse_args()
169+
filter_ips(
170+
contigs_path=args.contigs,
171+
ips_path=args.ips,
172+
out_path=args.out,
173+
)

conf/modules.config

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -477,11 +477,14 @@ process {
477477
cpus = 4
478478
// TODO: we will tweak this one
479479
memory = { 80.GB * task.attempt }
480+
}
480481

482+
withName: CONCATENATE_SANNTIS_GFFS {
483+
ext.prefix = { "${meta.id}_sanntis" }
481484
publishDir = [
482485
path: { "${params.outdir}/${meta.id}/pathways-and-systems/sanntis" },
483486
mode: params.publish_dir_mode,
484-
saveAs: { filename -> filename.equals('versions.yml') ? null : filename },
487+
saveAs: { filename -> filename.equals('versions.yml') ? null : filename.replaceAll('_concatenated', '') },
485488
]
486489
}
487490

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
process FILTER_IPS_AND_FAA_BY_CONTIGS {
2+
3+
tag "$meta.id"
4+
label 'process_medium'
5+
6+
container "${ workflow.containerEngine == 'singularity' && !task.ext.singularity_pull_docker_container ?
7+
'oras://community.wave.seqera.io/library/seqkit_python:3dcb98fc808deba4' :
8+
'community.wave.seqera.io/library/seqkit_python:fa0b5f9a20082dfc' }"
9+
10+
input:
11+
tuple val(meta), path(contigs_chunk), path(ips_tsv), path(faa)
12+
13+
output:
14+
tuple val(meta), path("${prefix}_filtered_ips.tsv.gz"), path("${prefix}_filtered.faa.gz"), emit: filtered
15+
path "versions.yml", emit: versions
16+
17+
script:
18+
prefix = task.ext.prefix ?: "${meta.id}"
19+
"""
20+
filter_ips_by_contigs.py \\
21+
--contigs ${contigs_chunk} \\
22+
--ips ${ips_tsv} \\
23+
--out ${prefix}_filtered_ips.tsv.gz | \\
24+
seqkit grep \\
25+
--threads ${task.cpus} \\
26+
-f - \\
27+
${faa} \\
28+
-o ${prefix}_filtered.faa.gz
29+
30+
cat <<-END_VERSIONS > versions.yml
31+
"${task.process}":
32+
seqkit: \$( seqkit version | sed 's/seqkit v//' )
33+
python: \$( python3 --version | sed 's/Python //' )
34+
END_VERSIONS
35+
"""
36+
37+
stub:
38+
prefix = task.ext.prefix ?: "${meta.id}"
39+
"""
40+
echo "" | gzip > ${prefix}_filtered_ips.tsv.gz
41+
echo "" | gzip > ${prefix}_filtered.faa.gz
42+
43+
cat <<-END_VERSIONS > versions.yml
44+
"${task.process}":
45+
seqkit: \$( seqkit version | sed 's/seqkit v//' )
46+
python: \$( python3 --version | sed 's/Python //' )
47+
END_VERSIONS
48+
"""
49+
}

subworkflows/local/pathways_and_systems.nf

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,11 @@ include { KEGGPATHWAYSCOMPLETENESS } from '../../modules/ebi-metagenomics/ke
1414
/* LOCAL */
1515
include { ANTISMASH_JSON_TO_GFF } from '../../modules/local/antismash_json_to_gff'
1616
include { CONCATENATE_GFFS as CONCATENATE_ANTISMASH_GFFS } from '../../modules/local/concatenate_gffs'
17+
include { CONCATENATE_GFFS as CONCATENATE_SANNTIS_GFFS } from '../../modules/local/concatenate_gffs'
1718
include { ANTISMASH_SUMMARY } from '../../modules/local/antismash_summary'
1819
include { SANNTIS_SUMMARY } from '../../modules/local/sanntis_summary'
1920
include { MERGE_ANTISMASH_JSON } from '../../modules/local/merge_antismash_json'
21+
include { FILTER_IPS_AND_FAA_BY_CONTIGS } from '../../modules/local/filter_ips_and_faa_by_contigs'
2022

2123
include { DRAM_DISTILL_SWF } from '../../subworkflows/local/dram_distill_swf'
2224

@@ -118,22 +120,30 @@ workflow PATHWAYS_AND_SYSTEMS {
118120
)
119121
ch_versions = ch_versions.mix(ANTISMASH_SUMMARY.out.versions)
120122

121-
// Note: same weirdness as antismash_channel
122-
ch_sanntis = ch_contigs_and_predicted_proteins.map { meta, _all_contigs_fasta, faa, _gff, ips_tsv ->
123-
[meta, ips_tsv, [], faa]
124-
}
123+
// For each chunk we filter the full IPS TSV and FAA to only the proteins belonging
124+
// to contigs in that chunk, then run SANNTIS in parallel and merge the results.
125+
ch_sanntis_chunks = ch_contigs_and_predicted_proteins
126+
.combine(SEQKIT_SPLIT2.out.assembly.transpose(), by: 0)
127+
.map { meta, _all_contigs_fasta, faa, _gff, ips_tsv, contigs_chunk ->
128+
[meta, contigs_chunk, ips_tsv, faa]
129+
}
130+
131+
FILTER_IPS_AND_FAA_BY_CONTIGS(ch_sanntis_chunks)
132+
ch_versions = ch_versions.mix(FILTER_IPS_AND_FAA_BY_CONTIGS.out.versions)
125133

126-
// We run SanntiS only once per assembly. To chunk it, we would need to ensure
127-
// that each protein chunk contains annotations for only one contig. Otherwise,
128-
// SanntiS might misannotate sequences, as there is no guarantee that all proteins
129-
// from a single contig will be present in the same faa chunk.
130134
SANNTIS(
131-
ch_sanntis
135+
FILTER_IPS_AND_FAA_BY_CONTIGS.out.filtered
136+
.map { meta, ips, faa -> [meta, ips, [], faa] }
132137
)
133138
ch_versions = ch_versions.mix(SANNTIS.out.versions)
134139

140+
CONCATENATE_SANNTIS_GFFS(
141+
SANNTIS.out.gff.groupTuple()
142+
)
143+
ch_versions = ch_versions.mix(CONCATENATE_SANNTIS_GFFS.out.versions)
144+
135145
SANNTIS_SUMMARY(
136-
SANNTIS.out.gff
146+
CONCATENATE_SANNTIS_GFFS.out.concatenated_gff
137147
)
138148
ch_versions = ch_versions.mix(SANNTIS_SUMMARY.out.versions)
139149

@@ -150,6 +160,6 @@ workflow PATHWAYS_AND_SYSTEMS {
150160

151161
emit:
152162
versions = ch_versions
153-
sanntis_gff = SANNTIS.out.gff
163+
sanntis_gff = CONCATENATE_SANNTIS_GFFS.out.concatenated_gff
154164
antismash_gff = CONCATENATE_ANTISMASH_GFFS.out.concatenated_gff
155165
}

tests/assembly_erz101.fasta.gz

5.72 KB
Binary file not shown.

0 commit comments

Comments
 (0)