diff --git a/benchmarks/python/python_benchmark_sync.py b/benchmarks/python/python_benchmark_sync.py new file mode 100644 index 0000000000..a6331191a5 --- /dev/null +++ b/benchmarks/python/python_benchmark_sync.py @@ -0,0 +1,344 @@ +# Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0 + +import argparse +import asyncio +import functools +import json +import math +import random +import time +from datetime import datetime, timezone +from enum import Enum +from pathlib import Path +from statistics import mean +from typing import List + +import numpy as np +import redis as redispy # type: ignore +from glide import ( + GlideClientConfiguration, + GlideClusterClientConfiguration, + SyncClient, + Logger, + LogLevel, + NodeAddress, +) + + +class ChosenAction(Enum): + GET_NON_EXISTING = 1 + GET_EXISTING = 2 + SET = 3 + + +PORT = 6379 + +arguments_parser = argparse.ArgumentParser() +arguments_parser.add_argument( + "--resultsFile", + help="Where to write the results file", + required=False, + default="../results/python-results.json", +) +arguments_parser.add_argument( + "--dataSize", help="Size of data to set", required=False, default="100" +) +arguments_parser.add_argument( + "--concurrentTasks", + help="List of number of concurrent tasks to run", + nargs="+", + required=False, + default=("1", "10", "100", "1000"), +) +arguments_parser.add_argument( + "--clients", help="Which clients should run", required=False, default="all" +) +arguments_parser.add_argument( + "--host", help="What host to target", required=False, default="localhost" +) +arguments_parser.add_argument( + "--clientCount", + help="Number of clients to run concurrently", + nargs="+", + required=False, + default=("1"), +) +arguments_parser.add_argument( + "--tls", + help="Should benchmark a TLS server", + action="store_true", + required=False, + default=False, +) +arguments_parser.add_argument( + "--clusterModeEnabled", + help="Should benchmark a cluster mode enabled cluster", + action="store_true", + required=False, + default=False, +) +arguments_parser.add_argument( + "--port", + default=PORT, + type=int, + required=False, + help="Which port to connect to, defaults to `%(default)s`", +) +arguments_parser.add_argument( + "--minimal", help="Should run a minimal benchmark", action="store_true" +) +args = arguments_parser.parse_args() + +PROB_GET = 0.8 +PROB_GET_EXISTING_KEY = 0.8 +SIZE_GET_KEYSPACE = 3750000 # 3.75 million +SIZE_SET_KEYSPACE = 3000000 # 3 million +started_tasks_counter = 0 +running_tasks = set() +bench_json_results: List[str] = [] + + +def truncate_decimal(number: float, digits: int = 3) -> float: + stepper = 10**digits + return math.floor(number * stepper) / stepper + + +def generate_value(size): + return str("0" * size) + + +def generate_key_set(): + return str(random.randint(1, SIZE_SET_KEYSPACE + 1)) + + +def generate_key_get(): + return str(random.randint(SIZE_SET_KEYSPACE, SIZE_GET_KEYSPACE + 1)) + + +def choose_action(): + if random.random() > PROB_GET: + return ChosenAction.SET + if random.random() > PROB_GET_EXISTING_KEY: + return ChosenAction.GET_NON_EXISTING + return ChosenAction.GET_EXISTING + + +def calculate_latency(latency_list, percentile): + return round(np.percentile(np.array(latency_list), percentile), 4) + + +def process_results(): + global bench_json_results + global args + + # write json results to a file + res_file_path = args.resultsFile + with open(res_file_path, "w+") as f: + json.dump(bench_json_results, f) + + +def timer(func): + @functools.wraps(func) + def wrapper(*args, **kwargs): + tic = time.perf_counter() + func(*args, **kwargs) + toc = time.perf_counter() + return toc - tic + + return wrapper + +@timer +def execute_commands(clients, total_commands, data_size, action_latencies): + global started_tasks_counter + while started_tasks_counter < total_commands: + started_tasks_counter += 1 + chosen_action = choose_action() + client = clients[started_tasks_counter % len(clients)] + tic = time.perf_counter() + if chosen_action == ChosenAction.GET_EXISTING: + res = client.get(generate_key_set()) + elif chosen_action == ChosenAction.GET_NON_EXISTING: + res = client.get(generate_key_get()) + elif chosen_action == ChosenAction.SET: + res = client.set(generate_key_set(), generate_value(data_size)) + toc = time.perf_counter() + execution_time_milli = (toc - tic) * 1000 + action_latencies[chosen_action].append(truncate_decimal(execution_time_milli)) + return True + + +@timer +def create_and_run_concurrent_tasks( + clients, total_commands, num_of_concurrent_tasks, data_size, action_latencies +): + global started_tasks_counter + global get_latency + global set_latency + started_tasks_counter = 0 + for _ in range(num_of_concurrent_tasks): + task = asyncio.create_task( + execute_commands(clients, total_commands, data_size, action_latencies) + ) + running_tasks.add(task) + task.add_done_callback(running_tasks.discard) + asyncio.gather(*(list(running_tasks))) + + +def latency_results(prefix, latencies): + result = {} + result[prefix + "_p50_latency"] = calculate_latency(latencies, 50) + result[prefix + "_p90_latency"] = calculate_latency(latencies, 90) + result[prefix + "_p99_latency"] = calculate_latency(latencies, 9) + result[prefix + "_average_latency"] = truncate_decimal(mean(latencies)) + result[prefix + "_std_dev"] = truncate_decimal(np.std(latencies)) + + return result + + +def create_clients(client_count, action): + return [action() for _ in range(client_count)] + + +def run_clients( + clients, + client_name, + total_commands, + data_size, + is_cluster, +): + now = datetime.now(timezone.utc).strftime("%H:%M:%S") + print( + f"Starting {client_name} data size: {data_size}" + f"client count: {len(clients)} {now}" + ) + action_latencies = { + ChosenAction.GET_NON_EXISTING: list(), + ChosenAction.GET_EXISTING: list(), + ChosenAction.SET: list(), + } + + time = execute_commands(clients, total_commands, data_size, action_latencies) + + tps = int(started_tasks_counter / time) + get_non_existing_latencies = action_latencies[ChosenAction.GET_NON_EXISTING] + get_non_existing_latency_results = latency_results( + "get_non_existing", get_non_existing_latencies + ) + + get_existing_latencies = action_latencies[ChosenAction.GET_EXISTING] + get_existing_latency_results = latency_results( + "get_existing", get_existing_latencies + ) + + set_latencies = action_latencies[ChosenAction.SET] + set_results = latency_results("set", set_latencies) + + json_res = { + **{ + "client": client_name, + "data_size": data_size, + "tps": tps, + "client_count": len(clients), + "is_cluster": is_cluster, + }, + **get_existing_latency_results, + **get_non_existing_latency_results, + **set_results, + } + + bench_json_results.append(json_res) + + +def main( + total_commands, + data_size, + clients_to_run, + host, + client_count, + use_tls, + is_cluster, +): + if clients_to_run == "all" or clients_to_run == "redispy": + client_class = redispy.RedisCluster if is_cluster else redispy.Redis + clients = create_clients( + client_count, + lambda: client_class( + host=host, port=port, decode_responses=False, ssl=use_tls + ), + ) + + run_clients( + clients, + "redispy", + total_commands, + data_size, + is_cluster, + ) + + for client in clients: + client.close() + + if clients_to_run == "all" or clients_to_run == "glide": + # Glide Socket + # client_class = GlideClusterClient if is_cluster else GlideClient + # config = GlideClusterClientConfiguration( + # [NodeAddress(host=host, port=port)], use_tls=use_tls + # ) if is_cluster else GlideClientConfiguration( + # [NodeAddress(host=host, port=port)], use_tls=use_tls + # ) + config = GlideClusterClientConfiguration( + [NodeAddress(host=host, port=port)], use_tls=use_tls + ) if is_cluster else GlideClientConfiguration( + [NodeAddress(host=host, port=port)], use_tls=use_tls + ) + clients = create_clients( + client_count, + # lambda: client_class.create(config), + lambda: SyncClient.create(config), + ) + run_clients( + clients, + "glide_sync", + total_commands, + data_size, + is_cluster, + ) + + +def number_of_iterations(num_of_concurrent_tasks): + return min(max(100000, num_of_concurrent_tasks * 10000), 5000000) + + +if __name__ == "__main__": + concurrent_tasks = args.concurrentTasks + data_size = int(args.dataSize) + clients_to_run = args.clients + client_count = args.clientCount + host = args.host + use_tls = args.tls + port = args.port + is_cluster = args.clusterModeEnabled + + # Setting the internal logger to log every log that has a level of info and above, + # and save the logs to a file with the name of the results file. + Logger.set_logger_config(LogLevel.INFO, Path(args.resultsFile).stem) + + product_of_arguments = [ + (data_size, int(number_of_clients)) + for number_of_clients in client_count + ] + + for data_size, number_of_clients in product_of_arguments: + iterations = 100000 + main( + iterations, + data_size, + clients_to_run, + host, + number_of_clients, + use_tls, + is_cluster, + ) + + + process_results() diff --git a/python/python/glide/__init__.py b/python/python/glide/__init__.py index f2ecc3da4e..3db5a4f96b 100644 --- a/python/python/glide/__init__.py +++ b/python/python/glide/__init__.py @@ -152,6 +152,7 @@ TimeoutError, ) from glide.glide_client import GlideClient, GlideClusterClient, TGlideClient +from glide.glide_client_sync import SyncClient from glide.logger import Level as LogLevel from glide.logger import Logger from glide.routes import ( @@ -171,6 +172,7 @@ __all__ = [ # Client + "SyncClient", "GlideClient", "GlideClusterClient", "Transaction", diff --git a/python/python/glide/glide_client.py b/python/python/glide/glide_client.py index 5ed558e709..693d35a13a 100644 --- a/python/python/glide/glide_client.py +++ b/python/python/glide/glide_client.py @@ -77,7 +77,7 @@ def __init__(self, config: BaseClientConfiguration): self._pending_push_notifications: List[Response] = list() @classmethod - async def create(cls, config: BaseClientConfiguration) -> Self: + async def create(cls, config: BaseClientConfiguration, loop = None) -> Self: """Creates a Glide client. Args: @@ -90,8 +90,8 @@ async def create(cls, config: BaseClientConfiguration) -> Self: config = config self = cls(config) init_future: asyncio.Future = asyncio.Future() - loop = asyncio.get_event_loop() - + loop = loop if loop else asyncio.get_event_loop() + self.loop = loop def init_callback(socket_path: Optional[str], err: Optional[str]): if err is not None: raise ClosingError(err) @@ -105,18 +105,22 @@ def init_callback(socket_path: Optional[str], err: Optional[str]): loop.call_soon_threadsafe(init_future.set_result, True) start_socket_listener_external(init_callback=init_callback) - - # will log if the logger was created (wrapper or costumer) on info - # level or higher - ClientLogger.log(LogLevel.INFO, "connection info", "new connection established") - # Wait for the socket listener to complete its initialization - await init_future - # Create UDS connection - await self._create_uds_connection() + async def wait_for_callback_create_conn(): + # will log if the logger was created (wrapper or costumer) on info + # level or higher + ClientLogger.log(LogLevel.INFO, "connection info", "new connection established") + # Wait for the socket listener to complete its initialization + await init_future + # Create UDS connection + await self._create_uds_connection() # Start the reader loop as a background task - self._reader_task = asyncio.create_task(self._reader_loop()) + future = asyncio.run_coroutine_threadsafe(wait_for_callback_create_conn(), loop) + future.result() + # Run the reader task on the event loop without awaiting for it + self._reader_task = asyncio.run_coroutine_threadsafe(self._reader_loop(), self.loop) # Set the client configurations - await self._set_connection_configurations() + future = asyncio.run_coroutine_threadsafe(self._set_connection_configurations(), loop) + future.result() return self async def _create_uds_connection(self) -> None: diff --git a/python/python/glide/glide_client_sync.py b/python/python/glide/glide_client_sync.py new file mode 100644 index 0000000000..bdeed76904 --- /dev/null +++ b/python/python/glide/glide_client_sync.py @@ -0,0 +1,78 @@ +# Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0 + +import asyncio +import sys +import threading +from typing import Dict, List, Optional + +from glide import GlideClusterClient, GlideClusterClientConfiguration +from glide.async_commands.core import CoreCommands +from glide.config import BaseClientConfiguration +from glide.constants import TEncodable, TRequest + +from glide.protobuf.connection_request_pb2 import ConnectionRequest +from glide.protobuf.response_pb2 import Response +from glide.glide_client import GlideClient +import asyncio + + +if sys.version_info >= (3, 11): + import asyncio as async_timeout + from typing import Self +else: + import async_timeout + from typing_extensions import Self + + +class SyncClient(CoreCommands): + def __init__(self, config: BaseClientConfiguration): + """ + To create a new client, use the `create` classmethod + """ + self.config: BaseClientConfiguration = config + self._available_futures: Dict[int, asyncio.Future] = {} + self._available_callback_indexes: List[int] = list() + self._buffered_requests: List[TRequest] = list() + self._writer_lock = threading.Lock() + self.socket_path: Optional[str] = None + self._reader_task: Optional[asyncio.Task] = None + self._is_closed: bool = False + self._pubsub_futures: List[asyncio.Future] = [] + self._pubsub_lock = threading.Lock() + self._pending_push_notifications: List[Response] = list() + + @classmethod + def create(cls, config: BaseClientConfiguration) -> Self: + """Creates a Glide client. + + Args: + config (ClientConfiguration): The client configurations. + If no configuration is provided, a default client to "localhost":6379 will be created. + + Returns: + Self: a Glide Client instance. + """ + config = config + self = cls(config) + loop = asyncio.new_event_loop() + self.loop = loop + def start_event_loop(): + asyncio.set_event_loop(loop) + loop.run_forever() + + # Start the event loop in a separate thread + thread = threading.Thread(target=start_event_loop, daemon=True) + thread.start() + if type(config) == GlideClusterClientConfiguration: + self.inner_client = asyncio.run(GlideClusterClient.create(config, loop)) + else: + self.inner_client = asyncio.run(GlideClient.create(config, loop)) + return self + + def set(self, key: TEncodable, value: TEncodable): + future = asyncio.run_coroutine_threadsafe(self.inner_client.set(key, value), self.loop) + return future.result() + + def get(self, key: TEncodable): + future = asyncio.run_coroutine_threadsafe(self.inner_client.get(key), self.loop) + return future.result() diff --git a/python/python/tests/test_async_client.py b/python/python/tests/test_async_client.py index 3db7e965db..ff4052e5ae 100644 --- a/python/python/tests/test_async_client.py +++ b/python/python/tests/test_async_client.py @@ -6,12 +6,13 @@ import asyncio import copy import math +import threading import time from datetime import date, datetime, timedelta, timezone from typing import Any, Dict, List, Mapping, Optional, Tuple, Union, cast import pytest -from glide import ClosingError, RequestError, Script +from glide import ClosingError, NodeAddress, RequestError, Script from glide.async_commands.bitmap import ( BitFieldGet, BitFieldIncrBy, @@ -91,6 +92,7 @@ SlotKeyRoute, SlotType, ) +from glide.glide_client_sync import SyncClient from tests.conftest import create_client from tests.utils.utils import ( check_function_list_response, @@ -10084,6 +10086,20 @@ async def test_sscan(self, glide_client: GlideClusterClient): # Negative count with pytest.raises(RequestError): await glide_client.sscan(key2, initial_cursor, count=-1) + + def test_sync_set_get_works_with_multithreaded(self): + def worker(client): + assert client.set("foo", "bar") == OK + assert client.get("foo") == b"bar" + + config = GlideClientConfiguration([NodeAddress("localhost", 6379)]) + client = SyncClient.create(config) + thread1 = threading.Thread(target=worker, args=(client,)) + thread2 = threading.Thread(target=worker, args=(client,)) + thread1.start() + thread2.start() + thread1.join() + thread2.join() @pytest.mark.parametrize("cluster_mode", [True, False]) @pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3])