Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix too many bad inits handling during session refreshing #214

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
232 changes: 119 additions & 113 deletions scrapy_zyte_api/_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,32 @@
)


class FatalErrorHandler:

def __init__(self, crawler):
self.crawler = crawler

def __enter__(self):
return None

def __exit__(self, exc_type, exc, tb):
if exc_type is None:
return
from twisted.internet import reactor
from twisted.internet.interfaces import IReactorCore

reactor = cast(IReactorCore, reactor)
close = partial(
reactor.callLater, 0, self.crawler.engine.close_spider, self.crawler.spider
)
if issubclass(exc_type, TooManyBadSessionInits):
close("bad_session_inits")
elif issubclass(exc_type, PoolError):
close("pool_error")

Check warning on line 530 in scrapy_zyte_api/_session.py

View check run for this annotation

Codecov / codecov/patch

scrapy_zyte_api/_session.py#L530

Added line #L530 was not covered by tests
elif issubclass(exc_type, CloseSpider):
close(exc.reason)


session_config_registry = SessionConfigRulesRegistry()
session_config = session_config_registry.session_config

Expand Down Expand Up @@ -592,6 +618,8 @@

self._setting_params = settings.getdict("ZYTE_API_SESSION_PARAMS")

self._fatal_error_handler = FatalErrorHandler(crawler)

def _get_session_config(self, request: Request) -> SessionConfig:
try:
return self._session_config_cache[request]
Expand Down Expand Up @@ -686,18 +714,21 @@
return result

async def _create_session(self, request: Request, pool: str) -> str:
while True:
session_id = str(uuid4())
session_init_succeeded = await self._init_session(session_id, request, pool)
if session_init_succeeded:
self._pools[pool].add(session_id)
self._bad_inits[pool] = 0
break
self._bad_inits[pool] += 1
if self._bad_inits[pool] >= self._max_bad_inits[pool]:
raise TooManyBadSessionInits
self._queues[pool].append(session_id)
return session_id
with self._fatal_error_handler:
while True:
session_id = str(uuid4())
session_init_succeeded = await self._init_session(
session_id, request, pool
)
if session_init_succeeded:
self._pools[pool].add(session_id)
self._bad_inits[pool] = 0
break
self._bad_inits[pool] += 1
if self._bad_inits[pool] >= self._max_bad_inits[pool]:
raise TooManyBadSessionInits
self._queues[pool].append(session_id)
return session_id

async def _next_from_queue(self, request: Request, pool: str) -> str:
session_id = None
Expand Down Expand Up @@ -794,111 +825,91 @@
"""Check the response for signs of session expiration, update the
internal session pool accordingly, and return ``False`` if the session
has expired or ``True`` if the session passed validation."""
if self.is_init_request(request):
return True
session_config = self._get_session_config(request)
if not session_config.enabled(request):
return True
pool = self._get_pool(request)
try:
passed = session_config.check(response, request)
except CloseSpider:
raise
except Exception:
self._crawler.stats.inc_value(
f"scrapy-zyte-api/sessions/pools/{pool}/use/check-error"
)
logger.exception(
f"Unexpected exception raised while checking session "
f"validity on response {response}."
)
else:
outcome = "passed" if passed else "failed"
self._crawler.stats.inc_value(
f"scrapy-zyte-api/sessions/pools/{pool}/use/check-{outcome}"
)
if passed:
with self._fatal_error_handler:
if self.is_init_request(request):
return True
session_config = self._get_session_config(request)
if not session_config.enabled(request):
return True
self._start_request_session_refresh(request, pool)
pool = self._get_pool(request)
try:
passed = session_config.check(response, request)
except CloseSpider:
raise
except Exception:
self._crawler.stats.inc_value(
f"scrapy-zyte-api/sessions/pools/{pool}/use/check-error"
)
logger.exception(
f"Unexpected exception raised while checking session "
f"validity on response {response}."
)
else:
outcome = "passed" if passed else "failed"
self._crawler.stats.inc_value(
f"scrapy-zyte-api/sessions/pools/{pool}/use/check-{outcome}"
)
if passed:
return True
self._start_request_session_refresh(request, pool)
return False

async def assign(self, request: Request):
"""Assign a working session to *request*."""
if self.is_init_request(request):
return
session_config = self._get_session_config(request)
if not session_config.enabled(request):
self._crawler.stats.inc_value("scrapy-zyte-api/sessions/use/disabled")
return
session_id = await self._next(request)
# Note: If there is a session set already (e.g. a request being
# retried), it is overridden.
request.meta.setdefault("zyte_api_provider", {})["session"] = {"id": session_id}
if (
"zyte_api" in request.meta
or request.meta.get("zyte_api_automap", None) is False
or (
"zyte_api_automap" not in request.meta
and self._transparent_mode is False
)
):
meta_key = "zyte_api"
else:
meta_key = "zyte_api_automap"
request.meta.setdefault(meta_key, {})
if not isinstance(request.meta[meta_key], dict):
request.meta[meta_key] = {}
request.meta[meta_key]["session"] = {"id": session_id}
request.meta.setdefault("dont_merge_cookies", True)
with self._fatal_error_handler:
if self.is_init_request(request):
return
session_config = self._get_session_config(request)
if not session_config.enabled(request):
self._crawler.stats.inc_value("scrapy-zyte-api/sessions/use/disabled")
return
session_id = await self._next(request)
# Note: If there is a session set already (e.g. a request being
# retried), it is overridden.
request.meta.setdefault("zyte_api_provider", {})["session"] = {
"id": session_id
}
if (
"zyte_api" in request.meta
or request.meta.get("zyte_api_automap", None) is False
or (
"zyte_api_automap" not in request.meta
and self._transparent_mode is False
)
):
meta_key = "zyte_api"
else:
meta_key = "zyte_api_automap"
request.meta.setdefault(meta_key, {})
if not isinstance(request.meta[meta_key], dict):
request.meta[meta_key] = {}
request.meta[meta_key]["session"] = {"id": session_id}
request.meta.setdefault("dont_merge_cookies", True)

def is_enabled(self, request: Request) -> bool:
session_config = self._get_session_config(request)
return session_config.enabled(request)

def handle_error(self, request: Request):
pool = self._get_pool(request)
self._crawler.stats.inc_value(
f"scrapy-zyte-api/sessions/pools/{pool}/use/failed"
)
session_id = self._get_request_session_id(request)
if session_id is not None:
self._errors[session_id] += 1
if self._errors[session_id] < self._max_errors:
return
self._start_request_session_refresh(request, pool)
with self._fatal_error_handler:
pool = self._get_pool(request)
self._crawler.stats.inc_value(
f"scrapy-zyte-api/sessions/pools/{pool}/use/failed"
)
session_id = self._get_request_session_id(request)
if session_id is not None:
self._errors[session_id] += 1
if self._errors[session_id] < self._max_errors:
return
self._start_request_session_refresh(request, pool)

def handle_expiration(self, request: Request):
pool = self._get_pool(request)
self._crawler.stats.inc_value(
f"scrapy-zyte-api/sessions/pools/{pool}/use/expired"
)
self._start_request_session_refresh(request, pool)


class FatalErrorHandler:

def __init__(self, crawler):
self.crawler = crawler

async def __aenter__(self):
return None

async def __aexit__(self, exc_type, exc, tb):
if exc_type is None:
return
from twisted.internet import reactor
from twisted.internet.interfaces import IReactorCore

reactor = cast(IReactorCore, reactor)
close = partial(
reactor.callLater, 0, self.crawler.engine.close_spider, self.crawler.spider
)
if issubclass(exc_type, TooManyBadSessionInits):
close("bad_session_inits")
elif issubclass(exc_type, PoolError):
close("pool_error")
elif issubclass(exc_type, CloseSpider):
close(exc.reason)
with self._fatal_error_handler:
pool = self._get_pool(request)
self._crawler.stats.inc_value(
f"scrapy-zyte-api/sessions/pools/{pool}/use/expired"
)
self._start_request_session_refresh(request, pool)


class ScrapyZyteAPISessionDownloaderMiddleware:
Expand All @@ -910,19 +921,16 @@
def __init__(self, crawler: Crawler):
self._crawler = crawler
self._sessions = _SessionManager(crawler)
self._fatal_error_handler = FatalErrorHandler(crawler)

async def process_request(self, request: Request, spider: Spider) -> None:
async with self._fatal_error_handler:
await self._sessions.assign(request)
await self._sessions.assign(request)

async def process_response(
self, request: Request, response: Response, spider: Spider
) -> Union[Request, Response, None]:
if isinstance(response, DummyResponse):
return response
async with self._fatal_error_handler:
passed = await self._sessions.check(response, request)
passed = await self._sessions.check(response, request)
if not passed:
new_request_or_none = get_retry_request(
request,
Expand All @@ -945,12 +953,10 @@
return None

if exception.parsed.type == "/problem/session-expired":
async with self._fatal_error_handler:
self._sessions.handle_expiration(request)
self._sessions.handle_expiration(request)
reason = "session_expired"
elif exception.status in {520, 521}:
async with self._fatal_error_handler:
self._sessions.handle_error(request)
self._sessions.handle_error(request)
reason = "download_error"
else:
return None
Expand Down
Loading