2
2
import json
3
3
from pathlib import Path
4
4
from collections import defaultdict
5
- from typing import Dict , List , Optional
5
+ from typing import Dict , List , Optional , Tuple , Union
6
+ from concurrent .futures import ProcessPoolExecutor , as_completed
7
+
8
+ from tqdm import tqdm
9
+
10
+
11
+ import mimetypes
12
+
13
+ def is_text_mimetype (path : str ) -> bool :
14
+ mime , _ = mimetypes .guess_type (path )
15
+ return mime is not None and mime .startswith ("text/" )
16
+
17
+ def process_file (filepath : str , encoding : str ) -> Optional [Tuple [str , int , int ]]:
18
+ ext = (Path (filepath ).suffix or "NO_EXT" ).lower ()
19
+
20
+ try :
21
+ with open (filepath , "r" , encoding = encoding , errors = "ignore" ) as f :
22
+ with_spaces = sum (1 for _ in f )
23
+ f .seek (0 )
24
+ without_spaces = sum (1 for line in f if line .strip ())
25
+ return ext , with_spaces , without_spaces
26
+ except Exception :
27
+ return None
28
+
29
+ def scan_files (directory : Path , ignore_folders : set , ignore_exts : set ) -> List [str ]:
30
+ file_list = []
31
+
32
+ def _recursive_scan (path : Path ):
33
+ for entry in os .scandir (path ):
34
+ entry_path = Path (entry .path )
35
+ if entry .is_dir (follow_symlinks = False ):
36
+ if entry_path .name not in ignore_folders :
37
+ _recursive_scan (entry_path )
38
+ elif entry .is_file (follow_symlinks = False ):
39
+ ext = entry_path .suffix .lower () or "NO_EXT"
40
+ if ext not in ignore_exts :
41
+ if is_text_mimetype (entry_path ):
42
+ file_list .append (str (entry_path ))
43
+
44
+ _recursive_scan (directory )
45
+ return file_list
6
46
7
47
8
48
class LineCounter :
9
- def __init__ (self , ignore_extensions : Optional [List [str ]] = None , ignore_folder : Optional [List [str ]] = None , encoding : str = "utf-8" ):
49
+ def __init__ (
50
+ self ,
51
+ ignore_extensions : Optional [List [str ]] = None ,
52
+ ignore_folder : Optional [List [str ]] = None ,
53
+ encoding : str = "utf-8" ,
54
+ use_progress : bool = True ,
55
+ max_workers : Optional [int ] = None ,
56
+ ):
10
57
self .encoding = encoding
11
58
self .ignore_folder = set (ignore_folder or [])
12
59
self .ignore_extensions = set (ignore_extensions or [])
13
60
self .with_spaces : Dict [str , int ] = defaultdict (int )
14
61
self .without_spaces : Dict [str , int ] = defaultdict (int )
15
62
self .file_count : Dict [str , int ] = defaultdict (int )
63
+ self .use_progress = use_progress and tqdm is not None
64
+ self .max_workers = max_workers
16
65
17
- def count_lines (self , directory : Path ) -> Dict [str , Dict [str , int ]]:
66
+ def count_lines (self , directory : Union [ str , Path ] ) -> Dict [str , Dict [str , int ]]:
18
67
directory = Path (directory )
19
68
if not directory .is_dir ():
20
69
raise ValueError (f"{ directory } is not a valid directory" )
21
-
22
- for root , dirs , files in os .walk (directory ):
23
- # Remove ignored folders from traversal
24
- dirs [:] = [d for d in dirs if d not in self .ignore_folder ]
25
- for file in files :
26
- filepath = Path (root ) / file
27
- ext = (filepath .suffix or "NO_EXT" ).lower ()
28
-
29
- if ext in self .ignore_extensions :
30
- continue
31
-
32
- try :
33
- with open (filepath , "r" , encoding = self .encoding , errors = "ignore" ) as f :
34
- lines = f .readlines ()
35
- self .file_count [ext ] += 1
36
- self .with_spaces [ext ] += len (lines )
37
- self .without_spaces [ext ] += sum (1 for line in lines if line .strip ())
38
- except Exception as e :
39
- print (f"Error reading { filepath } : { e } " )
70
+
71
+ filepaths = scan_files (directory , self .ignore_folder , self .ignore_extensions )
72
+
73
+ with ProcessPoolExecutor (max_workers = self .max_workers ) as executor :
74
+ futures = [
75
+ executor .submit (process_file , filepath , self .encoding )
76
+ for filepath in filepaths
77
+ ]
78
+ iterator = as_completed (futures )
79
+
80
+ if self .use_progress :
81
+ iterator = tqdm (iterator , total = len (futures ), desc = "Counting lines" )
82
+
83
+ for future in iterator :
84
+ result = future .result ()
85
+ if result :
86
+ ext , with_spaces , without_spaces = result
87
+ self .file_count [ext ] += 1
88
+ self .with_spaces [ext ] += with_spaces
89
+ self .without_spaces [ext ] += without_spaces
40
90
41
91
return self ._build_result ()
42
92
@@ -53,7 +103,7 @@ def _build_result(self) -> Dict[str, Dict[str, int]]:
53
103
@staticmethod
54
104
def to_json (data : Dict ) -> str :
55
105
return json .dumps (data , indent = 2 )
56
-
106
+
57
107
@staticmethod
58
108
def to_csv (data : Dict ) -> str :
59
109
import csv
@@ -67,12 +117,11 @@ def to_csv(data: Dict) -> str:
67
117
writer .writerow ([ext , counts ["with_spaces" ], counts ["without_spaces" ], counts ["file_count" ]])
68
118
69
119
return output .getvalue ()
70
-
120
+
71
121
@staticmethod
72
122
def to_markdown (data : Dict ) -> str :
73
123
output = "| Extension | With Spaces | Without Spaces | File Count |\n "
74
124
output += "|-----------|-------------|----------------|------------|\n "
75
125
for ext , counts in data .items ():
76
126
output += f"| { ext } | { counts ['with_spaces' ]} | { counts ['without_spaces' ]} | { counts ['file_count' ]} |\n "
77
-
78
127
return output
0 commit comments