diff --git a/.bumpversion.cfg b/.bumpversion.cfg index e5ee2df..cecb753 100644 --- a/.bumpversion.cfg +++ b/.bumpversion.cfg @@ -1,5 +1,5 @@ [bumpversion] -current_version = 0.2.0 +current_version = 0.6.0 commit = True tag = True tag_name = {new_version} diff --git a/.coveragerc b/.coveragerc index 101904c..4fdc383 100644 --- a/.coveragerc +++ b/.coveragerc @@ -1,2 +1,8 @@ [run] branch = true + +[report] +# https://github.com/nedbat/coveragepy/issues/831#issuecomment-517778185 +exclude_lines = + pragma: no cover + if TYPE_CHECKING: diff --git a/.github/workflows/publish.yml b/.github/workflows/publish.yml index 240ffca..820c29d 100644 --- a/.github/workflows/publish.yml +++ b/.github/workflows/publish.yml @@ -13,11 +13,11 @@ jobs: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v4 - name: Set up Python - uses: actions/setup-python@v2 + uses: actions/setup-python@v5 with: - python-version: '3.x' + python-version: '3.13' - name: Install dependencies run: | python -m pip install --upgrade pip diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 1ac85a6..b056a24 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -16,12 +16,12 @@ jobs: strategy: fail-fast: false matrix: - python-version: ['3.7', '3.8', '3.9', '3.10'] + python-version: ['3.9', '3.10', '3.11', '3.12', '3.13'] steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v4 - name: Set up Python ${{ matrix.python-version }} - uses: actions/setup-python@v2 + uses: actions/setup-python@v5 with: python-version: ${{ matrix.python-version }} - name: Install dependencies @@ -33,20 +33,22 @@ jobs: tox -e py - name: coverage if: ${{ success() }} - run: bash <(curl -s https://codecov.io/bash) + uses: codecov/codecov-action@v4.0.1 + with: + token: ${{ secrets.CODECOV_TOKEN }} check: runs-on: ubuntu-latest strategy: fail-fast: false matrix: - python-version: ['3.10'] + python-version: ['3.12'] # Keep in sync with .readthedocs.yml tox-job: ["mypy", "docs"] steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v4 - name: Set up Python ${{ matrix.python-version }} - uses: actions/setup-python@v2 + uses: actions/setup-python@v5 with: python-version: ${{ matrix.python-version }} - name: Install dependencies diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml new file mode 100644 index 0000000..c2700c5 --- /dev/null +++ b/.pre-commit-config.yaml @@ -0,0 +1,19 @@ +repos: +- repo: https://github.com/PyCQA/isort + rev: 5.13.2 + hooks: + - id: isort +- repo: https://github.com/psf/black + rev: 24.10.0 + hooks: + - id: black +- repo: https://github.com/pycqa/flake8 + rev: 7.1.1 + hooks: + - id: flake8 +- repo: https://github.com/adamchainz/blacken-docs + rev: 1.19.0 + hooks: + - id: blacken-docs + additional_dependencies: + - black==24.10.0 diff --git a/.readthedocs.yml b/.readthedocs.yml new file mode 100644 index 0000000..f81f402 --- /dev/null +++ b/.readthedocs.yml @@ -0,0 +1,14 @@ +version: 2 +formats: all +sphinx: + configuration: docs/conf.py +build: + os: ubuntu-22.04 + tools: + # For available versions, see: + # https://docs.readthedocs.io/en/stable/config-file/v2.html#build-tools-python + python: "3.12" # Keep in sync with .github/workflows/test.yml +python: + install: + - requirements: docs/requirements.txt + - path: . diff --git a/CHANGES.rst b/CHANGES.rst index 3dbc42c..fb5fff1 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -1,6 +1,142 @@ Changes ======= +0.6.0 (2024-05-29) +------------------ + +* Improved how the :ref:`default retry policy ` handles + :ref:`temporary download errors `. + Before, 3 HTTP 429 responses followed by a single HTTP 520 response would + have prevented a retry. Now, unrelated responses and errors do not count + towards the HTTP 520 retry limit. + +* Improved how the :ref:`default retry policy ` handles + network errors. Before, after 15 minutes of unsuccessful responses (e.g. HTTP + 429), any network error would prevent a retry. Now, network errors must happen + 15 minutes in a row, without different errors in between, to stop retries. + +* Implemented an optional :ref:`aggressive retry policy + `, which retries more errors more often, and could + be useful for long crawls or websites with a low success rate. + +* Improved the exception that is raised when passing an invalid retrying policy + object to a :ref:`Python client `. + +0.5.2 (2024-05-10) +------------------ + +* :class:`~zyte_api.RequestError` now has a :data:`~zyte_api.RequestError.query` + attribute with the Zyte API request parameters that caused the error. + +0.5.1 (2024-04-16) +------------------ + +* :class:`~zyte_api.ZyteAPI` and :class:`~zyte_api.AsyncZyteAPI` sessions no + longer need to be used as context managers, and can instead be closed with a + ``close()`` method. + +0.5.0 (2024-04-05) +------------------ + +* Removed Python 3.7 support. + +* Added :class:`~zyte_api.ZyteAPI` and :class:`~zyte_api.AsyncZyteAPI` to + provide both sync and async Python interfaces with a cleaner API. + +* Deprecated ``zyte_api.aio``: + + * Replace ``zyte_api.aio.client.AsyncClient`` with the new + :class:`~zyte_api.AsyncZyteAPI` class. + + * Replace ``zyte_api.aio.client.create_session`` with the new + :meth:`AsyncZyteAPI.session ` method. + + * Import ``zyte_api.aio.errors.RequestError``, + ``zyte_api.aio.retry.RetryFactory`` and + ``zyte_api.aio.retry.zyte_api_retrying`` directly from ``zyte_api`` now. + +* When using the command-line interface, you can now use ``--store-errors`` to + have error responses be stored alongside successful responses. + +* Improved the documentation. + +0.4.8 (2023-11-02) +------------------ + +* Include the Zyte API request ID value in a new ``.request_id`` attribute + in ``zyte_api.aio.errors.RequestError``. + +0.4.7 (2023-09-26) +------------------ + +* ``AsyncClient`` now lets you set a custom user agent to send to Zyte API. + +0.4.6 (2023-09-26) +------------------ + +* Increased the client timeout to match the server’s. +* Mentioned the ``api_key`` parameter of ``AsyncClient`` in the docs example. + +0.4.5 (2023-01-03) +------------------ + +* w3lib >= 2.1.1 is required in install_requires, to ensure that URLs + are escaped properly. +* unnecessary ``requests`` library is removed from install_requires +* fixed tox 4 support + +0.4.4 (2022-12-01) +------------------ + +* Fixed an issue with submitting URLs which contain unescaped symbols +* New "retrying" argument for AsyncClient.__init__, which allows to set + custom retrying policy for the client +* ``--dont-retry-errors`` argument in the CLI tool + +0.4.3 (2022-11-10) +------------------ + +* Connections are no longer reused between requests. + This reduces the amount of ``ServerDisconnectedError`` exceptions. + +0.4.2 (2022-10-28) +------------------ +* Bump minimum ``aiohttp`` version to 3.8.0, as earlier versions don't support + brotli decompression of responses +* Declared Python 3.11 support + +0.4.1 (2022-10-16) +------------------ + +* Network errors, like server timeouts or disconnections, are now retried for + up to 15 minutes, instead of 5 minutes. + +0.4.0 (2022-09-20) +------------------ + +* Require to install ``Brotli`` as a dependency. This changes the requests to + have ``Accept-Encoding: br`` and automatically decompress brotli responses. + +0.3.0 (2022-07-29) +------------------ + +Internal AggStats class is cleaned up: + +* ``AggStats.n_extracted_queries`` attribute is removed, as it was a duplicate + of ``AggStats.n_results`` +* ``AggStats.n_results`` is renamed to ``AggStats.n_success`` +* ``AggStats.n_input_queries`` is removed as redundant and misleading; + AggStats got a new ``AggStats.n_processed`` property instead. + +This change is backwards incompatible if you used stats directly. + +0.2.1 (2022-07-29) +------------------ + +* ``aiohttp.client_exceptions.ClientConnectorError`` is now treated as a + network error and retried accordingly. +* Removed the unused ``zyte_api.sync`` module. + 0.2.0 (2022-07-14) ------------------ diff --git a/README.rst b/README.rst index d20d06b..e52e951 100644 --- a/README.rst +++ b/README.rst @@ -18,32 +18,101 @@ python-zyte-api :target: https://codecov.io/gh/zytedata/zyte-api :alt: Coverage report -Python client libraries for `Zyte Data API`_. +.. description-start -Command-line utility and asyncio-based library are provided by this package. +Command-line client and Python client library for `Zyte API`_. + +.. _Zyte API: https://docs.zyte.com/zyte-api/get-started.html + +.. description-end Installation ============ -:: +.. install-start + +.. code-block:: shell pip install zyte-api -``zyte-api`` requires Python 3.7+. +.. note:: Python 3.9+ is required. + +.. install-end + +Basic usage +=========== + +.. basic-start + +Set your API key +---------------- + +.. key-get-start + +After you `sign up for a Zyte API account +`_, copy `your API key +`_. + +.. key-get-end + + +Use the command-line client +--------------------------- + +Then you can use the zyte-api command-line client to send Zyte API requests. +First create a text file with a list of URLs: + +.. code-block:: none + + https://books.toscrape.com + https://quotes.toscrape.com + +And then call ``zyte-api`` from your shell: + +.. code-block:: shell -API key -======= + zyte-api url-list.txt --api-key YOUR_API_KEY --output results.jsonl -Make sure you have an API key for the `Zyte Data API`_ service. -You can set ``ZYTE_API_KEY`` environment -variable with the key to avoid passing it around explicitly. -Read the `documentation `_ for more information. +Use the Python sync API +----------------------- -License is BSD 3-clause. +For very basic Python scripts, use the sync API: + +.. code-block:: python + + from zyte_api import ZyteAPI + + client = ZyteAPI(api_key="YOUR_API_KEY") + response = client.get({"url": "https://toscrape.com", "httpResponseBody": True}) + + +Use the Python async API +------------------------ + +For asyncio code, use the async API: + +.. code-block:: python + + import asyncio + + from zyte_api import AsyncZyteAPI + + + async def main(): + client = AsyncZyteAPI(api_key="YOUR_API_KEY") + response = await client.get( + {"url": "https://toscrape.com", "httpResponseBody": True} + ) + + + asyncio.run(main()) + +.. basic-end + +Read the `documentation `_ for more +information. * Documentation: https://python-zyte-api.readthedocs.io * Source code: https://github.com/zytedata/python-zyte-api * Issue tracker: https://github.com/zytedata/python-zyte-api/issues - -.. _Zyte Data API: https://docs.zyte.com/zyte-api/get-started.html diff --git a/docs/_ext/__init__.py b/docs/_ext/__init__.py new file mode 100644 index 0000000..ee080d2 --- /dev/null +++ b/docs/_ext/__init__.py @@ -0,0 +1,43 @@ +import re + +from docutils import nodes +from docutils.parsers.rst.roles import set_classes + + +def http_api_reference_role( + name, rawtext, text, lineno, inliner, options={}, content=[] +): + match = re.search( + r"(?s)^(.+?)\s*<\s*((?:request|response):[a-zA-Z.]+)\s*>\s*$", text + ) + if match: + display_text = match[1] + reference = match[2] + else: + display_text = None + reference = text + if reference.startswith("request:"): + request_or_response = "request" + elif reference.startswith("response:"): + request_or_response = "response/200" + else: + raise ValueError( + f":http: directive reference must start with request: or " + f"response:, got {reference} from {text!r}." + ) + + field = reference.split(":", maxsplit=1)[1] + if not display_text: + display_text = field + refuri = ( + f"https://docs.zyte.com/zyte-api/usage/reference.html" + f"#operation/extract/{request_or_response}/{field}" + ) + set_classes(options) + node = nodes.reference(rawtext, display_text, refuri=refuri, **options) + return [node], [] + + +def setup(app): + # https://github.com/scrapy-plugins/scrapy-zyte-api/blob/2bfb2bef2e43293a62f47781914331bc4fa08f06/docs/_ext/__init__.py#L42 + app.add_role("http", http_api_reference_role) diff --git a/docs/_templates/custom-class-template.rst b/docs/_templates/custom-class-template.rst deleted file mode 100644 index b30e1a0..0000000 --- a/docs/_templates/custom-class-template.rst +++ /dev/null @@ -1,35 +0,0 @@ -.. - Template based in the original one, with some changes - proposed on https://stackoverflow.com/a/62613202/3887420 - -{{ fullname | escape | underline}} - -.. currentmodule:: {{ module }} - -.. autoclass:: {{ objname }} - :members: - :show-inheritance: - - {% block methods %} - .. automethod:: __init__ - - {% if methods %} - .. rubric:: {{ _('Methods') }} - - .. autosummary:: - {% for item in methods %} - ~{{ name }}.{{ item }} - {%- endfor %} - {% endif %} - {% endblock %} - - {% block attributes %} - {% if attributes %} - .. rubric:: {{ _('Attributes') }} - - .. autosummary:: - {% for item in attributes %} - ~{{ name }}.{{ item }} - {%- endfor %} - {% endif %} - {% endblock %} diff --git a/docs/_templates/custom-module-template.rst b/docs/_templates/custom-module-template.rst deleted file mode 100644 index f4d4155..0000000 --- a/docs/_templates/custom-module-template.rst +++ /dev/null @@ -1,70 +0,0 @@ -.. - Template based in the original one, with some changes - proposed on https://stackoverflow.com/a/62613202/3887420 - -{{ fullname | escape | underline}} - -.. automodule:: {{ fullname }} - - {% block attributes %} - {% if attributes %} - .. rubric:: {{ _('Module Attributes') }} - - .. autosummary:: - :toctree: - {% for item in attributes %} - {{ item }} - {%- endfor %} - {% endif %} - {% endblock %} - - {% block functions %} - {% if functions %} - .. rubric:: {{ _('Functions') }} - - .. autosummary:: - :toctree: - {% for item in functions %} - {{ item }} - {%- endfor %} - {% endif %} - {% endblock %} - - {% block classes %} - {% if classes %} - .. rubric:: {{ _('Classes') }} - - .. autosummary:: - :toctree: - :template: custom-class-template.rst - {% for item in classes %} - {{ item }} - {%- endfor %} - {% endif %} - {% endblock %} - - {% block exceptions %} - {% if exceptions %} - .. rubric:: {{ _('Exceptions') }} - - .. autosummary:: - :toctree: - {% for item in exceptions %} - {{ item }} - {%- endfor %} - {% endif %} - {% endblock %} - -{% block modules %} -{% if modules %} -.. rubric:: Modules - -.. autosummary:: - :toctree: - :template: custom-module-template.rst - :recursive: -{% for item in modules %} - {{ item }} -{%- endfor %} -{% endif %} -{% endblock %} diff --git a/docs/api_reference.rst b/docs/api_reference.rst deleted file mode 100644 index 3b18395..0000000 --- a/docs/api_reference.rst +++ /dev/null @@ -1,13 +0,0 @@ -============= -API Reference -============= - -.. - Based on ideas found on https://stackoverflow.com/a/62613202/3887420 - -.. autosummary:: - :toctree: _autosummary - :template: custom-module-template.rst - :recursive: - - zyte_api diff --git a/docs/asyncio_api.rst b/docs/asyncio_api.rst deleted file mode 100644 index 07ba88c..0000000 --- a/docs/asyncio_api.rst +++ /dev/null @@ -1,67 +0,0 @@ -.. _`asyncio_api`: - -=========== -asyncio API -=========== - -Create an instance of the ``AsyncClient`` to use the asyncio client API. -You can use the method ``request_raw`` to perform individual requests: - -.. code-block:: python - - import asyncio - from zyte_api.aio.client import AsyncClient - - client = AsyncClient() - - async def single_request(url): - return await client.request_raw({ - 'url': url, - 'browserHtml': True - }) - - response = asyncio.run(single_request("https://books.toscrape.com")) - # Do something with the response .. - -There is also ``request_parallel_as_completed`` method, which allows -to process many URLs in parallel, using multiple connections: - -.. code-block:: python - - import asyncio - import json - import sys - - from zyte_api.aio.client import AsyncClient, create_session - from zyte_api.aio.errors import RequestError - - async def extract_from(urls, n_conn): - client = AsyncClient(n_conn=n_conn) - requests = [ - {"url": url, "browserHtml": True} - for url in urls - ] - async with create_session(n_conn) as session: - res_iter = client.request_parallel_as_completed(requests, session=session) - for fut in res_iter: - try: - res = await fut - # do something with a result, e.g. - print(json.dumps(res)) - except RequestError as e: - print(e, file=sys.stderr) - raise - - urls = ["https://toscrape.com", "https://books.toscrape.com"] - asyncio.run(extract_from(urls, n_conn=15)) - -``request_parallel_as_completed`` is modelled after ``asyncio.as_completed`` -(see https://docs.python.org/3/library/asyncio-task.html#asyncio.as_completed), -and actually uses it under the hood. - -``request_parallel_as_completed`` and ``request_raw`` methods handle -throttling (http 429 errors) and network errors, retrying a request in -these cases. - -CLI interface implementation (``zyte_api/__main__.py``) can serve -as an usage example. diff --git a/docs/command_line.rst b/docs/command_line.rst deleted file mode 100644 index 0bddd20..0000000 --- a/docs/command_line.rst +++ /dev/null @@ -1,98 +0,0 @@ -.. _`command_line`: - -====================== -Command-line interface -====================== - -The most basic way to use the client is from a command line. - -First, create a file with urls, an URL per line (e.g. ``urls.txt``). - -Second, set ``ZYTE_API_KEY`` env variable with your -API key (you can also pass API key as ``--api-key`` script -argument). - -Then run a script, to get the results: - -.. code-block:: shell - - zyte-api urls.txt --output res.jsonl - -.. note:: You may use ``python -m zyte_api`` instead of ``zyte-api``. - -Requests to get browser HTML from those input URLs will be sent to Zyte Data -API, using up to 20 parallel connections, and the API responses will be stored -in the ``res.jsonl`` `JSON Lines`_ file, 1 response per line. - -.. _JSON Lines: https://jsonlines.org/ - -The results may be stored in an order which is different from the input order. -If you need to match the output results to the input URLs, the best way is to -use the ``echoData`` field (see below); it is passed through, and returned -as-is in the ``echoData`` attribute. By default it will contain the input URL -the content belongs to. - -If you need more flexibility, you can customize the requests by creating -a JSON Lines file with queries: a JSON object per line. You can pass any -`Zyte Data API`_ options there. For example, you could create the following -``requests.jsonl`` file: - -.. code-block:: json - - {"url": "https://example.com", "browserHtml": true, "geolocation": "GB", "echoData": "homepage"} - {"url": "https://example.com/foo", "browserHtml": true, "javascript": false} - {"url": "https://example.com/bar", "browserHtml": true, "geolocation": "US"} - -See `API docs`_ for a description of all supported parameters. - -.. _API docs: https://docs.zyte.com/zyte-api/openapi.html -.. _Zyte Data API: https://docs.zyte.com/zyte-api/get-started.html - -To get results for this ``requests.jsonl`` file, run: - -.. code-block:: shell - - zyte-api requests.jsonl --output res.jsonl - -Processing speed -~~~~~~~~~~~~~~~~ - -Each API key has a limit on RPS. To get your URLs processed faster you can -increase the number concurrent connections. - -Best options depend on the RPS limit and on websites you're extracting -data from. For example, if your API key has a limit of 3RPS, and average -response time you observe for your websites is 10s, then to get to these -3RPS you may set the number of concurrent connections to 30. - -To set these options in the CLI, use the ``--n-conn`` argument: - -.. code-block:: shell - - zyte-api urls.txt --n-conn 30 --output res.jsonl - -If too many requests are being processed in parallel, you'll be getting -throttling errors. They are handled by CLI automatically, but they make -extraction less efficient; please tune the concurrency options to -not hit the throttling errors (HTTP 429) often. - -You may be also limited by the website speed. The Zyte Data API tries not to hit -any individual website too hard, but it could be better to limit this on -a client side as well. If you're extracting data from a single website, -it could make sense to decrease the amount of parallel requests; it can ensure -higher success ratio overall. - -If you're extracting data from multiple websites, it makes sense to spread the -load across time: if you have websites A, B and C, don't send requests in -AAAABBBBCCCC order, send them in ABCABCABCABC order instead. - -To do so, you can change the order of the queries in your input file. -Alternatively, you can pass ``--shuffle`` options; it randomly shuffles -input queries before sending them to the API: - -.. code-block:: shell - - zyte-api urls.txt --shuffle --output res.jsonl - -Run ``zyte-api --help`` to get description of all supported -options. diff --git a/docs/conf.py b/docs/conf.py index 17a8f56..a652286 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -12,19 +12,21 @@ # import os import sys -sys.path.insert(0, os.path.abspath('../')) +from pathlib import Path + +sys.path.insert(0, os.path.abspath("../")) # -- Project information ----------------------------------------------------- -project = u'python-zyte-api' -copyright = u'2021, Zyte Group Ltd' -author = u'Zyte Group Ltd' +project = "python-zyte-api" +copyright = "2021, Zyte Group Ltd" +author = "Zyte Group Ltd" # The short X.Y version -version = u'' +version = "" # The full version, including alpha/beta/rc tags -release = u'0.2.0' +release = "0.6.0" # -- General configuration --------------------------------------------------- @@ -36,38 +38,40 @@ # Add any Sphinx extension module names here, as strings. They can be # extensions coming with Sphinx (named 'sphinx.ext.*') or your custom # ones. +sys.path.insert(0, str(Path(__file__).parent.absolute())) # _ext extensions = [ - 'sphinx.ext.autodoc', - 'sphinx.ext.intersphinx', - 'sphinx.ext.ifconfig', - 'sphinx.ext.viewcode', - 'sphinx.ext.githubpages', - 'sphinx.ext.autosummary', + "_ext", + "sphinx.ext.autodoc", + "sphinx.ext.intersphinx", + "sphinx.ext.ifconfig", + "sphinx.ext.viewcode", + "sphinx.ext.githubpages", + "sphinxarg.ext", ] # Add any paths that contain templates here, relative to this directory. -templates_path = ['_templates'] +# templates_path = ["_templates"] # The suffix(es) of source filenames. # You can specify multiple suffix as a list of string: # # source_suffix = ['.rst', '.md'] -source_suffix = '.rst' +source_suffix = {".rst": "restructuredtext"} # The master toctree document. -master_doc = 'index' +master_doc = "index" # The language for content autogenerated by Sphinx. Refer to documentation # for a list of supported languages. # # This is also used if you do content translation via gettext catalogs. # Usually you set "language" from the command line for these cases. -language = 'en' +language = "en" # List of patterns, relative to source directory, that match files and # directories to ignore when looking for source files. # This pattern also affects html_static_path and html_extra_path. -exclude_patterns = [u'_build', 'Thumbs.db', '.DS_Store'] +exclude_patterns = ["_build", "Thumbs.db", ".DS_Store"] # The name of the Pygments (syntax highlighting) style to use. pygments_style = None @@ -78,13 +82,7 @@ # The theme to use for HTML and HTML Help pages. See the documentation for # a list of builtin themes. # -html_theme = 'sphinx_rtd_theme' - -# Add any paths that contain custom themes here, relative to this directory. -# Add path to the RTD explicitly to robustify builds (otherwise might -# fail in a clean Debian build env) -import sphinx_rtd_theme -html_theme_path = [sphinx_rtd_theme.get_html_theme_path()] +html_theme = "sphinx_rtd_theme" # Theme options are theme-specific and customize the look and feel of a theme # further. For a list of options available for each theme, see the @@ -111,7 +109,7 @@ # -- Options for HTMLHelp output --------------------------------------------- # Output file base name for HTML help builder. -htmlhelp_basename = 'python-zyte-apidoc' +htmlhelp_basename = "python-zyte-apidoc" # -- Options for LaTeX output ------------------------------------------------ @@ -120,15 +118,12 @@ # The paper size ('letterpaper' or 'a4paper'). # # 'papersize': 'letterpaper', - # The font size ('10pt', '11pt' or '12pt'). # # 'pointsize': '10pt', - # Additional stuff for the LaTeX preamble. # # 'preamble': '', - # Latex figure (float) alignment # # 'figure_align': 'htbp', @@ -138,8 +133,13 @@ # (source start file, target name, title, # author, documentclass [howto, manual, or own class]). latex_documents = [ - (master_doc, 'python-zyte-api.tex', u'python-zyte-api Documentation', - u'Zyte Group Ltd', 'manual'), + ( + master_doc, + "python-zyte-api.tex", + "python-zyte-api Documentation", + "Zyte Group Ltd", + "manual", + ), ] @@ -148,8 +148,7 @@ # One entry per manual page. List of tuples # (source start file, name, description, authors, manual section). man_pages = [ - (master_doc, 'python-zyte-api', u'python-zyte-api Documentation', - [author], 1) + (master_doc, "python-zyte-api", "python-zyte-api Documentation", [author], 1) ] @@ -159,9 +158,15 @@ # (source start file, target name, title, author, # dir menu entry, description, category) texinfo_documents = [ - (master_doc, 'python-zyte-api', u'python-zyte-api Documentation', - author, 'python-zyte-api', 'One line description of project.', - 'Miscellaneous'), + ( + master_doc, + "python-zyte-api", + "python-zyte-api Documentation", + author, + "python-zyte-api", + "One line description of project.", + "Miscellaneous", + ), ] @@ -180,23 +185,35 @@ # epub_uid = '' # A list of files that should not be packed into the epub file. -epub_exclude_files = ['search.html'] +epub_exclude_files = ["search.html"] # -- Extension configuration ------------------------------------------------- # -- Options for intersphinx extension --------------------------------------- intersphinx_mapping = { - 'python': ('https://docs.python.org/3', None, ), - 'aiohttp': ('https://docs.aiohttp.org/en/stable/', None, ), - 'tenacity': ('https://tenacity.readthedocs.io/en/latest/', None, ), + "python": ( + "https://docs.python.org/3", + None, + ), + "aiohttp": ( + "https://docs.aiohttp.org/en/stable/", + None, + ), + "tenacity": ( + "https://tenacity.readthedocs.io/en/latest/", + None, + ), + "zyte": ( + "https://docs.zyte.com", + None, + ), } autodoc_default_options = { # 'special-members': '__init__,__call__', # 'undoc-members': True, - 'exclude-members': '__weakref__' + "exclude-members": "__weakref__" } add_module_names = False -autosummary_generate = True diff --git a/docs/index.rst b/docs/index.rst index f066ab3..7fe3951 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -2,27 +2,35 @@ python-zyte-api =============== -Python client libraries for `Zyte Data API`_. +.. include:: ../README.rst + :start-after: description-start + :end-before: description-end -Command-line utility and asyncio-based library are provided by this package. +.. toctree:: + :caption: Getting started + :maxdepth: 1 -:ref:`license` is BSD 3-clause. + intro/install + intro/basic .. toctree:: - :caption: Getting started + :caption: Usage :maxdepth: 1 - install - command_line - asyncio_api + use/key + use/cli + use/api + +.. toctree:: + :caption: Reference + :maxdepth: 1 + + ref/cli + ref/api .. toctree:: :caption: All the rest :maxdepth: 1 - api_reference contributing changelog - license - -.. _Zyte Data API: https://docs.zyte.com/zyte-api/get-started.html \ No newline at end of file diff --git a/docs/install.rst b/docs/install.rst deleted file mode 100644 index 7816f09..0000000 --- a/docs/install.rst +++ /dev/null @@ -1,20 +0,0 @@ -.. _`install`: - -============ -Installation -============ - -:: - - pip install zyte-api - -``zyte-api`` requires Python 3.7+. - -API key -======= - -Make sure you have an API key for the `Zyte Data API`_ service. -You can set ``ZYTE_API_KEY`` environment -variable with the key to avoid passing it around explicitly. - -.. _Zyte Data API: https://docs.zyte.com/zyte-api/get-started.html diff --git a/docs/intro/basic.rst b/docs/intro/basic.rst new file mode 100644 index 0000000..32ff015 --- /dev/null +++ b/docs/intro/basic.rst @@ -0,0 +1,9 @@ +.. _basic: + +=========== +Basic usage +=========== + +.. include:: /../README.rst + :start-after: basic-start + :end-before: basic-end diff --git a/docs/intro/install.rst b/docs/intro/install.rst new file mode 100644 index 0000000..be46008 --- /dev/null +++ b/docs/intro/install.rst @@ -0,0 +1,9 @@ +.. _install: + +============ +Installation +============ + +.. include:: /../README.rst + :start-after: install-start + :end-before: install-end diff --git a/docs/license.rst b/docs/license.rst deleted file mode 100644 index e6a41ca..0000000 --- a/docs/license.rst +++ /dev/null @@ -1,7 +0,0 @@ -.. _`license`: - -======= -License -======= - -.. include:: ../LICENSE diff --git a/docs/ref/api.rst b/docs/ref/api.rst new file mode 100644 index 0000000..70164ed --- /dev/null +++ b/docs/ref/api.rst @@ -0,0 +1,44 @@ +.. _api-ref: + +============= +API reference +============= + +.. module:: zyte_api + +Sync API +======== + +.. autoclass:: ZyteAPI + :members: + + +Async API +========= + +.. autoclass:: AsyncZyteAPI + :members: + + +Retries +======= + +.. autodata:: zyte_api_retrying + :no-value: + +.. autodata:: aggressive_retrying + :no-value: + +.. autoclass:: RetryFactory + +.. autoclass:: AggressiveRetryFactory + + +Errors +====== + +.. autoexception:: RequestError + :members: + +.. autoclass:: ParsedError + :members: diff --git a/docs/ref/cli.rst b/docs/ref/cli.rst new file mode 100644 index 0000000..7afd4f5 --- /dev/null +++ b/docs/ref/cli.rst @@ -0,0 +1,13 @@ +.. _cli-ref: + +============= +CLI reference +============= + +zyte-api +======== + +.. argparse:: + :ref: zyte_api.__main__._get_argument_parser + :prog: zyte-api + :nodefault: diff --git a/docs/requirements.txt b/docs/requirements.txt index 8861680..164cfa9 100644 --- a/docs/requirements.txt +++ b/docs/requirements.txt @@ -1,4 +1,5 @@ -tenacity aiohttp >= 3.6.0 Sphinx >= 4.2.0 +sphinx-argparse sphinx-rtd-theme >= 0.4 +tenacity diff --git a/docs/use/api.rst b/docs/use/api.rst new file mode 100644 index 0000000..c9419bd --- /dev/null +++ b/docs/use/api.rst @@ -0,0 +1,204 @@ +.. _api: + +.. currentmodule:: zyte_api + +===================== +Python client library +===================== + +Once you have :ref:`installed python-zyte-api ` and :ref:`configured +your API key `, you can use one of its APIs from Python code: + +- The :ref:`sync API ` can be used to build simple, proof-of-concept or + debugging Python scripts. + +- The :ref:`async API ` can be used from :ref:`coroutines + `, and is meant for production usage, as well as for asyncio + environments like `Jupyter notebooks`_. + + .. _Jupyter notebooks: https://jupyter.org/ + +.. _sync: + +Sync API +======== + +Create a :class:`ZyteAPI` object, and use its +:meth:`~ZyteAPI.get` method to perform a single request: + +.. code-block:: python + + from zyte_api import ZyteAPI + + client = ZyteAPI() + result = client.get({"url": "https://toscrape.com", "httpResponseBody": True}) + +To perform multiple requests, use a :meth:`~ZyteAPI.session` for +better performance, and use :meth:`~ZyteAPI.iter` to send multiple +requests in parallel: + +.. code-block:: python + + from zyte_api import ZyteAPI, RequestError + + client = ZyteAPI() + with client.session() as session: + queries = [ + {"url": "https://toscrape.com", "httpResponseBody": True}, + {"url": "https://books.toscrape.com", "httpResponseBody": True}, + ] + for result_or_exception in session.iter(queries): + if isinstance(result_or_exception, dict): + ... + elif isinstance(result_or_exception, RequestError): + ... + else: + assert isinstance(result_or_exception, Exception) + ... + +.. tip:: :meth:`~ZyteAPI.iter` yields results as they come, not + necessarily in their original order. Use :http:`request:echoData` to track + the source request. + +.. _asyncio_api: + +Async API +========= + +Create an :class:`AsyncZyteAPI` object, and use its +:meth:`~AsyncZyteAPI.get` method to perform a single request: + +.. code-block:: python + + import asyncio + + from zyte_api import AsyncZyteAPI + + + async def main(): + client = AsyncZyteAPI() + result = await client.get({"url": "https://toscrape.com", "httpResponseBody": True}) + + + asyncio.run(main()) + +To perform multiple requests, use a :meth:`~AsyncZyteAPI.session` for +better performance, and use :meth:`~AsyncZyteAPI.iter` to send +multiple requests in parallel: + +.. code-block:: python + + import asyncio + + from zyte_api import ZyteAPI, RequestError + + + async def main(): + client = ZyteAPI() + async with client.session() as session: + queries = [ + {"url": "https://toscrape.com", "httpResponseBody": True}, + {"url": "https://books.toscrape.com", "httpResponseBody": True}, + ] + for future in session.iter(queries): + try: + result = await future + except RequestError as e: + ... + except Exception as e: + ... + + + asyncio.run(main()) + +.. tip:: :meth:`~AsyncZyteAPI.iter` yields results as they come, not + necessarily in their original order. Use :http:`request:echoData` to track + the source request. + + +.. _api-optimize: + +Optimization +============ + +:class:`ZyteAPI` and :class:`AsyncZyteAPI` use 15 +concurrent connections by default. + +To change that, use the ``n_conn`` parameter when creating your client object: + +.. code-block:: python + + client = ZyteAPI(n_conn=30) + +The number of concurrent connections if enforced across all method calls, +including different sessions of the same client. + +For guidelines on how to choose the optimal value for you, and other +optimization tips, see :ref:`zapi-optimize`. + + +Errors and retries +================== + +Methods of :class:`ZyteAPI` and :class:`AsyncZyteAPI` automatically handle +retries for :ref:`rate-limiting ` and :ref:`unsuccessful +` responses, as well as network errors. + +.. _retry-policy: +.. _default-retry-policy: + +The default retry policy, :data:`~zyte_api.zyte_api_retrying`, does the +following for each request: + +- Retries :ref:`rate-limiting responses ` forever. + +- Retries :ref:`temporary download errors ` + up to 3 times. :ref:`Permanent download errors + ` also count towards this retry limit. + +- Retries permanent download errors up to 3 times per request. + +- Retries network errors until they have happened for 15 minutes straight. + +- Retries error responses with an HTTP status code in the 500-599 range (503, + 520 and 521 excluded) up to 3 times. + +- Disallows new requests if undocumented error responses are more than 10 + *and* more than 1% of all responses. + +All retries are done with an exponential backoff algorithm. + +.. _aggressive-retry-policy: + +If some :ref:`unsuccessful responses ` exceed +maximum retries with the default retry policy, try using +:data:`~zyte_api.aggressive_retrying` instead, which duplicates attempts for +all retry scenarios. + +Alternatively, the reference documentation of :class:`~zyte_api.RetryFactory` +and :class:`~zyte_api.AggressiveRetryFactory` features some examples of custom +retry policies, and you can always build your own +:class:`~tenacity.AsyncRetrying` object from scratch. + +To use :data:`~zyte_api.aggressive_retrying` or a custom retry policy, pass an +instance of your :class:`~tenacity.AsyncRetrying` subclass when creating your +client object: + +.. code-block:: python + + from zyte_api import ZyteAPI, aggressive_retrying + + client = ZyteAPI(retrying=aggressive_retrying) + +When retries are exceeded for a given request, an exception is raised. Except +for the :meth:`~ZyteAPI.iter` method of the :ref:`sync API `, which +yields exceptions instead of raising them, to prevent exceptions from +interrupting the entire iteration. + +The type of exception depends on the issue that caused the final request +attempt to fail. Unsuccessful responses trigger a :exc:`RequestError` and +network errors trigger :ref:`aiohttp exceptions `. +Other exceptions could be raised; for example, from a custom retry policy. + + +.. seealso:: :ref:`api-ref` diff --git a/docs/use/cli.rst b/docs/use/cli.rst new file mode 100644 index 0000000..abf2479 --- /dev/null +++ b/docs/use/cli.rst @@ -0,0 +1,114 @@ +.. _command_line: + +=================== +Command-line client +=================== + +Once you have :ref:`installed python-zyte-api ` and :ref:`configured +your API key `, you can use the ``zyte-api`` command-line client. + +To use ``zyte-api``, pass an :ref:`input file ` as the first +parameter and specify an :ref:`output file ` with ``--output``. +For example: + +.. code-block:: shell + + zyte-api urls.txt --output result.jsonl + +.. _input-file: + +Input file +========== + +The input file can be either of the following: + +- A plain-text file with a list of target URLs, one per line. For example: + + .. code-block:: none + + https://books.toscrape.com + https://quotes.toscrape.com + + For each URL, a Zyte API request will be sent with + :http:`request:browserHtml` set to ``True``. + +- A `JSON Lines `_ file with a object of :ref:`Zyte + API request parameters ` per line. For example: + + .. code-block:: json + + {"url": "https://a.example", "browserHtml": true, "geolocation": "GB"} + {"url": "https://b.example", "httpResponseBody": true} + {"url": "https://books.toscrape.com", "productNavigation": true} + + +.. _output-file: + +Output file +=========== + +You can specify the path to an output file with the ``--output``/``-o`` switch. +If not specified, the output is printed on the standard output. + +.. warning:: The output path is overwritten. + +The output file is in `JSON Lines`_ format. Each line contains a JSON object +with a response from Zyte API. + +By default, ``zyte-api`` uses multiple concurrent connections for +:ref:`performance reasons ` and, as a result, the order of +responses will probably not match the order of the source requests from the +:ref:`input file `. If you need to match the output results to the +input requests, the best way is to use :http:`request:echoData`. By default, +``zyte-api`` fills :http:`request:echoData` with the input URL. + + +.. _cli-optimize: + +Optimization +============ + +By default, ``zyte-api`` uses 20 concurrent connections for requests. Use the +``--n-conn`` switch to change that: + +.. code-block:: shell + + zyte-api --n-conn 40 … + +The ``--shuffle`` option can be useful if you target multiple websites and your +:ref:`input file ` is sorted by website, to randomize the request +order and hence distribute the load somewhat evenly: + +.. code-block:: shell + + zyte-api urls.txt --shuffle … + +For guidelines on how to choose the optimal ``--n-conn`` value for you, and +other optimization tips, see :ref:`zapi-optimize`. + + +Errors and retries +================== + +``zyte-api`` automatically handles retries for :ref:`rate-limiting +` and :ref:`unsuccessful +` responses, as well as network errors, +following the :ref:`default retry policy `. + +Use ``--dont-retry-errors`` to disable the retrying of error responses, and +retrying only :ref:`rate-limiting responses `: + +.. code-block:: shell + + zyte-api --dont-retry-errors … + +By default, errors are only logged in the standard error output (``stderr``). +If you want to include error responses in the output file, use +``--store-errors``: + +.. code-block:: shell + + zyte-api --store-errors … + + +.. seealso:: :ref:`cli-ref` diff --git a/docs/use/key.rst b/docs/use/key.rst new file mode 100644 index 0000000..c632895 --- /dev/null +++ b/docs/use/key.rst @@ -0,0 +1,49 @@ +.. _api-key: + +======= +API key +======= + +.. include:: /../README.rst + :start-after: key-get-start + :end-before: key-get-end + +It is recommended to configure your API key through an environment variable, so +that it can be picked by both the :ref:`command-line client ` and +the :ref:`Python client library `: + +- On Windows: + + .. code-block:: shell + + > set ZYTE_API_KEY=YOUR_API_KEY + +- On macOS and Linux: + + .. code-block:: shell + + $ export ZYTE_API_KEY=YOUR_API_KEY + +Alternatively, you may pass your API key to the clients directly: + +- To pass your API key directly to the command-line client, use the + ``--api-key`` switch: + + .. code-block:: shell + + zyte-api --api-key YOUR_API_KEY … + +- To pass your API key directly to the Python client classes, use the + ``api_key`` parameter when creating a client object: + + .. code-block:: python + + from zyte_api import ZyteAPI + + client = ZyteAPI(api_key="YOUR_API_KEY") + + .. code-block:: python + + from zyte_api import AsyncZyteAPI + + client = AsyncZyteAPI(api_key="YOUR_API_KEY") diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..e882810 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,9 @@ +[tool.isort] +profile = "black" +multi_line_output = 3 + +[tool.black] +target-version = ["py39", "py310", "py311", "py312", "py313"] + +[tool.mypy] +check_untyped_defs = true diff --git a/pytest.ini b/pytest.ini new file mode 100644 index 0000000..c7cb0a7 --- /dev/null +++ b/pytest.ini @@ -0,0 +1,4 @@ +[pytest] +filterwarnings = + ignore:The zyte_api\.aio module is deprecated:DeprecationWarning + diff --git a/requirements-test.txt b/requirements-test.txt deleted file mode 100644 index 847062e..0000000 --- a/requirements-test.txt +++ /dev/null @@ -1,3 +0,0 @@ -pytest -pytest-cov -responses diff --git a/setup.cfg b/setup.cfg new file mode 100644 index 0000000..43b4ff1 --- /dev/null +++ b/setup.cfg @@ -0,0 +1,14 @@ +[flake8] +ignore = + # Style issues handled by black. + E501, + E203, + W503, + +per-file-ignores = + # F401: Ignore "imported but unused" errors in __init__ files, as those + # imports are there to expose submodule functions so they can be imported + # directly from that module + zyte_api/__init__.py:F401 + zyte_api/aio/errors.py:F401 + zyte_api/aio/retry.py:F401 \ No newline at end of file diff --git a/setup.py b/setup.py index d97d3eb..612d6d4 100755 --- a/setup.py +++ b/setup.py @@ -1,47 +1,49 @@ -#!/usr/bin/env python import os -from setuptools import setup, find_packages + +from setuptools import find_packages, setup def get_version(): about = {} here = os.path.abspath(os.path.dirname(__file__)) - with open(os.path.join(here, 'zyte_api/__version__.py')) as f: + with open(os.path.join(here, "zyte_api/__version__.py")) as f: exec(f.read(), about) - return about['__version__'] + return about["__version__"] setup( - name='zyte-api', + name="zyte-api", version=get_version(), - description='Python interface to Zyte Data API', - long_description=open('README.rst').read() + "\n\n" + open('CHANGES.rst').read(), - long_description_content_type='text/x-rst', - author='Zyte Group Ltd', - author_email='opensource@zyte.com', - url='https://github.com/zytedata/python-zyte-api', - packages=find_packages(exclude=['tests', 'examples']), - entry_points = { - 'console_scripts': ['zyte-api=zyte_api.__main__:_main'], + description="Python interface to Zyte API", + long_description=open("README.rst").read(), + long_description_content_type="text/x-rst", + author="Zyte Group Ltd", + author_email="opensource@zyte.com", + url="https://github.com/zytedata/python-zyte-api", + packages=find_packages(exclude=["tests", "examples"]), + entry_points={ + "console_scripts": ["zyte-api=zyte_api.__main__:_main"], }, install_requires=[ - 'requests', - 'tenacity', - 'aiohttp >= 3.6.0', - 'tqdm', - 'attrs', - 'runstats', + "aiohttp >= 3.8.0", + "attrs", + "brotli", + "runstats", + "tenacity", + "tqdm", + "w3lib >= 2.1.1", ], classifiers=[ - 'Development Status :: 3 - Alpha', - 'Intended Audience :: Developers', - 'License :: OSI Approved :: BSD License', - 'Natural Language :: English', - 'Operating System :: OS Independent', - 'Programming Language :: Python :: 3', - 'Programming Language :: Python :: 3.7', - 'Programming Language :: Python :: 3.8', - 'Programming Language :: Python :: 3.9', - 'Programming Language :: Python :: 3.10', + "Development Status :: 3 - Alpha", + "Intended Audience :: Developers", + "License :: OSI Approved :: BSD License", + "Natural Language :: English", + "Operating System :: OS Independent", + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.9", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", + "Programming Language :: Python :: 3.13", ], ) diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..db2b302 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,9 @@ +import pytest + + +@pytest.fixture(scope="session") +def mockserver(): + from .mockserver import MockServer + + with MockServer() as server: + yield server diff --git a/tests/mockserver.py b/tests/mockserver.py new file mode 100644 index 0000000..ce84138 --- /dev/null +++ b/tests/mockserver.py @@ -0,0 +1,201 @@ +import argparse +import json +import socket +import sys +import time +from base64 import b64encode +from importlib import import_module +from subprocess import PIPE, Popen +from typing import Any, Dict, cast +from urllib.parse import urlparse + +from twisted.internet import reactor +from twisted.internet.defer import Deferred +from twisted.internet.interfaces import IReactorTime +from twisted.internet.task import deferLater +from twisted.web.resource import Resource +from twisted.web.server import NOT_DONE_YET, Site + + +# https://github.com/scrapy/scrapy/blob/02b97f98e74a994ad3e4d74e7ed55207e508a576/tests/mockserver.py#L27C1-L33C19 +def getarg(request, name, default=None, type=None): + if name in request.args: + value = request.args[name][0] + if type is not None: + value = type(value) + return value + return default + + +def get_ephemeral_port(): + s = socket.socket() + s.bind(("", 0)) + return s.getsockname()[1] + + +class DropResource(Resource): + isLeaf = True + + def deferRequest(self, request, delay, f, *a, **kw): + def _cancelrequest(_): + # silence CancelledError + d.addErrback(lambda _: None) + d.cancel() + + d: Deferred = deferLater(cast(IReactorTime, reactor), delay, f, *a, **kw) + request.notifyFinish().addErrback(_cancelrequest) + return d + + def render_POST(self, request): + request.setHeader(b"Content-Length", b"1024") + self.deferRequest(request, 0, self._delayedRender, request) + return NOT_DONE_YET + + def _delayedRender(self, request): + abort = getarg(request, b"abort", 0, type=int) + request.write(b"this connection will be dropped\n") + tr = request.channel.transport + try: + if abort and hasattr(tr, "abortConnection"): + tr.abortConnection() + else: + tr.loseConnection() + finally: + request.finish() + + +class DefaultResource(Resource): + request_count = 0 + + def getChild(self, path, request): + return self + + def render_POST(self, request): + request_data = json.loads(request.content.read()) + + request.responseHeaders.setRawHeaders( + b"Content-Type", + [b"application/json"], + ) + request.responseHeaders.setRawHeaders( + b"request-id", + [b"abcd1234"], + ) + + url = request_data["url"] + domain = urlparse(url).netloc + response_data: Dict[str, Any] + if domain == "e429.example": + request.setResponseCode(429) + response_data = {"status": 429, "type": "/limits/over-user-limit"} + return json.dumps(response_data).encode() + if domain == "e500.example": + request.setResponseCode(500) + return "" + if domain == "e520.example": + request.setResponseCode(520) + response_data = {"status": 520, "type": "/download/temporary-error"} + return json.dumps(response_data).encode() + if domain == "e521.example": + request.setResponseCode(521) + response_data = {"status": 521, "type": "/download/internal-error"} + return json.dumps(response_data).encode() + if domain == "exception.example": + request.setResponseCode(401) + response_data = { + "status": 401, + "type": "/auth/key-not-found", + "title": "Authentication Key Not Found", + "detail": "The authentication key is not valid or can't be matched.", + } + return json.dumps(response_data).encode() + if domain == "empty-body-exception.example": + request.setResponseCode(500) + return b"" + if domain == "nonjson.example": + request.setResponseCode(200) + return b"foo" + if domain == "nonjson-exception.example": + request.setResponseCode(500) + return b"foo" + if domain == "array-exception.example": + request.setResponseCode(500) + return b'["foo"]' + + response_data = { + "url": url, + } + + html = "Hello

World!

" + if "httpResponseBody" in request_data: + body = b64encode(html.encode()).decode() + response_data["httpResponseBody"] = body + else: + assert "browserHtml" in request_data + response_data["browserHtml"] = html + + return json.dumps(response_data).encode() + + +class MockServer: + def __init__(self, resource=None, port=None): + resource = resource or DefaultResource + self.resource = "{}.{}".format(resource.__module__, resource.__name__) + self.proc = None + self.host = socket.gethostbyname(socket.gethostname()) + self.port = port or get_ephemeral_port() + self.root_url = "http://%s:%d" % (self.host, self.port) + + def __enter__(self): + self.proc = Popen( + [ + sys.executable, + "-u", + "-m", + "tests.mockserver", + self.resource, + "--port", + str(self.port), + ], + stdout=PIPE, + ) + assert self.proc.stdout is not None + self.proc.stdout.readline() + return self + + def __exit__(self, exc_type, exc_value, traceback): + assert self.proc is not None + self.proc.kill() + self.proc.wait() + time.sleep(0.2) + + def urljoin(self, path): + return self.root_url + path + + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument("resource") + parser.add_argument("--port", type=int) + args = parser.parse_args() + module_name, name = args.resource.rsplit(".", 1) + sys.path.append(".") + resource = getattr(import_module(module_name), name)() + # Typing issue: https://github.com/twisted/twisted/issues/9909 + http_port = reactor.listenTCP(args.port, Site(resource)) # type: ignore[attr-defined] + + def print_listening(): + host = http_port.getHost() + print( + "Mock server {} running at http://{}:{}".format( + resource, host.host, host.port + ) + ) + + # Typing issue: https://github.com/twisted/twisted/issues/9909 + reactor.callWhenRunning(print_listening) # type: ignore[attr-defined] + reactor.run() # type: ignore[attr-defined] + + +if __name__ == "__main__": + main() diff --git a/tests/test_async.py b/tests/test_async.py new file mode 100644 index 0000000..771c0f1 --- /dev/null +++ b/tests/test_async.py @@ -0,0 +1,341 @@ +import asyncio +from unittest.mock import AsyncMock + +import pytest + +from zyte_api import ( + AggressiveRetryFactory, + AsyncZyteAPI, + RequestError, + TooManyUndocumentedErrors, +) +from zyte_api._retry import ZyteAsyncRetrying +from zyte_api.aio.client import AsyncClient +from zyte_api.apikey import NoApiKey +from zyte_api.errors import ParsedError +from zyte_api.utils import USER_AGENT + + +@pytest.mark.parametrize( + ("client_cls",), + ( + (AsyncZyteAPI,), + (AsyncClient,), + ), +) +@pytest.mark.parametrize( + "user_agent,expected", + ( + ( + None, + USER_AGENT, + ), + ( + f"scrapy-zyte-api/0.11.1 {USER_AGENT}", + f"scrapy-zyte-api/0.11.1 {USER_AGENT}", + ), + ), +) +def test_user_agent(client_cls, user_agent, expected): + client = client_cls(api_key="123", api_url="http:\\test", user_agent=user_agent) + assert client.user_agent == expected + + +@pytest.mark.parametrize( + ("client_cls",), + ( + (AsyncZyteAPI,), + (AsyncClient,), + ), +) +def test_api_key(client_cls): + client_cls(api_key="a") + with pytest.raises(NoApiKey): + client_cls() + + +@pytest.mark.parametrize( + ("client_cls", "get_method"), + ( + (AsyncZyteAPI, "get"), + (AsyncClient, "request_raw"), + ), +) +@pytest.mark.asyncio +async def test_get(client_cls, get_method, mockserver): + client = client_cls(api_key="a", api_url=mockserver.urljoin("/")) + expected_result = { + "url": "https://a.example", + "httpResponseBody": "PGh0bWw+PGJvZHk+SGVsbG88aDE+V29ybGQhPC9oMT48L2JvZHk+PC9odG1sPg==", + } + actual_result = await getattr(client, get_method)( + {"url": "https://a.example", "httpResponseBody": True} + ) + assert actual_result == expected_result + + +@pytest.mark.parametrize( + ("client_cls", "get_method"), + ( + (AsyncZyteAPI, "get"), + (AsyncClient, "request_raw"), + ), +) +@pytest.mark.asyncio +async def test_get_request_error(client_cls, get_method, mockserver): + client = client_cls(api_key="a", api_url=mockserver.urljoin("/")) + with pytest.raises(RequestError) as request_error_info: + await getattr(client, get_method)( + {"url": "https://exception.example", "browserHtml": True}, + ) + parsed_error = request_error_info.value.parsed + assert isinstance(parsed_error, ParsedError) + assert parsed_error.data == { + "detail": "The authentication key is not valid or can't be matched.", + "status": 401, + "title": "Authentication Key Not Found", + "type": "/auth/key-not-found", + } + + +@pytest.mark.parametrize( + ("client_cls", "get_method"), + ( + (AsyncZyteAPI, "get"), + (AsyncClient, "request_raw"), + ), +) +@pytest.mark.asyncio +async def test_get_request_error_empty_body(client_cls, get_method, mockserver): + client = client_cls(api_key="a", api_url=mockserver.urljoin("/")) + with pytest.raises(RequestError) as request_error_info: + await getattr(client, get_method)( + {"url": "https://empty-body-exception.example", "browserHtml": True}, + ) + parsed_error = request_error_info.value.parsed + assert isinstance(parsed_error, ParsedError) + assert parsed_error.data is None + + +@pytest.mark.parametrize( + ("client_cls", "get_method"), + ( + (AsyncZyteAPI, "get"), + (AsyncClient, "request_raw"), + ), +) +@pytest.mark.asyncio +async def test_get_request_error_non_json(client_cls, get_method, mockserver): + client = client_cls(api_key="a", api_url=mockserver.urljoin("/")) + with pytest.raises(RequestError) as request_error_info: + await getattr(client, get_method)( + {"url": "https://nonjson-exception.example", "browserHtml": True}, + ) + parsed_error = request_error_info.value.parsed + assert isinstance(parsed_error, ParsedError) + assert parsed_error.data is None + + +@pytest.mark.parametrize( + ("client_cls", "get_method"), + ( + (AsyncZyteAPI, "get"), + (AsyncClient, "request_raw"), + ), +) +@pytest.mark.asyncio +async def test_get_request_error_unexpected_json(client_cls, get_method, mockserver): + client = client_cls(api_key="a", api_url=mockserver.urljoin("/")) + with pytest.raises(RequestError) as request_error_info: + await getattr(client, get_method)( + {"url": "https://array-exception.example", "browserHtml": True}, + ) + parsed_error = request_error_info.value.parsed + assert isinstance(parsed_error, ParsedError) + assert parsed_error.data is None + + +@pytest.mark.parametrize( + ("client_cls", "iter_method"), + ( + (AsyncZyteAPI, "iter"), + (AsyncClient, "request_parallel_as_completed"), + ), +) +@pytest.mark.asyncio +async def test_iter(client_cls, iter_method, mockserver): + client = client_cls(api_key="a", api_url=mockserver.urljoin("/")) + queries = [ + {"url": "https://a.example", "httpResponseBody": True}, + {"url": "https://exception.example", "httpResponseBody": True}, + {"url": "https://b.example", "httpResponseBody": True}, + ] + expected_results = [ + { + "url": "https://a.example", + "httpResponseBody": "PGh0bWw+PGJvZHk+SGVsbG88aDE+V29ybGQhPC9oMT48L2JvZHk+PC9odG1sPg==", + }, + Exception, + { + "url": "https://b.example", + "httpResponseBody": "PGh0bWw+PGJvZHk+SGVsbG88aDE+V29ybGQhPC9oMT48L2JvZHk+PC9odG1sPg==", + }, + ] + actual_results = [] + for future in getattr(client, iter_method)(queries): + try: + actual_result = await future + except Exception as exception: + actual_result = exception + actual_results.append(actual_result) + assert len(actual_results) == len(expected_results) + for actual_result in actual_results: + if isinstance(actual_result, Exception): + assert Exception in expected_results + else: + assert actual_result in expected_results + + +@pytest.mark.parametrize( + ("client_cls", "get_method", "iter_method"), + ( + (AsyncZyteAPI, "get", "iter"), + (AsyncClient, "request_raw", "request_parallel_as_completed"), + ), +) +@pytest.mark.asyncio +async def test_semaphore(client_cls, get_method, iter_method, mockserver): + client = client_cls(api_key="a", api_url=mockserver.urljoin("/")) + client._semaphore = AsyncMock(wraps=client._semaphore) + queries = [ + {"url": "https://a.example", "httpResponseBody": True}, + {"url": "https://b.example", "httpResponseBody": True}, + {"url": "https://c.example", "httpResponseBody": True}, + ] + futures = [ + getattr(client, get_method)(queries[0]), + next(iter(getattr(client, iter_method)(queries[1:2]))), + getattr(client, get_method)(queries[2]), + ] + for future in asyncio.as_completed(futures): + await future + assert client._semaphore.__aenter__.call_count == len(queries) + assert client._semaphore.__aexit__.call_count == len(queries) + + +@pytest.mark.asyncio +async def test_session_context_manager(mockserver): + client = AsyncZyteAPI(api_key="a", api_url=mockserver.urljoin("/")) + queries = [ + {"url": "https://a.example", "httpResponseBody": True}, + {"url": "https://exception.example", "httpResponseBody": True}, + {"url": "https://b.example", "httpResponseBody": True}, + ] + expected_results = [ + { + "url": "https://a.example", + "httpResponseBody": "PGh0bWw+PGJvZHk+SGVsbG88aDE+V29ybGQhPC9oMT48L2JvZHk+PC9odG1sPg==", + }, + Exception, + { + "url": "https://b.example", + "httpResponseBody": "PGh0bWw+PGJvZHk+SGVsbG88aDE+V29ybGQhPC9oMT48L2JvZHk+PC9odG1sPg==", + }, + ] + actual_results = [] + async with client.session() as session: + assert session._session.connector.limit == client.n_conn + actual_results.append(await session.get(queries[0])) + for future in session.iter(queries[1:]): + try: + result = await future + except Exception as e: + result = e + actual_results.append(result) + aiohttp_session = session._session + assert not aiohttp_session.closed + assert aiohttp_session.closed + + with pytest.raises(RuntimeError): + await session.get(queries[0]) + + with pytest.raises(RuntimeError): + future = next(iter(session.iter(queries[1:]))) + await future + + assert len(actual_results) == len(expected_results) + for actual_result in actual_results: + if isinstance(actual_result, Exception): + assert Exception in expected_results + else: + assert actual_result in expected_results + + +@pytest.mark.asyncio +async def test_session_no_context_manager(mockserver): + client = AsyncZyteAPI(api_key="a", api_url=mockserver.urljoin("/")) + queries = [ + {"url": "https://a.example", "httpResponseBody": True}, + {"url": "https://exception.example", "httpResponseBody": True}, + {"url": "https://b.example", "httpResponseBody": True}, + ] + expected_results = [ + { + "url": "https://a.example", + "httpResponseBody": "PGh0bWw+PGJvZHk+SGVsbG88aDE+V29ybGQhPC9oMT48L2JvZHk+PC9odG1sPg==", + }, + Exception, + { + "url": "https://b.example", + "httpResponseBody": "PGh0bWw+PGJvZHk+SGVsbG88aDE+V29ybGQhPC9oMT48L2JvZHk+PC9odG1sPg==", + }, + ] + actual_results = [] + session = client.session() + assert session._session.connector.limit == client.n_conn + actual_results.append(await session.get(queries[0])) + for future in session.iter(queries[1:]): + try: + result = await future + except Exception as e: + result = e + actual_results.append(result) + aiohttp_session = session._session + assert not aiohttp_session.closed + await session.close() + assert aiohttp_session.closed + + with pytest.raises(RuntimeError): + await session.get(queries[0]) + + with pytest.raises(RuntimeError): + future = next(iter(session.iter(queries[1:]))) + await future + + assert len(actual_results) == len(expected_results) + for actual_result in actual_results: + if isinstance(actual_result, Exception): + assert Exception in expected_results + else: + assert actual_result in expected_results + + +def test_retrying_class(): + """A descriptive exception is raised when creating a client with an + AsyncRetrying subclass or similar instead of an instance of it.""" + with pytest.raises(ValueError): + AsyncZyteAPI(api_key="foo", retrying=AggressiveRetryFactory) # type: ignore[arg-type] + + +@pytest.mark.asyncio +async def test_too_many_undocumented_errors(mockserver): + ZyteAsyncRetrying._total_outcomes = 9 + ZyteAsyncRetrying._total_undocumented_errors = 9 + + client = AsyncZyteAPI(api_key="a", api_url=mockserver.urljoin("/")) + + await client.get({"url": "https://a.example", "httpResponseBody": True}) + with pytest.raises(TooManyUndocumentedErrors): + await client.get({"url": "https://e500.example", "httpResponseBody": True}) + with pytest.raises(TooManyUndocumentedErrors): + await client.get({"url": "https://a.example", "httpResponseBody": True}) diff --git a/tests/test_main.py b/tests/test_main.py new file mode 100644 index 0000000..d8c4df2 --- /dev/null +++ b/tests/test_main.py @@ -0,0 +1,269 @@ +import json +import os +import subprocess +from json import JSONDecodeError +from tempfile import NamedTemporaryFile +from unittest.mock import AsyncMock, Mock, patch + +import pytest + +from zyte_api.__main__ import run +from zyte_api.aio.errors import RequestError + + +def get_json_content(file_object): + if not file_object: + return + + file_path = file_object.name + try: + with open(file_path, "r") as file: + return json.load(file) + except JSONDecodeError: + pass + + +def delete_file(file_path): + try: + os.remove(file_path) + print(f"File '{file_path}' has been deleted successfully.") + except FileNotFoundError: + print(f"File '{file_path}' not found. Unable to delete.") + + +def forbidden_domain_response(): + response_str = { + "type": "/download/temporary-error", + "title": "Temporary Downloading Error", + "status": 520, + "detail": "There is a downloading problem which might be temporary. Retry in N seconds from 'Retry-After' header or open a support ticket from https://support.zyte.com/support/tickets/new if it fails consistently.", + } + return response_str + + +async def fake_exception(value=True): + # Simulating an error condition + if value: + raise RequestError( + query={"url": "https://example.com", "httpResponseBody": True}, + response_content=json.dumps(forbidden_domain_response()).encode(), + request_info=None, + history=None, + ) + + create_session_mock = AsyncMock() + return await create_session_mock.coroutine() + + +@pytest.mark.parametrize( + "queries,expected_response,store_errors,exception", + ( + ( + # test if it stores the error(s) also by adding flag + ( + [ + { + "url": "https://forbidden.example", + "browserHtml": True, + "echoData": "https://forbidden.example", + } + ], + forbidden_domain_response(), + True, + fake_exception, + ), + # test with store_errors=False + ( + [ + { + "url": "https://forbidden.example", + "browserHtml": True, + "echoData": "https://forbidden.example", + } + ], + None, # expected response should be None + False, + fake_exception, + ), + ) + ), +) +@pytest.mark.asyncio +async def test_run(queries, expected_response, store_errors, exception): + tmp_path = "temporary_file.jsonl" + temporary_file = open(tmp_path, "w") + n_conn = 5 + api_url = "https://example.com" + api_key = "fake_key" + retry_errors = True + + # Create a mock for AsyncZyteAPI + async_client_mock = Mock() + + # Create a mock for the iter method + request_parallel_mock = Mock() + async_client_mock.return_value.iter = request_parallel_mock + + # Patch the AsyncZyteAPI class in __main__ with the mock + with ( + patch("zyte_api.__main__.AsyncZyteAPI", async_client_mock), + patch("zyte_api.__main__.create_session") as create_session_mock, + ): + # Mock create_session to return an AsyncMock + create_session_mock.return_value = AsyncMock() + + # Set up the AsyncZyteAPI instance to return the mocked iterator + async_client_mock.return_value.iter.return_value = [ + exception(), + ] + + # Call the run function with the mocked AsyncZyteAPI + await run( + queries=queries, + out=temporary_file, + n_conn=n_conn, + api_url=api_url, + api_key=api_key, + retry_errors=retry_errors, + store_errors=store_errors, + ) + + assert get_json_content(temporary_file) == expected_response + os.unlink(tmp_path) + + +@pytest.mark.asyncio +async def test_run_stop_on_errors_false(mockserver): + queries = [{"url": "https://exception.example", "httpResponseBody": True}] + with NamedTemporaryFile("w") as output_file: + with pytest.warns( + DeprecationWarning, match=r"^The stop_on_errors parameter is deprecated\.$" + ): + await run( + queries=queries, + out=output_file, + n_conn=1, + api_url=mockserver.urljoin("/"), + api_key="a", + stop_on_errors=False, + ) + + +@pytest.mark.asyncio +async def test_run_stop_on_errors_true(mockserver): + query = {"url": "https://exception.example", "httpResponseBody": True} + queries = [query] + with NamedTemporaryFile("w") as output_file: + with pytest.warns( + DeprecationWarning, match=r"^The stop_on_errors parameter is deprecated\.$" + ): + with pytest.raises(RequestError) as exc_info: + await run( + queries=queries, + out=output_file, + n_conn=1, + api_url=mockserver.urljoin("/"), + api_key="a", + stop_on_errors=True, + ) + assert exc_info.value.query == query + + +def _run(*, input, mockserver, cli_params=None): + cli_params = cli_params or tuple() + with NamedTemporaryFile("w") as url_list: + url_list.write(input) + url_list.flush() + # Note: Using “python -m zyte_api” instead of “zyte-api” enables + # coverage tracking to work. + result = subprocess.run( + [ + "python", + "-m", + "zyte_api", + "--api-key", + "a", + "--api-url", + mockserver.urljoin("/"), + url_list.name, + *cli_params, + ], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + return result + + +def test_empty_input(mockserver): + result = _run(input="", mockserver=mockserver) + assert result.returncode + assert result.stdout == b"" + assert result.stderr == b"No input queries found. Is the input file empty?\n" + + +def test_intype_txt_implicit(mockserver): + result = _run(input="https://a.example", mockserver=mockserver) + assert not result.returncode + assert ( + result.stdout + == b'{"url": "https://a.example", "browserHtml": "Hello

World!

"}\n' + ) + + +def test_intype_txt_explicit(mockserver): + result = _run( + input="https://a.example", mockserver=mockserver, cli_params=["--intype", "txt"] + ) + assert not result.returncode + assert ( + result.stdout + == b'{"url": "https://a.example", "browserHtml": "Hello

World!

"}\n' + ) + + +def test_intype_jsonl_implicit(mockserver): + result = _run( + input='{"url": "https://a.example", "browserHtml": true}', mockserver=mockserver + ) + assert not result.returncode + assert ( + result.stdout + == b'{"url": "https://a.example", "browserHtml": "Hello

World!

"}\n' + ) + + +def test_intype_jsonl_explicit(mockserver): + result = _run( + input='{"url": "https://a.example", "browserHtml": true}', + mockserver=mockserver, + cli_params=["--intype", "jl"], + ) + assert not result.returncode + assert ( + result.stdout + == b'{"url": "https://a.example", "browserHtml": "Hello

World!

"}\n' + ) + + +@pytest.mark.flaky(reruns=16) +def test_limit_and_shuffle(mockserver): + result = _run( + input="https://a.example\nhttps://b.example", + mockserver=mockserver, + cli_params=["--limit", "1", "--shuffle"], + ) + assert not result.returncode + assert ( + result.stdout + == b'{"url": "https://b.example", "browserHtml": "Hello

World!

"}\n' + ) + + +def test_run_non_json_response(mockserver): + result = _run( + input="https://nonjson.example", + mockserver=mockserver, + ) + assert not result.returncode + assert result.stdout == b"" + assert b"json.decoder.JSONDecodeError" in result.stderr diff --git a/tests/test_retry.py b/tests/test_retry.py new file mode 100644 index 0000000..a86fc9b --- /dev/null +++ b/tests/test_retry.py @@ -0,0 +1,524 @@ +from collections import deque +from copy import copy +from unittest.mock import patch + +import pytest +from aiohttp.client_exceptions import ServerConnectionError +from tenacity import AsyncRetrying + +from zyte_api import ( + AggressiveRetryFactory, + AsyncZyteAPI, + RequestError, + RetryFactory, + TooManyUndocumentedErrors, + aggressive_retrying, + zyte_api_retrying, +) +from zyte_api._retry import ZyteAsyncRetrying + +from .mockserver import DropResource, MockServer + + +def reset_totals(): + ZyteAsyncRetrying._total_outcomes = 0 + ZyteAsyncRetrying._total_undocumented_errors = 0 + + +def test_deprecated_imports(): + from zyte_api import RetryFactory, zyte_api_retrying + from zyte_api.aio.retry import RetryFactory as DeprecatedRetryFactory + from zyte_api.aio.retry import zyte_api_retrying as deprecated_zyte_api_retrying + + assert RetryFactory is DeprecatedRetryFactory + assert zyte_api_retrying is deprecated_zyte_api_retrying + + +UNSET = object() + + +class OutlierException(RuntimeError): + pass + + +@pytest.mark.parametrize( + ("value", "exception"), + ( + (UNSET, OutlierException), + (True, OutlierException), + (False, RequestError), + ), +) +@pytest.mark.asyncio +async def test_get_handle_retries(value, exception, mockserver): + kwargs = {} + if value is not UNSET: + kwargs["handle_retries"] = value + + def broken_stop(_): + raise OutlierException + + retrying = AsyncRetrying(stop=broken_stop) + client = AsyncZyteAPI( + api_key="a", api_url=mockserver.urljoin("/"), retrying=retrying + ) + with pytest.raises(exception): + await client.get( + {"url": "https://exception.example", "browserHtml": True}, + **kwargs, + ) + + +@pytest.mark.parametrize( + ("retry_factory", "status", "waiter"), + ( + (RetryFactory, 429, "throttling"), + (RetryFactory, 520, "temporary_download_error"), + (AggressiveRetryFactory, 429, "throttling"), + (AggressiveRetryFactory, 500, "undocumented_error"), + (AggressiveRetryFactory, 520, "download_error"), + ), +) +@pytest.mark.asyncio +async def test_retry_wait(retry_factory, status, waiter, mockserver): + + def broken_wait(self, retry_state): + raise OutlierException + + class CustomRetryFactory(retry_factory): # type: ignore[valid-type, misc] + pass + + setattr(CustomRetryFactory, f"{waiter}_wait", broken_wait) + retrying = CustomRetryFactory().build() + client = AsyncZyteAPI( + api_key="a", api_url=mockserver.urljoin("/"), retrying=retrying + ) + with pytest.raises(OutlierException): + await client.get( + {"url": f"https://e{status}.example", "browserHtml": True}, + ) + + +@pytest.mark.parametrize( + ("retry_factory",), + ( + (RetryFactory,), + (AggressiveRetryFactory,), + ), +) +@pytest.mark.asyncio +async def test_retry_wait_network_error(retry_factory): + waiter = "network_error" + + def broken_wait(self, retry_state): + raise OutlierException + + class CustomRetryFactory(retry_factory): # type: ignore[valid-type, misc] + pass + + setattr(CustomRetryFactory, f"{waiter}_wait", broken_wait) + + retrying = CustomRetryFactory().build() + with MockServer(resource=DropResource) as mockserver: + client = AsyncZyteAPI( + api_key="a", api_url=mockserver.urljoin("/"), retrying=retrying + ) + with pytest.raises(OutlierException): + await client.get( + {"url": "https://example.com", "browserHtml": True}, + ) + + +def mock_request_error(*, status=200): + return RequestError( + history=None, + request_info=None, + response_content=None, + status=status, + query={}, + ) + + +# Number of times to test request errors that must be retried forever. +FOREVER_TIMES = 100 + + +class fast_forward: + def __init__(self, time): + self.time = time + + +class scale: + + def __init__(self, factor): + self.factor = factor + + def __call__(self, number, add=0): + return int(number * self.factor) + add + + +@pytest.mark.parametrize( + ("retrying", "outcomes", "exhausted"), + ( + # Shared behaviors of all retry policies + *( + (retrying, outcomes, exhausted) + for retrying in (zyte_api_retrying, aggressive_retrying) + for outcomes, exhausted in ( + # Rate limiting is retried forever. + ( + (mock_request_error(status=429),) * FOREVER_TIMES, + False, + ), + ( + (mock_request_error(status=503),) * FOREVER_TIMES, + False, + ), + # Network errors are retried until there have only been network + # errors (of any kind) for 15 minutes straight or more. + ( + ( + ServerConnectionError(), + fast_forward(15 * 60 - 1), + ServerConnectionError(), + ), + False, + ), + ( + ( + ServerConnectionError(), + fast_forward(15 * 60), + ServerConnectionError(), + ), + True, + ), + ( + ( + mock_request_error(status=429), + fast_forward(15 * 60 - 1), + ServerConnectionError(), + ), + False, + ), + ( + ( + mock_request_error(status=429), + fast_forward(15 * 60), + ServerConnectionError(), + ), + False, + ), + ( + ( + ServerConnectionError(), + fast_forward(7 * 60), + mock_request_error(status=429), + fast_forward(8 * 60 - 1), + ServerConnectionError(), + ), + False, + ), + ( + ( + ServerConnectionError(), + fast_forward(7 * 60), + mock_request_error(status=429), + fast_forward(8 * 60), + ServerConnectionError(), + ), + False, + ), + ( + ( + ServerConnectionError(), + fast_forward(7 * 60), + mock_request_error(status=429), + fast_forward(8 * 60), + ServerConnectionError(), + fast_forward(15 * 60 - 1), + ServerConnectionError(), + ), + False, + ), + ( + ( + ServerConnectionError(), + fast_forward(7 * 60), + mock_request_error(status=429), + fast_forward(8 * 60), + ServerConnectionError(), + fast_forward(15 * 60), + ServerConnectionError(), + ), + True, + ), + ) + ), + # Scaled behaviors, where the default retry policy uses half as many + # attempts as the aggressive retry policy. + *( + (retrying, outcomes, exhausted) + for retrying, scaled in ( + (zyte_api_retrying, scale(0.5)), + (aggressive_retrying, scale(1)), + ) + for outcomes, exhausted in ( + # Temporary download errors are retried until they have + # happened 8*factor times in total. Permanent download errors + # also count towards that limit. + ( + (mock_request_error(status=520),) * scaled(8, -1), + False, + ), + ( + (mock_request_error(status=520),) * scaled(8), + True, + ), + ( + ( + *(mock_request_error(status=429),) * scaled(8, -2), + mock_request_error(status=520), + ), + False, + ), + ( + ( + *(mock_request_error(status=429),) * scaled(8, -1), + mock_request_error(status=520), + ), + False, + ), + ( + ( + *( + mock_request_error(status=429), + mock_request_error(status=520), + ) + * scaled(8, -1), + ), + False, + ), + ( + ( + *( + mock_request_error(status=429), + mock_request_error(status=520), + ) + * scaled(8), + ), + True, + ), + ( + ( + *(mock_request_error(status=520),) * scaled(8, -3), + *(mock_request_error(status=521),) * 1, + *(mock_request_error(status=520),) * 1, + ), + False, + ), + ( + ( + *(mock_request_error(status=520),) * scaled(8, -2), + *(mock_request_error(status=521),) * 1, + *(mock_request_error(status=520),) * 1, + ), + True, + ), + ( + ( + *(mock_request_error(status=520),) * scaled(8, -2), + *(mock_request_error(status=521),) * 1, + ), + False, + ), + ( + ( + *(mock_request_error(status=520),) * scaled(8, -1), + *(mock_request_error(status=521),) * 1, + ), + True, + ), + # Permanent download errors are retried until they have + # happened 4*factor times in total. + ( + (*(mock_request_error(status=521),) * scaled(4, -1),), + False, + ), + ( + (*(mock_request_error(status=521),) * scaled(4),), + True, + ), + # Undocumented 5xx errors are retried until they have happened + # 4*factor times. + *( + scenario + for status in ( + 500, + 502, + 504, + ) + for scenario in ( + ( + (*(mock_request_error(status=status),) * scaled(4, -1),), + False, + ), + ( + (*(mock_request_error(status=status),) * scaled(4),), + True, + ), + ( + ( + *(mock_request_error(status=status),) * scaled(4, -2), + mock_request_error(status=429), + mock_request_error(status=503), + ServerConnectionError(), + mock_request_error(status=status), + ), + False, + ), + ( + ( + *(mock_request_error(status=status),) * scaled(4, -1), + mock_request_error(status=429), + mock_request_error(status=503), + ServerConnectionError(), + mock_request_error(status=status), + ), + True, + ), + ( + ( + mock_request_error(status=555), + *(mock_request_error(status=status),) * scaled(4, -2), + ), + False, + ), + ( + ( + mock_request_error(status=555), + *(mock_request_error(status=status),) * scaled(4, -1), + ), + True, + ), + ) + ), + ) + ), + ), +) +@pytest.mark.asyncio +@patch("time.monotonic") +async def test_retry_stop(monotonic_mock, retrying, outcomes, exhausted): + reset_totals() + monotonic_mock.return_value = 0 + last_outcome = outcomes[-1] + outcomes = deque(outcomes) + + def wait(retry_state): + return 0.0 + + retrying = copy(retrying) + retrying.wait = wait + + async def run(): + while True: + try: + outcome = outcomes.popleft() + except IndexError: + return + else: + if isinstance(outcome, fast_forward): + monotonic_mock.return_value += outcome.time + continue + raise outcome + + run = retrying.wraps(run) + try: + await run() + except Exception as outcome: + assert exhausted, outcome + assert outcome is last_outcome + else: + assert not exhausted + + +mock_good_response = object() + + +@pytest.mark.parametrize( + ("retrying", "outcome_sequences", "exhausted"), + ( + # A ZyteAPIError exception is raised when, of all responses, + # undocumented 5xx responses are at least 10 and at least 1%. + # + # 9, 100%: + ( + zyte_api_retrying, + ((mock_request_error(status=500),),) * 9, + False, + ), + # 10, 100%: + ( + zyte_api_retrying, + ((mock_request_error(status=500),),) * 10, + True, + ), + # 10, <1%: + ( + zyte_api_retrying, + ((mock_request_error(status=500),),) * 9 # 9 / 18 (50%) + + ((mock_good_response,),) * (982) # + 0 / 982 = 9 / 1000 (0.9%) + + ((mock_request_error(status=500),),) * 1, # + 1 / 1 = 10 / 1001 (0.999…%) + False, + ), + # 10, ≥1%: + ( + zyte_api_retrying, + ((mock_request_error(status=500),),) * 9 # 9 / 18 (50%) + + ((mock_good_response,),) * (981) # + 0 / 981 = 9 / 999 (0.9%) + + ((mock_request_error(status=500),),) * 1, # + 1 / 1 = 10 / 1000 (1%) + True, + ), + ), +) +@pytest.mark.asyncio +@patch("time.monotonic") +async def test_retry_stop_global_parallel( + monotonic_mock, retrying, outcome_sequences, exhausted +): + reset_totals() + monotonic_mock.return_value = 0 + last_outcome = outcome_sequences[-1][-1] + outcome_sequences = tuple(deque(outcomes) for outcomes in outcome_sequences) + + def wait(retry_state): + return 0.0 + + retrying = copy(retrying) + retrying.wait = wait + + async def run(outcomes): + while True: + try: + outcome = outcomes.popleft() + except IndexError: + return + else: + if isinstance(outcome, fast_forward): + monotonic_mock.return_value += outcome.time + continue + if outcome is mock_good_response: + continue + raise outcome + + run = retrying.wraps(run) + + try: + for outcomes in outcome_sequences: + await run(outcomes) + except Exception as exc: + assert exhausted, exc + assert isinstance(exc, TooManyUndocumentedErrors) + assert exc.outcome is last_outcome + else: + assert not exhausted diff --git a/tests/test_sync.py b/tests/test_sync.py new file mode 100644 index 0000000..8e014e2 --- /dev/null +++ b/tests/test_sync.py @@ -0,0 +1,150 @@ +from types import GeneratorType +from unittest.mock import AsyncMock + +import pytest + +from zyte_api import ZyteAPI +from zyte_api.apikey import NoApiKey + + +def test_api_key(): + ZyteAPI(api_key="a") + with pytest.raises(NoApiKey): + ZyteAPI() + + +def test_get(mockserver): + client = ZyteAPI(api_key="a", api_url=mockserver.urljoin("/")) + expected_result = { + "url": "https://a.example", + "httpResponseBody": "PGh0bWw+PGJvZHk+SGVsbG88aDE+V29ybGQhPC9oMT48L2JvZHk+PC9odG1sPg==", + } + actual_result = client.get({"url": "https://a.example", "httpResponseBody": True}) + assert actual_result == expected_result + + +def test_iter(mockserver): + client = ZyteAPI(api_key="a", api_url=mockserver.urljoin("/")) + queries = [ + {"url": "https://a.example", "httpResponseBody": True}, + {"url": "https://exception.example", "httpResponseBody": True}, + {"url": "https://b.example", "httpResponseBody": True}, + ] + expected_results = [ + { + "url": "https://a.example", + "httpResponseBody": "PGh0bWw+PGJvZHk+SGVsbG88aDE+V29ybGQhPC9oMT48L2JvZHk+PC9odG1sPg==", + }, + Exception, + { + "url": "https://b.example", + "httpResponseBody": "PGh0bWw+PGJvZHk+SGVsbG88aDE+V29ybGQhPC9oMT48L2JvZHk+PC9odG1sPg==", + }, + ] + actual_results = client.iter(queries) + assert isinstance(actual_results, GeneratorType) + actual_results_list = list(actual_results) + assert len(actual_results_list) == len(expected_results) + for actual_result in actual_results_list: + if isinstance(actual_result, Exception): + assert Exception in expected_results + else: + assert actual_result in expected_results + + +def test_semaphore(mockserver): + client = ZyteAPI(api_key="a", api_url=mockserver.urljoin("/")) + client._async_client._semaphore = AsyncMock(wraps=client._async_client._semaphore) + queries = [ + {"url": "https://a.example", "httpResponseBody": True}, + {"url": "https://b.example", "httpResponseBody": True}, + {"url": "https://c.example", "httpResponseBody": True}, + ] + client.get(queries[0]) + next(iter(client.iter(queries[1:2]))) + client.get(queries[2]) + assert client._async_client._semaphore.__aenter__.call_count == len(queries) + assert client._async_client._semaphore.__aexit__.call_count == len(queries) + + +def test_session_context_manager(mockserver): + client = ZyteAPI(api_key="a", api_url=mockserver.urljoin("/")) + queries = [ + {"url": "https://a.example", "httpResponseBody": True}, + {"url": "https://exception.example", "httpResponseBody": True}, + {"url": "https://b.example", "httpResponseBody": True}, + ] + expected_results = [ + { + "url": "https://a.example", + "httpResponseBody": "PGh0bWw+PGJvZHk+SGVsbG88aDE+V29ybGQhPC9oMT48L2JvZHk+PC9odG1sPg==", + }, + Exception, + { + "url": "https://b.example", + "httpResponseBody": "PGh0bWw+PGJvZHk+SGVsbG88aDE+V29ybGQhPC9oMT48L2JvZHk+PC9odG1sPg==", + }, + ] + actual_results = [] + with client.session() as session: + assert session._session.connector.limit == client._async_client.n_conn + actual_results.append(session.get(queries[0])) + for result in session.iter(queries[1:]): + actual_results.append(result) + aiohttp_session = session._session + assert not aiohttp_session.closed + assert aiohttp_session.closed + + with pytest.raises(RuntimeError): + session.get(queries[0]) + + assert isinstance(next(iter(session.iter(queries[1:]))), RuntimeError) + + assert len(actual_results) == len(expected_results) + for actual_result in actual_results: + if isinstance(actual_result, Exception): + assert Exception in expected_results + else: + assert actual_result in expected_results + + +def test_session_no_context_manager(mockserver): + client = ZyteAPI(api_key="a", api_url=mockserver.urljoin("/")) + queries = [ + {"url": "https://a.example", "httpResponseBody": True}, + {"url": "https://exception.example", "httpResponseBody": True}, + {"url": "https://b.example", "httpResponseBody": True}, + ] + expected_results = [ + { + "url": "https://a.example", + "httpResponseBody": "PGh0bWw+PGJvZHk+SGVsbG88aDE+V29ybGQhPC9oMT48L2JvZHk+PC9odG1sPg==", + }, + Exception, + { + "url": "https://b.example", + "httpResponseBody": "PGh0bWw+PGJvZHk+SGVsbG88aDE+V29ybGQhPC9oMT48L2JvZHk+PC9odG1sPg==", + }, + ] + actual_results = [] + session = client.session() + assert session._session.connector.limit == client._async_client.n_conn + actual_results.append(session.get(queries[0])) + for result in session.iter(queries[1:]): + actual_results.append(result) + aiohttp_session = session._session + assert not aiohttp_session.closed + session.close() + assert aiohttp_session.closed + + with pytest.raises(RuntimeError): + session.get(queries[0]) + + assert isinstance(next(iter(session.iter(queries[1:]))), RuntimeError) + + assert len(actual_results) == len(expected_results) + for actual_result in actual_results: + if isinstance(actual_result, Exception): + assert Exception in expected_results + else: + assert actual_result in expected_results diff --git a/tests/test_utils.py b/tests/test_utils.py index af3c739..9228c57 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -1,6 +1,18 @@ import pytest +from aiohttp import TCPConnector +from pytest import raises -from zyte_api.utils import _guess_intype +from zyte_api._utils import create_session +from zyte_api.utils import _guess_intype, _process_query + + +@pytest.mark.asyncio +async def test_create_session_custom_connector(): + # Declare a connector with a random parameter to avoid it matching the + # default one. + custom_connector = TCPConnector(limit=1850) + session = create_session(connector=custom_connector) + assert session.connector == custom_connector @pytest.mark.parametrize( @@ -55,3 +67,59 @@ ) def test_guess_intype(file_name, first_line, expected): assert _guess_intype(file_name, [first_line]) == expected + + +@pytest.mark.parametrize( + "input,output", + ( + # Unsafe URLs in the url field are modified, while left untouched on + # other fields. + ( + { + "a": {"b", "c"}, + "d": "https://example.com/ a", + "url": "https://example.com/ a", + }, + { + "a": {"b", "c"}, + "d": "https://example.com/ a", + "url": "https://example.com/%20a", + }, + ), + # Safe URLs are returned unmodified. + ( + {"url": "https://example.com"}, + {"url": "https://example.com"}, + ), + # URL fragments are kept. + ( + {"url": "https://example.com#a"}, + {"url": "https://example.com#a"}, + ), + # If no URL is passed, nothing is done. + ( + {"a": "b"}, + {"a": "b"}, + ), + # NOTE: We use w3lib.url.safe_url_string for escaping. Tests covering + # the URL escaping logic exist upstream. + ), +) +def test_process_query(input, output): + assert _process_query(input) == output + + +def test_process_query_bytes(): + with raises(ValueError): + _process_query({"url": b"https://example.com"}) + + +@pytest.mark.asyncio # https://github.com/aio-libs/aiohttp/pull/1468 +async def test_deprecated_create_session(): + from zyte_api.aio.client import create_session as _create_session + + with pytest.warns( + DeprecationWarning, + match=r"^zyte_api\.aio\.client\.create_session is deprecated", + ): + _create_session() diff --git a/tox.ini b/tox.ini index c4cebf1..33663b7 100644 --- a/tox.ini +++ b/tox.ini @@ -1,32 +1,49 @@ [tox] -envlist = py37,py38,py39,py310,mypy,docs +envlist = py39,py310,py311,py312,py313,mypy,docs,twine [testenv] deps = - -rrequirements-test.txt + pytest + pytest-asyncio + pytest-cov + pytest-rerunfailures + pytest-twisted + responses + twisted commands = py.test \ - --cov-report=term --cov-report=html --cov-report=xml --cov=zyte_api \ + --cov-report=term-missing --cov-report=html --cov-report=xml --cov=zyte_api \ --doctest-modules \ {posargs:zyte_api tests} [testenv:mypy] deps = - mypy==0.910 + mypy==1.12.0 + pytest==8.3.3 + Twisted==24.7.0 + types-tqdm==4.66.0.20240417 -commands = mypy --ignore-missing-imports --no-warn-no-return \ +commands = mypy --ignore-missing-imports \ zyte_api \ tests -[docs] +[testenv:docs] changedir = docs deps = -rdocs/requirements.txt - -[testenv:docs] basepython = python3 -changedir = {[docs]changedir} -deps = {[docs]deps} commands = sphinx-build -W -b html . {envtmpdir}/html + +[testenv:pre-commit] +deps = pre-commit +commands = pre-commit run --all-files --show-diff-on-failure + +[testenv:twine] +deps = + twine==5.1.1 + build==1.2.2 +commands = + python -m build --sdist + twine check dist/* diff --git a/zyte_api/__init__.py b/zyte_api/__init__.py index 74f9aaa..0e9b55b 100644 --- a/zyte_api/__init__.py +++ b/zyte_api/__init__.py @@ -1,3 +1,24 @@ """ -Python client libraries and command line utilities for Zyte Data API -""" \ No newline at end of file +Python client libraries and command line utilities for Zyte API +""" + +from ._async import AsyncZyteAPI +from ._errors import RequestError +from ._retry import AggressiveRetryFactory, RetryFactory, TooManyUndocumentedErrors +from ._retry import aggressive_retrying as _aggressive_retrying +from ._retry import ( + stop_after_uninterrupted_delay, + stop_on_count, + stop_on_download_error, +) +from ._retry import zyte_api_retrying as _zyte_api_retrying +from ._sync import ZyteAPI +from .errors import ParsedError + +# We re-define the variables here for Sphinx to pick the documentation. + +#: :ref:`Default retry policy `. +zyte_api_retrying = _zyte_api_retrying + +#: :ref:`Aggresive retry policy `. +aggressive_retrying = _aggressive_retrying diff --git a/zyte_api/__main__.py b/zyte_api/__main__.py index b47b5f4..94bab28 100644 --- a/zyte_api/__main__.py +++ b/zyte_api/__main__.py @@ -1,51 +1,86 @@ -""" Basic command-line interface for Zyte Data APIs. """ +""" Basic command-line interface for Zyte API. """ import argparse -import json -import sys import asyncio +import json import logging import random +import sys +from warnings import warn import tqdm +from tenacity import retry_if_exception -from zyte_api.aio.client import ( - create_session, - AsyncClient -) -from zyte_api.constants import ENV_VARIABLE, API_URL +from zyte_api import RequestError +from zyte_api._async import AsyncZyteAPI +from zyte_api._retry import RetryFactory, _is_throttling_error +from zyte_api._utils import create_session +from zyte_api.constants import API_URL from zyte_api.utils import _guess_intype -logger = logging.getLogger('zyte_api') +class DontRetryErrorsFactory(RetryFactory): + retry_condition = retry_if_exception(_is_throttling_error) + + +logger = logging.getLogger("zyte_api") _UNSET = object() -async def run(queries, out, n_conn, stop_on_errors, api_url, - api_key=None): +async def run( + queries, + out, + *, + n_conn, + stop_on_errors=_UNSET, + api_url, + api_key=None, + retry_errors=True, + store_errors=None, +): + if stop_on_errors is not _UNSET: + warn( + "The stop_on_errors parameter is deprecated.", + DeprecationWarning, + stacklevel=2, + ) + else: + stop_on_errors = False - client = AsyncClient(n_conn=n_conn, api_key=api_key, api_url=api_url) + def write_output(content): + json.dump(content, out, ensure_ascii=False) + out.write("\n") + out.flush() + pbar.update() + + retrying = None if retry_errors else DontRetryErrorsFactory().build() + client = AsyncZyteAPI( + n_conn=n_conn, api_key=api_key, api_url=api_url, retrying=retrying + ) async with create_session(connection_pool_size=n_conn) as session: - result_iter = client.request_parallel_as_completed( + result_iter = client.iter( queries=queries, session=session, ) - pbar = tqdm.tqdm(smoothing=0, leave=True, total=len(queries), miniters=1, - unit="url") + pbar = tqdm.tqdm( + smoothing=0, leave=True, total=len(queries), miniters=1, unit="url" + ) pbar.set_postfix_str(str(client.agg_stats)) try: for fut in result_iter: try: result = await fut - json.dump(result, out, ensure_ascii=False) - out.write("\n") - out.flush() - pbar.update() except Exception as e: + if store_errors and isinstance(e, RequestError): + write_output(e.parsed.data) + if stop_on_errors: raise - logger.error(str(e)) + + logger.exception("Exception raised during response handling") + else: + write_output(result) finally: pbar.set_postfix_str(str(client.agg_stats)) finally: @@ -59,16 +94,15 @@ async def run(queries, out, n_conn, stop_on_errors, api_url, def read_input(input_fp, intype): assert intype in {"txt", "jl", _UNSET} lines = input_fp.readlines() + if not lines: + return [] if intype is _UNSET: intype = _guess_intype(input_fp.name, lines) if intype == "txt": urls = [u.strip() for u in lines if u.strip()] records = [{"url": url, "browserHtml": True} for url in urls] else: - records = [ - json.loads(line.strip()) - for line in lines if line.strip() - ] + records = [json.loads(line.strip()) for line in lines if line.strip()] # Automatically replicating the url in echoData to being able to # to match URLs with content in the responses for record in records: @@ -76,72 +110,121 @@ def read_input(input_fp, intype): return records -def _main(program_name='zyte-api'): - """ Process urls from input file through Zyte Data API """ +def _get_argument_parser(program_name="zyte-api"): p = argparse.ArgumentParser( prog=program_name, - description=""" - Process input URLs from a file using Zyte Data API. - """, + description="Send Zyte API requests.", ) - p.add_argument("input", - type=argparse.FileType("r", encoding='utf8'), - help="Input file with urls, url per line by default. The " - "Format can be changed using `--intype` argument.") - p.add_argument("--intype", default=_UNSET, choices=["txt", "jl"], - help="Type of the input file. " - "Allowed values are 'txt' (1 URL per line) and 'jl' " - "(JSON Lines file, each object describing the " - "parameters of a request). " - "If not specified, the input type is guessed based on " - "the input file name extension (.jl, .jsonl, .txt) or " - "content, and assumed to be txt if guessing fails.") - p.add_argument("--limit", type=int, - help="Max number of URLs to take from the input") - p.add_argument("--output", "-o", - default=sys.stdout, - type=argparse.FileType("w", encoding='utf8'), - help=".jsonlines file to store extracted data. " - "By default, results are printed to stdout.") - p.add_argument("--n-conn", type=int, default=20, - help="number of connections to the API server " - "(default: %(default)s)") - p.add_argument("--api-key", - help="Zyte Data API key. " - "You can also set %s environment variable instead " - "of using this option." % ENV_VARIABLE) - p.add_argument("--api-url", - help="Zyte Data API endpoint (default: %(default)s)", - default=API_URL) - p.add_argument("--loglevel", "-L", default="INFO", - choices=["DEBUG", "INFO", "WARNING", "ERROR"], - help="log level (default: %(default)s)") - p.add_argument("--shuffle", help="Shuffle input URLs", action="store_true") - args = p.parse_args() - logging.basicConfig( - stream=sys.stderr, - level=getattr(logging, args.loglevel) + p.add_argument( + "INPUT", + type=argparse.FileType("r", encoding="utf8"), + help=( + "Path to an input file (see 'Command-line client > Input file' in " + "the docs for details)." + ), + ) + p.add_argument( + "--intype", + default=_UNSET, + choices=["txt", "jl"], + help=( + "Type of the input file, either 'txt' (plain text) or 'jl' (JSON " + "Lines).\n" + "\n" + "If not specified, the input type is guessed based on the input " + "file extension ('.jl', '.jsonl', or '.txt'), or in its content, " + "with 'txt' as fallback." + ), ) + p.add_argument("--limit", type=int, help="Maximum number of requests to send.") + p.add_argument( + "--output", + "-o", + default=sys.stdout, + type=argparse.FileType("w", encoding="utf8"), + help=( + "Path for the output file. Results are written into the output " + "file in JSON Lines format.\n" + "\n" + "If not specified, results are printed to the standard output." + ), + ) + p.add_argument( + "--n-conn", + type=int, + default=20, + help=("Number of concurrent connections to use (default: %(default)s)."), + ) + p.add_argument( + "--api-key", + help="Zyte API key.", + ) + p.add_argument( + "--api-url", help="Zyte API endpoint (default: %(default)s).", default=API_URL + ) + p.add_argument( + "--loglevel", + "-L", + default="INFO", + choices=["DEBUG", "INFO", "WARNING", "ERROR"], + help="Log level (default: %(default)s).", + ) + p.add_argument( + "--shuffle", + help="Shuffle request order.", + action="store_true", + ) + p.add_argument( + "--dont-retry-errors", + help="Do not retry unsuccessful responses and network errors, only rate-limiting responses.", + action="store_true", + ) + p.add_argument( + "--store-errors", + help=( + "Store error responses in the output file.\n" + "\n" + "If omitted, only successful responses are stored." + ), + action="store_true", + ) + return p + + +def _main(program_name="zyte-api"): + """Process urls from input file through Zyte API""" + p = _get_argument_parser(program_name=program_name) + args = p.parse_args() + logging.basicConfig(stream=sys.stderr, level=getattr(logging, args.loglevel)) + + queries = read_input(args.INPUT, args.intype) + if not queries: + print("No input queries found. Is the input file empty?", file=sys.stderr) + sys.exit(-1) - queries = read_input(args.input, args.intype) if args.shuffle: random.shuffle(queries) if args.limit: - queries = queries[:args.limit] + queries = queries[: args.limit] - logger.info(f"Loaded {len(queries)} urls from {args.input.name}; shuffled: {args.shuffle}") - logger.info(f"Running Zyte Data API (connections: {args.n_conn})") + logger.info( + f"Loaded {len(queries)} urls from {args.INPUT.name}; shuffled: {args.shuffle}" + ) + logger.info(f"Running Zyte API (connections: {args.n_conn})") loop = asyncio.get_event_loop() - coro = run(queries, - out=args.output, - n_conn=args.n_conn, - stop_on_errors=False, - api_url=args.api_url, - api_key=args.api_key) + coro = run( + queries, + out=args.output, + n_conn=args.n_conn, + api_url=args.api_url, + api_key=args.api_key, + retry_errors=not args.dont_retry_errors, + store_errors=args.store_errors, + ) loop.run_until_complete(coro) loop.close() -if __name__ == '__main__': - _main(program_name='python -m zyte_api') +if __name__ == "__main__": + _main(program_name="python -m zyte_api") diff --git a/zyte_api/__version__.py b/zyte_api/__version__.py index 7fd229a..906d362 100644 --- a/zyte_api/__version__.py +++ b/zyte_api/__version__.py @@ -1 +1 @@ -__version__ = '0.2.0' +__version__ = "0.6.0" diff --git a/zyte_api/_async.py b/zyte_api/_async.py new file mode 100644 index 0000000..f6f30e6 --- /dev/null +++ b/zyte_api/_async.py @@ -0,0 +1,228 @@ +from __future__ import annotations + +import asyncio +import time +from asyncio import Future +from functools import partial +from typing import TYPE_CHECKING, Any, Dict, Iterator, List, Optional + +import aiohttp +from tenacity import AsyncRetrying + +from ._errors import RequestError +from ._retry import TooManyUndocumentedErrors, zyte_api_retrying +from ._utils import _AIO_API_TIMEOUT, create_session +from .apikey import get_apikey +from .constants import API_URL +from .stats import AggStats, ResponseStats +from .utils import USER_AGENT, _process_query + +if TYPE_CHECKING: + _ResponseFuture = Future[Dict[str, Any]] + + +def _post_func(session): + """Return a function to send a POST request""" + if session is None: + return partial(aiohttp.request, method="POST", timeout=_AIO_API_TIMEOUT) + else: + return session.post + + +class _AsyncSession: + def __init__(self, client, **session_kwargs): + self._client = client + self._session = create_session(client.n_conn, **session_kwargs) + + async def __aenter__(self): + return self + + async def __aexit__(self, *exc_info): + await self._session.close() + + async def close(self): + await self._session.close() + + async def get( + self, + query: dict, + *, + endpoint: str = "extract", + handle_retries=True, + retrying: Optional[AsyncRetrying] = None, + ): + return await self._client.get( + query=query, + endpoint=endpoint, + handle_retries=handle_retries, + retrying=retrying, + session=self._session, + ) + + def iter( + self, + queries: List[dict], + *, + endpoint: str = "extract", + handle_retries=True, + retrying: Optional[AsyncRetrying] = None, + ) -> Iterator[Future]: + return self._client.iter( + queries=queries, + endpoint=endpoint, + session=self._session, + handle_retries=handle_retries, + retrying=retrying, + ) + + +class AsyncZyteAPI: + """:ref:`Asynchronous Zyte API client `. + + Parameters work the same as for :class:`ZyteAPI`. + """ + + def __init__( + self, + *, + api_key=None, + api_url=API_URL, + n_conn=15, + retrying: Optional[AsyncRetrying] = None, + user_agent: Optional[str] = None, + ): + if retrying is not None and not isinstance(retrying, AsyncRetrying): + raise ValueError( + "The retrying parameter, if defined, must be an instance of " + "AsyncRetrying." + ) + self.api_key = get_apikey(api_key) + self.api_url = api_url + self.n_conn = n_conn + self.agg_stats = AggStats() + self.retrying = retrying or zyte_api_retrying + self.user_agent = user_agent or USER_AGENT + self._semaphore = asyncio.Semaphore(n_conn) + self._disabling_exception: TooManyUndocumentedErrors | None = None + + async def get( + self, + query: dict, + *, + endpoint: str = "extract", + session=None, + handle_retries=True, + retrying: Optional[AsyncRetrying] = None, + ) -> _ResponseFuture: + """Asynchronous equivalent to :meth:`ZyteAPI.get`.""" + if self._disabling_exception is not None: + raise self._disabling_exception + + retrying = retrying or self.retrying + post = _post_func(session) + auth = aiohttp.BasicAuth(self.api_key) + headers = {"User-Agent": self.user_agent, "Accept-Encoding": "br"} + + response_stats = [] + start_global = time.perf_counter() + + async def request(): + stats = ResponseStats.create(start_global) + self.agg_stats.n_attempts += 1 + + safe_query = _process_query(query) + post_kwargs = dict( + url=self.api_url + endpoint, + json=safe_query, + auth=auth, + headers=headers, + ) + + try: + async with self._semaphore: + async with post(**post_kwargs) as resp: + stats.record_connected(resp.status, self.agg_stats) + if resp.status >= 400: + content = await resp.read() + resp.release() + stats.record_read() + stats.record_request_error(content, self.agg_stats) + + raise RequestError( + request_info=resp.request_info, + history=resp.history, + status=resp.status, + message=resp.reason, + headers=resp.headers, + response_content=content, + query=safe_query, + ) + + response = await resp.json() + stats.record_read(self.agg_stats) + return response + except Exception as e: + if not isinstance(e, RequestError): + self.agg_stats.n_errors += 1 + stats.record_exception(e, agg_stats=self.agg_stats) + raise + finally: + response_stats.append(stats) + + if handle_retries: + request = retrying.wraps(request) + + try: + # Try to make a request + result = await request() + self.agg_stats.n_success += 1 + except Exception as exc: + if isinstance(exc, TooManyUndocumentedErrors): + self._disabling_exception = exc + self.agg_stats.n_fatal_errors += 1 + raise + + return result + + def iter( + self, + queries: List[dict], + *, + endpoint: str = "extract", + session: Optional[aiohttp.ClientSession] = None, + handle_retries=True, + retrying: Optional[AsyncRetrying] = None, + ) -> Iterator[_ResponseFuture]: + """Asynchronous equivalent to :meth:`ZyteAPI.iter`. + + .. note:: Yielded futures, when awaited, do raise their exceptions, + instead of only returning them. + """ + + def _request(query): + return self.get( + query, + endpoint=endpoint, + session=session, + handle_retries=handle_retries, + retrying=retrying, + ) + + return asyncio.as_completed([_request(query) for query in queries]) + + def session(self, **kwargs): + """Asynchronous equivalent to :meth:`ZyteAPI.session`. + + You do not need to use :meth:`~AsyncZyteAPI.session` as an async + context manager as long as you await ``close()`` on the object it + returns when you are done: + + .. code-block:: python + + session = client.session() + try: + ... + finally: + await session.close() + """ + return _AsyncSession(client=self, **kwargs) diff --git a/zyte_api/_errors.py b/zyte_api/_errors.py new file mode 100644 index 0000000..13e33be --- /dev/null +++ b/zyte_api/_errors.py @@ -0,0 +1,41 @@ +import logging +from typing import Any, Dict, Optional + +from aiohttp import ClientResponseError + +from zyte_api.errors import ParsedError + +logger = logging.getLogger("zyte_api") + + +class RequestError(ClientResponseError): + """Exception raised upon receiving a :ref:`rate-limiting + ` or :ref:`unsuccessful + ` response from Zyte API.""" + + def __init__(self, *args, **kwargs): + #: Query sent to Zyte API. + #: + #: May be slightly different from the input query due to + #: pre-processing logic on the client side. + self.query: Dict[str, Any] = kwargs.pop("query") + + #: Request ID. + self.request_id: Optional[str] = kwargs.get("headers", {}).get("request-id") + + #: Response body. + self.response_content: Optional[bytes] = kwargs.pop("response_content") + + super().__init__(*args, **kwargs) + + @property + def parsed(self): + """Response as a :class:`ParsedError` object.""" + return ParsedError.from_body(self.response_content or b"") + + def __str__(self): + return ( + f"RequestError: {self.status}, message={self.message}, " + f"headers={self.headers}, body={self.response_content!r}, " + f"request_id={self.request_id}" + ) diff --git a/zyte_api/_retry.py b/zyte_api/_retry.py new file mode 100644 index 0000000..d7cd5ce --- /dev/null +++ b/zyte_api/_retry.py @@ -0,0 +1,392 @@ +from __future__ import annotations + +import asyncio +import logging +from collections import Counter +from datetime import timedelta +from itertools import count +from typing import TYPE_CHECKING, Any, Union + +from aiohttp import client_exceptions +from tenacity import ( + AsyncRetrying, + DoAttempt, + DoSleep, + RetryCallState, + after_log, + before_log, + before_sleep_log, + retry_base, + retry_if_exception, + wait_chain, + wait_fixed, + wait_random, + wait_random_exponential, +) +from tenacity.stop import stop_base, stop_never + +from ._errors import RequestError + +if TYPE_CHECKING: + from tenacity import RetryBaseT as SyncRetryBaseT + from tenacity.asyncio import RetryBaseT + from tenacity.stop import StopBaseT + from tenacity.wait import WaitBaseT + +logger = logging.getLogger(__name__) + +_IDS = count() + + +_NETWORK_ERRORS = ( + asyncio.TimeoutError, # could happen while reading the response body + client_exceptions.ClientResponseError, + client_exceptions.ClientOSError, + client_exceptions.ServerConnectionError, + client_exceptions.ServerDisconnectedError, + client_exceptions.ServerTimeoutError, + client_exceptions.ClientPayloadError, + client_exceptions.ClientConnectorSSLError, + client_exceptions.ClientConnectorError, +) + + +def _is_network_error(exc: BaseException) -> bool: + if isinstance(exc, RequestError): + # RequestError is ClientResponseError, which is in the + # _NETWORK_ERRORS list, but it should be handled + # separately. + return False + return isinstance(exc, _NETWORK_ERRORS) + + +def _is_throttling_error(exc: BaseException) -> bool: + return isinstance(exc, RequestError) and exc.status in (429, 503) + + +class stop_on_count(stop_base): + """Keep a call count with the specified counter name, and stop after the + specified number os calls. + + Unlike stop_after_attempt, this callable does not take into account + attempts for which a different stop callable was used. + """ + + def __init__(self, max_count: int) -> None: + self._max_count = max_count + self._counter_id = next(_IDS) + + def __call__(self, retry_state: "RetryCallState") -> bool: + if not hasattr(retry_state, "counter"): + retry_state.counter = Counter() # type: ignore + retry_state.counter[self._counter_id] += 1 # type: ignore + if retry_state.counter[self._counter_id] >= self._max_count: # type: ignore + return True + return False + + +time_unit_type = Union[int, float, timedelta] + + +def to_seconds(time_unit: time_unit_type) -> float: + return float( + time_unit.total_seconds() if isinstance(time_unit, timedelta) else time_unit + ) + + +class stop_after_uninterrupted_delay(stop_base): + """Stop when this stop callable has been called for the specified time + uninterrupted, i.e. without calls to different stop callables. + + Unlike stop_after_delay, this callable resets its timer after any attempt + for which a different stop callable was used. + """ + + def __init__(self, max_delay: time_unit_type) -> None: + self._max_delay = to_seconds(max_delay) + self._timer_id = next(_IDS) + + def __call__(self, retry_state: "RetryCallState") -> bool: + if not hasattr(retry_state, "uninterrupted_start_times"): + retry_state.uninterrupted_start_times = {} # type: ignore + if self._timer_id not in retry_state.uninterrupted_start_times: # type: ignore + # First time. + retry_state.uninterrupted_start_times[self._timer_id] = [ # type: ignore + retry_state.attempt_number, + retry_state.outcome_timestamp, + ] + return False + attempt_number, start_time = retry_state.uninterrupted_start_times[ # type: ignore + self._timer_id + ] + if retry_state.attempt_number - attempt_number > 1: + # There was a different stop reason since the last attempt, + # resetting the timer. + retry_state.uninterrupted_start_times[self._timer_id] = [ # type: ignore + retry_state.attempt_number, + retry_state.outcome_timestamp, + ] + return False + if retry_state.outcome_timestamp - start_time < self._max_delay: + # Within time, do not stop, only increase the attempt count. + retry_state.uninterrupted_start_times[self._timer_id][0] += 1 # type: ignore + return False + return True + + +class stop_on_download_error(stop_base): + """Stop after the specified max numbers of total or permanent download + errors.""" + + def __init__(self, max_total: int, max_permanent: int) -> None: + self._max_total = max_total + self._max_permanent = max_permanent + + def __call__(self, retry_state: "RetryCallState") -> bool: + if not hasattr(retry_state, "counter"): + retry_state.counter = Counter() # type: ignore + assert retry_state.outcome, "Unexpected empty outcome" + exc = retry_state.outcome.exception() + assert exc, "Unexpected empty exception" + if exc.status == 521: # type: ignore + retry_state.counter["permanent_download_error"] += 1 # type: ignore + if retry_state.counter["permanent_download_error"] >= self._max_permanent: # type: ignore + return True + retry_state.counter["download_error"] += 1 # type: ignore + if retry_state.counter["download_error"] >= self._max_total: # type: ignore + return True + return False + + +def _download_error(exc: BaseException) -> bool: + return isinstance(exc, RequestError) and exc.status in {520, 521} + + +def _undocumented_error(exc: BaseException) -> bool: + return ( + isinstance(exc, RequestError) + and exc.status >= 500 + and exc.status not in {503, 520, 521} + ) + + +class TooManyUndocumentedErrors(RuntimeError): + def __init__(self, outcome, errors, total): + msg = ( + f"Too many undocumented error responses received from Zyte API " + f"({errors} out of {total}, {errors / total:.2%}). This process " + f"will no longer be able to send Zyte API requests. Please, " + f"monitor https://status.zyte.com/ or contact support " + f"(https://support.zyte.com/support/tickets/new) before sending " + f"more requests like the ones causing these error responses.\n" + f"Last offending query: {outcome.query}\n" + f"Last offending response: {outcome}" + ) + self.outcome = outcome + super().__init__(msg) + + +class ZyteAsyncRetrying(AsyncRetrying): + _total_outcomes = 0 + _total_undocumented_errors = 0 + + def __init__( + self, + stop: StopBaseT, + wait: WaitBaseT, + retry: SyncRetryBaseT | RetryBaseT, + reraise: bool, + **kwargs, + ): + kwargs.setdefault("before", before_log(logger, logging.DEBUG)) + kwargs.setdefault("after", after_log(logger, logging.DEBUG)) + kwargs.setdefault("before_sleep", before_sleep_log(logger, logging.DEBUG)) + super().__init__( + stop=stop, + wait=wait, + retry=retry, + reraise=reraise, + **kwargs, + ) + + async def iter(self, retry_state: RetryCallState) -> DoAttempt | DoSleep | Any: + do = await super().iter(retry_state) + retry_cls = retry_state.retry_object.__class__ + if retry_state.outcome is not None: + retry_cls._total_outcomes += 1 # type: ignore[attr-defined] + try: + retry_state.outcome.result() + except Exception as exc: + if _undocumented_error(exc): + retry_cls._total_undocumented_errors += 1 # type: ignore[attr-defined] + errors = retry_cls._total_undocumented_errors # type: ignore[attr-defined] + total = retry_cls._total_outcomes # type: ignore[attr-defined] + if errors >= 10 and errors / total >= 0.01: + raise TooManyUndocumentedErrors( + outcome=exc, + errors=errors, + total=total, + ) + return do + + +class RetryFactory: + """Factory class that builds the :class:`tenacity.AsyncRetrying` object + that defines the :ref:`default retry policy `. + + To create a custom retry policy, you can subclass this factory class, + modify it as needed, and then call :meth:`build` on your subclass to get + the corresponding :class:`tenacity.AsyncRetrying` object. + + For example, to double the number of attempts for :ref:`temporary + download errors ` and the time network + errors are retried: + + .. code-block:: python + + from zyte_api import ( + RetryFactory, + stop_after_uninterrupted_delay, + stop_on_count, + ) + + + class CustomRetryFactory(RetryFactory): + network_error_stop = stop_after_uninterrupted_delay(30 * 60) + temporary_download_error_stop = stop_on_count(8) + + + CUSTOM_RETRY_POLICY = CustomRetryFactory().build() + """ + + retry_condition: retry_base = ( + retry_if_exception(_is_throttling_error) + | retry_if_exception(_is_network_error) + | retry_if_exception(_download_error) + | retry_if_exception(_undocumented_error) + ) + # throttling + throttling_wait = wait_chain( + # always wait 20-40s first + wait_fixed(20) + wait_random(0, 20), + # wait 20-40s again + wait_fixed(20) + wait_random(0, 20), + # wait from 30 to 630s, with full jitter and exponentially + # increasing max wait time + wait_fixed(30) + wait_random_exponential(multiplier=1, max=600), + ) + + # connection errors, other client and server failures + network_error_wait = ( + # wait from 3s to ~1m + wait_random(3, 7) + + wait_random_exponential(multiplier=1, max=55) + ) + temporary_download_error_wait = network_error_wait + throttling_stop = stop_never + network_error_stop = stop_after_uninterrupted_delay(15 * 60) + temporary_download_error_stop = stop_on_download_error(max_total=4, max_permanent=2) + + undocumented_error_stop = stop_on_count(2) + undocumented_error_wait = network_error_wait + + def wait(self, retry_state: RetryCallState) -> float: + assert retry_state.outcome, "Unexpected empty outcome" + exc = retry_state.outcome.exception() + assert exc, "Unexpected empty exception" + if _is_throttling_error(exc): + return self.throttling_wait(retry_state=retry_state) + if _is_network_error(exc): + return self.network_error_wait(retry_state=retry_state) + if _undocumented_error(exc): + return self.undocumented_error_wait(retry_state=retry_state) + assert _download_error(exc) # See retry_condition + return self.temporary_download_error_wait(retry_state=retry_state) + + def stop(self, retry_state: RetryCallState) -> bool: + assert retry_state.outcome, "Unexpected empty outcome" + exc = retry_state.outcome.exception() + assert exc, "Unexpected empty exception" + if _is_throttling_error(exc): + return self.throttling_stop(retry_state) + if _is_network_error(exc): + return self.network_error_stop(retry_state) + if _undocumented_error(exc): + return self.undocumented_error_stop(retry_state) + assert _download_error(exc) # See retry_condition + return self.temporary_download_error_stop(retry_state) + + def reraise(self) -> bool: + return True + + def build(self) -> AsyncRetrying: + return ZyteAsyncRetrying( + wait=self.wait, + retry=self.retry_condition, + stop=self.stop, + reraise=self.reraise(), + ) + + +zyte_api_retrying: AsyncRetrying = RetryFactory().build() + + +class AggressiveRetryFactory(RetryFactory): + """Factory class that builds the :class:`tenacity.AsyncRetrying` object + that defines the :ref:`aggressive retry policy `. + + To create a custom retry policy, you can subclass this factory class, + modify it as needed, and then call :meth:`build` on your subclass to get + the corresponding :class:`tenacity.AsyncRetrying` object. + + For example, to double the maximum number of attempts for all error + responses and double the time network errors are retried: + + .. code-block:: python + + from zyte_api import ( + AggressiveRetryFactory, + stop_after_uninterrupted_delay, + stop_on_count, + stop_on_download_error, + ) + + + class CustomRetryFactory(AggressiveRetryFactory): + download_error_stop = stop_on_download_error(max_total=16, max_permanent=8) + network_error_stop = stop_after_uninterrupted_delay(30 * 60) + undocumented_error_stop = stop_on_count(8) + + + CUSTOM_RETRY_POLICY = CustomRetryFactory().build() + """ + + retry_condition = ( + RetryFactory.retry_condition + | retry_if_exception(_download_error) + | retry_if_exception(_undocumented_error) + ) + + download_error_stop = stop_on_download_error(max_total=8, max_permanent=4) + download_error_wait = RetryFactory.temporary_download_error_wait + + undocumented_error_stop = stop_on_count(4) + + def stop(self, retry_state: RetryCallState) -> bool: + assert retry_state.outcome, "Unexpected empty outcome" + exc = retry_state.outcome.exception() + assert exc, "Unexpected empty exception" + if _download_error(exc): + return self.download_error_stop(retry_state) + return super().stop(retry_state) + + def wait(self, retry_state: RetryCallState) -> float: + assert retry_state.outcome, "Unexpected empty outcome" + exc = retry_state.outcome.exception() + assert exc, "Unexpected empty exception" + if _download_error(exc): + return self.download_error_wait(retry_state) + return super().wait(retry_state) + + +aggressive_retrying = AggressiveRetryFactory().build() diff --git a/zyte_api/_sync.py b/zyte_api/_sync.py new file mode 100644 index 0000000..413618d --- /dev/null +++ b/zyte_api/_sync.py @@ -0,0 +1,215 @@ +import asyncio +from typing import Generator, List, Optional, Union + +from aiohttp import ClientSession +from tenacity import AsyncRetrying + +from ._async import AsyncZyteAPI +from .constants import API_URL + + +def _get_loop(): + try: + return asyncio.get_event_loop() + except RuntimeError: # pragma: no cover (tests always have a running loop) + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + return loop + + +class _Session: + def __init__(self, client, **session_kwargs): + self._client = client + + # https://github.com/aio-libs/aiohttp/pull/1468 + async def create_session(): + return client._async_client.session(**session_kwargs)._session + + loop = _get_loop() + self._session = loop.run_until_complete(create_session()) + + def __enter__(self): + return self + + def __exit__(self, *exc_info): + loop = _get_loop() + loop.run_until_complete(self._session.close()) + + def close(self): + loop = _get_loop() + loop.run_until_complete(self._session.close()) + + def get( + self, + query: dict, + *, + endpoint: str = "extract", + handle_retries=True, + retrying: Optional[AsyncRetrying] = None, + ): + return self._client.get( + query=query, + endpoint=endpoint, + handle_retries=handle_retries, + retrying=retrying, + session=self._session, + ) + + def iter( + self, + queries: List[dict], + *, + endpoint: str = "extract", + handle_retries=True, + retrying: Optional[AsyncRetrying] = None, + ) -> Generator[Union[dict, Exception], None, None]: + return self._client.iter( + queries=queries, + endpoint=endpoint, + session=self._session, + handle_retries=handle_retries, + retrying=retrying, + ) + + +class ZyteAPI: + """:ref:`Synchronous Zyte API client `. + + *api_key* is your Zyte API key. If not specified, it is read from the + ``ZYTE_API_KEY`` environment variable. See :ref:`api-key`. + + *api_url* is the Zyte API base URL. + + *n_conn* is the maximum number of concurrent requests to use. See + :ref:`api-optimize`. + + *retrying* is the retry policy for requests. Defaults to + :data:`~zyte_api.zyte_api_retrying`. + + *user_agent* is the user agent string reported to Zyte API. Defaults to + ``python-zyte-api/``. + + .. tip:: To change the ``User-Agent`` header sent to a target website, use + :http:`request:customHttpRequestHeaders` instead. + """ + + def __init__( + self, + *, + api_key=None, + api_url=API_URL, + n_conn=15, + retrying: Optional[AsyncRetrying] = None, + user_agent: Optional[str] = None, + ): + self._async_client = AsyncZyteAPI( + api_key=api_key, + api_url=api_url, + n_conn=n_conn, + retrying=retrying, + user_agent=user_agent, + ) + + def get( + self, + query: dict, + *, + endpoint: str = "extract", + session: Optional[ClientSession] = None, + handle_retries: bool = True, + retrying: Optional[AsyncRetrying] = None, + ) -> dict: + """Send *query* to Zyte API and return the result. + + *endpoint* is the Zyte API endpoint path relative to the client object + *api_url*. + + *session* is the network session to use. Consider using + :meth:`session` instead of this parameter. + + *handle_retries* determines whether or not a :ref:`retry policy + ` should be used. + + *retrying* is the :ref:`retry policy ` to use, provided + *handle_retries* is ``True``. If not specified, the :ref:`default retry + policy ` is used. + """ + loop = _get_loop() + future = self._async_client.get( + query=query, + endpoint=endpoint, + session=session, + handle_retries=handle_retries, + retrying=retrying, + ) + return loop.run_until_complete(future) + + def iter( + self, + queries: List[dict], + *, + endpoint: str = "extract", + session: Optional[ClientSession] = None, + handle_retries: bool = True, + retrying: Optional[AsyncRetrying] = None, + ) -> Generator[Union[dict, Exception], None, None]: + """Send multiple *queries* to Zyte API in parallel and iterate over + their results as they come. + + The number of *queries* can exceed the *n_conn* parameter set on the + client object. Extra queries will be queued, there will be only up to + *n_conn* requests being processed in parallel at a time. + + Results may come an a different order from the original list of + *queries*. You can use :http:`request:echoData` to attach metadata to + queries, and later use that metadata to restore their original order. + + When exceptions occur, they are yielded, not raised. + + The remaining parameters work the same as in :meth:`get`. + """ + loop = _get_loop() + for future in self._async_client.iter( + queries=queries, + endpoint=endpoint, + session=session, + handle_retries=handle_retries, + retrying=retrying, + ): + try: + yield loop.run_until_complete(future) + except Exception as exception: + yield exception + + def session(self, **kwargs): + """:ref:`Context manager ` to create a session. + + A session is an object that has the same API as the client object, + except: + + - :meth:`get` and :meth:`iter` do not have a *session* parameter, + the session creates an :class:`aiohttp.ClientSession` object and + passes it to :meth:`get` and :meth:`iter` automatically. + + - It does not have a :meth:`session` method. + + Using the same :class:`aiohttp.ClientSession` object for all Zyte API + requests improves performance by keeping a pool of reusable connections + to Zyte API. + + The :class:`aiohttp.ClientSession` object is created with sane defaults + for Zyte API, but you can use *kwargs* to pass additional parameters to + :class:`aiohttp.ClientSession` and even override those sane defaults. + + You do not need to use :meth:`session` as a context manager as long as + you call ``close()`` on the object it returns when you are done: + + .. code-block:: python + + session = client.session() + try: + ... + finally: + session.close() + """ + return _Session(client=self, **kwargs) diff --git a/zyte_api/_utils.py b/zyte_api/_utils.py new file mode 100644 index 0000000..e090724 --- /dev/null +++ b/zyte_api/_utils.py @@ -0,0 +1,32 @@ +from warnings import warn + +import aiohttp +from aiohttp import TCPConnector + +from .constants import API_TIMEOUT + +# 120 seconds is probably too long, but we are concerned about the case with +# many concurrent requests and some processing logic running in the same reactor, +# thus, saturating the CPU. This will make timeouts more likely. +_AIO_API_TIMEOUT = aiohttp.ClientTimeout(total=API_TIMEOUT + 120) + + +def deprecated_create_session( + connection_pool_size=100, **kwargs +) -> aiohttp.ClientSession: + warn( + ( + "zyte_api.aio.client.create_session is deprecated, use " + "ZyteAPI.session or AsyncZyteAPI.session instead." + ), + DeprecationWarning, + ) + return create_session(connection_pool_size=connection_pool_size, **kwargs) + + +def create_session(connection_pool_size=100, **kwargs) -> aiohttp.ClientSession: + """Create a session with parameters suited for Zyte API""" + kwargs.setdefault("timeout", _AIO_API_TIMEOUT) + if "connector" not in kwargs: + kwargs["connector"] = TCPConnector(limit=connection_pool_size, force_close=True) + return aiohttp.ClientSession(**kwargs) diff --git a/zyte_api/aio/__init__.py b/zyte_api/aio/__init__.py index db16ed4..16a6133 100644 --- a/zyte_api/aio/__init__.py +++ b/zyte_api/aio/__init__.py @@ -1,3 +1,18 @@ """ -Asyncio client for Zyte Data API -""" \ No newline at end of file +Asyncio client for Zyte API +""" + +from warnings import warn + +warn( + ( + "The zyte_api.aio module is deprecated. Replace " + "zyte_api.aio.client.AsyncClient with zyte_api.AsyncZyteAPI (note " + "that method names are different), zyte_api.aio.client.create_session " + "with zyte_api.AsyncZyteAPI.session, zyte_api.aio.errors.RequestError " + "with zyte_api.RequestError, zyte_api.aio.retry.RetryFactory with " + "zyte_api.RetryFactory, and zyte_api.aio.retry.zyte_api_retrying with " + "zyte_api.zyte_api_retrying." + ), + DeprecationWarning, +) diff --git a/zyte_api/aio/client.py b/zyte_api/aio/client.py index e82c832..208cb38 100644 --- a/zyte_api/aio/client.py +++ b/zyte_api/aio/client.py @@ -1,150 +1,7 @@ -""" -Asyncio client for Zyte Data API -""" +from .._async import AsyncZyteAPI +from .._utils import deprecated_create_session as create_session # noqa: F401 -import asyncio -import time -from functools import partial -from typing import Optional, Iterator, List -import aiohttp -from aiohttp import TCPConnector -from tenacity import AsyncRetrying - -from .errors import RequestError -from .retry import zyte_api_retrying -from ..apikey import get_apikey -from ..constants import API_URL, API_TIMEOUT -from ..stats import AggStats, ResponseStats -from ..utils import user_agent - - -# 120 seconds is probably too long, but we are concerned about the case with -# many concurrent requests and some processing logic running in the same reactor, -# thus, saturating the CPU. This will make timeouts more likely. -AIO_API_TIMEOUT = aiohttp.ClientTimeout(total=API_TIMEOUT + 120) - - -def create_session(connection_pool_size=100, **kwargs) -> aiohttp.ClientSession: - """ Create a session with parameters suited for Zyte API """ - kwargs.setdefault('timeout', AIO_API_TIMEOUT) - if "connector" not in kwargs: - kwargs["connector"] = TCPConnector(limit=connection_pool_size) - return aiohttp.ClientSession(**kwargs) - - -def _post_func(session): - """ Return a function to send a POST request """ - if session is None: - return partial(aiohttp.request, - method='POST', - timeout=AIO_API_TIMEOUT) - else: - return session.post - - -class AsyncClient: - def __init__(self, *, - api_key=None, - api_url=API_URL, - n_conn=15, - ): - self.api_key = get_apikey(api_key) - self.api_url = api_url - self.n_conn = n_conn - self.agg_stats = AggStats() - - async def request_raw(self, query: dict, *, - endpoint: str = 'extract', - session=None, - handle_retries=True, - retrying: Optional[AsyncRetrying] = None, - ): - retrying = retrying or zyte_api_retrying - post = _post_func(session) - auth = aiohttp.BasicAuth(self.api_key) - headers = {'User-Agent': user_agent(aiohttp)} - - response_stats = [] - start_global = time.perf_counter() - - async def request(): - stats = ResponseStats.create(start_global) - self.agg_stats.n_attempts += 1 - - post_kwargs = dict( - url=self.api_url + endpoint, - json=query, - auth=auth, - headers=headers, - ) - - try: - async with post(**post_kwargs) as resp: - stats.record_connected(resp.status, self.agg_stats) - if resp.status >= 400: - content = await resp.read() - resp.release() - stats.record_read() - stats.record_request_error(content, self.agg_stats) - - raise RequestError( - request_info=resp.request_info, - history=resp.history, - status=resp.status, - message=resp.reason, - headers=resp.headers, - response_content=content - ) - - response = await resp.json() - stats.record_read(self.agg_stats) - return response - except Exception as e: - if not isinstance(e, RequestError): - self.agg_stats.n_errors += 1 - stats.record_exception(e, agg_stats=self.agg_stats) - raise - finally: - response_stats.append(stats) - - if handle_retries: - request = retrying.wraps(request) - - try: - # Try to make a request - result = await request() - self.agg_stats.n_extracted_queries += 1 - except Exception: - self.agg_stats.n_fatal_errors += 1 - raise - finally: - self.agg_stats.n_input_queries += 1 - - self.agg_stats.n_results += 1 - return result - - def request_parallel_as_completed(self, - queries: List[dict], - *, - endpoint: str = 'extract', - session: Optional[aiohttp.ClientSession] = None, - ) -> Iterator[asyncio.Future]: - """ Send multiple requests to Zyte Data API in parallel. - Return an `asyncio.as_completed` iterator. - - ``queries`` is a list of requests to process (dicts). - - ``session`` is an optional aiohttp.ClientSession object; - use it to enable HTTP Keep-Alive. Set the session TCPConnector - limit to a value greater than the number of connections. - """ - sem = asyncio.Semaphore(self.n_conn) - - async def _request(query): - async with sem: - return await self.request_raw(query, - endpoint=endpoint, - session=session) - - return asyncio.as_completed([_request(query) for query in queries]) +class AsyncClient(AsyncZyteAPI): + request_raw = AsyncZyteAPI.get + request_parallel_as_completed = AsyncZyteAPI.iter diff --git a/zyte_api/aio/errors.py b/zyte_api/aio/errors.py index a7c03cf..987d8ba 100644 --- a/zyte_api/aio/errors.py +++ b/zyte_api/aio/errors.py @@ -1,26 +1 @@ -# -*- coding: utf-8 -*- -import logging - -from aiohttp import ClientResponseError - -from zyte_api.errors import ParsedError - -logger = logging.getLogger(__name__) - - -class RequestError(ClientResponseError): - """ Exception which is raised when Request-level error is returned. - In contrast with ClientResponseError, it allows to inspect response - content. - """ - def __init__(self, *args, **kwargs): - self.response_content = kwargs.pop("response_content") - super().__init__(*args, **kwargs) - - @property - def parsed(self): - return ParsedError.from_body(self.response_content) - - def __str__(self): - return f"RequestError: {self.status}, message={self.message}, " \ - f"headers={self.headers}, body={self.response_content}" +from .._errors import RequestError diff --git a/zyte_api/aio/retry.py b/zyte_api/aio/retry.py index aaaa959..5cd22a7 100644 --- a/zyte_api/aio/retry.py +++ b/zyte_api/aio/retry.py @@ -1,135 +1 @@ -# -*- coding: utf-8 -*- -""" -Zyte Data Extraction retrying logic. - -TODO: add sync support; only aio is supported at the moment. -TODO: Implement retry logic for temparary errors (520) using the proposed retry-after header. -""" -import asyncio -import logging - -from aiohttp import client_exceptions -from tenacity import ( - wait_chain, - wait_fixed, - wait_random_exponential, - wait_random, - stop_after_attempt, - stop_after_delay, - retry_if_exception, - RetryCallState, - before_sleep_log, - after_log, AsyncRetrying, before_log, -) -from tenacity.stop import stop_never - -from .errors import RequestError - - -logger = logging.getLogger(__name__) - - -_NETWORK_ERRORS = ( - asyncio.TimeoutError, # could happen while reading the response body - client_exceptions.ClientResponseError, - client_exceptions.ClientOSError, - client_exceptions.ServerConnectionError, - client_exceptions.ServerDisconnectedError, - client_exceptions.ServerTimeoutError, - client_exceptions.ClientPayloadError, - client_exceptions.ClientConnectorSSLError, - client_exceptions.ClientConnectorError, -) - - -def _is_network_error(exc: BaseException) -> bool: - if isinstance(exc, RequestError): - # RequestError is ClientResponseError, which is in the - # _NETWORK_ERRORS list, but it should be handled - # separately. - return False - return isinstance(exc, _NETWORK_ERRORS) - - -def _is_throttling_error(exc: BaseException) -> bool: - return isinstance(exc, RequestError) and exc.status in (429, 503) - - -def _is_temporary_download_error(exc: BaseException) -> bool: - return isinstance(exc, RequestError) and exc.status == 520 - - -class RetryFactory: - """ - Build custom retry configuration - """ - retry_condition = ( - retry_if_exception(_is_throttling_error) - | retry_if_exception(_is_network_error) - | retry_if_exception(_is_temporary_download_error) - ) - # throttling - throttling_wait = wait_chain( - # always wait 20-40s first - wait_fixed(20) + wait_random(0, 20), - - # wait 20-40s again - wait_fixed(20) + wait_random(0, 20), - - # wait from 30 to 630s, with full jitter and exponentially - # increasing max wait time - wait_fixed(30) + wait_random_exponential(multiplier=1, max=600) - ) - - # connection errors, other client and server failures - network_error_wait = ( - # wait from 3s to ~1m - wait_random(3, 7) + wait_random_exponential(multiplier=1, max=55) - ) - temporary_download_error_wait = network_error_wait - throttling_stop = stop_never - network_error_stop = stop_after_delay(5 * 60) - temporary_download_error_stop = stop_after_attempt(4) - - def wait(self, retry_state: RetryCallState) -> float: - assert retry_state.outcome, "Unexpected empty outcome" - exc = retry_state.outcome.exception() - assert exc, "Unexpected empty exception" - if _is_throttling_error(exc): - return self.throttling_wait(retry_state=retry_state) - elif _is_network_error(exc): - return self.network_error_wait(retry_state=retry_state) - elif _is_temporary_download_error(exc): - return self.temporary_download_error_wait(retry_state=retry_state) - else: - raise RuntimeError("Invalid retry state exception: %s" % exc) - - def stop(self, retry_state: RetryCallState) -> bool: - assert retry_state.outcome, "Unexpected empty outcome" - exc = retry_state.outcome.exception() - assert exc, "Unexpected empty exception" - if _is_throttling_error(exc): - return self.throttling_stop(retry_state) - elif _is_network_error(exc): - return self.network_error_stop(retry_state) - elif _is_temporary_download_error(exc): - return self.temporary_download_error_stop(retry_state) - else: - raise RuntimeError("Invalid retry state exception: %s" % exc) - - def reraise(self) -> bool: - return True - - def build(self) -> AsyncRetrying: - return AsyncRetrying( - wait=self.wait, - retry=self.retry_condition, - stop=self.stop, - reraise=self.reraise(), - before=before_log(logger, logging.DEBUG), - after=after_log(logger, logging.DEBUG), - before_sleep=before_sleep_log(logger, logging.DEBUG), - ) - - -zyte_api_retrying: AsyncRetrying = RetryFactory().build() +from .._retry import RetryFactory, zyte_api_retrying diff --git a/zyte_api/apikey.py b/zyte_api/apikey.py index f9b0f80..c1cc70b 100644 --- a/zyte_api/apikey.py +++ b/zyte_api/apikey.py @@ -10,11 +10,13 @@ class NoApiKey(Exception): def get_apikey(key: Optional[str] = None) -> str: - """ Return API key, probably loading it from an environment variable """ + """Return API key, probably loading it from an environment variable""" if key is not None: return key try: return os.environ[ENV_VARIABLE] except KeyError: - raise NoApiKey("API key not found. Please set {} " - "environment variable.".format(ENV_VARIABLE)) + raise NoApiKey( + "API key not found. Please set {} " + "environment variable.".format(ENV_VARIABLE) + ) diff --git a/zyte_api/constants.py b/zyte_api/constants.py index db24f4b..a433302 100644 --- a/zyte_api/constants.py +++ b/zyte_api/constants.py @@ -1,10 +1,10 @@ # -*- coding: utf-8 -*- # Name of the environment variable with the API key -ENV_VARIABLE = 'ZYTE_API_KEY' +ENV_VARIABLE = "ZYTE_API_KEY" # API URL -API_URL = 'https://api.zyte.com/v1/' +API_URL = "https://api.zyte.com/v1/" # Default timeout that server uses. Client timeouts should be larger than that. -API_TIMEOUT = 60 +API_TIMEOUT = 200 diff --git a/zyte_api/errors.py b/zyte_api/errors.py index eab2183..0dce512 100644 --- a/zyte_api/errors.py +++ b/zyte_api/errors.py @@ -6,13 +6,25 @@ @attr.s(auto_attribs=True) class ParsedError: - """ Parsed error from Zyte Data API """ + """Parsed error response body from Zyte API.""" + + #: Raw response body from Zyte API. response_body: bytes + + #: JSON-decoded response body. + #: + #: If ``None``, :data:`parse_error` indicates the reason. data: Optional[dict] + + #: If :data:`data` is ``None``, this indicates whether the reason is that + #: :data:`response_body` is not valid JSON (``"bad_json"``) or that it is + #: not a JSON object (``"bad_format"``). parse_error: Optional[str] @classmethod - def from_body(cls, response_body: bytes) -> 'ParsedError': + def from_body(cls, response_body: bytes) -> "ParsedError": + """Return a :class:`ParsedError` object built out of the specified + error response body.""" data = None parse_error = None @@ -25,12 +37,10 @@ def from_body(cls, response_body: bytes) -> 'ParsedError': except (json.JSONDecodeError, UnicodeDecodeError) as _: # noqa: F841 parse_error = "bad_json" - return cls( - response_body=response_body, - data=data, - parse_error=parse_error - ) + return cls(response_body=response_body, data=data, parse_error=parse_error) @property def type(self) -> Optional[str]: - return (self.data or {}).get('type', None) + """ID of the error type, e.g. ``"/limits/over-user-limit"`` or + ``"/download/temporary-error"``.""" + return (self.data or {}).get("type", None) diff --git a/zyte_api/stats.py b/zyte_api/stats.py index d2d804f..6b23107 100644 --- a/zyte_api/stats.py +++ b/zyte_api/stats.py @@ -1,8 +1,8 @@ # -*- coding: utf-8 -*- -from typing import Optional -from collections import Counter import functools import time +from collections import Counter +from typing import Optional import attr from runstats import Statistics @@ -17,6 +17,7 @@ def wrapper(*args, **kwargs): return meth(*args, **kwargs) except ZeroDivisionError: return 0 + return wrapper @@ -24,20 +25,21 @@ class AggStats: def __init__(self): self.time_connect_stats = Statistics() self.time_total_stats = Statistics() - self.n_results = 0 - self.n_fatal_errors = 0 - self.n_attempts = 0 - self.n_429 = 0 - self.n_errors = 0 + self.n_success = 0 # number of successful results returned to the user + self.n_fatal_errors = ( + 0 # number of errors returned to the user, after all retries + ) - self.n_input_queries = 0 - self.n_extracted_queries = 0 # Queries answered without any type of error - self.n_query_responses = 0 + self.n_attempts = ( + 0 # total amount of requests made to Zyte API, including retries + ) + self.n_429 = 0 # number of 429 (throttling) responses + self.n_errors = 0 # number of errors, including errors which were retried - self.status_codes = Counter() - self.exception_types = Counter() - self.api_error_types = Counter() + self.status_codes: Counter = Counter() + self.exception_types: Counter = Counter() + self.api_error_types: Counter = Counter() def __str__(self): return "conn:{:0.2f}s, resp:{:0.2f}s, throttle:{:.1%}, err:{}+{}({:.1%}) | success:{}/{}({:.1%})".format( @@ -47,27 +49,31 @@ def __str__(self): self.n_errors - self.n_fatal_errors, self.n_fatal_errors, self.error_ratio(), - self.n_extracted_queries, - self.n_input_queries, - self.success_ratio() + self.n_success, + self.n_processed, + self.success_ratio(), ) def summary(self): return ( - "\n" + - "Summary\n" + - "-------\n" + - "Mean connection time: {:0.2f}\n".format(self.time_connect_stats.mean()) + - "Mean response time: {:0.2f}\n".format(self.time_total_stats.mean()) + - "Throttle ratio: {:0.1%}\n".format(self.throttle_ratio()) + - "Attempts: {}\n".format(self.n_attempts) + - "Errors: {:0.1%}, fatal: {}, non fatal: {}\n".format( + "\n" + + "Summary\n" + + "-------\n" + + "Mean connection time: {:0.2f}\n".format( + self.time_connect_stats.mean() + ) + + "Mean response time: {:0.2f}\n".format(self.time_total_stats.mean()) + + "Throttle ratio: {:0.1%}\n".format(self.throttle_ratio()) + + "Attempts: {}\n".format(self.n_attempts) + + "Errors: {:0.1%}, fatal: {}, non fatal: {}\n".format( self.error_ratio(), self.n_fatal_errors, - self.n_errors - self.n_fatal_errors) + - "Successful URLs: {} of {}\n".format( - self.n_extracted_queries, self.n_input_queries) + - "Success ratio: {:0.1%}\n".format(self.success_ratio()) + self.n_errors - self.n_fatal_errors, + ) + + "Successful URLs: {} of {}\n".format( + self.n_success, self.n_processed + ) + + "Success ratio: {:0.1%}\n".format(self.success_ratio()) ) @zero_on_division_error @@ -80,7 +86,12 @@ def error_ratio(self): @zero_on_division_error def success_ratio(self): - return self.n_extracted_queries / self.n_input_queries + return self.n_success / self.n_processed + + @property + def n_processed(self): + """Total number of processed URLs""" + return self.n_success + self.n_fatal_errors @attr.s diff --git a/zyte_api/utils.py b/zyte_api/utils.py index 2be98de..1b24a60 100644 --- a/zyte_api/utils.py +++ b/zyte_api/utils.py @@ -1,8 +1,12 @@ import re from os.path import splitext +from w3lib.url import safe_url_string + from .__version__ import __version__ +USER_AGENT = f"python-zyte-api/{__version__}" + def _guess_intype(file_name, lines): _, dot_extension = splitext(file_name) @@ -12,14 +16,31 @@ def _guess_intype(file_name, lines): if extension == "txt": return "txt" - if re.search(r'^\s*\{', lines[0]): + if re.search(r"^\s*\{", lines[0]): return "jl" return "txt" -def user_agent(library): - return 'python-zyte-api/{} {}/{}'.format( - __version__, - library.__name__, - library.__version__) +def _process_query(query): + """Given a query to be sent to Zyte API, return a functionally-equivalent + query that fixes any known issue. + + Specifically, unsafe characters in the query URL are escaped to make sure + they are safe not only for the end server, but also for Zyte API, which + requires URLs compatible with RFC 2396. + + *query* is never modified in place, but the returned object is not + guaranteed to be a copy of *query*: it could be *query* itself if no + changes where needed, or a shallow copy of *query* with some common nested + objects (e.g. shared ``actions`` list). + """ + url = query.get("url", None) + if url is None: + return query + if not isinstance(url, str): + raise ValueError(f"Expected a str URL parameter, got {type(url)}") + safe_url = safe_url_string(url) + if url == safe_url: + return query + return {**query, "url": safe_url}