Skip to content

Commit a1d81d1

Browse files
authored
Fix too many bad inits handling during session refreshing (#214)
1 parent 106b099 commit a1d81d1

File tree

2 files changed

+205
-229
lines changed

2 files changed

+205
-229
lines changed

scrapy_zyte_api/_session.py

Lines changed: 119 additions & 113 deletions
Original file line numberDiff line numberDiff line change
@@ -506,6 +506,32 @@ def session_config(
506506
)
507507

508508

509+
class FatalErrorHandler:
510+
511+
def __init__(self, crawler):
512+
self.crawler = crawler
513+
514+
def __enter__(self):
515+
return None
516+
517+
def __exit__(self, exc_type, exc, tb):
518+
if exc_type is None:
519+
return
520+
from twisted.internet import reactor
521+
from twisted.internet.interfaces import IReactorCore
522+
523+
reactor = cast(IReactorCore, reactor)
524+
close = partial(
525+
reactor.callLater, 0, self.crawler.engine.close_spider, self.crawler.spider
526+
)
527+
if issubclass(exc_type, TooManyBadSessionInits):
528+
close("bad_session_inits")
529+
elif issubclass(exc_type, PoolError):
530+
close("pool_error")
531+
elif issubclass(exc_type, CloseSpider):
532+
close(exc.reason)
533+
534+
509535
session_config_registry = SessionConfigRulesRegistry()
510536
session_config = session_config_registry.session_config
511537

@@ -592,6 +618,8 @@ def __init__(self, crawler: Crawler):
592618

593619
self._setting_params = settings.getdict("ZYTE_API_SESSION_PARAMS")
594620

621+
self._fatal_error_handler = FatalErrorHandler(crawler)
622+
595623
def _get_session_config(self, request: Request) -> SessionConfig:
596624
try:
597625
return self._session_config_cache[request]
@@ -686,18 +714,21 @@ async def _init_session(self, session_id: str, request: Request, pool: str) -> b
686714
return result
687715

688716
async def _create_session(self, request: Request, pool: str) -> str:
689-
while True:
690-
session_id = str(uuid4())
691-
session_init_succeeded = await self._init_session(session_id, request, pool)
692-
if session_init_succeeded:
693-
self._pools[pool].add(session_id)
694-
self._bad_inits[pool] = 0
695-
break
696-
self._bad_inits[pool] += 1
697-
if self._bad_inits[pool] >= self._max_bad_inits[pool]:
698-
raise TooManyBadSessionInits
699-
self._queues[pool].append(session_id)
700-
return session_id
717+
with self._fatal_error_handler:
718+
while True:
719+
session_id = str(uuid4())
720+
session_init_succeeded = await self._init_session(
721+
session_id, request, pool
722+
)
723+
if session_init_succeeded:
724+
self._pools[pool].add(session_id)
725+
self._bad_inits[pool] = 0
726+
break
727+
self._bad_inits[pool] += 1
728+
if self._bad_inits[pool] >= self._max_bad_inits[pool]:
729+
raise TooManyBadSessionInits
730+
self._queues[pool].append(session_id)
731+
return session_id
701732

702733
async def _next_from_queue(self, request: Request, pool: str) -> str:
703734
session_id = None
@@ -794,111 +825,91 @@ async def check(self, response: Response, request: Request) -> bool:
794825
"""Check the response for signs of session expiration, update the
795826
internal session pool accordingly, and return ``False`` if the session
796827
has expired or ``True`` if the session passed validation."""
797-
if self.is_init_request(request):
798-
return True
799-
session_config = self._get_session_config(request)
800-
if not session_config.enabled(request):
801-
return True
802-
pool = self._get_pool(request)
803-
try:
804-
passed = session_config.check(response, request)
805-
except CloseSpider:
806-
raise
807-
except Exception:
808-
self._crawler.stats.inc_value(
809-
f"scrapy-zyte-api/sessions/pools/{pool}/use/check-error"
810-
)
811-
logger.exception(
812-
f"Unexpected exception raised while checking session "
813-
f"validity on response {response}."
814-
)
815-
else:
816-
outcome = "passed" if passed else "failed"
817-
self._crawler.stats.inc_value(
818-
f"scrapy-zyte-api/sessions/pools/{pool}/use/check-{outcome}"
819-
)
820-
if passed:
828+
with self._fatal_error_handler:
829+
if self.is_init_request(request):
830+
return True
831+
session_config = self._get_session_config(request)
832+
if not session_config.enabled(request):
821833
return True
822-
self._start_request_session_refresh(request, pool)
834+
pool = self._get_pool(request)
835+
try:
836+
passed = session_config.check(response, request)
837+
except CloseSpider:
838+
raise
839+
except Exception:
840+
self._crawler.stats.inc_value(
841+
f"scrapy-zyte-api/sessions/pools/{pool}/use/check-error"
842+
)
843+
logger.exception(
844+
f"Unexpected exception raised while checking session "
845+
f"validity on response {response}."
846+
)
847+
else:
848+
outcome = "passed" if passed else "failed"
849+
self._crawler.stats.inc_value(
850+
f"scrapy-zyte-api/sessions/pools/{pool}/use/check-{outcome}"
851+
)
852+
if passed:
853+
return True
854+
self._start_request_session_refresh(request, pool)
823855
return False
824856

825857
async def assign(self, request: Request):
826858
"""Assign a working session to *request*."""
827-
if self.is_init_request(request):
828-
return
829-
session_config = self._get_session_config(request)
830-
if not session_config.enabled(request):
831-
self._crawler.stats.inc_value("scrapy-zyte-api/sessions/use/disabled")
832-
return
833-
session_id = await self._next(request)
834-
# Note: If there is a session set already (e.g. a request being
835-
# retried), it is overridden.
836-
request.meta.setdefault("zyte_api_provider", {})["session"] = {"id": session_id}
837-
if (
838-
"zyte_api" in request.meta
839-
or request.meta.get("zyte_api_automap", None) is False
840-
or (
841-
"zyte_api_automap" not in request.meta
842-
and self._transparent_mode is False
843-
)
844-
):
845-
meta_key = "zyte_api"
846-
else:
847-
meta_key = "zyte_api_automap"
848-
request.meta.setdefault(meta_key, {})
849-
if not isinstance(request.meta[meta_key], dict):
850-
request.meta[meta_key] = {}
851-
request.meta[meta_key]["session"] = {"id": session_id}
852-
request.meta.setdefault("dont_merge_cookies", True)
859+
with self._fatal_error_handler:
860+
if self.is_init_request(request):
861+
return
862+
session_config = self._get_session_config(request)
863+
if not session_config.enabled(request):
864+
self._crawler.stats.inc_value("scrapy-zyte-api/sessions/use/disabled")
865+
return
866+
session_id = await self._next(request)
867+
# Note: If there is a session set already (e.g. a request being
868+
# retried), it is overridden.
869+
request.meta.setdefault("zyte_api_provider", {})["session"] = {
870+
"id": session_id
871+
}
872+
if (
873+
"zyte_api" in request.meta
874+
or request.meta.get("zyte_api_automap", None) is False
875+
or (
876+
"zyte_api_automap" not in request.meta
877+
and self._transparent_mode is False
878+
)
879+
):
880+
meta_key = "zyte_api"
881+
else:
882+
meta_key = "zyte_api_automap"
883+
request.meta.setdefault(meta_key, {})
884+
if not isinstance(request.meta[meta_key], dict):
885+
request.meta[meta_key] = {}
886+
request.meta[meta_key]["session"] = {"id": session_id}
887+
request.meta.setdefault("dont_merge_cookies", True)
853888

854889
def is_enabled(self, request: Request) -> bool:
855890
session_config = self._get_session_config(request)
856891
return session_config.enabled(request)
857892

858893
def handle_error(self, request: Request):
859-
pool = self._get_pool(request)
860-
self._crawler.stats.inc_value(
861-
f"scrapy-zyte-api/sessions/pools/{pool}/use/failed"
862-
)
863-
session_id = self._get_request_session_id(request)
864-
if session_id is not None:
865-
self._errors[session_id] += 1
866-
if self._errors[session_id] < self._max_errors:
867-
return
868-
self._start_request_session_refresh(request, pool)
894+
with self._fatal_error_handler:
895+
pool = self._get_pool(request)
896+
self._crawler.stats.inc_value(
897+
f"scrapy-zyte-api/sessions/pools/{pool}/use/failed"
898+
)
899+
session_id = self._get_request_session_id(request)
900+
if session_id is not None:
901+
self._errors[session_id] += 1
902+
if self._errors[session_id] < self._max_errors:
903+
return
904+
self._start_request_session_refresh(request, pool)
869905

870906
def handle_expiration(self, request: Request):
871-
pool = self._get_pool(request)
872-
self._crawler.stats.inc_value(
873-
f"scrapy-zyte-api/sessions/pools/{pool}/use/expired"
874-
)
875-
self._start_request_session_refresh(request, pool)
876-
877-
878-
class FatalErrorHandler:
879-
880-
def __init__(self, crawler):
881-
self.crawler = crawler
882-
883-
async def __aenter__(self):
884-
return None
885-
886-
async def __aexit__(self, exc_type, exc, tb):
887-
if exc_type is None:
888-
return
889-
from twisted.internet import reactor
890-
from twisted.internet.interfaces import IReactorCore
891-
892-
reactor = cast(IReactorCore, reactor)
893-
close = partial(
894-
reactor.callLater, 0, self.crawler.engine.close_spider, self.crawler.spider
895-
)
896-
if issubclass(exc_type, TooManyBadSessionInits):
897-
close("bad_session_inits")
898-
elif issubclass(exc_type, PoolError):
899-
close("pool_error")
900-
elif issubclass(exc_type, CloseSpider):
901-
close(exc.reason)
907+
with self._fatal_error_handler:
908+
pool = self._get_pool(request)
909+
self._crawler.stats.inc_value(
910+
f"scrapy-zyte-api/sessions/pools/{pool}/use/expired"
911+
)
912+
self._start_request_session_refresh(request, pool)
902913

903914

904915
class ScrapyZyteAPISessionDownloaderMiddleware:
@@ -910,19 +921,16 @@ def from_crawler(cls, crawler: Crawler):
910921
def __init__(self, crawler: Crawler):
911922
self._crawler = crawler
912923
self._sessions = _SessionManager(crawler)
913-
self._fatal_error_handler = FatalErrorHandler(crawler)
914924

915925
async def process_request(self, request: Request, spider: Spider) -> None:
916-
async with self._fatal_error_handler:
917-
await self._sessions.assign(request)
926+
await self._sessions.assign(request)
918927

919928
async def process_response(
920929
self, request: Request, response: Response, spider: Spider
921930
) -> Union[Request, Response, None]:
922931
if isinstance(response, DummyResponse):
923932
return response
924-
async with self._fatal_error_handler:
925-
passed = await self._sessions.check(response, request)
933+
passed = await self._sessions.check(response, request)
926934
if not passed:
927935
new_request_or_none = get_retry_request(
928936
request,
@@ -945,12 +953,10 @@ async def process_exception(
945953
return None
946954

947955
if exception.parsed.type == "/problem/session-expired":
948-
async with self._fatal_error_handler:
949-
self._sessions.handle_expiration(request)
956+
self._sessions.handle_expiration(request)
950957
reason = "session_expired"
951958
elif exception.status in {520, 521}:
952-
async with self._fatal_error_handler:
953-
self._sessions.handle_error(request)
959+
self._sessions.handle_error(request)
954960
reason = "download_error"
955961
else:
956962
return None

0 commit comments

Comments
 (0)