Skip to content

Commit 000df4b

Browse files
integration tests
1 parent 6084a1e commit 000df4b

File tree

5 files changed

+110
-9
lines changed

5 files changed

+110
-9
lines changed

elasticsearch/_async/helpers.py

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
Tuple,
3232
TypeVar,
3333
Union,
34+
cast,
3435
)
3536

3637
from ..compat import safe_task
@@ -41,6 +42,7 @@
4142
_TYPE_BULK_ACTION_HEADER,
4243
_TYPE_BULK_ACTION_HEADER_AND_BODY,
4344
_TYPE_BULK_ACTION_HEADER_WITH_META_AND_BODY,
45+
_TYPE_BULK_ACTION_WITH_META,
4446
BulkMeta,
4547
_ActionChunker,
4648
_process_bulk_chunk_error,
@@ -195,7 +197,10 @@ async def azip(
195197

196198
async def async_streaming_bulk(
197199
client: AsyncElasticsearch,
198-
actions: Union[Iterable[_TYPE_BULK_ACTION], AsyncIterable[_TYPE_BULK_ACTION]],
200+
actions: Union[
201+
Iterable[_TYPE_BULK_ACTION_WITH_META],
202+
AsyncIterable[_TYPE_BULK_ACTION_WITH_META],
203+
],
199204
chunk_size: int = 500,
200205
max_chunk_bytes: int = 100 * 1024 * 1024,
201206
flush_after_seconds: Optional[float] = None,
@@ -257,9 +262,14 @@ async def async_streaming_bulk(
257262
if isinstance(retry_on_status, int):
258263
retry_on_status = (retry_on_status,)
259264

260-
async def map_actions() -> AsyncIterable[_TYPE_BULK_ACTION_HEADER_AND_BODY]:
265+
async def map_actions() -> (
266+
AsyncIterable[_TYPE_BULK_ACTION_HEADER_WITH_META_AND_BODY]
267+
):
261268
async for item in aiter(actions):
262-
yield expand_action_callback(item)
269+
if isinstance(item, BulkMeta):
270+
yield item, None
271+
else:
272+
yield expand_action_callback(cast(_TYPE_BULK_ACTION, item))
263273

264274
serializer = client.transport.serializers.get_serializer("application/json")
265275

elasticsearch/compat.py

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
from contextlib import asynccontextmanager, contextmanager
2323
from pathlib import Path
2424
from threading import Thread
25-
from typing import Tuple, Type, Union
25+
from typing import Any, AsyncIterator, Callable, Coroutine, Iterator, Tuple, Type, Union
2626

2727
string_types: Tuple[Type[str], Type[bytes]] = (str, bytes)
2828

@@ -80,15 +80,17 @@ def warn_stacklevel() -> int:
8080

8181

8282
@contextmanager
83-
def safe_thread(target, *args, **kwargs):
83+
def safe_thread(
84+
target: Callable[..., Any], *args: Any, **kwargs: Any
85+
) -> Iterator[Thread]:
8486
"""Run a thread within a context manager block.
8587
8688
The thread is automatically joined when the block ends. If the thread raised
8789
an exception, it is raised in the caller's context.
8890
"""
8991
captured_exception = None
9092

91-
def run():
93+
def run() -> None:
9294
try:
9395
target(*args, **kwargs)
9496
except BaseException as exc:
@@ -97,14 +99,14 @@ def run():
9799

98100
thread = Thread(target=run)
99101
thread.start()
100-
yield
102+
yield thread
101103
thread.join()
102104
if captured_exception:
103105
raise captured_exception
104106

105107

106108
@asynccontextmanager
107-
async def safe_task(coro):
109+
async def safe_task(coro: Coroutine[Any, Any, Any]) -> AsyncIterator[asyncio.Task[Any]]:
108110
"""Run a background task within a context manager block.
109111
110112
The task is awaited when the block ends.

elasticsearch/helpers/actions.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import time
2121
from enum import Enum
2222
from operator import methodcaller
23-
from threading import Thread
2423
from typing import (
2524
Any,
2625
Callable,

test_elasticsearch/test_async/test_server/test_helpers.py

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import asyncio
1919
import logging
20+
import time
2021
from datetime import datetime, timedelta, timezone
2122
from unittest.mock import MagicMock, call, patch
2223

@@ -123,6 +124,49 @@ def sync_gen():
123124
"_source"
124125
]
125126

127+
async def test_explicit_flushes(self, async_client):
128+
async def async_gen():
129+
for x in range(2):
130+
yield {"answer": x, "_id": x}
131+
yield helpers.BULK_FLUSH
132+
await asyncio.sleep(0.1)
133+
for x in range(2):
134+
yield {"answer": x + 2, "_id": x + 2}
135+
136+
timestamps = []
137+
async for ok, item in helpers.async_streaming_bulk(
138+
async_client, async_gen(), index="test-index", refresh=True
139+
):
140+
assert ok
141+
timestamps.append(time.time())
142+
143+
assert timestamps[1] - timestamps[0] < 0.1
144+
assert timestamps[2] - timestamps[1] > 0.1
145+
assert timestamps[3] - timestamps[2] < 0.1
146+
147+
async def test_timeout_flushes(self, async_client):
148+
async def async_gen():
149+
for x in range(2):
150+
yield {"answer": x, "_id": x}
151+
await asyncio.sleep(0.5)
152+
for x in range(2):
153+
yield {"answer": x + 2, "_id": x + 2}
154+
155+
timestamps = []
156+
async for ok, item in helpers.async_streaming_bulk(
157+
async_client,
158+
async_gen(),
159+
index="test-index",
160+
refresh=True,
161+
flush_after_seconds=0.05,
162+
):
163+
assert ok
164+
timestamps.append(time.time())
165+
166+
assert timestamps[1] - timestamps[0] < 0.1
167+
assert timestamps[2] - timestamps[1] > 0.1
168+
assert timestamps[3] - timestamps[2] < 0.1
169+
126170
async def test_all_errors_from_chunk_are_raised_on_failure(self, async_client):
127171
await async_client.indices.create(
128172
index="i",

test_elasticsearch/test_server/test_helpers.py

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
# under the License.
1717

1818
import json
19+
import time
1920
from datetime import datetime, timedelta
2021
from unittest.mock import call, patch
2122

@@ -75,6 +76,51 @@ def test_bulk_all_documents_get_inserted(sync_client):
7576
assert {"answer": 42} == sync_client.get(index="test-index", id=42)["_source"]
7677

7778

79+
def test_explicit_flushes(sync_client):
80+
def sync_gen():
81+
for x in range(2):
82+
yield {"answer": x, "_id": x}
83+
yield helpers.BULK_FLUSH
84+
time.sleep(0.1)
85+
for x in range(2):
86+
yield {"answer": x + 2, "_id": x + 2}
87+
88+
timestamps = []
89+
for ok, item in helpers.streaming_bulk(
90+
sync_client, sync_gen(), index="test-index", refresh=True
91+
):
92+
assert ok
93+
timestamps.append(time.time())
94+
95+
assert timestamps[1] - timestamps[0] < 0.1
96+
assert timestamps[2] - timestamps[1] > 0.1
97+
assert timestamps[3] - timestamps[2] < 0.1
98+
99+
100+
def test_timeout_flushes(sync_client):
101+
def sync_gen():
102+
for x in range(2):
103+
yield {"answer": x, "_id": x}
104+
time.sleep(0.5)
105+
for x in range(2):
106+
yield {"answer": x + 2, "_id": x + 2}
107+
108+
timestamps = []
109+
for ok, item in helpers.streaming_bulk(
110+
sync_client,
111+
sync_gen(),
112+
index="test-index",
113+
refresh=True,
114+
flush_after_seconds=0.05,
115+
):
116+
assert ok
117+
timestamps.append(time.time())
118+
119+
assert timestamps[1] - timestamps[0] < 0.1
120+
assert timestamps[2] - timestamps[1] > 0.1
121+
assert timestamps[3] - timestamps[2] < 0.1
122+
123+
78124
def test_bulk_all_errors_from_chunk_are_raised_on_failure(sync_client):
79125
sync_client.indices.create(
80126
index="i",

0 commit comments

Comments
 (0)