| 
1 |  | -Using asyncio with Elasticsearch  | 
2 |  | -================================  | 
 | 1 | +Async Elasticsearch API  | 
 | 2 | +=======================  | 
3 | 3 | 
 
  | 
4 | 4 |  .. py:module:: elasticsearch  | 
5 | 5 |     :no-index:  | 
6 | 6 | 
 
  | 
7 |  | -The ``elasticsearch`` package supports async/await with  | 
8 |  | -`asyncio <https://docs.python.org/3/library/asyncio.html>`_ and `aiohttp <https://docs.aiohttp.org>`_.  | 
9 |  | -You can either install ``aiohttp`` directly or use the ``[async]`` extra:  | 
10 |  | - | 
11 |  | - .. code-block:: bash  | 
12 |  | -
  | 
13 |  | -    $ python -m pip install elasticsearch aiohttp  | 
14 |  | -
  | 
15 |  | -    # - OR -  | 
16 |  | -
  | 
17 |  | -    $ python -m pip install elasticsearch[async]  | 
18 |  | -
  | 
19 |  | -Getting Started with Async  | 
20 |  | ---------------------------  | 
21 |  | - | 
22 |  | -After installation all async API endpoints are available via :class:`~elasticsearch.AsyncElasticsearch`  | 
23 |  | -and are used in the same way as other APIs, just with an extra ``await``:  | 
24 |  | - | 
25 |  | - .. code-block:: python  | 
26 |  | -
  | 
27 |  | -    import asyncio  | 
28 |  | -    from elasticsearch import AsyncElasticsearch  | 
29 |  | -
  | 
30 |  | -    client = AsyncElasticsearch()  | 
31 |  | -
  | 
32 |  | -    async def main():  | 
33 |  | -        resp = await client.search(  | 
34 |  | -            index="documents",  | 
35 |  | -            body={"query": {"match_all": {}}},  | 
36 |  | -            size=20,  | 
37 |  | -        )  | 
38 |  | -        print(resp)  | 
39 |  | -
  | 
40 |  | -    loop = asyncio.get_event_loop()  | 
41 |  | -    loop.run_until_complete(main())  | 
42 |  | -
  | 
43 |  | -All APIs that are available under the sync client are also available under the async client.  | 
44 |  | - | 
45 |  | -ASGI Applications and Elastic APM  | 
46 |  | ----------------------------------  | 
47 |  | - | 
48 |  | -`ASGI <https://asgi.readthedocs.io>`_ (Asynchronous Server Gateway Interface) is a new way to  | 
49 |  | -serve Python web applications making use of async I/O to achieve better performance.  | 
50 |  | -Some examples of ASGI frameworks include FastAPI, Django 3.0+, and Starlette.  | 
51 |  | -If you're using one of these frameworks along with Elasticsearch then you  | 
52 |  | -should be using :py:class:`~elasticsearch.AsyncElasticsearch` to avoid blocking  | 
53 |  | -the event loop with synchronous network calls for optimal performance.  | 
54 |  | - | 
55 |  | -`Elastic APM <https://www.elastic.co/guide/en/apm/agent/python/current/index.html>`_  | 
56 |  | -also supports tracing of async Elasticsearch queries just the same as  | 
57 |  | -synchronous queries. For an example on how to configure ``AsyncElasticsearch`` with  | 
58 |  | -a popular ASGI framework `FastAPI <https://fastapi.tiangolo.com/>`_ and APM tracing  | 
59 |  | -there is a `pre-built example <https://github.com/elastic/elasticsearch-py/tree/master/examples/fastapi-apm>`_  | 
60 |  | -in the ``examples/fastapi-apm`` directory.  | 
61 |  | - | 
62 |  | -Frequently Asked Questions  | 
63 |  | ---------------------------  | 
64 |  | - | 
65 |  | -ValueError when initializing ``AsyncElasticsearch``?  | 
66 |  | -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~  | 
67 |  | - | 
68 |  | -If when trying to use ``AsyncElasticsearch`` you receive ``ValueError: You must  | 
69 |  | -have 'aiohttp' installed to use AiohttpHttpNode`` you should ensure that you  | 
70 |  | -have ``aiohttp`` installed in your environment (check with ``$ python -m pip  | 
71 |  | -freeze | grep aiohttp``). Otherwise, async support won't be available.  | 
72 |  | - | 
73 |  | -What about the ``elasticsearch-async`` package?  | 
74 |  | -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~  | 
75 |  | - | 
76 |  | -Previously asyncio was supported separately via the `elasticsearch-async <https://github.com/elastic/elasticsearch-py-async>`_  | 
77 |  | -package. The ``elasticsearch-async`` package has been deprecated in favor of  | 
78 |  | -``AsyncElasticsearch`` provided by the ``elasticsearch`` package  | 
79 |  | -in v7.8 and onwards.  | 
80 |  | - | 
81 |  | -Receiving 'Unclosed client session / connector' warning?  | 
82 |  | -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~  | 
83 |  | - | 
84 |  | -This warning is created by ``aiohttp`` when an open HTTP connection is  | 
85 |  | -garbage collected. You'll typically run into this when closing your application.  | 
86 |  | -To resolve the issue ensure that :meth:`~elasticsearch.AsyncElasticsearch.close`  | 
87 |  | -is called before the :py:class:`~elasticsearch.AsyncElasticsearch` instance is garbage collected.  | 
88 |  | - | 
89 |  | -For example if using FastAPI that might look like this:  | 
90 |  | - | 
91 |  | - .. code-block:: python  | 
92 |  | -
  | 
93 |  | -    import os  | 
94 |  | -    from contextlib import asynccontextmanager  | 
95 |  | -
  | 
96 |  | -    from fastapi import FastAPI  | 
97 |  | -    from elasticsearch import AsyncElasticsearch  | 
98 |  | -
  | 
99 |  | -    ELASTICSEARCH_URL = os.environ["ELASTICSEARCH_URL"]  | 
100 |  | -    client = None  | 
101 |  | -
  | 
102 |  | -    @asynccontextmanager  | 
103 |  | -    async def lifespan(app: FastAPI):  | 
104 |  | -        global client  | 
105 |  | -        client = AsyncElasticsearch(ELASTICSEARCH_URL)  | 
106 |  | -        yield  | 
107 |  | -        await client.close()  | 
108 |  | -
  | 
109 |  | -    app = FastAPI(lifespan=lifespan)  | 
110 |  | -
  | 
111 |  | -    @app.get("/")  | 
112 |  | -    async def main():  | 
113 |  | -        return await client.info()  | 
114 |  | -
  | 
115 |  | -You can run this example by saving it to ``main.py`` and executing  | 
116 |  | -``ELASTICSEARCH_URL=http://localhost:9200 uvicorn main:app``.  | 
117 |  | - | 
118 |  | - | 
119 |  | -Async Helpers  | 
120 |  | --------------  | 
121 |  | - | 
122 |  | -Async variants of all helpers are available in ``elasticsearch.helpers``  | 
123 |  | -and are all prefixed with ``async_*``. You'll notice that these APIs  | 
124 |  | -are identical to the ones in the sync :ref:`helpers` documentation.  | 
125 |  | - | 
126 |  | -All async helpers that accept an iterator or generator also accept async iterators  | 
127 |  | -and async generators.  | 
128 |  | - | 
129 |  | - .. py:module:: elasticsearch.helpers  | 
130 |  | -    :no-index:  | 
131 |  | - | 
132 |  | -Bulk and Streaming Bulk  | 
133 |  | -~~~~~~~~~~~~~~~~~~~~~~~  | 
134 |  | - | 
135 |  | - .. autofunction:: async_bulk  | 
136 |  | - | 
137 |  | - .. code-block:: python  | 
138 |  | -
  | 
139 |  | -    import asyncio  | 
140 |  | -    from elasticsearch import AsyncElasticsearch  | 
141 |  | -    from elasticsearch.helpers import async_bulk  | 
142 |  | -
  | 
143 |  | -    client = AsyncElasticsearch()  | 
144 |  | -
  | 
145 |  | -    async def gendata():  | 
146 |  | -        mywords = ['foo', 'bar', 'baz']  | 
147 |  | -        for word in mywords:  | 
148 |  | -            yield {  | 
149 |  | -                "_index": "mywords",  | 
150 |  | -                "doc": {"word": word},  | 
151 |  | -            }  | 
152 |  | -
  | 
153 |  | -    async def main():  | 
154 |  | -        await async_bulk(client, gendata())  | 
155 |  | -
  | 
156 |  | -    loop = asyncio.get_event_loop()  | 
157 |  | -    loop.run_until_complete(main())  | 
158 |  | -
  | 
159 |  | - .. autofunction:: async_streaming_bulk  | 
160 |  | - | 
161 |  | - .. code-block:: python  | 
162 |  | -
  | 
163 |  | -    import asyncio  | 
164 |  | -    from elasticsearch import AsyncElasticsearch  | 
165 |  | -    from elasticsearch.helpers import async_streaming_bulk  | 
166 |  | -
  | 
167 |  | -    client = AsyncElasticsearch()  | 
168 |  | -
  | 
169 |  | -    async def gendata():  | 
170 |  | -        mywords = ['foo', 'bar', 'baz']  | 
171 |  | -        for word in mywords:  | 
172 |  | -            yield {  | 
173 |  | -                "_index": "mywords",  | 
174 |  | -                "word": word,  | 
175 |  | -            }  | 
176 |  | -
  | 
177 |  | -    async def main():  | 
178 |  | -        async for ok, result in async_streaming_bulk(client, gendata()):  | 
179 |  | -            action, result = result.popitem()  | 
180 |  | -            if not ok:  | 
181 |  | -                print("failed to %s document %s" % ())  | 
182 |  | -
  | 
183 |  | -    loop = asyncio.get_event_loop()  | 
184 |  | -    loop.run_until_complete(main())  | 
185 |  | -
  | 
186 |  | -Scan  | 
187 |  | -~~~~  | 
188 |  | - | 
189 |  | - .. autofunction:: async_scan  | 
190 |  | - | 
191 |  | - .. code-block:: python  | 
192 |  | -
  | 
193 |  | -    import asyncio  | 
194 |  | -    from elasticsearch import AsyncElasticsearch  | 
195 |  | -    from elasticsearch.helpers import async_scan  | 
196 |  | -
  | 
197 |  | -    client = AsyncElasticsearch()  | 
198 |  | -
  | 
199 |  | -    async def main():  | 
200 |  | -        async for doc in async_scan(  | 
201 |  | -            client=client,  | 
202 |  | -            query={"query": {"match": {"title": "python"}}},  | 
203 |  | -            index="orders-*"  | 
204 |  | -        ):  | 
205 |  | -            print(doc)  | 
206 |  | -
  | 
207 |  | -    loop = asyncio.get_event_loop()  | 
208 |  | -    loop.run_until_complete(main())  | 
209 |  | -
  | 
210 |  | -Reindex  | 
211 |  | -~~~~~~~  | 
212 |  | - | 
213 |  | - .. autofunction:: async_reindex  | 
214 |  | - | 
215 |  | - | 
216 |  | -API Reference  | 
217 |  | --------------  | 
218 |  | - | 
219 |  | - .. py:module:: elasticsearch  | 
220 |  | -    :no-index:  | 
221 |  | - | 
222 |  | -The API of :class:`~elasticsearch.AsyncElasticsearch` is nearly identical  | 
223 |  | -to the API of :class:`~elasticsearch.Elasticsearch` with the exception that  | 
224 |  | -every API call like :py:func:`~elasticsearch.AsyncElasticsearch.search` is  | 
225 |  | -an ``async`` function and requires an ``await`` to properly return the response  | 
226 |  | -body.  | 
227 |  | - | 
228 |  | -AsyncElasticsearch  | 
229 |  | -~~~~~~~~~~~~~~~~~~  | 
230 |  | - | 
231 | 7 |  .. note::  | 
232 | 8 | 
 
  | 
233 | 9 |     To reference Elasticsearch APIs that are namespaced like ``.indices.create()``  | 
234 | 10 |     refer to the sync API reference. These APIs are identical between sync and async.  | 
235 | 11 | 
 
  | 
 | 12 | +Elasticsearch  | 
 | 13 | +-------------  | 
236 | 14 |  .. autoclass:: AsyncElasticsearch  | 
237 | 15 |    :members:  | 
0 commit comments