diff --git a/3.0 b/3.0 new file mode 100644 index 0000000..a8f3ace --- /dev/null +++ b/3.0 @@ -0,0 +1,221 @@ +import json +import os +import re +import time +import sys +from urllib.parse import urlparse +import requests +import csv +import argparse +from multiprocessing import Pool, Manager +from loguru import logger + +requests.packages.urllib3.disable_warnings() + +logger.remove() +handler_id = logger.add(sys.stderr, level="DEBUG") # 设置输出级别 + +payload_array = {"string": "1", "boolean": "true", "integer": "1", "array": "1", "number": "1", "object": ""} + +# POST类型就发送这个数据 +json_payload = """{ + "code": "string", + "createTime": "2021-02-05T10:34:37.691Z", + "delFlag": "string", + "deptId": 0, + "fullName": "string", + "fullPathCode": "string", + "fullPathName": "string", + "isVirtual": true, + "name": "string", + "outCode": "string", + "outParentCode": "string", + "parentCode": "string", + "parentId": 0, + "parentName": "string", + "sort": 0, + "updateTime": "2021-02-05T10:34:37.691Z" +}""" + +def banner(): + logger.info(''' + _ _ + _____ ____ _ __ _ __ _ ___ _ __ | |__ __ _ ___| | __ +/ __\ \ /\ / / _` |/ _` |/ _` |/ _ \ '__|____| '_ \ / _` |/ __| |/ / +\__ \\ V V / (_| | (_| | (_| | __/ | |_____| | | | (_| | (__| < +|___/ \_/\_/ \__,_|\__, |\__, |\___|_| |_| |_|\__,_|\___|_|\_\\ + |___/ |___/ + by jayus +python swagger.py -h +--------------------------------------------------------------------- + ''') + +def check(url, headers): + try: + res = requests.get(url=url, headers=headers, timeout=5, verify=False) + if " {path}") + for method, details in methods.items(): + try: + process_path(domain, basePath, path, method, details, global_data, headers) + except Exception as e: + logger.error(f"Error processing path {path} with method {method}: {e}") + except Exception as e: + logger.error(f"Error in go_docs: {e}") + +def process_path(domain, basePath, path, method, details, global_data, headers): + summary = details.get('summary', path) + param_num = str(details).count("'in':") + + if method in ['post', 'put']: + process_post_put(domain, basePath, path, method, details, param_num, summary, global_data, headers) + elif method in ['get', 'delete']: + process_get_delete(domain, basePath, path, method, details, param_num, summary, global_data, headers) + else: + logger.error(f"[!] 未知的请求方法: {method}") + +def process_post_put(domain, basePath, path, method, details, param_num, summary, global_data, headers): + # 实现对post和put方法的处理 + if "'in': 'body'" in str(details): + data = json_payload + else: + data = {} + + url = domain + basePath + path + if method == 'post': + req = requests.post(url=url, headers=headers, data=data, timeout=5, verify=False) + else: + req = requests.put(url=url, headers=headers, data=data, timeout=5, verify=False) + + hhh = [url, summary, path, method, url, param_num, data, req.status_code, req.text] + global_data.put(hhh) + +def process_get_delete(domain, basePath, path, method, details, param_num, summary, global_data, headers): + # 实现对get和delete方法的处理 + query_string = "" + param_map = {} + if "parameters" in details: + parameters = details['parameters'] + for param in parameters: + p_type = param.get('type', '') or param.get('schema', {}).get('type', '') + p_name = param['name'] + param_map[p_name] = payload_array.get(p_type, '') + + for key in param_map.keys(): + query_string += f"{key}={param_map[key]}&" + + if "{" in path: + tmps = re.findall("\{[^\}]*\}", path) + for tmp in tmps: + path = path.replace(tmp, param_map[tmp[1:-1]]) + + query_url = f"{domain}{basePath}{path}?{query_string[:-1]}" + if method == 'get': + req = requests.get(url=query_url, headers=headers, timeout=5, verify=False) + else: + req = requests.delete(url=query_url, headers=headers, timeout=5, verify=False) + + hhh = [url, summary, path, method, query_url, param_num, param_map, req.status_code, req.text] + global_data.put(hhh) + +def run(data, headers): + url = data[0] + q = data[1] + url_type = check(url, headers) + if url_type == 0: + logger.error("[!] Error") + elif url_type == 1: + logger.success(f"working on {url}, type: source") + go_source(url, q, headers) + elif url_type == 2: + logger.success(f"working on {url}, type: api-docs") + go_docs(url, q, headers) + else: + logger.success(f"working on {url}, type: html") + go_html(url, q, headers) + +def print_error(value): + logger.error(f"进程池出错, 出错原因为: {value}") + +def run_pool(urls, headers): + p = Pool(8) + manager = Manager() + q = manager.Queue() + for url in urls: + url = url.strip() + param = [url, q] + p.apply_async(run, args=(param, headers), error_callback=print_error) + p.close() + p.join() + output_to_csv(q) + +def output_to_csv(global_data): + with open('swagger.csv', 'w', newline='', encoding='utf-8') as f: + writer = csv.writer(f) + writer.writerow(["api-doc-url", "summary", "path", "method", "query_url", "num of params", "data", "status_code", "response"]) + while not global_data.empty(): + writer.writerow(global_data.get()) + +if __name__ == '__main__': + parser = argparse.ArgumentParser() + parser.add_argument("-u", "--url", dest='target_url', help="resource地址 or api文档地址 or swagger首页地址") + parser.add_argument("-f", "--file", dest='url_file', help="批量测试") + parser.add_argument("-t", "--token", dest='auth_token', help="认证token") + args = parser.parse_args() + + logger.add("file.log", format="{time:YYYY-MM-DD at HH:mm:ss} | {level} | {message}") + banner() + + headers = {} + if args.auth_token: + headers['Authorization'] = f"Bearer {args.auth_token}" + + if args.target_url: + run_pool([args.target_url], headers) + elif args.url_file: + with open(args.url_file, 'r') as f: + urls = f.readlines() + run_pool(urls, headers)