|
| 1 | +# -*- coding: utf-8 -*- |
| 2 | +import asyncio |
| 3 | +import csv |
| 4 | +import time |
| 5 | +from typing import Any, Dict, List |
| 6 | + |
| 7 | +import aiofiles |
| 8 | +import httpx |
| 9 | +from common import SymbolContent, make_req_params_and_headers |
| 10 | + |
| 11 | +HOST = "https://query1.finance.yahoo.com" |
| 12 | +SYMBOL_QUERY_API_URI = "/v1/finance/screener" |
| 13 | +PAGE_SIZE = 100 # 可选配置(25, 50, 100) |
| 14 | + |
| 15 | + |
| 16 | +def parse_symbol_content(quote_item: Dict) -> SymbolContent: |
| 17 | + """ |
| 18 | + 数据提取 |
| 19 | + :param quote_item: |
| 20 | + :return: |
| 21 | + """ |
| 22 | + symbol_content = SymbolContent() |
| 23 | + symbol_content.symbol = quote_item["symbol"] |
| 24 | + symbol_content.name = quote_item["shortName"] |
| 25 | + symbol_content.price = quote_item["regularMarketPrice"]["fmt"] |
| 26 | + symbol_content.change_price = quote_item["regularMarketChange"]["fmt"] |
| 27 | + symbol_content.change_percent = quote_item["regularMarketChangePercent"]["fmt"] |
| 28 | + symbol_content.market_price = quote_item["marketCap"]["fmt"] |
| 29 | + return symbol_content |
| 30 | + |
| 31 | + |
| 32 | +async def send_request(page_start: int, page_size: int) -> Dict[str, Any]: |
| 33 | + """ |
| 34 | + 公共的发送请求的函数 |
| 35 | + :param page_start: 分页起始位置 |
| 36 | + :param page_size: 每一页的长度 |
| 37 | + :return: |
| 38 | + """ |
| 39 | + # print(f"[send_request] page_start:{page_start}") |
| 40 | + req_url = HOST + SYMBOL_QUERY_API_URI |
| 41 | + common_params, headers, common_payload_data = make_req_params_and_headers() |
| 42 | + # 修改分页变动参数 |
| 43 | + common_payload_data["offset"] = page_start |
| 44 | + common_payload_data["size"] = page_size |
| 45 | + |
| 46 | + async with httpx.AsyncClient() as client: |
| 47 | + response = await client.post(url=req_url, params=common_params, json=common_payload_data, headers=headers, |
| 48 | + timeout=30) |
| 49 | + if response.status_code != 200: |
| 50 | + raise Exception("发起请求时发生异常,请求发生错误,原因:", response.text) |
| 51 | + try: |
| 52 | + response_dict: Dict = response.json() |
| 53 | + return response_dict |
| 54 | + except Exception as e: |
| 55 | + raise e |
| 56 | + |
| 57 | + |
| 58 | +async def fetch_currency_data_single(page_start: int) -> List[SymbolContent]: |
| 59 | + """ |
| 60 | + Fetch currency data for a single page. |
| 61 | + :param page_start: Page start index. |
| 62 | + :return: List of SymbolContent for the page. |
| 63 | + """ |
| 64 | + try: |
| 65 | + response_dict: Dict = await send_request(page_start=page_start, page_size=PAGE_SIZE) |
| 66 | + return [ |
| 67 | + parse_symbol_content(quote) for quote in response_dict["finance"]["result"][0]["quotes"] |
| 68 | + ] |
| 69 | + except Exception as e: |
| 70 | + print(f"Error fetching data for page_start={page_start}: {e}") |
| 71 | + return [] |
| 72 | + |
| 73 | + |
| 74 | +async def fetch_currency_data_list(max_total_count: int) -> List[SymbolContent]: |
| 75 | + """ |
| 76 | + Fetch currency data using asyncio. |
| 77 | + :param max_total_count: Maximum total count of currencies. |
| 78 | + :return: List of all SymbolContent. |
| 79 | + """ |
| 80 | + page_starts = list(range(0, max_total_count, PAGE_SIZE)) |
| 81 | + print(f"总共发起: {len(page_starts)} 次网络请求") |
| 82 | + |
| 83 | + tasks = [fetch_currency_data_single(page_start) for page_start in page_starts] |
| 84 | + results = await asyncio.gather(*tasks) |
| 85 | + |
| 86 | + # 扁平化结果列表 |
| 87 | + return [item for sublist in results for item in sublist] |
| 88 | + |
| 89 | + |
| 90 | +async def get_max_total_count() -> int: |
| 91 | + """ |
| 92 | + 获取所有币种总数量 |
| 93 | + :return: |
| 94 | + """ |
| 95 | + print("开始获取最大的币种数量") |
| 96 | + try: |
| 97 | + response_dict: Dict = await send_request(page_start=0, page_size=PAGE_SIZE) |
| 98 | + total_num: int = response_dict["finance"]["result"][0]["total"] |
| 99 | + print(f"获取到 {total_num} 种币种") |
| 100 | + return total_num |
| 101 | + except Exception as e: |
| 102 | + print("错误信息:", e) |
| 103 | + return 0 |
| 104 | + |
| 105 | + |
| 106 | +async def save_data_to_csv(save_file_name: str, currency_data_list: List[SymbolContent]) -> None: |
| 107 | + """ |
| 108 | + 保存数据存储到CSV文件中 |
| 109 | + :param save_file_name: 保存的文件名 |
| 110 | + :param currency_data_list: |
| 111 | + :return: |
| 112 | + """ |
| 113 | + async with aiofiles.open(save_file_name, mode='w', newline='', encoding='utf-8') as file: |
| 114 | + writer = csv.writer(file) |
| 115 | + # 写入标题行 |
| 116 | + await file.write(','.join(SymbolContent.get_fields()) + '\n') |
| 117 | + # 遍历数据列表,并将每个币种的名称写入CSV |
| 118 | + for symbol in currency_data_list: |
| 119 | + await file.write(f"{symbol.symbol},{symbol.name},{symbol.price},{symbol.change_price},{symbol.change_percent},{symbol.market_price}\n") |
| 120 | + |
| 121 | + |
| 122 | +async def run_crawler_async(save_file_name: str) -> None: |
| 123 | + """ |
| 124 | + 爬虫主流程(异步并发版本) |
| 125 | + :param save_file_name: |
| 126 | + :return: |
| 127 | + """ |
| 128 | + # step1 获取最大数据总量 |
| 129 | + max_total: int = await get_max_total_count() |
| 130 | + # step2 遍历每一页数据并解析存储到数据容器中 |
| 131 | + data_list: List[SymbolContent] = await fetch_currency_data_list(max_total) |
| 132 | + # step3 将数据容器中的数据保存csv |
| 133 | + await save_data_to_csv(save_file_name, data_list) |
| 134 | + |
| 135 | +async def main(): |
| 136 | + """ |
| 137 | + 主函数 |
| 138 | + :return: |
| 139 | + """ |
| 140 | + start_time = time.time() |
| 141 | + save_csv_file_name = f"symbol_data_{int(start_time)}.csv" |
| 142 | + await run_crawler_async(save_csv_file_name) |
| 143 | + end_time = time.time() |
| 144 | + print(f"asyncio调度协程执行程序耗时: {end_time - start_time} 秒") |
| 145 | + |
| 146 | + |
| 147 | +if __name__ == '__main__': |
| 148 | + asyncio.run(main()) |
| 149 | + |
0 commit comments