Skip to content

Commit 90abbae

Browse files
committed
Python POC: added async API using CFFI
1 parent b6668a9 commit 90abbae

File tree

9 files changed

+999
-80
lines changed

9 files changed

+999
-80
lines changed

benchmarks/python/all_async_100

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
[{"client": "redispy", "loop": "asyncio", "num_of_tasks": 10, "data_size": 100, "tps": 12261, "client_count": 1, "is_cluster": true, "get_existing_p50_latency": 0.757, "get_existing_p90_latency": 0.851, "get_existing_p99_latency": 0.69, "get_existing_average_latency": 0.798, "get_existing_std_dev": 3.012, "get_non_existing_p50_latency": 0.742, "get_non_existing_p90_latency": 0.837, "get_non_existing_p99_latency": 0.679, "get_non_existing_average_latency": 0.757, "get_non_existing_std_dev": 0.145, "set_p50_latency": 0.755, "set_p90_latency": 0.848, "set_p99_latency": 0.687, "set_average_latency": 0.813, "set_std_dev": 4.049}, {"client": "glide_socket", "loop": "asyncio", "num_of_tasks": 10, "data_size": 100, "tps": 14454, "client_count": 1, "is_cluster": true, "get_existing_p50_latency": 0.601, "get_existing_p90_latency": 0.92, "get_existing_p99_latency": 0.538, "get_existing_average_latency": 0.689, "get_existing_std_dev": 0.27, "get_non_existing_p50_latency": 0.596, "get_non_existing_p90_latency": 0.916, "get_non_existing_p99_latency": 0.534, "get_non_existing_average_latency": 0.686, "get_non_existing_std_dev": 0.298, "set_p50_latency": 0.598, "set_p90_latency": 0.916, "set_p99_latency": 0.537, "set_average_latency": 0.686, "set_std_dev": 0.295}, {"client": "glide_ffi", "loop": "asyncio", "num_of_tasks": 10, "data_size": 100, "tps": 13885, "client_count": 1, "is_cluster": true, "get_existing_p50_latency": 0.704, "get_existing_p90_latency": 0.89, "get_existing_p99_latency": 0.512, "get_existing_average_latency": 0.721, "get_existing_std_dev": 0.427, "get_non_existing_p50_latency": 0.69, "get_non_existing_p90_latency": 0.872, "get_non_existing_p99_latency": 0.494, "get_non_existing_average_latency": 0.706, "get_non_existing_std_dev": 0.532, "set_p50_latency": 0.697, "set_p90_latency": 0.879, "set_p99_latency": 0.5, "set_average_latency": 0.711, "set_std_dev": 0.378}, {"client": "redispy", "loop": "asyncio", "num_of_tasks": 100, "data_size": 100, "tps": 8658, "client_count": 1, "is_cluster": true, "get_existing_p50_latency": 6.738, "get_existing_p90_latency": 6.858, "get_existing_p99_latency": 6.635, "get_existing_average_latency": 9.442, "get_existing_std_dev": 93.62, "get_non_existing_p50_latency": 6.729, "get_non_existing_p90_latency": 6.846, "get_non_existing_p99_latency": 6.626, "get_non_existing_average_latency": 7.898, "get_non_existing_std_dev": 60.307, "set_p50_latency": 6.738, "set_p90_latency": 6.856, "set_p99_latency": 6.634, "set_average_latency": 10.012, "set_std_dev": 101.823}, {"client": "glide_socket", "loop": "asyncio", "num_of_tasks": 100, "data_size": 100, "tps": 24707, "client_count": 1, "is_cluster": true, "get_existing_p50_latency": 3.972, "get_existing_p90_latency": 4.138, "get_existing_p99_latency": 3.923, "get_existing_average_latency": 4.042, "get_existing_std_dev": 0.513, "get_non_existing_p50_latency": 3.971, "get_non_existing_p90_latency": 4.149, "get_non_existing_p99_latency": 3.922, "get_non_existing_average_latency": 4.037, "get_non_existing_std_dev": 0.485, "set_p50_latency": 3.974, "set_p90_latency": 4.129, "set_p99_latency": 3.925, "set_average_latency": 4.042, "set_std_dev": 0.509}, {"client": "glide_ffi", "loop": "asyncio", "num_of_tasks": 100, "data_size": 100, "tps": 13857, "client_count": 1, "is_cluster": true, "get_existing_p50_latency": 7.273, "get_existing_p90_latency": 7.79, "get_existing_p99_latency": 6.2921, "get_existing_average_latency": 7.215, "get_existing_std_dev": 2.779, "get_non_existing_p50_latency": 7.269, "get_non_existing_p90_latency": 7.791, "get_non_existing_p99_latency": 6.2847, "get_non_existing_average_latency": 7.217, "get_non_existing_std_dev": 2.738, "set_p50_latency": 7.275, "set_p90_latency": 7.774, "set_p99_latency": 6.297, "set_average_latency": 7.167, "set_std_dev": 2.257}, {"client": "redispy", "loop": "asyncio", "num_of_tasks": 1000, "data_size": 100, "tps": 1730, "client_count": 1, "is_cluster": true, "get_existing_p50_latency": 91.572, "get_existing_p90_latency": 92.46, "get_existing_p99_latency": 90.881, "get_existing_average_latency": 349.823, "get_existing_std_dev": 2868.364, "get_non_existing_p50_latency": 91.556, "get_non_existing_p90_latency": 92.4182, "get_non_existing_p99_latency": 90.865, "get_non_existing_average_latency": 336.043, "get_non_existing_std_dev": 2829.902, "set_p50_latency": 91.577, "set_p90_latency": 92.475, "set_p99_latency": 90.89, "set_average_latency": 348.878, "set_std_dev": 2900.974}, {"client": "glide_socket", "loop": "asyncio", "num_of_tasks": 1000, "data_size": 100, "tps": 17042, "client_count": 1, "is_cluster": true, "get_existing_p50_latency": 60.046, "get_existing_p90_latency": 61.8933, "get_existing_p99_latency": 41.5114, "get_existing_average_latency": 58.463, "get_existing_std_dev": 10.693, "get_non_existing_p50_latency": 60.04, "get_non_existing_p90_latency": 61.867, "get_non_existing_p99_latency": 41.516, "get_non_existing_average_latency": 58.406, "get_non_existing_std_dev": 10.671, "set_p50_latency": 60.093, "set_p90_latency": 61.798, "set_p99_latency": 41.5268, "set_average_latency": 58.358, "set_std_dev": 10.445}, {"client": "glide_ffi", "loop": "asyncio", "num_of_tasks": 1000, "data_size": 100, "tps": 12173, "client_count": 1, "is_cluster": true, "get_existing_p50_latency": 81.869, "get_existing_p90_latency": 85.655, "get_existing_p99_latency": 74.679, "get_existing_average_latency": 81.529, "get_existing_std_dev": 7.444, "get_non_existing_p50_latency": 81.87, "get_non_existing_p90_latency": 85.671, "get_non_existing_p99_latency": 74.5847, "get_non_existing_average_latency": 81.507, "get_non_existing_std_dev": 7.602, "set_p50_latency": 81.885, "set_p90_latency": 85.669, "set_p99_latency": 74.5914, "set_average_latency": 81.529, "set_std_dev": 7.391}]

benchmarks/python/python_benchmark.py

Lines changed: 35 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
from glide import (
1919
GlideClientConfiguration,
2020
GlideClusterClientConfiguration,
21+
GlideAsync,
2122
GlideClient,
2223
GlideClusterClient,
2324
Logger,
@@ -197,7 +198,12 @@ def latency_results(prefix, latencies):
197198

198199

199200
async def create_clients(client_count, action):
200-
return [await action() for _ in range(client_count)]
201+
try:
202+
return [await action() for _ in range(client_count)]
203+
except Exception as e:
204+
print(e)
205+
return [action() for _ in range(client_count)]
206+
201207

202208

203209
async def run_clients(
@@ -270,7 +276,7 @@ async def main(
270276
clients = await create_clients(
271277
client_count,
272278
lambda: client_class(
273-
host=host, port=port, decode_responses=True, ssl=use_tls
279+
host=host, port=port, decode_responses=False, ssl=use_tls
274280
),
275281
)
276282

@@ -285,7 +291,10 @@ async def main(
285291
)
286292

287293
for client in clients:
288-
await client.aclose()
294+
try:
295+
await client.aclose()
296+
except Exception:
297+
await client.close()
289298

290299
if clients_to_run == "all" or clients_to_run == "glide":
291300
# Glide Socket
@@ -301,7 +310,28 @@ async def main(
301310
)
302311
await run_clients(
303312
clients,
304-
"glide",
313+
"glide_socket",
314+
event_loop_name,
315+
total_commands,
316+
num_of_concurrent_tasks,
317+
data_size,
318+
is_cluster,
319+
)
320+
if clients_to_run == "all" or clients_to_run == "glide":
321+
# Glide Socket
322+
client_class = GlideAsync
323+
config = GlideClusterClientConfiguration(
324+
[NodeAddress(host=host, port=port)], use_tls=use_tls
325+
) if is_cluster else GlideClientConfiguration(
326+
[NodeAddress(host=host, port=port)], use_tls=use_tls
327+
)
328+
clients = await create_clients(
329+
client_count,
330+
lambda: client_class(config),
331+
)
332+
await run_clients(
333+
clients,
334+
"glide_ffi",
305335
event_loop_name,
306336
total_commands,
307337
num_of_concurrent_tasks,
@@ -311,6 +341,7 @@ async def main(
311341

312342

313343
def number_of_iterations(num_of_concurrent_tasks):
344+
return 100000
314345
return min(max(100000, num_of_concurrent_tasks * 10000), 5000000)
315346

316347

benchmarks/python/python_sync_benchmark.py

Lines changed: 0 additions & 27 deletions
This file was deleted.

go/src/lib.rs

Lines changed: 141 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,19 @@
22

33
#![deny(unsafe_op_in_unsafe_fn)]
44
use glide_core::client::Client as GlideClient;
5+
use glide_core::command_request::SimpleRoutes;
6+
use glide_core::command_request::{Routes, SlotTypes};
57
use glide_core::connection_request;
68
use glide_core::errors;
79
use glide_core::errors::RequestErrorType;
810
use glide_core::request_type::RequestType;
911
use glide_core::ConnectionRequest;
10-
use glide_core::client::{NodeAddress, TlsMode};
1112
use protobuf::Message;
12-
use redis::{RedisResult, Value};
13+
use redis::cluster_routing::{
14+
MultipleNodeRoutingInfo, Route, RoutingInfo, SingleNodeRoutingInfo, SlotAddr,
15+
};
16+
use redis::cluster_routing::{ResponsePolicy, Routable};
17+
use redis::{Cmd, RedisResult, Value};
1318
use std::slice::from_raw_parts;
1419
use std::{
1520
ffi::{c_void, CString},
@@ -113,27 +118,34 @@ pub type FailureCallback = unsafe extern "C" fn(
113118
error_type: RequestErrorType,
114119
) -> ();
115120

116-
/// The connection response.
117-
///
118-
/// It contains either a connection or an error. It is represented as a struct instead of a union for ease of use in the wrapper language.
119-
///
120-
/// The struct is freed by the external caller by using `free_connection_response` to avoid memory leaks.
121-
#[repr(C)]
122-
pub struct ConnectionResponse {
123-
conn_ptr: *const c_void,
124-
connection_error_message: *const c_char,
125-
}
121+
/// The connection response.
122+
///
123+
/// It contains either a connection or an error. It is represented as a struct instead of a union for ease of use in the wrapper language.
124+
///
125+
/// The struct is freed by the external caller by using `free_connection_response` to avoid memory leaks.
126+
#[repr(C)]
127+
pub struct ConnectionResponse {
128+
conn_ptr: *const c_void,
129+
connection_error_message: *const c_char,
130+
}
126131

127132
/// A `GlideClient` adapter.
128133
// TODO: Remove allow(dead_code) once connection logic is implemented
129134
#[allow(dead_code)]
130135
pub struct ClientAdapter {
131136
client: GlideClient,
137+
success_callback: SuccessCallback,
138+
failure_callback: FailureCallback,
132139
runtime: Runtime,
133140
}
134141

135142
fn create_client_internal(
143+
connection_request_bytes: &[u8],
144+
success_callback: SuccessCallback,
145+
failure_callback: FailureCallback,
136146
) -> Result<ClientAdapter, String> {
147+
let request = connection_request::ConnectionRequest::parse_from_bytes(connection_request_bytes)
148+
.map_err(|err| err.to_string())?;
137149
// TODO: optimize this using multiple threads instead of a single worker thread (e.g. by pinning each go thread to a rust thread)
138150
let runtime = Builder::new_multi_thread()
139151
.enable_all()
@@ -144,17 +156,13 @@ fn create_client_internal(
144156
let redis_error = err.into();
145157
errors::error_message(&redis_error)
146158
})?;
147-
let addresses = vec![NodeAddress { host: "localhost".to_string(), port: 6379}];
148-
let use_tls = true;
149159
let client = runtime
150-
.block_on(GlideClient::new(ConnectionRequest {addresses, tls_mode: if use_tls {
151-
Some(TlsMode::SecureTls)
152-
} else {
153-
Some(TlsMode::NoTls)
154-
}, cluster_mode_enabled: true, ..Default::default()}, None))
160+
.block_on(GlideClient::new(ConnectionRequest::from(request), None))
155161
.map_err(|err| err.to_string())?;
156162
Ok(ClientAdapter {
157163
client,
164+
success_callback,
165+
failure_callback,
158166
runtime,
159167
})
160168
}
@@ -177,10 +185,15 @@ fn create_client_internal(
177185
/// * Both the `success_callback` and `failure_callback` function pointers need to live while the client is open/active. The caller is responsible for freeing both callbacks.
178186
// TODO: Consider making this async
179187
#[no_mangle]
180-
pub unsafe extern "C" fn create_client() -> *const ConnectionResponse {
181-
// let request_bytes =
182-
// unsafe { std::slice::from_raw_parts(connection_request_bytes, connection_request_len) };
183-
let response = match create_client_internal() {
188+
pub unsafe extern "C" fn create_client(
189+
connection_request_bytes: *const u8,
190+
connection_request_len: usize,
191+
success_callback: SuccessCallback,
192+
failure_callback: FailureCallback,
193+
) -> *const ConnectionResponse {
194+
let request_bytes =
195+
unsafe { std::slice::from_raw_parts(connection_request_bytes, connection_request_len) };
196+
let response = match create_client_internal(request_bytes, success_callback, failure_callback) {
184197
Err(err) => ConnectionResponse {
185198
conn_ptr: std::ptr::null(),
186199
connection_error_message: CString::into_raw(
@@ -508,46 +521,131 @@ pub unsafe extern "C" fn command(
508521
arg_count: c_ulong,
509522
args: *const usize,
510523
args_len: *const c_ulong,
511-
) -> *mut CommandResponse {
524+
route_bytes: *const u8,
525+
route_bytes_len: usize,
526+
) {
512527
let client_adapter =
513528
unsafe { Box::leak(Box::from_raw(client_adapter_ptr as *mut ClientAdapter)) };
529+
// The safety of this needs to be ensured by the calling code. Cannot dispose of the pointer before
530+
// all operations have completed.
531+
let ptr_address = client_adapter_ptr as usize;
514532

515-
// Ensure the arguments are converted properly
516533
let arg_vec =
517534
unsafe { convert_double_pointer_to_vec(args as *const *const c_void, arg_count, args_len) };
518535

519536
let mut client_clone = client_adapter.client.clone();
520537

521538
// Create the command outside of the task to ensure that the command arguments passed
522-
// from the caller are still valid
539+
// from "go" are still valid
523540
let mut cmd = command_type
524541
.get_command()
525542
.expect("Couldn't fetch command type");
526-
527543
for command_arg in arg_vec {
528544
cmd.arg(command_arg);
529545
}
530546

531-
// Block on the async task to execute the command
532-
let result = client_adapter.runtime.block_on(async move {
533-
client_clone.send_command(&cmd, None).await
534-
});
547+
let r_bytes = unsafe { std::slice::from_raw_parts(route_bytes, route_bytes_len) };
548+
549+
let route = Routes::parse_from_bytes(r_bytes).unwrap();
550+
551+
client_adapter.runtime.spawn(async move {
552+
let result = client_clone
553+
.send_command(&cmd, get_route(route, Some(&cmd)))
554+
.await;
555+
let client_adapter = unsafe { Box::leak(Box::from_raw(ptr_address as *mut ClientAdapter)) };
556+
let value = match result {
557+
Ok(value) => value,
558+
Err(err) => {
559+
let message = errors::error_message(&err);
560+
let error_type = errors::error_type(&err);
561+
562+
let c_err_str = CString::into_raw(
563+
CString::new(message).expect("Couldn't convert error message to CString"),
564+
);
565+
unsafe { (client_adapter.failure_callback)(channel, c_err_str, error_type) };
566+
return;
567+
}
568+
};
569+
570+
let result: RedisResult<CommandResponse> = valkey_value_to_command_response(value);
535571

536-
match result {
537-
Ok(value) => {
538-
// Convert the value to a CommandResponse
539-
match valkey_value_to_command_response(value) {
540-
Ok(command_response) => Box::into_raw(Box::new(command_response)), // Return a pointer to the CommandResponse
572+
unsafe {
573+
match result {
574+
Ok(message) => {
575+
(client_adapter.success_callback)(channel, Box::into_raw(Box::new(message)))
576+
}
541577
Err(err) => {
542-
eprintln!("Error converting value to CommandResponse: {:?}", err);
543-
std::ptr::null_mut()
578+
let message = errors::error_message(&err);
579+
let error_type = errors::error_type(&err);
580+
581+
let c_err_str = CString::into_raw(
582+
CString::new(message).expect("Couldn't convert error message to CString"),
583+
);
584+
(client_adapter.failure_callback)(channel, c_err_str, error_type);
544585
}
545-
}
586+
};
546587
}
547-
Err(err) => {
548-
// Handle the error case
549-
eprintln!("Error executing command: {:?}", err);
550-
std::ptr::null_mut()
588+
});
589+
}
590+
591+
fn get_route(route: Routes, cmd: Option<&Cmd>) -> Option<RoutingInfo> {
592+
use glide_core::command_request::routes::Value;
593+
let route = route.value?;
594+
let get_response_policy = |cmd: Option<&Cmd>| {
595+
cmd.and_then(|cmd| {
596+
cmd.command()
597+
.and_then(|cmd| ResponsePolicy::for_command(&cmd))
598+
})
599+
};
600+
match route {
601+
Value::SimpleRoutes(simple_route) => {
602+
let simple_route = simple_route.enum_value().unwrap();
603+
match simple_route {
604+
SimpleRoutes::AllNodes => Some(RoutingInfo::MultiNode((
605+
MultipleNodeRoutingInfo::AllNodes,
606+
get_response_policy(cmd),
607+
))),
608+
SimpleRoutes::AllPrimaries => Some(RoutingInfo::MultiNode((
609+
MultipleNodeRoutingInfo::AllMasters,
610+
get_response_policy(cmd),
611+
))),
612+
SimpleRoutes::Random => {
613+
Some(RoutingInfo::SingleNode(SingleNodeRoutingInfo::Random))
614+
}
615+
}
551616
}
617+
Value::SlotKeyRoute(slot_key_route) => Some(RoutingInfo::SingleNode(
618+
SingleNodeRoutingInfo::SpecificNode(Route::new(
619+
redis::cluster_topology::get_slot(slot_key_route.slot_key.as_bytes()),
620+
get_slot_addr(&slot_key_route.slot_type),
621+
)),
622+
)),
623+
Value::SlotIdRoute(slot_id_route) => Some(RoutingInfo::SingleNode(
624+
SingleNodeRoutingInfo::SpecificNode(Route::new(
625+
slot_id_route.slot_id as u16,
626+
get_slot_addr(&slot_id_route.slot_type),
627+
)),
628+
)),
629+
Value::ByAddressRoute(by_address_route) => match u16::try_from(by_address_route.port) {
630+
Ok(port) => Some(RoutingInfo::SingleNode(SingleNodeRoutingInfo::ByAddress {
631+
host: by_address_route.host.to_string(),
632+
port,
633+
})),
634+
Err(_) => {
635+
// TODO: Handle error propagation.
636+
None
637+
}
638+
},
639+
_ => panic!("unknown route type"),
552640
}
553641
}
642+
643+
fn get_slot_addr(slot_type: &protobuf::EnumOrUnknown<SlotTypes>) -> SlotAddr {
644+
slot_type
645+
.enum_value()
646+
.map(|slot_type| match slot_type {
647+
SlotTypes::Primary => SlotAddr::Master,
648+
SlotTypes::Replica => SlotAddr::ReplicaRequired,
649+
})
650+
.expect("Received unexpected slot id type")
651+
}

0 commit comments

Comments
 (0)