1717from pathlib import Path
1818from gzip import open as gzopen
1919from bz2 import open as bzopen
20- from typing import Generator
21-
22- from Bio .SeqIO .FastaIO import SimpleFastaParser
20+ from typing import Generator , TextIO
2321
2422from kaptive .log import log , quit_with_error , bold_cyan
2523
2624# Constants -----------------------------------------------------------------------------------------------------------
27- _COMPRESSION_MAGIC = {b'\x1f \x8b ' : 'gz' , b'\x42 \x5a ' : 'bz2' , b'\x50 \x4b ' : 'zip' }
25+ _COMPRESSION_MAGIC = {b'\x1f \x8b ' : 'gz' , b'\x42 \x5a ' : 'bz2' , b'\x50 \x4b ' : 'zip' , b'\x37 \x7a ' : '7z' , b'\x78 \x01 ' : 'xz' }
26+ _READ_N_BYTES = max (len (x ) for x in _COMPRESSION_MAGIC )
2827_LOGO = r""" _ __ _ ____ _____ _____ _______
2928 | |/ / / \ | _ \_ _|_ _\ \ / / ____|
3029 | ' / / _ \ | |_) || | | | \ \ / /| _|
@@ -60,7 +59,7 @@ def check_file(path: str | Path) -> Path:
6059 return path .absolute ()
6160
6261
63- def check_cpus (cpus : int | str | None ) -> int :
62+ def check_cpus (cpus : int | str | None = 0 ) -> int :
6463 if not cpus :
6564 return os .cpu_count ()
6665 try :
@@ -72,15 +71,26 @@ def check_cpus(cpus: int | str | None) -> int:
7271 return min (cpus , os .cpu_count ())
7372
7473
75- def check_dir (path : str , parents : bool = True , exist_ok : bool = True ) -> Path :
74+ def check_out (path : str , mode : str = "at" , parents : bool = True , exist_ok : bool = True ) -> Path | TextIO :
7675 """
77- Check if a directory exists, and create it if not
76+ Check if the user wants to create/append a file or directory.
77+ If it looks like/is already a file (has an extension), return the file object.
78+ If it looks like/is already a directory, return the directory path.
7879 """
79- try :
80- (path := Path (path )).mkdir (parents = parents , exist_ok = exist_ok )
81- return path
82- except Exception as e :
83- quit_with_error (f"Could not create directory { path } : { e } " )
80+ # This may also be sys.stdout
81+ if path == '-' :
82+ return sys .stdout
83+ if (path := Path (path )).suffix :
84+ try :
85+ return path .open (mode )
86+ except Exception as e :
87+ quit_with_error (f'Could not open { path } : { e } ' )
88+ if not path .exists ():
89+ try :
90+ path .mkdir (parents = parents , exist_ok = exist_ok )
91+ except Exception as e :
92+ quit_with_error (f'Could not create { path } : { e } ' )
93+ return path
8494
8595
8696def check_python_version (major : int = 3 , minor : int = 8 ):
@@ -97,27 +107,20 @@ def check_biopython_version(major: int = 1, minor: int = 79):
97107 quit_with_error (f'Biopython version { major } .{ minor } or greater required, got { major_version } .{ minor_version } ' )
98108
99109
100- def parse_fasta (fasta : Path , skip_plasmids : bool = False , verbose : bool = False ) -> Generator [tuple [str , str , str ], None , None ]:
101- log (f'Parsing { fasta .name } ' , verbose )
102- with open (fasta , 'rb' ) as f : # Read the first two bytes to determine the compression format
103- compression = _COMPRESSION_MAGIC .get (f .read (2 ), 'uncompressed' ) # Default to uncompressed
104- if compression == 'uncompressed' :
105- opener = open # Use the built-in open function
110+ def opener (file : Path | str , check : bool = True , verbose : bool = False , * args , ** kwargs ):
111+ """Opens a file with the appropriate open function based on the compression format of the file"""
112+ with open (check_file (file ) if check else file , 'rb' ) as f :
113+ file_start = f .read (_READ_N_BYTES )
114+ compression = next ((comp for magic , comp in _COMPRESSION_MAGIC .items () if file_start .startswith (magic )), 'no' )
115+ log (f'Opening { file } with { compression } compression, file start: { file_start } ' , verbose )
116+ if compression == 'no' :
117+ return open (file , * args , ** kwargs ) # Use the built-in open function
106118 elif compression == 'gz' :
107- opener = gzopen # Use the gzip open function
119+ return gzopen ( file , * args , ** kwargs ) # Use the gzip open function
108120 elif compression == 'bz2' :
109- opener = bzopen # Use the bzip2 open function
121+ return bzopen ( file , * args , ** kwargs ) # Use the bzip2 open function
110122 else :
111123 quit_with_error (f'Unsupported compression format: { compression } ' )
112- try :
113- plasmid_markers = {'plasmid' , '__pl' }
114- with opener (fasta , 'rt' ) as f :
115- for header , sequence in SimpleFastaParser (f ):
116- if skip_plasmids and any (i in header for i in plasmid_markers ):
117- continue
118- yield (x := header .split (' ' , 1 ))[0 ], x [1 ] if len (x ) == 2 else '' , sequence
119- except Exception as e :
120- quit_with_error (f'Error reading { fasta } : { e } ' )
121124
122125
123126def get_logo (message : str , width : int = 43 ) -> str : # 43 is the width of the logo
0 commit comments