Skip to content

Commit ef36cd6

Browse files
committed
perf(native): reduce mem copy in tmq raw_data_t
1 parent 235c6bc commit ef36cd6

File tree

6 files changed

+193
-130
lines changed

6 files changed

+193
-130
lines changed

taos-optin/src/raw/mod.rs

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1774,17 +1774,24 @@ impl RawRes {
17741774
}
17751775

17761776
#[inline]
1777-
pub(crate) fn tmq_get_raw(&self) -> RawData {
1777+
pub(crate) fn tmq_get_raw(&self) -> Result<RawData, RawError> {
17781778
let mut meta = raw_data_t {
17791779
raw: std::ptr::null_mut(),
17801780
raw_len: 0,
17811781
raw_type: 0,
17821782
};
17831783
unsafe {
17841784
let _code = (self.c.tmq.as_ref().unwrap().tmq_get_raw)(self.as_ptr(), &mut meta as _);
1785-
let raw = RawData::from(&meta);
1786-
(self.c.tmq.as_ref().unwrap().tmq_free_raw)(meta);
1787-
raw
1785+
if _code == 0 {
1786+
return Ok(RawData::new(
1787+
meta,
1788+
self.c.tmq.as_ref().unwrap().tmq_free_raw,
1789+
));
1790+
} else {
1791+
let c_str = (self.c.tmq.as_ref().unwrap().tmq_err2str)(tmq_resp_err_t(_code));
1792+
let err = CStr::from_ptr(c_str).to_string_lossy().into_owned();
1793+
Err(RawError::new_with_context(_code, err, "tmq_get_raw error"))
1794+
}
17881795
}
17891796
}
17901797
}

taos-optin/src/tmq/mod.rs

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -317,7 +317,8 @@ impl AsyncOnSync for Meta {}
317317

318318
impl IsMeta for Meta {
319319
fn as_raw_meta(&self) -> RawResult<RawMeta> {
320-
Ok(unsafe { std::mem::transmute(self.raw.clone()) })
320+
self.res.tmq_get_raw()
321+
// Ok(unsafe { std::mem::transmute(self.raw.clone()) })
321322
}
322323

323324
fn as_json_meta(&self) -> RawResult<taos_query::common::JsonMeta> {
@@ -328,13 +329,13 @@ impl IsMeta for Meta {
328329
}
329330
impl Meta {
330331
fn new(res: RawRes) -> Self {
331-
let raw = res.tmq_get_raw();
332+
let raw = res.tmq_get_raw().expect("get raw message error");
332333
Self { res, raw }
333334
}
334335

335-
pub fn to_raw(&self) -> raw_data_t {
336-
self.raw.as_raw_data_t()
337-
}
336+
// pub fn to_raw(&self) -> raw_data_t {
337+
// self.raw.as_raw_data_t()
338+
// }
338339

339340
pub fn to_json(&self) -> serde_json::Value {
340341
serde_json::from_slice(self.res.tmq_get_json_meta().as_bytes())
@@ -359,7 +360,7 @@ impl Data {
359360
#[async_trait::async_trait]
360361
impl IsAsyncData for Data {
361362
async fn as_raw_data(&self) -> RawResult<taos_query::common::RawData> {
362-
Ok(self.raw.tmq_get_raw())
363+
self.raw.tmq_get_raw()
363364
}
364365

365366
async fn fetch_raw_block(&self) -> RawResult<Option<RawBlock>> {
@@ -369,7 +370,7 @@ impl IsAsyncData for Data {
369370

370371
impl IsData for Data {
371372
fn as_raw_data(&self) -> RawResult<taos_query::common::RawData> {
372-
Ok(self.raw.tmq_get_raw())
373+
self.raw.tmq_get_raw()
373374
}
374375

375376
fn fetch_raw_block(&self) -> RawResult<Option<RawBlock>> {

taos-query/src/common/raw/data.rs

Lines changed: 120 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,8 @@ const RAW_PTR_OFFSET: usize = std::mem::size_of::<u32>() + std::mem::size_of::<u
1111
///
1212
/// It can be copy/cloned, but should not use it outbound away a offset lifetime.
1313
#[repr(C)]
14-
#[derive(Debug, Clone)]
14+
#[derive(Debug, Clone, Copy)]
15+
#[allow(non_camel_case_types)]
1516
pub struct raw_data_t {
1617
pub raw: *const c_void,
1718
pub raw_len: u32,
@@ -20,98 +21,141 @@ pub struct raw_data_t {
2021

2122
unsafe impl Send for raw_data_t {}
2223

23-
impl raw_data_t {
24-
pub fn to_bytes(&self) -> Bytes {
25-
let cap = // raw data len
26-
self.raw_len as usize +
27-
// self.raw_len
28-
std::mem::size_of::<u32>() +
29-
// self.raw_type
30-
std::mem::size_of::<u16>();
31-
let mut data = Vec::with_capacity(cap);
32-
33-
// first 4 bytes: raw_len
34-
data.extend(self.raw_len.to_le_bytes());
35-
36-
// next 2 bytes: raw_type
37-
data.extend(self.raw_type.to_le_bytes());
38-
39-
unsafe {
40-
let ptr = data.as_mut_ptr().add(RAW_PTR_OFFSET);
41-
std::ptr::copy_nonoverlapping(self.raw, ptr as _, self.raw_len as _);
42-
data.set_len(cap);
43-
}
44-
Bytes::from(data)
45-
}
24+
// impl raw_data_t {
25+
// pub fn to_bytes(&self) -> Bytes {
26+
// let cap = // raw data len
27+
// self.raw_len as usize +
28+
// // self.raw_len
29+
// std::mem::size_of::<u32>() +
30+
// // self.raw_type
31+
// std::mem::size_of::<u16>();
32+
// let mut data = Vec::with_capacity(cap);
33+
34+
// // first 4 bytes: raw_len
35+
// data.extend(self.raw_len.to_le_bytes());
36+
37+
// // next 2 bytes: raw_type
38+
// data.extend(self.raw_type.to_le_bytes());
39+
40+
// unsafe {
41+
// let ptr = data.as_mut_ptr().add(RAW_PTR_OFFSET);
42+
// std::ptr::copy_nonoverlapping(self.raw, ptr as _, self.raw_len as _);
43+
// data.set_len(cap);
44+
// }
45+
// Bytes::from(data)
46+
// }
47+
// }
48+
49+
/// TMQ message raw data container.
50+
///
51+
/// It's a wrapper for raw data from native library, and will be auto free when drop.
52+
#[derive(Debug)]
53+
pub struct RawData {
54+
free: unsafe extern "C" fn(raw: raw_data_t) -> i32,
55+
raw: raw_data_t,
4656
}
47-
48-
#[derive(Debug, Clone)]
49-
pub struct RawData(Bytes);
50-
5157
unsafe impl Send for RawData {}
5258
unsafe impl Sync for RawData {}
5359

54-
impl From<&raw_data_t> for RawData {
55-
fn from(raw: &raw_data_t) -> Self {
56-
RawData(raw.to_bytes())
60+
impl Drop for RawData {
61+
/// Use native free function to free raw_data_t
62+
fn drop(&mut self) {
63+
unsafe {
64+
(self.free)(self.raw);
65+
}
5766
}
5867
}
5968

60-
impl<T: Into<Bytes>> From<T> for RawData {
61-
fn from(bytes: T) -> Self {
62-
RawData(bytes.into())
69+
impl RawData {
70+
pub fn new(raw: raw_data_t, free: unsafe extern "C" fn(raw: raw_data_t) -> i32) -> Self {
71+
RawData { free, raw }
6372
}
6473
}
6574

75+
// #[derive(Debug, Clone)]
76+
// pub struct RawData(Bytes);
77+
78+
// unsafe impl Send for RawData {}
79+
// unsafe impl Sync for RawData {}
80+
81+
// // impl From<&raw_data_t> for RawData {
82+
// // fn from(raw: &raw_data_t) -> Self {
83+
// // RawData(raw.to_bytes())
84+
// // }
85+
// // }
86+
87+
// impl<T: Into<Bytes>> From<T> for RawData {
88+
// fn from(bytes: T) -> Self {
89+
// RawData(bytes.into())
90+
// }
91+
// }
92+
6693
impl RawData {
67-
pub fn new(raw: Bytes) -> Self {
68-
raw.into()
69-
}
70-
pub fn raw(&self) -> *const c_void {
71-
unsafe { self.0.as_ptr().add(RAW_PTR_OFFSET) as _ }
94+
pub fn raw_ptr(&self) -> *const c_void {
95+
self.raw.raw
7296
}
7397
pub fn raw_len(&self) -> u32 {
74-
unsafe { *(self.0.as_ptr() as *const u32) }
98+
self.raw.raw_len
7599
}
76100
pub fn raw_type(&self) -> u16 {
77-
unsafe { *(self.0.as_ptr().add(std::mem::size_of::<u32>()) as *const u16) }
101+
self.raw.raw_type
102+
}
103+
pub fn raw_slice(&self) -> &[u8] {
104+
unsafe { std::slice::from_raw_parts(self.raw.raw as *const u8, self.raw.raw_len as _) }
78105
}
79106

80107
pub fn as_raw_data_t(&self) -> raw_data_t {
81-
raw_data_t {
82-
raw: self.raw(),
83-
raw_len: self.raw_len(),
84-
raw_type: self.raw_type(),
85-
}
108+
self.raw
86109
}
87110

88111
pub fn as_bytes(&self) -> Cow<Bytes> {
89-
Cow::Borrowed(&self.0)
112+
todo!("replace as_bytes")
90113
}
91114
}
92115

116+
extern "C" fn _rust_free_raw(raw: raw_data_t) -> i32 {
117+
unsafe {
118+
let ptr = raw.raw as *mut u8;
119+
let len = raw.raw_len as usize;
120+
std::alloc::dealloc(
121+
ptr,
122+
std::alloc::Layout::from_size_align(len, 1).expect("Invalid layout"),
123+
);
124+
}
125+
0
126+
}
93127
impl Inlinable for RawData {
94128
fn read_inlined<R: std::io::Read>(reader: &mut R) -> std::io::Result<Self> {
95-
let mut data = Vec::new();
96-
97129
let len = reader.read_u32()?;
98-
data.extend(len.to_le_bytes());
99-
100130
let meta_type = reader.read_u16()?;
101-
data.extend(meta_type.to_le_bytes());
102131

103-
data.resize(data.len() + len as usize, 0);
132+
let layout = std::alloc::Layout::from_size_align(len as _, 1).map_err(|_| {
133+
std::io::Error::new(std::io::ErrorKind::InvalidData, "Invalid raw data length")
134+
})?;
135+
let ptr = unsafe { std::alloc::alloc(layout) };
136+
let buf = unsafe { std::slice::from_raw_parts_mut(ptr, len as _) };
104137

105-
let buf = &mut data[RAW_PTR_OFFSET..];
138+
reader.read_exact(buf).inspect_err(|_| unsafe {
139+
// free memory if read failed
140+
std::alloc::dealloc(ptr, layout);
141+
})?;
106142

107-
reader.read_exact(buf)?;
108-
Ok(data.into())
143+
let raw = raw_data_t {
144+
raw: ptr as _,
145+
raw_len: len,
146+
raw_type: meta_type,
147+
};
148+
149+
let message = RawData::new(raw, _rust_free_raw);
150+
151+
Ok(message)
109152
}
110153

111154
fn write_inlined<W: std::io::Write>(&self, wtr: &mut W) -> std::io::Result<usize> {
112-
let bytes = self.as_bytes();
113-
wtr.write_all(&bytes)?;
114-
Ok(bytes.len())
155+
wtr.write_all(self.raw_len().to_le_bytes().as_ref())?;
156+
wtr.write_all(self.raw_type().to_le_bytes().as_ref())?;
157+
wtr.write_all(self.raw_slice())?;
158+
Ok(self.raw_len() as usize + 6)
115159
}
116160
}
117161

@@ -121,29 +165,31 @@ impl crate::util::AsyncInlinable for RawData {
121165
reader: &mut R,
122166
) -> std::io::Result<Self> {
123167
use tokio::io::*;
124-
let mut data = Vec::new();
125-
126168
let len = reader.read_u32_le().await?;
127-
data.extend(len.to_le_bytes());
128-
129169
let meta_type = reader.read_u16_le().await?;
130-
data.extend(meta_type.to_le_bytes());
131-
132-
data.resize(data.len() + len as usize, 0);
133-
134-
let buf = &mut data[RAW_PTR_OFFSET..];
135-
136-
reader.read_exact(buf).await?;
137-
Ok(data.into())
170+
let mut vec: Vec<u8> = Vec::with_capacity(len as usize);
171+
reader.read_exact(&mut vec).await?;
172+
173+
let ptr = Box::into_raw(vec.into_boxed_slice());
174+
let raw = raw_data_t {
175+
raw: ptr as _,
176+
raw_len: len,
177+
raw_type: meta_type,
178+
};
179+
180+
let message = RawData::new(raw, _rust_free_raw);
181+
Ok(message)
138182
}
139183

140184
async fn write_inlined<W: tokio::io::AsyncWrite + Send + Unpin>(
141185
&self,
142186
wtr: &mut W,
143187
) -> std::io::Result<usize> {
144188
use tokio::io::*;
145-
let bytes = self.as_bytes();
146-
wtr.write_all(&bytes).await?;
147-
Ok(bytes.len())
189+
wtr.write_all(self.raw_len().to_le_bytes().as_ref()).await?;
190+
wtr.write_all(self.raw_type().to_le_bytes().as_ref())
191+
.await?;
192+
wtr.write_all(self.raw_slice()).await?;
193+
Ok(self.raw_len() as usize + 6)
148194
}
149195
}

0 commit comments

Comments
 (0)