Skip to content

Commit 9c36534

Browse files
authored
viewer stress test (#26479)
1 parent bcd70e0 commit 9c36534

File tree

6 files changed

+156
-0
lines changed

6 files changed

+156
-0
lines changed
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
# -*- coding: utf-8 -*-
2+
import argparse
3+
from ydb.tests.stress.viewer.workload import Workload
4+
5+
if __name__ == '__main__':
6+
text = """\033[92mViewer workload\x1b[0m"""
7+
parser = argparse.ArgumentParser(description=text, formatter_class=argparse.RawDescriptionHelpFormatter)
8+
parser.add_argument('--mon_endpoint', default='http://localhost:8765', help="An endpoint to be used")
9+
parser.add_argument('--database', default=None, help='A database to connect')
10+
parser.add_argument('--duration', default=10 ** 9, type=lambda x: int(x), help='A duration of workload in seconds.')
11+
args = parser.parse_args()
12+
with Workload(args.mon_endpoint, args.database, args.duration) as workload:
13+
workload.start()
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
# -*- coding: utf-8 -*-
2+
import pytest
3+
import os
4+
import yatest
5+
from ydb.tests.library.stress.fixtures import StressFixture
6+
7+
8+
class TestYdbWorkload(StressFixture):
9+
@pytest.fixture(autouse=True, scope="function")
10+
def setup(self):
11+
yield from self.setup_cluster()
12+
13+
def test(self):
14+
if not self.mon_endpoint:
15+
self.mon_endpoint = "http://localhost:8765"
16+
cmd = [
17+
yatest.common.binary_path(os.getenv("YDB_TEST_PATH")),
18+
"--mon_endpoint", self.mon_endpoint,
19+
"--database", self.database,
20+
"--duration", "60",
21+
]
22+
yatest.common.execute(cmd, wait=True)
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
PY3TEST()
2+
INCLUDE(${ARCADIA_ROOT}/ydb/tests/ydbd_dep.inc)
3+
ENV(YDB_TEST_PATH="ydb/tests/stress/viewer/viewer")
4+
5+
TEST_SRCS(
6+
test_workload.py
7+
)
8+
9+
IF (SANITIZER_TYPE)
10+
REQUIREMENTS(ram:32)
11+
ENDIF()
12+
13+
SIZE(MEDIUM)
14+
15+
DEPENDS(
16+
ydb/tests/stress/viewer
17+
)
18+
19+
PEERDIR(
20+
ydb/tests/library
21+
ydb/tests/library/stress
22+
ydb/tests/stress/viewer/workload
23+
)
24+
25+
END()
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
# -*- coding: utf-8 -*-
2+
import time
3+
import threading
4+
import queue
5+
import traceback
6+
import requests
7+
from urllib.parse import urljoin
8+
from concurrent.futures import ThreadPoolExecutor
9+
10+
class Workload:
11+
def __init__(self, mon_endpoint, database, duration):
12+
self.mon_endpoint = mon_endpoint
13+
self.database = database
14+
self.round_size = 1000
15+
self.duration = duration
16+
self.delayed_events = queue.Queue()
17+
self.pool_semaphore = threading.BoundedSemaphore(value=100)
18+
self.worker_exception = []
19+
self.pool = None
20+
21+
def wrapper(self, f, *args, **kwargs):
22+
try:
23+
if len(self.worker_exception) == 0:
24+
result = f(*args, **kwargs)
25+
if result.status_code != 200:
26+
raise AssertionError(f"Bad response code {result.status_code}, text: {result.text}")
27+
except Exception:
28+
with self.lock:
29+
self.worker_exception.append(traceback.format_exc())
30+
finally:
31+
self.pool_semaphore.release()
32+
33+
def call_viewer_api_post(self, url, body=None, headers=None):
34+
if body is None:
35+
body = {}
36+
if self.database:
37+
body["database"] = self.database
38+
return requests.post(self.mon_endpoint + url, json=body, headers=headers)
39+
40+
def loop(self):
41+
started_at = time.time()
42+
43+
while time.time() - started_at < self.duration:
44+
yield self.call_viewer_api_post, "/viewer/capabilities"
45+
yield self.call_viewer_api_post, "/viewer/whoami"
46+
yield self.call_viewer_api_post, "/viewer/nodelist"
47+
yield self.call_viewer_api_post, "/viewer/nodes?fields_required=all"
48+
yield self.call_viewer_api_post, "/viewer/groups?fields_required=all"
49+
if self.database:
50+
yield self.call_viewer_api_post, "/viewer/describe", {"path": self.database}
51+
52+
def __enter__(self):
53+
return self
54+
55+
def __exit__(self, exc_type, exc_val, exc_tb):
56+
if self.pool:
57+
self.pool.shutdown(wait=True)
58+
self.pool = None
59+
60+
def start(self):
61+
self.pool = ThreadPoolExecutor(max_workers=100)
62+
for call in self.loop():
63+
if len(self.worker_exception) == 0:
64+
self.pool_semaphore.acquire()
65+
func, *args = call
66+
self.pool.submit(self.wrapper, func, *args)
67+
else:
68+
raise AssertionError(f"Worker exceptions {self.worker_exception}")
69+
self.pool.shutdown(wait=True)
70+
self.pool = None
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
PY3_LIBRARY()
2+
3+
PY_SRCS(
4+
__init__.py
5+
)
6+
7+
PEERDIR(
8+
contrib/python/requests
9+
)
10+
11+
END()

ydb/tests/stress/viewer/ya.make

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
PY3_PROGRAM(viewer)
2+
3+
PY_SRCS(
4+
__main__.py
5+
)
6+
7+
PEERDIR(
8+
ydb/tests/stress/viewer/workload
9+
)
10+
11+
END()
12+
13+
RECURSE_FOR_TESTS(
14+
tests
15+
)

0 commit comments

Comments
 (0)