Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
221 changes: 221 additions & 0 deletions 3.0
Original file line number Diff line number Diff line change
@@ -0,0 +1,221 @@
import json
import os
import re
import time
import sys
from urllib.parse import urlparse
import requests
import csv
import argparse
from multiprocessing import Pool, Manager
from loguru import logger

requests.packages.urllib3.disable_warnings()

logger.remove()
handler_id = logger.add(sys.stderr, level="DEBUG") # 设置输出级别

payload_array = {"string": "1", "boolean": "true", "integer": "1", "array": "1", "number": "1", "object": ""}

# POST类型就发送这个数据
json_payload = """{
"code": "string",
"createTime": "2021-02-05T10:34:37.691Z",
"delFlag": "string",
"deptId": 0,
"fullName": "string",
"fullPathCode": "string",
"fullPathName": "string",
"isVirtual": true,
"name": "string",
"outCode": "string",
"outParentCode": "string",
"parentCode": "string",
"parentId": 0,
"parentName": "string",
"sort": 0,
"updateTime": "2021-02-05T10:34:37.691Z"
}"""

def banner():
logger.info('''
_ _
_____ ____ _ __ _ __ _ ___ _ __ | |__ __ _ ___| | __
/ __\ \ /\ / / _` |/ _` |/ _` |/ _ \ '__|____| '_ \ / _` |/ __| |/ /
\__ \\ V V / (_| | (_| | (_| | __/ | |_____| | | | (_| | (__| <
|___/ \_/\_/ \__,_|\__, |\__, |\___|_| |_| |_|\__,_|\___|_|\_\\
|___/ |___/
by jayus
python swagger.py -h
---------------------------------------------------------------------
''')

def check(url, headers):
try:
res = requests.get(url=url, headers=headers, timeout=5, verify=False)
if "<html" in res.text:
logger.debug("[+] 输入url为swagger首页,开始解析api文档地址")
return 3 # html
elif "\"parameters\"" in res.text:
logger.debug("[+] 输入url为api文档地址,开始构造请求发包")
return 2 # api doc
elif "\"location\"" in res.text:
logger.debug("[+] 输入url为resource地址,开始解析api文档地址")
return 1 # source
except Exception as e:
logger.error(f"Error checking URL: {e}")
return 0

def get_api_docs_paths(resource_url, headers): # 输入resource,解析出各api文档的url
domain = urlparse(resource_url)
domain = domain.scheme + "://" + domain.netloc
try:
res = requests.get(url=resource_url, headers=headers, verify=False, timeout=10)
resources = json.loads(res.text)
except Exception as e:
logger.error(f"Error getting API docs paths: {e}")
return []

paths = []
if isinstance(resources, dict) and "apis" in resources.keys(): # 版本不同,格式不一样
for api_docs in resources['apis']:
paths.append(domain + api_docs['path'])
else:
for i in resources:
paths.append(domain + i['location'])
return paths

def go_docs(url, global_data, headers):
try:
domain = urlparse(url)
domain = domain.scheme + "://" + domain.netloc
res = requests.get(url=url, headers=headers, timeout=5, verify=False)
res = json.loads(res.text)

basePath = res.get("basePath", '') or res.get("servers", [{}])[0].get("url", '')
paths = res['paths']
logger.info(f"[+] {url} has {len(paths)} paths")

for path, methods in paths.items():
logger.debug(f"test on {url} => {path}")
for method, details in methods.items():
try:
process_path(domain, basePath, path, method, details, global_data, headers)
except Exception as e:
logger.error(f"Error processing path {path} with method {method}: {e}")
except Exception as e:
logger.error(f"Error in go_docs: {e}")

def process_path(domain, basePath, path, method, details, global_data, headers):
summary = details.get('summary', path)
param_num = str(details).count("'in':")

if method in ['post', 'put']:
process_post_put(domain, basePath, path, method, details, param_num, summary, global_data, headers)
elif method in ['get', 'delete']:
process_get_delete(domain, basePath, path, method, details, param_num, summary, global_data, headers)
else:
logger.error(f"[!] 未知的请求方法: {method}")

def process_post_put(domain, basePath, path, method, details, param_num, summary, global_data, headers):
# 实现对post和put方法的处理
if "'in': 'body'" in str(details):
data = json_payload
else:
data = {}

url = domain + basePath + path
if method == 'post':
req = requests.post(url=url, headers=headers, data=data, timeout=5, verify=False)
else:
req = requests.put(url=url, headers=headers, data=data, timeout=5, verify=False)

hhh = [url, summary, path, method, url, param_num, data, req.status_code, req.text]
global_data.put(hhh)

def process_get_delete(domain, basePath, path, method, details, param_num, summary, global_data, headers):
# 实现对get和delete方法的处理
query_string = ""
param_map = {}
if "parameters" in details:
parameters = details['parameters']
for param in parameters:
p_type = param.get('type', '') or param.get('schema', {}).get('type', '')
p_name = param['name']
param_map[p_name] = payload_array.get(p_type, '')

for key in param_map.keys():
query_string += f"{key}={param_map[key]}&"

if "{" in path:
tmps = re.findall("\{[^\}]*\}", path)
for tmp in tmps:
path = path.replace(tmp, param_map[tmp[1:-1]])

query_url = f"{domain}{basePath}{path}?{query_string[:-1]}"
if method == 'get':
req = requests.get(url=query_url, headers=headers, timeout=5, verify=False)
else:
req = requests.delete(url=query_url, headers=headers, timeout=5, verify=False)

hhh = [url, summary, path, method, query_url, param_num, param_map, req.status_code, req.text]
global_data.put(hhh)

def run(data, headers):
url = data[0]
q = data[1]
url_type = check(url, headers)
if url_type == 0:
logger.error("[!] Error")
elif url_type == 1:
logger.success(f"working on {url}, type: source")
go_source(url, q, headers)
elif url_type == 2:
logger.success(f"working on {url}, type: api-docs")
go_docs(url, q, headers)
else:
logger.success(f"working on {url}, type: html")
go_html(url, q, headers)

def print_error(value):
logger.error(f"进程池出错, 出错原因为: {value}")

def run_pool(urls, headers):
p = Pool(8)
manager = Manager()
q = manager.Queue()
for url in urls:
url = url.strip()
param = [url, q]
p.apply_async(run, args=(param, headers), error_callback=print_error)
p.close()
p.join()
output_to_csv(q)

def output_to_csv(global_data):
with open('swagger.csv', 'w', newline='', encoding='utf-8') as f:
writer = csv.writer(f)
writer.writerow(["api-doc-url", "summary", "path", "method", "query_url", "num of params", "data", "status_code", "response"])
while not global_data.empty():
writer.writerow(global_data.get())

if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument("-u", "--url", dest='target_url', help="resource地址 or api文档地址 or swagger首页地址")
parser.add_argument("-f", "--file", dest='url_file', help="批量测试")
parser.add_argument("-t", "--token", dest='auth_token', help="认证token")
args = parser.parse_args()

logger.add("file.log", format="{time:YYYY-MM-DD at HH:mm:ss} | {level} | {message}")
banner()

headers = {}
if args.auth_token:
headers['Authorization'] = f"Bearer {args.auth_token}"

if args.target_url:
run_pool([args.target_url], headers)
elif args.url_file:
with open(args.url_file, 'r') as f:
urls = f.readlines()
run_pool(urls, headers)