1
+ """
2
+ Validate async code patterns and detect common pitfalls.
3
+ """
4
+
5
+ import ast
6
+ import argparse
7
+ import json
8
+ from pathlib import Path
9
+ from typing import List , Dict , Any
10
+
11
+ class AsyncCodeValidator :
12
+ """Validate async code for common patterns and pitfalls."""
13
+
14
+ def __init__ (self ):
15
+ self .issues = []
16
+ self .suggestions = []
17
+
18
+ def validate_directory (self , source_dir : Path ) -> Dict [str , Any ]:
19
+ """Validate all Python files in directory."""
20
+
21
+ validation_results = {
22
+ 'files_checked' : 0 ,
23
+ 'issues_found' : 0 ,
24
+ 'suggestions' : 0 ,
25
+ 'details' : []
26
+ }
27
+
28
+ python_files = list (source_dir .rglob ("*.py" ))
29
+
30
+ for file_path in python_files :
31
+ if self ._should_skip_file (file_path ):
32
+ continue
33
+
34
+ file_results = self ._validate_file (file_path )
35
+ validation_results ['details' ].append (file_results )
36
+ validation_results ['files_checked' ] += 1
37
+ validation_results ['issues_found' ] += len (file_results ['issues' ])
38
+ validation_results ['suggestions' ] += len (file_results ['suggestions' ])
39
+
40
+ return validation_results
41
+
42
+ def _validate_file (self , file_path : Path ) -> Dict [str , Any ]:
43
+ """Validate a single Python file."""
44
+
45
+ file_results = {
46
+ 'file' : str (file_path ),
47
+ 'issues' : [],
48
+ 'suggestions' : []
49
+ }
50
+
51
+ try :
52
+ with open (file_path , 'r' , encoding = 'utf-8' ) as f :
53
+ source_code = f .read ()
54
+
55
+ tree = ast .parse (source_code , filename = str (file_path ))
56
+
57
+ # Analyze AST for async patterns
58
+ validator = AsyncPatternVisitor (file_path )
59
+ validator .visit (tree )
60
+
61
+ file_results ['issues' ] = validator .issues
62
+ file_results ['suggestions' ] = validator .suggestions
63
+
64
+ except Exception as e :
65
+ file_results ['issues' ].append ({
66
+ 'type' : 'parse_error' ,
67
+ 'message' : f"Failed to parse file: { str (e )} " ,
68
+ 'line' : 0
69
+ })
70
+
71
+ return file_results
72
+
73
+
74
+ def _should_skip_file (self , file_path : Path ) -> bool :
75
+ """Determine if a file should be skipped (e.g., __init__.py files)."""
76
+ return file_path .name == "__init__.py"
77
+
78
+ class AsyncPatternVisitor (ast .NodeVisitor ):
79
+ """AST visitor to detect async patterns and issues."""
80
+
81
+ def __init__ (self , file_path : Path ):
82
+ self .file_path = file_path
83
+ self .issues = []
84
+ self .suggestions = []
85
+ self .in_async_function = False
86
+
87
+ def visit_AsyncFunctionDef (self , node ):
88
+ """Visit async function definitions."""
89
+
90
+ self .in_async_function = True
91
+
92
+ # Check for blocking operations in async functions
93
+ self ._check_blocking_operations (node )
94
+
95
+ # Check for proper error handling
96
+ self ._check_error_handling (node )
97
+
98
+ self .generic_visit (node )
99
+ self .in_async_function = False
100
+
101
+ def visit_Call (self , node ):
102
+ """Visit function calls."""
103
+
104
+ if self .in_async_function :
105
+ # Check for potentially unawaited async calls
106
+ self ._check_unawaited_calls (node )
107
+
108
+ # Check for blocking I/O operations
109
+ self ._check_blocking_io (node )
110
+
111
+ self .generic_visit (node )
112
+
113
+ def _check_blocking_operations (self , node ):
114
+ """Check for blocking operations in async functions."""
115
+
116
+ blocking_patterns = [
117
+ 'time.sleep' ,
118
+ 'requests.get' , 'requests.post' ,
119
+ 'subprocess.run' , 'subprocess.call' ,
120
+ 'open' # File I/O without async
121
+ ]
122
+
123
+ for child in ast .walk (node ):
124
+ if isinstance (child , ast .Call ):
125
+ call_name = self ._get_call_name (child )
126
+ if call_name in blocking_patterns :
127
+ self .issues .append ({
128
+ 'type' : 'blocking_operation' ,
129
+ 'message' : f"Blocking operation '{ call_name } ' in async function" ,
130
+ 'line' : child .lineno ,
131
+ 'suggestion' : f"Use async equivalent of { call_name } "
132
+ })
133
+
134
+ def _check_unawaited_calls (self , node ):
135
+ """Check for potentially unawaited async calls."""
136
+
137
+ # Look for calls that might return coroutines
138
+ async_patterns = [
139
+ 'aiohttp' , 'asyncio' , 'asyncpg' ,
140
+ 'websockets' , 'motor' # Common async libraries
141
+ ]
142
+
143
+ call_name = self ._get_call_name (node )
144
+
145
+ for pattern in async_patterns :
146
+ if pattern in call_name :
147
+ # Check if this call is awaited
148
+ parent = getattr (node , 'parent' , None )
149
+ if not isinstance (parent , ast .Await ):
150
+ self .suggestions .append ({
151
+ 'type' : 'potentially_unawaited' ,
152
+ 'message' : f"Call to '{ call_name } ' might need await" ,
153
+ 'line' : node .lineno
154
+ })
155
+ break
156
+
157
+ def _get_call_name (self , node ):
158
+ """Extract the name of a function call."""
159
+
160
+ if isinstance (node .func , ast .Name ):
161
+ return node .func .id
162
+ elif isinstance (node .func , ast .Attribute ):
163
+ if isinstance (node .func .value , ast .Name ):
164
+ return f"{ node .func .value .id } .{ node .func .attr } "
165
+ else :
166
+ return node .func .attr
167
+ return "unknown"
168
+
169
+
170
+ if __name__ == "__main__" :
171
+ parser = argparse .ArgumentParser (description = "Validate async code patterns and detect common pitfalls." )
172
+ parser .add_argument ("--source" , type = Path , required = True , help = "Source directory to validate." )
173
+ parser .add_argument ("--report" , type = Path , required = True , help = "Path to the output validation report." )
174
+
175
+ args = parser .parse_args ()
176
+
177
+ validator = AsyncCodeValidator ()
178
+ results = validator .validate_directory (args .source )
179
+
180
+ with open (args .report , 'w' ) as f :
181
+ json .dump (results , f , indent = 4 )
182
+
183
+ print (f"Validation report saved to { args .report } " )
0 commit comments