|
19 | 19 | import random |
20 | 20 | from asyncio import Task |
21 | 21 | from typing import Dict, List, Optional, Tuple, Union |
| 22 | +from datetime import datetime, timedelta |
| 23 | +import pandas as pd |
22 | 24 |
|
23 | | -from playwright.async_api import (BrowserContext, BrowserType, Page, |
24 | | - async_playwright) |
| 25 | +from playwright.async_api import (BrowserContext, BrowserType, Page, async_playwright) |
25 | 26 |
|
26 | 27 | import config |
27 | 28 | from base.base_crawler import AbstractCrawler |
@@ -95,56 +96,122 @@ async def start(self): |
95 | 96 | utils.logger.info( |
96 | 97 | "[BilibiliCrawler.start] Bilibili Crawler finished ...") |
97 | 98 |
|
| 99 | + async def get_pubtime_datetime(self, start: str = config.START_DAY, end: str = config.END_DAY) -> tuple[str, str]: |
| 100 | + """ |
| 101 | + 获取 bilibili 作品发布日期起始时间戳 pubtime_begin_s 与发布日期结束时间戳 pubtime_end_s |
| 102 | + --- |
| 103 | + :param start: 发布日期起始时间,YYYY-MM-DD |
| 104 | + :param end: 发布日期结束时间,YYYY-MM-DD |
| 105 | + |
| 106 | + Note |
| 107 | + --- |
| 108 | + - 搜索的时间范围为 start 至 end,包含 start 和 end |
| 109 | + - 若要搜索同一天的内容,为了包含 start 当天的搜索内容,则 pubtime_end_s 的值应该为 pubtime_begin_s 的值加上一天再减去一秒,即 start 当天的最后一秒 |
| 110 | + - 如仅搜索 2024-01-05 的内容,pubtime_begin_s = 1704384000,pubtime_end_s = 1704470399 |
| 111 | + 转换为可读的 datetime 对象:pubtime_begin_s = datetime.datetime(2024, 1, 5, 0, 0),pubtime_end_s = datetime.datetime(2024, 1, 5, 23, 59, 59) |
| 112 | + - 若要搜索 start 至 end 的内容,为了包含 end 当天的搜索内容,则 pubtime_end_s 的值应该为 pubtime_end_s 的值加上一天再减去一秒,即 end 当天的最后一秒 |
| 113 | + - 如搜索 2024-01-05 - 2024-01-06 的内容,pubtime_begin_s = 1704384000,pubtime_end_s = 1704556799 |
| 114 | + 转换为可读的 datetime 对象:pubtime_begin_s = datetime.datetime(2024, 1, 5, 0, 0),pubtime_end_s = datetime.datetime(2024, 1, 6, 23, 59, 59) |
| 115 | + """ |
| 116 | + # 转换 start 与 end 为 datetime 对象 |
| 117 | + start_day: datetime = datetime.strptime(start, '%Y-%m-%d') |
| 118 | + end_day: datetime = datetime.strptime(end, '%Y-%m-%d') |
| 119 | + if start_day > end_day: |
| 120 | + raise ValueError('Wrong time range, please check your start and end argument, to ensure that the start cannot exceed end') |
| 121 | + elif start_day == end_day: # 搜索同一天的内容 |
| 122 | + end_day = start_day + timedelta(days=1) - timedelta(seconds=1) # 则将 end_day 设置为 start_day + 1 day - 1 second |
| 123 | + else: # 搜索 start 至 end |
| 124 | + end_day = end_day + timedelta(days=1) - timedelta(seconds=1) # 则将 end_day 设置为 end_day + 1 day - 1 second |
| 125 | + # 将其重新转换为时间戳 |
| 126 | + return str(int(start_day.timestamp())), str(int(end_day.timestamp())) |
| 127 | + |
98 | 128 | async def search(self): |
99 | 129 | """ |
100 | 130 | search bilibili video with keywords |
101 | 131 | :return: |
102 | 132 | """ |
103 | | - utils.logger.info( |
104 | | - "[BilibiliCrawler.search] Begin search bilibli keywords") |
| 133 | + utils.logger.info("[BilibiliCrawler.search] Begin search bilibli keywords") |
105 | 134 | bili_limit_count = 20 # bilibili limit page fixed value |
106 | 135 | if config.CRAWLER_MAX_NOTES_COUNT < bili_limit_count: |
107 | 136 | config.CRAWLER_MAX_NOTES_COUNT = bili_limit_count |
108 | 137 | start_page = config.START_PAGE # start page number |
109 | 138 | for keyword in config.KEYWORDS.split(","): |
110 | 139 | source_keyword_var.set(keyword) |
111 | | - utils.logger.info( |
112 | | - f"[BilibiliCrawler.search] Current search keyword: {keyword}") |
113 | | - page = 1 |
114 | | - while (page - start_page + 1) * bili_limit_count <= config.CRAWLER_MAX_NOTES_COUNT: |
115 | | - if page < start_page: |
116 | | - utils.logger.info( |
117 | | - f"[BilibiliCrawler.search] Skip page: {page}") |
| 140 | + utils.logger.info(f"[BilibiliCrawler.search] Current search keyword: {keyword}") |
| 141 | + # 每个关键词最多返回 1000 条数据 |
| 142 | + if not config.ALL_DAY: |
| 143 | + page = 1 |
| 144 | + while (page - start_page + 1) * bili_limit_count <= config.CRAWLER_MAX_NOTES_COUNT: |
| 145 | + if page < start_page: |
| 146 | + utils.logger.info(f"[BilibiliCrawler.search] Skip page: {page}") |
| 147 | + page += 1 |
| 148 | + continue |
| 149 | + |
| 150 | + utils.logger.info(f"[BilibiliCrawler.search] search bilibili keyword: {keyword}, page: {page}") |
| 151 | + video_id_list: List[str] = [] |
| 152 | + videos_res = await self.bili_client.search_video_by_keyword( |
| 153 | + keyword=keyword, |
| 154 | + page=page, |
| 155 | + page_size=bili_limit_count, |
| 156 | + order=SearchOrderType.DEFAULT, |
| 157 | + pubtime_begin_s=0, # 作品发布日期起始时间戳 |
| 158 | + pubtime_end_s=0 # 作品发布日期结束日期时间戳 |
| 159 | + ) |
| 160 | + video_list: List[Dict] = videos_res.get("result") |
| 161 | + |
| 162 | + semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM) |
| 163 | + task_list = [self.get_video_info_task(aid=video_item.get("aid"), bvid="", semaphore=semaphore) for video_item in video_list] |
| 164 | + video_items = await asyncio.gather(*task_list) |
| 165 | + for video_item in video_items: |
| 166 | + if video_item: |
| 167 | + video_id_list.append(video_item.get("View").get("aid")) |
| 168 | + await bilibili_store.update_bilibili_video(video_item) |
| 169 | + await bilibili_store.update_up_info(video_item) |
| 170 | + await self.get_bilibili_video(video_item, semaphore) |
118 | 171 | page += 1 |
119 | | - continue |
120 | | - |
121 | | - utils.logger.info(f"[BilibiliCrawler.search] search bilibili keyword: {keyword}, page: {page}") |
122 | | - video_id_list: List[str] = [] |
123 | | - videos_res = await self.bili_client.search_video_by_keyword( |
124 | | - keyword=keyword, |
125 | | - page=page, |
126 | | - page_size=bili_limit_count, |
127 | | - order=SearchOrderType.DEFAULT, |
128 | | - pubtime_begin_s=0, # 作品发布日期起始时间戳 |
129 | | - pubtime_end_s=0 # 作品发布日期结束日期时间戳 |
130 | | - ) |
131 | | - video_list: List[Dict] = videos_res.get("result") |
132 | | - |
133 | | - semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM) |
134 | | - task_list = [ |
135 | | - self.get_video_info_task(aid=video_item.get( |
136 | | - "aid"), bvid="", semaphore=semaphore) |
137 | | - for video_item in video_list |
138 | | - ] |
139 | | - video_items = await asyncio.gather(*task_list) |
140 | | - for video_item in video_items: |
141 | | - if video_item: |
142 | | - video_id_list.append(video_item.get("View").get("aid")) |
143 | | - await bilibili_store.update_bilibili_video(video_item) |
144 | | - await bilibili_store.update_up_info(video_item) |
145 | | - await self.get_bilibili_video(video_item, semaphore) |
146 | | - page += 1 |
147 | | - await self.batch_get_video_comments(video_id_list) |
| 172 | + await self.batch_get_video_comments(video_id_list) |
| 173 | + # 按照 START_DAY 至 END_DAY 按照每一天进行筛选,这样能够突破 1000 条视频的限制,最大程度爬取该关键词下的所有视频 |
| 174 | + else: |
| 175 | + for day in pd.date_range(start=config.START_DAY, end=config.END_DAY, freq='D'): |
| 176 | + # 按照每一天进行爬取的时间戳参数 |
| 177 | + pubtime_begin_s, pubtime_end_s = await self.get_pubtime_datetime(start=day.strftime('%Y-%m-%d'), end=day.strftime('%Y-%m-%d')) |
| 178 | + page = 1 |
| 179 | + while (page - start_page + 1) * bili_limit_count <= config.CRAWLER_MAX_NOTES_COUNT: |
| 180 | + # ! Catch any error if response return nothing, go to next day |
| 181 | + try: |
| 182 | + # ! Don't skip any page, to make sure gather all video in one day |
| 183 | + # if page < start_page: |
| 184 | + # utils.logger.info(f"[BilibiliCrawler.search] Skip page: {page}") |
| 185 | + # page += 1 |
| 186 | + # continue |
| 187 | + |
| 188 | + utils.logger.info(f"[BilibiliCrawler.search] search bilibili keyword: {keyword}, date: {day.ctime()}, page: {page}") |
| 189 | + video_id_list: List[str] = [] |
| 190 | + videos_res = await self.bili_client.search_video_by_keyword( |
| 191 | + keyword=keyword, |
| 192 | + page=page, |
| 193 | + page_size=bili_limit_count, |
| 194 | + order=SearchOrderType.DEFAULT, |
| 195 | + pubtime_begin_s=pubtime_begin_s, # 作品发布日期起始时间戳 |
| 196 | + pubtime_end_s=pubtime_end_s # 作品发布日期结束日期时间戳 |
| 197 | + ) |
| 198 | + video_list: List[Dict] = videos_res.get("result") |
| 199 | + |
| 200 | + semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM) |
| 201 | + task_list = [self.get_video_info_task(aid=video_item.get("aid"), bvid="", semaphore=semaphore) for video_item in video_list] |
| 202 | + video_items = await asyncio.gather(*task_list) |
| 203 | + for video_item in video_items: |
| 204 | + if video_item: |
| 205 | + video_id_list.append(video_item.get("View").get("aid")) |
| 206 | + await bilibili_store.update_bilibili_video(video_item) |
| 207 | + await bilibili_store.update_up_info(video_item) |
| 208 | + await self.get_bilibili_video(video_item, semaphore) |
| 209 | + page += 1 |
| 210 | + await self.batch_get_video_comments(video_id_list) |
| 211 | + # go to next day |
| 212 | + except Exception as e: |
| 213 | + print(e) |
| 214 | + break |
148 | 215 |
|
149 | 216 | async def batch_get_video_comments(self, video_id_list: List[str]): |
150 | 217 | """ |
|
0 commit comments