fix: Also use error_handler for context pipeline errors by janbuchar · Pull Request #331 · apify/crawlee-python

Expand Up @@ -155,8 +155,8 @@ def __init__(
self._context_pipeline = (_context_pipeline or ContextPipeline()).compose(self._check_url_after_redirects)
self._error_handler: ErrorHandler[TCrawlingContext] | None = None self._failed_request_handler: FailedRequestHandler[TCrawlingContext] | None = None self._error_handler: ErrorHandler[TCrawlingContext | BasicCrawlingContext] | None = None self._failed_request_handler: FailedRequestHandler[TCrawlingContext | BasicCrawlingContext] | None = None
self._max_request_retries = max_request_retries self._max_requests_per_crawl = max_requests_per_crawl Expand Down Expand Up @@ -309,13 +309,15 @@ async def get_key_value_store( """Return the key-value store with the given ID or name. If none is provided, return the default KVS.""" return await KeyValueStore.open(id=id, name=name)
def error_handler(self, handler: ErrorHandler[TCrawlingContext]) -> ErrorHandler[TCrawlingContext]: def error_handler( self, handler: ErrorHandler[TCrawlingContext | BasicCrawlingContext] ) -> ErrorHandler[TCrawlingContext]: """Decorator for configuring an error handler (called after a request handler error and before retrying).""" self._error_handler = handler return handler
def failed_request_handler( self, handler: FailedRequestHandler[TCrawlingContext] self, handler: FailedRequestHandler[TCrawlingContext | BasicCrawlingContext] ) -> FailedRequestHandler[TCrawlingContext]: """Decorator for configuring a failed request handler (called after max retries are reached).""" self._failed_request_handler = handler Expand Down Expand Up @@ -575,7 +577,9 @@ def _check_url_patterns( # The URL does not match any `include` pattern - reject it return False
async def _handle_request_error(self, crawling_context: TCrawlingContext, error: Exception) -> None: async def _handle_request_retries( self, crawling_context: TCrawlingContext | BasicCrawlingContext, error: Exception ) -> None: request_provider = await self.get_request_provider() request = crawling_context.request
Expand Down Expand Up @@ -605,7 +609,39 @@ async def _handle_request_error(self, crawling_context: TCrawlingContext, error: await self._handle_failed_request(crawling_context, error) self._statistics.record_request_processing_failure(request.id or request.unique_key)
async def _handle_failed_request(self, crawling_context: TCrawlingContext, error: Exception) -> None: async def _handle_request_error( self, crawling_context: TCrawlingContext | BasicCrawlingContext, error: Exception ) -> None: try: crawling_context.request.state = RequestState.ERROR_HANDLER
await wait_for( partial(self._handle_request_retries, crawling_context, error), timeout=self._internal_timeout, timeout_message='Handling request failure timed out after ' f'{self._internal_timeout.total_seconds()} seconds', logger=self._logger, )
crawling_context.request.state = RequestState.DONE except UserDefinedErrorHandlerError: crawling_context.request.state = RequestState.ERROR raise except Exception as secondary_error: self._logger.exception( 'An exception occurred during handling of failed request. This places the crawler ' 'and its underlying storages into an unknown state and crawling will be terminated.', exc_info=secondary_error, ) crawling_context.request.state = RequestState.ERROR raise
if crawling_context.session: crawling_context.session.mark_bad()
async def _handle_failed_request( self, crawling_context: TCrawlingContext | BasicCrawlingContext, error: Exception ) -> None: self._logger.exception('Request failed and reached maximum retries', exc_info=error) self._statistics.error_tracker.add(error)
Expand Down Expand Up @@ -753,34 +789,11 @@ async def __run_task_function(self) -> None: RequestHandlerError[TCrawlingContext], primary_error ) # valid thanks to ContextPipeline
try: request.state = RequestState.ERROR_HANDLER
await wait_for( partial( self._handle_request_error, primary_error.crawling_context, primary_error.wrapped_exception ), timeout=self._internal_timeout, timeout_message='Handling request failure timed out after ' f'{self._internal_timeout.total_seconds()} seconds', logger=self._logger, )
request.state = RequestState.DONE except UserDefinedErrorHandlerError: request.state = RequestState.ERROR raise except Exception as secondary_error: self._logger.exception( 'An exception occurred during handling of failed request. This places the crawler ' 'and its underlying storages into an unknown state and crawling will be terminated.', exc_info=secondary_error, ) request.state = RequestState.ERROR raise
if crawling_context.session: crawling_context.session.mark_bad() self._logger.debug( 'An exception occurred in the user-defined request handler', exc_info=primary_error.wrapped_exception, ) await self._handle_request_error(primary_error.crawling_context, primary_error.wrapped_exception) except SessionError as session_error: if not crawling_context.session: raise RuntimeError('SessionError raised in a crawling context without a session') from session_error Expand Down Expand Up @@ -822,30 +835,11 @@ async def __run_task_function(self) -> None: max_retries=3, ) except ContextPipelineInitializationError as initialization_error: if self._should_retry_request(crawling_context, initialization_error): self._logger.debug( 'An exception occurred during the initialization of crawling context, a retry is in order', exc_info=initialization_error, )
request = crawling_context.request request.retry_count += 1 request.state = RequestState.DONE await request_provider.reclaim_request(request) else: self._logger.exception('Request failed and reached maximum retries', exc_info=initialization_error)
await wait_for( lambda: request_provider.mark_request_as_handled(crawling_context.request), timeout=self._internal_timeout, timeout_message='Marking request as handled timed out after ' f'{self._internal_timeout.total_seconds()} seconds', logger=self._logger, max_retries=3, )
if crawling_context.session: crawling_context.session.mark_bad() self._logger.debug( 'An exception occurred during the initialization of crawling context', exc_info=initialization_error, ) await self._handle_request_error(crawling_context, initialization_error) except Exception as internal_error: self._logger.exception( 'An exception occurred during handling of a request. This places the crawler ' Expand Down