Skip to content

Commit 1e4ef3d

Browse files
fixed and version to 4.0.1
- open many chrome tab in a time when a risk control happened for media platform - add time gap for wb fetching note detail - solved wrong pb migrate setting for db
1 parent 658f6e6 commit 1e4ef3d

File tree

223 files changed

+342
-389
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

223 files changed

+342
-389
lines changed

core/general_process.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -318,6 +318,7 @@ def wrapper():
318318
if 'wb' not in recorder.item_source:
319319
recorder.item_source['wb'] = 0
320320
recorder.item_source['wb'] += 1
321+
await asyncio.sleep(1)
321322
elif source_name == KUAISHOU_PLATFORM_NAME:
322323
existings[KUAISHOU_PLATFORM_NAME].update(un_related_urls)
323324
for video in un_related_urls:
@@ -327,6 +328,8 @@ def wrapper():
327328
if 'ks' not in recorder.item_source:
328329
recorder.item_source['ks'] = 0
329330
recorder.item_source['ks'] += 1
331+
# it seems that kuaishou can afford more requests per second...
332+
# await asyncio.sleep(1)
330333
except Exception as e:
331334
wis_logger.error(f"Error Extracting Post List: {e}")
332335
continue

core/wis/kuaishou/client.py

Lines changed: 48 additions & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,9 @@
22
import asyncio
33
import json
44
import traceback
5-
from typing import Callable, Dict, List, Optional, Union
5+
from typing import Callable, Dict, List, Optional, Union, cast
66
from urllib.parse import urlencode
7-
7+
import time
88
import httpx
99
from httpx import Response
1010
from tenacity import RetryError, retry, stop_after_attempt, wait_fixed
@@ -35,6 +35,8 @@ def __init__(
3535
self._graphql = KuaiShouGraphQL()
3636
self.account_with_ip_pool = account_with_ip_pool
3737
self.account_info: Optional[AccountWithIpModel] = None
38+
self._account_update_lock = asyncio.Lock()
39+
self._account_updated_at = None
3840

3941
@property
4042
def headers(self):
@@ -66,20 +68,35 @@ async def update_account_info(self, force_login: bool = False):
6668
Returns:
6769
6870
"""
69-
have_account = False
70-
wis_logger.debug("try to get a new account")
71-
account_info = await self.account_with_ip_pool.get_account_with_ip_info(force_login)
72-
self.account_info = account_info
73-
have_account = await self.pong()
74-
if have_account:
71+
if self._account_updated_at and time.time() - self._account_updated_at < 60:
7572
return
76-
wis_logger.info(f"current account {account_info.account.account_name} is invalid, try to get a new one")
77-
await self.mark_account_invalid(account_info)
78-
account_info = await self.account_with_ip_pool.get_account_with_ip_info(force_login=True)
79-
self.account_info = account_info
80-
have_account = await self.pong()
81-
if not have_account:
82-
raise DataFetchError("cannot get any valid account, we have to quit")
73+
74+
# Use async lock to ensure that only one account is being updated
75+
async with self._account_update_lock:
76+
if self.account_with_ip_pool.proxy_ip_pool:
77+
try:
78+
await self.account_with_ip_pool.mark_ip_invalid(
79+
cast(IpInfoModel, self.account_info.ip_info)
80+
)
81+
proxy_ip_pool: ProxyIpPool = cast(
82+
ProxyIpPool, self.account_with_ip_pool.proxy_ip_pool
83+
)
84+
self.account_info.ip_info = await proxy_ip_pool.get_proxy()
85+
if await self.pong():
86+
self._account_updated_at = time.time()
87+
wis_logger.info("update account by changing ip success, will continue the job")
88+
return
89+
except Exception as e:
90+
wis_logger.error(f"update IP failed, err:{e}")
91+
92+
# in this case, we change ip and account together to ensure success -- follow ajiang's suggestion
93+
await self.mark_account_invalid(self.account_info)
94+
account_info = await self.account_with_ip_pool.get_account_with_ip_info(force_login=force_login)
95+
self.account_info = account_info
96+
if not await self.pong():
97+
raise DataFetchError("cannot get any valid account, we have to quit")
98+
wis_logger.info("update account by changing ip and account success, will continue the job")
99+
self._account_updated_at = time.time()
83100

84101
async def mark_account_invalid(self, account_with_ip: AccountWithIpModel):
85102
"""
@@ -156,48 +173,18 @@ async def get(self, uri: str, params=None, **kwargs) -> Dict:
156173
method="GET", url=f"{KUAISHOU_API}{final_uri}", **kwargs
157174
)
158175
except RetryError as e:
159-
# 获取原始异常
160-
original_exception = e.last_attempt.exception()
161-
traceback.print_exception(
162-
type(original_exception),
163-
original_exception,
164-
original_exception.__traceback__,
165-
)
166-
wis_logger.error(
167-
f"请求uri:{uri} 重试均失败了,尝试更换账号与IP再次发起重试"
168-
)
176+
wis_logger.info(
177+
f"[KuaiShouApiClient.get] get uri:{uri} failed many times, will try to update account")
178+
# let the bull fly for a while, in case the account is updating by other thread
179+
await asyncio.sleep(1)
180+
await self.update_account_info(force_login=True)
169181
try:
170-
wis_logger.info(
171-
f"请求uri:{uri} 尝试更换IP再次发起重试..."
172-
)
173-
await self.account_with_ip_pool.mark_ip_invalid(
174-
self.account_info.ip_info
175-
)
176-
self.account_info.ip_info = (
177-
await self.account_with_ip_pool.proxy_ip_pool.get_proxy()
178-
)
182+
wis_logger.info("try to update account by changing ip")
179183
return await self.request(
180184
method="GET", url=f"{KUAISHOU_API}{final_uri}", **kwargs
181185
)
182186
except Exception as e:
183-
# 获取原始异常
184-
"""
185-
# in version 4.0, we do not have ip proxy yet
186-
original_exception = ee.last_attempt.exception()
187-
traceback.print_exception(
188-
type(original_exception),
189-
original_exception,
190-
original_exception.__traceback__,
191-
)
192-
"""
193-
wis_logger.error(
194-
f"请求uri:{uri},IP更换后还是失败,尝试更换账号与IP再次发起重试"
195-
)
196-
await self.mark_account_invalid(self.account_info)
197-
await self.update_account_info(force_login=True)
198-
return await self.request(
199-
method="GET", url=f"{KUAISHOU_API}{final_uri}", **kwargs
200-
)
187+
wis_logger.error(f"account and ip still invalid, have to quit, err:{e}")
201188

202189
async def post(self, uri: str, data: dict, **kwargs) -> Dict:
203190
"""
@@ -219,53 +206,21 @@ async def post(self, uri: str, data: dict, **kwargs) -> Dict:
219206
headers=self.headers,
220207
)
221208
except RetryError as e:
222-
# 获取原始异常
223-
original_exception = e.last_attempt.exception()
224-
traceback.print_exception(
225-
type(original_exception),
226-
original_exception,
227-
original_exception.__traceback__,
228-
)
229-
230-
wis_logger.error(
231-
f"请求uri:{uri} 重试均失败了,尝试更换账号与IP再次发起重试"
232-
)
209+
wis_logger.info(
210+
f"[KuaiShouApiClient.post] post uri:{uri} failed many times, will try to update account")
211+
# let the bull fly for a while, in case the account is updating by other thread
212+
await asyncio.sleep(1)
213+
await self.update_account_info(force_login=True)
233214
try:
234-
wis_logger.info(
235-
f"请求uri:{uri} 尝试更换IP再次发起重试..."
236-
)
237-
await self.account_with_ip_pool.mark_ip_invalid(
238-
self.account_info.ip_info
239-
)
240-
self.account_info.ip_info = (
241-
await self.account_with_ip_pool.proxy_ip_pool.get_proxy()
242-
)
243-
215+
wis_logger.info("try to update account by changing ip")
244216
return await self.request(
245217
method="POST",
246218
url=f"{KUAISHOU_API}{uri}",
247219
data=json_str,
248-
**kwargs,
249-
)
250-
except Exception as ee:
251-
# 获取原始异常
252-
"""
253-
# in version 4.0, we do not have ip proxy yet
254-
original_exception = ee.last_attempt.exception()
255-
traceback.print_exception(
256-
type(original_exception),
257-
original_exception,
258-
original_exception.__traceback__,
259-
)
260-
"""
261-
wis_logger.error(
262-
"no IP proxy available, try to get a new account"
263-
)
264-
await self.mark_account_invalid(self.account_info)
265-
await self.update_account_info(force_login=True)
266-
return await self.request(
267-
method="POST", url=f"{KUAISHOU_API}{uri}", data=json_str, **kwargs
220+
headers=self.headers,
268221
)
222+
except Exception as e:
223+
wis_logger.error(f"account and ip still invalid, have to quit, err:{e}")
269224

270225
async def pong(self) -> bool:
271226
"""

core/wis/kuaishou/core.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ async def async_initialize(self):
4545
await account_with_ip_pool.async_initialize()
4646

4747
self.ks_client.account_with_ip_pool = account_with_ip_pool
48-
await self.ks_client.update_account_info()
48+
self.ks_client.account_info = await account_with_ip_pool.get_account_with_ip_info()
4949

5050
async def posts_list(self,
5151
keywords: List[str],

core/wis/llmuse.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,11 @@ def perform_completion_with_backoff(messages: List, model: str = '', **kwargs):
3838
if hasattr(e, 'status_code'):
3939
if e.status_code in [400, 401, 413]:
4040
# client error, no need to retry
41-
error_msg = f"{model} Client error: {e.status_code}. Detail: {str(e)}"
42-
if 'Image url should be a valid url or should like data:image/TYPE;base64' not in str(e):
41+
error_msg = f"{model} API error: {e.status_code}. Detail: {str(e)}"
42+
if (
43+
'Image url should be a valid url or should like data:image/TYPE;base64' not in error_msg and
44+
'Failed to process image URL: data:' not in error_msg
45+
):
4346
# image url probility is that server cannot fetch the image, so we don't need to worry about it
4447
wis_logger.error(error_msg)
4548
wis_logger.info(f"messages: {messages}")

core/wis/mc_commen/account_pool.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ async def get_active_account(self, force_login: bool = False) -> AccountInfoMode
5252
account = self._account_list.pop(0)
5353
if account.status.value == AccountStatusEnum.NORMAL.value:
5454
wis_logger.debug(
55-
f"from saved, get active account {account}"
55+
f"from account pool get active account {account}"
5656
)
5757
return account
5858

core/wis/nodriver_helper.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ async def start(self):
4646
'headless': False,
4747
'browser_args': [
4848
'--lang=zh-CN',
49-
'--no-sandbox',
49+
# '--no-sandbox',
5050
'--disable-translate', # 禁用翻译
5151
'--no-first-run', # 禁用首次运行向导
5252
'--no-default-browser-check'

core/wis/weibo/client.py

Lines changed: 46 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,12 @@
1313
from .exception import DataFetchError
1414
from .field import SearchType
1515
from urllib.parse import parse_qs, unquote, urlencode
16+
import time
1617

1718

1819
class WeiboClient:
1920
account_info: AccountWithIpModel
21+
_account_update_lock: asyncio.Lock
2022

2123
def __init__(
2224
self,
@@ -34,6 +36,8 @@ def __init__(
3436
self.timeout = timeout
3537
# self._user_agent = user_agent or utils.get_user_agent()
3638
self.account_with_ip_pool = account_with_ip_pool
39+
self._account_update_lock = asyncio.Lock()
40+
self._account_updated_at = None
3741

3842
@property
3943
def headers(self):
@@ -61,24 +65,40 @@ def _cookies(self):
6165

6266
async def update_account_info(self, force_login: bool = False):
6367
"""
64-
更新客户端的账号信息, 该方法会一直尝试获取新的账号信息,直到获取到一个有效的账号信息
68+
Args:
69+
force_login: 是否强制登录获取新账号
6570
Returns:
66-
6771
"""
68-
have_account = False
69-
wis_logger.debug("try to get a new account")
70-
account_info = await self.account_with_ip_pool.get_account_with_ip_info(force_login)
71-
self.account_info = account_info
72-
have_account = await self.pong()
73-
if have_account:
72+
if self._account_updated_at and time.time() - self._account_updated_at < 60:
7473
return
75-
wis_logger.info(f"current account {account_info.account.account_name} is invalid, try to get a new one")
76-
await self.mark_account_invalid(account_info)
77-
account_info = await self.account_with_ip_pool.get_account_with_ip_info(force_login=True)
78-
self.account_info = account_info
79-
have_account = await self.pong()
80-
if not have_account:
81-
raise DataFetchError("cannot get any valid account, we have to quit")
74+
75+
# Use async lock to ensure that only one account is being updated
76+
async with self._account_update_lock:
77+
if self.account_with_ip_pool.proxy_ip_pool:
78+
try:
79+
await self.account_with_ip_pool.mark_ip_invalid(
80+
cast(IpInfoModel, self.account_info.ip_info)
81+
)
82+
proxy_ip_pool: ProxyIpPool = cast(
83+
ProxyIpPool, self.account_with_ip_pool.proxy_ip_pool
84+
)
85+
self.account_info.ip_info = await proxy_ip_pool.get_proxy()
86+
if await self.pong():
87+
wis_logger.info("update account by changing ip success, will continue the job")
88+
self._account_updated_at = time.time()
89+
return
90+
except Exception as e:
91+
wis_logger.error(f"update IP failed, err:{e}")
92+
93+
# in this case, we change ip and account together to ensure success -- follow ajiang's suggestion
94+
await self.mark_account_invalid(self.account_info)
95+
account_info = await self.account_with_ip_pool.get_account_with_ip_info(force_login=force_login)
96+
self.account_info = account_info
97+
if not await self.pong():
98+
raise DataFetchError("cannot get any valid account, we have to quit")
99+
100+
wis_logger.info("update account by changing ip and account success, will continue the job")
101+
self._account_updated_at = time.time()
82102

83103
async def mark_account_invalid(self, account_with_ip: AccountWithIpModel):
84104
"""
@@ -172,28 +192,17 @@ async def get(self, uri: str, params=None, **kwargs) -> Union[Response, Dict]:
172192
)
173193
return res
174194
except RetryError:
175-
wis_logger.error(
176-
f"[WeiboClient.get] 请求uri:{uri} 重试均失败了,尝试更换账号与IP再次发起重试"
177-
)
195+
wis_logger.info(
196+
f"[WeiboClient.get] get uri:{uri} failed many times, will try to update account")
197+
# let the bull fly for a while, in case the account is updating by other thread
198+
await asyncio.sleep(1)
199+
await self.update_account_info(force_login=True)
178200
try:
179-
wis_logger.info("尝试更换IP再次发起重试...")
180-
await self.account_with_ip_pool.mark_ip_invalid(
181-
cast(IpInfoModel, self.account_info.ip_info)
182-
)
183-
proxy_ip_pool: ProxyIpPool = cast(
184-
ProxyIpPool, self.account_with_ip_pool.proxy_ip_pool
185-
)
186-
self.account_info.ip_info = await proxy_ip_pool.get_proxy()
187201
return await self.request(
188202
method="GET", url=f"{WEIBO_API_URL}{final_uri}", **kwargs
189203
)
190204
except Exception as e:
191-
wis_logger.error(f"尝试更换账号再次发起重试, err:{e}")
192-
await self.mark_account_invalid(self.account_info)
193-
await self.update_account_info(force_login=True)
194-
return await self.request(
195-
method="GET", url=f"{WEIBO_API_URL}{final_uri}", **kwargs
196-
)
205+
wis_logger.error(f"account and ip still invalid, have to quit, err:{e}")
197206

198207
async def post(self, uri: str, data: Dict, **kwargs) -> Union[Response, Dict]:
199208
"""
@@ -212,30 +221,20 @@ async def post(self, uri: str, data: Dict, **kwargs) -> Union[Response, Dict]:
212221
)
213222
return res
214223
except RetryError:
215-
wis_logger.error(
216-
f"[WeiboClient.post] 请求uri:{uri} 重试均失败了,尝试更换IP再次发起重试"
224+
wis_logger.info(
225+
f"[WeiboClient.post] post uri:{uri} failed many times, will try to update account"
217226
)
227+
await asyncio.sleep(1)
228+
await self.update_account_info(force_login=True)
218229
try:
219-
await self.account_with_ip_pool.mark_ip_invalid(
220-
cast(IpInfoModel, self.account_info.ip_info)
221-
)
222-
proxy_ip_pool: ProxyIpPool = cast(
223-
ProxyIpPool, self.account_with_ip_pool.proxy_ip_pool
224-
)
225-
self.account_info.ip_info = await proxy_ip_pool.get_proxy()
226230
return await self.request(
227231
method="POST",
228232
url=f"{WEIBO_API_URL}{uri}",
229233
data=json_str,
230234
**kwargs,
231235
)
232236
except Exception as e:
233-
wis_logger.error(f"尝试更换账号再次发起重试, err:{e}")
234-
await self.mark_account_invalid(self.account_info)
235-
await self.update_account_info(force_login=True)
236-
return await self.request(
237-
method="POST", url=f"{WEIBO_API_URL}{uri}", data=json_str, **kwargs
238-
)
237+
wis_logger.error(f"account and ip still invalid, have to quit, err:{e}")
239238

240239
async def pong(self) -> bool:
241240
"""get a note to check if login state is ok"""

0 commit comments

Comments
 (0)