-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathLinajes.py
More file actions
executable file
·132 lines (117 loc) · 5.38 KB
/
Linajes.py
File metadata and controls
executable file
·132 lines (117 loc) · 5.38 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
import pandas as pd
import gzip
import os
import re
import csv
def parse_vcf(vcf_file):
"""Parsea un archivo VCF comprimido y extrae las variantes relevantes."""
variants = []
with gzip.open(vcf_file, 'rt') as f:
for line in f:
if not line.startswith('#'):
fields = line.strip().split('\t')
chrom = fields[0]
try: #manejo de error si no se puede convertir a entero
pos = int(fields[1])
except ValueError:
print(f"Error: No se pudo convertir la posición a entero en la línea: {line.strip()} del archivo {vcf_file}")
continue #salta a la siguiente linea
ref = fields[3]
alt = fields[4]
variants.append({'chrom': chrom, 'pos': pos, 'ref': ref, 'alt': alt})
return variants
def classify_with_matches(variants, clade_df, strategy="combined"):
"""Clasifica el linaje y devuelve también el diccionario matching_clades."""
matching_clades = {}
for index, row in clade_df.iterrows():
clade_site = int(row['site'])
clade_alt = row['alt']
for variant in variants:
if variant['pos'] == clade_site and variant['alt'] == clade_alt:
if row['clade'] not in matching_clades:
matching_clades[row['clade']] = 0
matching_clades[row['clade']] += 1
break
if not matching_clades:
return None, {}
if strategy == "max_matches":
best_clade = max(matching_clades, key=matching_clades.get)
elif strategy == "longest_name":
best_clade = max(matching_clades, key=len)
elif strategy == "manual_priority":
priority_list = ["3I", "3I_A", "3I_A.1", "3I_A.1.1", "3I_A.1.2", "3I_B", "3II", "3II_B", "3III", "3III_A.1", "3III_B.1", "3III_C", "3III_C.2", "3III_C.2.1", "3III_C.2.2"]
for clade in priority_list:
if clade in matching_clades:
best_clade = clade
break
else:
best_clade = max(matching_clades, key=matching_clades.get)
elif strategy == "combined":
max_count = max(matching_clades.values())
best_clades = [clade for clade, count in matching_clades.items() if count == max_count]
best_clade = max(best_clades, key=len)
else:
raise ValueError("Estrategia no válida.")
return best_clade, matching_clades
def export_clades_matches(matching_clades, output_file):
"""Exporta el diccionario 'matching_clades' a un archivo CSV."""
if matching_clades:
try:
with open(output_file, 'w', newline='') as csvfile:
writer = csv.writer(csvfile)
writer.writerow(['Linaje', 'Coincidencias'])
for lineage, count in matching_clades.items():
writer.writerow([lineage, count])
print(f"Coincidencias por linaje exportadas a: {output_file}")
except Exception as e:
print(f"Error al exportar coincidencias por linaje a CSV: {e}")
else:
print(f"No matching clades found. No CSV file created for this sample.")
def main():
"""Función principal para coordinar el análisis y exportar resultados."""
clade_file = '/home/ics2/Descargas/Nextclade/DENV3/clades.tsv'
vcf_dir = '/home/ics2/Descargas/Nextclade/DENV3/variant_calling_analysis/vcfs'
output_file = 'resultados_clasificacion.csv'
try:
clade_df = pd.read_csv(clade_file, sep='\t')
except FileNotFoundError:
print(f"Error: Archivo {clade_file} no encontrado.")
return
except pd.errors.ParserError:
print(f"Error: Formato incorrecto en {clade_file}.")
return
vcf_files = [os.path.join(vcf_dir, f) for f in os.listdir(vcf_dir) if f.endswith('.vcf.gz')]
if not vcf_files:
print(f"No se encontraron archivos .vcf.gz en el directorio: {vcf_dir}")
return
strategy = "combined"
results = {}
matches_results={} #diccionario para guardar los matching clades
for vcf_file in vcf_files:
try:
sample_name = re.search(r'calls_(\d+).vcf.gz', os.path.basename(vcf_file)).group(1)
variants = parse_vcf(vcf_file)
lineage, matching_clades = classify_with_matches(variants, clade_df, strategy)
results[sample_name] = lineage
matches_results[sample_name] = matching_clades #guardar los matching clades
export_clades_matches(matching_clades, f"clades_matches_{sample_name}.csv")
except AttributeError:
print(f"Error: Nombre de archivo VCF no válido: {vcf_file}. Asegúrese de que los nombres sigan el patrón 'calls_#.vcf.gz'")
except Exception as e:
print(f"Error al procesar {vcf_file}: {e}")
try:
with open(output_file, 'w', newline='') as csvfile:
writer = csv.writer(csvfile)
writer.writerow(['Muestra', 'Linaje Asignado'])
for sample, lineage in results.items():
writer.writerow([sample, lineage])
print(f"Resultados exportados a {output_file}")
except Exception as e:
print(f"Error al exportar a CSV: {e}")
for sample, lineage in results.items():
if lineage:
print(f"Muestra {sample}: Linaje asignado: {lineage}")
else:
print(f"Muestra {sample}: No se pudo asignar un linaje.")
if __name__ == "__main__":
main()