Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
267 changes: 154 additions & 113 deletions tools/check_links.py
Original file line number Diff line number Diff line change
@@ -1,189 +1,230 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
FlyPython 链接检查工具
用于定期检查README文件中所有外部链接的有效性
FlyPython Link Checker Tool
Periodically checks the validity of all external links in README files
"""

import re
import requests
import time
import json
import os
from pathlib import Path
from urllib.parse import urlparse
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import List, Dict

import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry


class LinkChecker:
def __init__(self):
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
})
self.timeout = 10
def __init__(self, timeout: int = 10, max_workers: int = 10):
self.session = self._create_session()
self.timeout = timeout
self.max_workers = max_workers
self.results = {
'working': [],
'broken': [],
'redirect': [],
'timeout': [],
'unknown': []
}

def extract_links_from_file(self, filename):
"""从markdown文件中提取所有外部链接"""
self.processed_urls = set()

def _create_session(self) -> requests.Session:
"""Create a requests session with retry strategy and headers"""
session = requests.Session()

# Configure retry strategy
retry_strategy = Retry(
total=2,
backoff_factor=0.5,
status_forcelist=[429, 500, 502, 503, 504]
)
adapter = HTTPAdapter(max_retries=retry_strategy)
session.mount('http://', adapter)
session.mount('https://', adapter)

# Set user agent
session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
})

return session

def extract_links_from_file(self, filename: str) -> List[Dict]:
"""Extract all external links from a markdown file"""
filepath = Path(filename)

if not filepath.exists():
print(f"File not found: {filename}")
return []

try:
with open(filename, 'r', encoding='utf-8') as f:
content = f.read()
content = filepath.read_text(encoding='utf-8')
except Exception as e:
print(f"无法读取文件 {filename}: {e}")
print(f"Failed to read {filename}: {e}")
return []

# 匹配markdown链接格式 [text](url)
markdown_links = re.findall(r'\[([^\]]*)\]\(([^)]+)\)', content)

# 匹配纯链接格式
url_pattern = r'https?://[^\s\])\}]+'
plain_links = re.findall(url_pattern, content)

links = []

# 处理markdown链接
# Extract markdown links [text](url)
markdown_links = re.findall(r'\[([^\]]*)\]\(([^)]+)\)', content)
for text, url in markdown_links:
if url.startswith('http'):
links.append({
'text': text,
'url': url,
'file': filename,
'file': str(filepath),
'type': 'markdown'
})

# 处理纯链接
for url in plain_links:
# 避免重复
if not any(link['url'] == url for link in links):
# Extract plain URLs
plain_urls = re.findall(r'https?://[^\s\])\}]+', content)
seen = {link['url'] for link in links}

for url in plain_urls:
if url not in seen:
links.append({
'text': url,
'url': url,
'file': filename,
'file': str(filepath),
'type': 'plain'
})
seen.add(url)

return links
def check_link(self, link):
"""检查单个链接的状态"""

def check_link(self, link: Dict) -> Dict:
"""Check the status of a single link"""
url = link['url']

if url in self.processed_urls:
return link

self.processed_urls.add(url)

try:
# Try HEAD request first (faster)
response = self.session.head(url, timeout=self.timeout, allow_redirects=True)
status_code = response.status_code

if status_code == 200:
link['status'] = 'working'
link['status_code'] = status_code
self.results['working'].append(link)
elif 300 <= status_code < 400:
link['status'] = 'redirect'
link['status_code'] = status_code
link['final_url'] = response.url
self.results['redirect'].append(link)
else:
# 尝试GET请求,有些网站不支持HEAD
try:
response = self.session.get(url, timeout=self.timeout)
if response.status_code == 200:
link['status'] = 'working'
link['status_code'] = response.status_code
self.results['working'].append(link)
else:
link['status'] = 'broken'
link['status_code'] = response.status_code
self.results['broken'].append(link)
except:
link['status'] = 'broken'
link['status_code'] = status_code
self.results['broken'].append(link)
return self._process_response(link, response)

except requests.exceptions.Timeout:
link['status'] = 'timeout'
link['error'] = 'Request timeout'
self.results['timeout'].append(link)
return link

except requests.exceptions.RequestException as e:
link['status'] = 'unknown'
link['error'] = str(e)
self.results['unknown'].append(link)
# Fall back to GET request for servers that don't support HEAD
try:
response = self.session.get(url, timeout=self.timeout)
return self._process_response(link, response)
except requests.exceptions.RequestException:
link['status'] = 'unknown'
link['error'] = str(e)
self.results['unknown'].append(link)
return link

def _process_response(self, link: Dict, response: requests.Response) -> Dict:
"""Process HTTP response and categorize link"""
status_code = response.status_code

if status_code == 200:
link['status'] = 'working'
self.results['working'].append(link)
elif 300 <= status_code < 400:
link['status'] = 'redirect'
link['final_url'] = response.url
self.results['redirect'].append(link)
else:
link['status'] = 'broken'
self.results['broken'].append(link)

link['status_code'] = status_code
return link
def check_all_links(self, links, max_workers=10):
"""并发检查所有链接"""
print(f"开始检查 {len(links)} 个链接...")

def check_all_links(self, links: List[Dict]) -> None:
"""Concurrently check all links"""
print(f"Checking {len(links)} links...\n")

with ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_link = {executor.submit(self.check_link, link): link for link in links}
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
futures = {executor.submit(self.check_link, link): link for link in links}

for i, future in enumerate(as_completed(future_to_link), 1):
link = future_to_link[future]
for i, future in enumerate(as_completed(futures), 1):
link = futures[future]
try:
result = future.result()
status = result.get('status', 'unknown')
print(f"[{i}/{len(links)}] {status.upper()}: {result['url']}")
time.sleep(0.1)
status = result.get('status', 'unknown').upper()
print(f"[{i}/{len(links)}] {status}: {result['url']}")
except Exception as e:
print(f"检查链接时出错 {link['url']}: {e}")
def generate_report(self):
"""生成检查报告"""
print(f"Error checking {link['url']}: {e}")

def generate_report(self, output_dir: str = 'reports') -> None:
"""Generate and save detailed report"""
total = sum(len(links) for links in self.results.values())

print("\n" + "="*60)
print("链接检查报告")
print("="*60)
print(f"总链接数: {total}")
print(f"正常链接: {len(self.results['working'])}")
print(f"重定向链接: {len(self.results['redirect'])}")
print(f"失效链接: {len(self.results['broken'])}")
print(f"超时链接: {len(self.results['timeout'])}")
print(f"未知状态: {len(self.results['unknown'])}")

# 保存详细结果
os.makedirs('../reports', exist_ok=True)
with open('../reports/link_check_results.json', 'w', encoding='utf-8') as f:
report = f"""
{'='*60}
Link Check Report
{'='*60}
Total Links: {total}
✓ Working: {len(self.results['working'])}
→ Redirects: {len(self.results['redirect'])}
✗ Broken: {len(self.results['broken'])}
⏱ Timeouts: {len(self.results['timeout'])}
? Unknown: {len(self.results['unknown'])}
{'='*60}
"""
print(report)

# Save detailed results
output_path = Path(output_dir)
output_path.mkdir(exist_ok=True)

results_file = output_path / 'link_check_results.json'
with open(results_file, 'w', encoding='utf-8') as f:
json.dump(self.results, f, ensure_ascii=False, indent=2)

print(f"\n详细结果已保存到: reports/link_check_results.json")
print(f"Detailed results saved to: {results_file}")

def deduplicate_links(self, links: List[Dict]) -> List[Dict]:
"""Remove duplicate links by URL"""
seen = set()
unique = []
for link in links:
if link['url'] not in seen:
unique.append(link)
seen.add(link['url'])
return unique


def main():
checker = LinkChecker()
files = ['../README.md', '../README_cn.md']

# 从README文件提取链接 (相对于项目根目录)
files_to_check = ['../README.md', '../README_cn.md']
# Extract links
checker = LinkChecker(timeout=10, max_workers=10)
all_links = []

for filename in files_to_check:
print(f"{filename} 提取链接...")
for filename in files:
print(f"Extracting links from {filename}...")
links = checker.extract_links_from_file(filename)
all_links.extend(links)
print(f"找到 {len(links)} 个链接")
if links:
all_links.extend(links)
print(f"Found {len(links)} links\n")

if not all_links:
print("没有找到任何链接!")
print("No links found!")
return

# 去重
unique_links = []
seen_urls = set()
for link in all_links:
if link['url'] not in seen_urls:
unique_links.append(link)
seen_urls.add(link['url'])

print(f"去重后共 {len(unique_links)} 个唯一链接")
# Deduplicate and check
unique_links = checker.deduplicate_links(all_links)
print(f"Checking {len(unique_links)} unique links\n")

# 检查链接
checker.check_all_links(unique_links)

# 生成报告
checker.generate_report()


if __name__ == '__main__':
main()
main()