Skip to content

Commit b6fbaae

Browse files
rootroot
authored andcommitted
Optimize code structure(version 1.3).
Signed-off-by: root <[email protected]>
1 parent b334a08 commit b6fbaae

File tree

6 files changed

+237
-151
lines changed

6 files changed

+237
-151
lines changed

src/engine.rs

Lines changed: 16 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -341,73 +341,10 @@ where
341341
start.elapsed().as_micros()
342342
);
343343

344-
let mut new_block_flags: Vec<bool> = Vec::with_capacity(length);
345-
let mut block_sum = 0;
346-
for (t, i) in ents_idx.iter().enumerate() {
347-
if match t {
348-
0 => true,
349-
_ => ents_idx[t - 1].entries.unwrap() != ents_idx[t].entries.unwrap(),
350-
} {
351-
block_sum += 1;
352-
new_block_flags.push(true);
353-
} else {
354-
new_block_flags.push(false);
355-
}
356-
}
357-
358-
let mut ctx = AioContext::new(block_sum);
359-
for (seq, i) in ents_idx.iter().enumerate() {
360-
if new_block_flags[seq] {
361-
submit_read_request_to_file(
362-
self.pipe_log.as_ref(),
363-
seq,
364-
&mut ctx,
365-
i.entries.unwrap(),
366-
)
367-
.unwrap();
368-
}
369-
}
370-
println!(
371-
"[fetch_entries_to_aio] (stage2) time cost: {:?} us",
372-
start.elapsed().as_micros()
373-
);
344+
self.pipe_log
345+
.async_entry_read::<M>(&mut ents_idx, vec)
346+
.unwrap();
374347

375-
let mut seq = 0;
376-
let mut decode_buf = vec![];
377-
378-
for (t, i) in ents_idx.iter().enumerate() {
379-
decode_buf = match t {
380-
0 => {
381-
ctx.single_wait(0).unwrap();
382-
LogBatch::decode_entries_block(
383-
&ctx.data(0),
384-
ents_idx[0].entries.unwrap(),
385-
ents_idx[0].compression_type,
386-
)
387-
.unwrap()
388-
}
389-
_ => match new_block_flags[t] {
390-
true => {
391-
seq += 1;
392-
ctx.single_wait(seq).unwrap();
393-
LogBatch::decode_entries_block(
394-
&ctx.data(seq),
395-
i.entries.unwrap(),
396-
i.compression_type,
397-
)
398-
.unwrap()
399-
}
400-
false => decode_buf,
401-
},
402-
};
403-
vec.push(
404-
parse_from_bytes::<M>(
405-
&mut decode_buf
406-
[(i.entry_offset) as usize..(i.entry_offset + i.entry_len) as usize],
407-
)
408-
.unwrap(),
409-
);
410-
}
411348
ENGINE_READ_ENTRY_COUNT_HISTOGRAM.observe(ents_idx.len() as f64);
412349
println!(
413350
"[fetch_entries_to_aio] (end) time cost: {:?} us",
@@ -692,19 +629,6 @@ where
692629
})
693630
}
694631

695-
pub(crate) fn submit_read_request_to_file<P>(
696-
pipe_log: &P,
697-
seq: usize,
698-
ctx: &mut AioContext,
699-
handle: FileBlockHandle,
700-
) -> Result<()>
701-
where
702-
P: PipeLog,
703-
{
704-
pipe_log.read_bytes_aio(seq, ctx, handle).unwrap();
705-
Ok(())
706-
}
707-
708632
#[cfg(test)]
709633
mod tests {
710634
use super::*;
@@ -910,7 +834,7 @@ mod tests {
910834

911835
#[test]
912836
fn test_async_read() {
913-
let normal_batch_size = 8192;
837+
let normal_batch_size = 4096;
914838
let compressed_batch_size = 5120;
915839
for &entry_size in &[normal_batch_size] {
916840
if entry_size == normal_batch_size {
@@ -2162,11 +2086,15 @@ mod tests {
21622086
type Handle = <ObfuscatedFileSystem as FileSystem>::Handle;
21632087
type Reader = <ObfuscatedFileSystem as FileSystem>::Reader;
21642088
type Writer = <ObfuscatedFileSystem as FileSystem>::Writer;
2089+
type AsyncIoContext = AioContext;
21652090

2166-
// fn read_aio(&self, ctx: &mut AioContext, offset: u64) -> std::io::Result<()>
2167-
// { // todo!()
2168-
// Ok(())
2169-
// }
2091+
fn new_async_reader(
2092+
&self,
2093+
handle: Arc<Self::Handle>,
2094+
ctx: &mut Self::AsyncIoContext,
2095+
) -> std::io::Result<()> {
2096+
ctx.new_reader(handle)
2097+
}
21702098

21712099
fn create<P: AsRef<Path>>(&self, path: P) -> std::io::Result<Self::Handle> {
21722100
let handle = self.inner.create(&path)?;
@@ -2226,6 +2154,10 @@ mod tests {
22262154
fn new_writer(&self, h: Arc<Self::Handle>) -> std::io::Result<Self::Writer> {
22272155
self.inner.new_writer(h)
22282156
}
2157+
2158+
fn new_async_io_context(&self, block_sum: usize) -> std::io::Result<Self::AsyncIoContext> {
2159+
todo!()
2160+
}
22292161
}
22302162

22312163
#[test]

src/env/default.rs

Lines changed: 51 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -102,14 +102,13 @@ impl LogFd {
102102
Ok(readed)
103103
}
104104

105-
pub fn read_aio(&self, seq: usize, ctx: &mut AioContext, offset: u64) {
105+
pub fn read_aio(&self, aior: &mut aiocb, len: usize, pbuf: *mut u8, offset: u64) {
106106
unsafe {
107-
let aior = &mut ctx.aio_vec[seq];
108107
aior.aio_fildes = self.0;
109108
aior.aio_reqprio = 0;
110109
aior.aio_sigevent = SigEvent::new(SigevNotify::SigevNone).sigevent();
111-
aior.aio_nbytes = ctx.buf_vec[seq].len() as usize;
112-
aior.aio_buf = ctx.buf_vec[seq].as_mut_ptr() as *mut c_void;
110+
aior.aio_nbytes = len;
111+
aior.aio_buf = pbuf as *mut c_void;
113112
aior.aio_lio_opcode = libc::LIO_READ;
114113
aior.aio_offset = offset as off_t;
115114
libc::aio_read(aior);
@@ -277,25 +276,52 @@ impl WriteExt for LogFile {
277276
}
278277

279278
pub struct AioContext {
279+
inner: Option<Arc<LogFd>>,
280+
offset: u64,
281+
index: usize,
280282
aio_vec: Vec<aiocb>,
281283
pub(crate) buf_vec: Vec<Vec<u8>>,
282284
}
283285
impl AioContext {
284286
pub fn new(block_sum: usize) -> Self {
285-
let mut vec = vec![];
287+
let mut aio_vec = vec![];
288+
let mut buf_vec = vec![];
286289
unsafe {
287290
for i in 0..block_sum {
288-
vec.push(mem::zeroed::<libc::aiocb>());
291+
aio_vec.push(mem::zeroed::<libc::aiocb>());
289292
}
290293
}
291294
Self {
292-
aio_vec: vec,
293-
buf_vec: vec![],
295+
inner: None,
296+
offset: 0,
297+
index: 0,
298+
aio_vec,
299+
buf_vec,
294300
}
295301
}
302+
303+
pub fn new_reader(&mut self, fd: Arc<LogFd>) -> IoResult<()> {
304+
self.inner = Some(fd);
305+
Ok(())
306+
}
296307
}
297308

298309
impl AsyncContext for AioContext {
310+
fn wait(&mut self) -> IoResult<usize> {
311+
let mut total = 0;
312+
for seq in 0..self.aio_vec.len() {
313+
match self.single_wait(seq) {
314+
Ok(len) => total += 1,
315+
Err(e) => return Err(e),
316+
}
317+
}
318+
Ok(total as usize)
319+
}
320+
321+
fn data(&self, seq: usize) -> Vec<u8> {
322+
self.buf_vec[seq].to_vec()
323+
}
324+
299325
fn single_wait(&mut self, seq: usize) -> IoResult<usize> {
300326
let buf_len = self.buf_vec[seq].len();
301327
unsafe {
@@ -312,19 +338,19 @@ impl AsyncContext for AioContext {
312338
}
313339
}
314340

315-
fn wait(&mut self) -> IoResult<usize> {
316-
let mut total = 0;
317-
for seq in 0..self.aio_vec.len() {
318-
match self.single_wait(seq) {
319-
Ok(len) => total += 1,
320-
Err(e) => return Err(e),
321-
}
322-
}
323-
Ok(total as usize)
324-
}
341+
fn submit_read_req(&mut self, buf: Vec<u8>, offset: u64) -> IoResult<()> {
342+
let seq = self.index;
343+
self.index += 1;
344+
self.buf_vec.push(buf);
325345

326-
fn data(&self, seq: usize) -> Vec<u8> {
327-
self.buf_vec[seq].to_vec()
346+
self.inner.as_ref().unwrap().read_aio(
347+
&mut self.aio_vec[seq],
348+
self.buf_vec[seq].len(),
349+
self.buf_vec[seq].as_mut_ptr(),
350+
offset,
351+
);
352+
353+
Ok(())
328354
}
329355
}
330356

@@ -334,16 +360,14 @@ impl FileSystem for DefaultFileSystem {
334360
type Handle = LogFd;
335361
type Reader = LogFile;
336362
type Writer = LogFile;
363+
type AsyncIoContext = AioContext;
337364

338-
fn read_aio(
365+
fn new_async_reader(
339366
&self,
340367
handle: Arc<Self::Handle>,
341-
seq: usize,
342-
ctx: &mut AioContext,
343-
offset: u64,
368+
ctx: &mut Self::AsyncIoContext,
344369
) -> IoResult<()> {
345-
handle.read_aio(seq, ctx, offset);
346-
Ok(())
370+
ctx.new_reader(handle)
347371
}
348372

349373
fn create<P: AsRef<Path>>(&self, path: P) -> IoResult<Self::Handle> {
@@ -370,7 +394,7 @@ impl FileSystem for DefaultFileSystem {
370394
Ok(LogFile::new(handle))
371395
}
372396

373-
fn new_async_context(&self, block_sum: usize) -> IoResult<AioContext> {
397+
fn new_async_io_context(&self, block_sum: usize) -> IoResult<Self::AsyncIoContext> {
374398
Ok(AioContext::new(block_sum))
375399
}
376400
}

src/env/mod.rs

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,13 @@ pub trait FileSystem: Send + Sync {
1717
type Handle: Send + Sync + Handle;
1818
type Reader: Seek + Read + Send;
1919
type Writer: Seek + Write + Send + WriteExt;
20+
type AsyncIoContext: AsyncContext;
2021

21-
fn read_aio(
22+
fn new_async_reader(
2223
&self,
2324
handle: Arc<Self::Handle>,
24-
seq: usize,
25-
ctx: &mut AioContext,
26-
offset: u64,
27-
) -> Result<()> {
28-
Ok(())
29-
}
25+
ctx: &mut Self::AsyncIoContext,
26+
) -> Result<()>;
3027

3128
fn create<P: AsRef<Path>>(&self, path: P) -> Result<Self::Handle>;
3229

@@ -62,9 +59,7 @@ pub trait FileSystem: Send + Sync {
6259

6360
fn new_writer(&self, handle: Arc<Self::Handle>) -> Result<Self::Writer>;
6461

65-
fn new_async_context(&self, block_sum: usize) -> Result<AioContext> {
66-
Ok(AioContext::new(block_sum))
67-
}
62+
fn new_async_io_context(&self, block_sum: usize) -> Result<Self::AsyncIoContext>;
6863
}
6964

7065
pub trait Handle {
@@ -84,6 +79,8 @@ pub trait WriteExt {
8479

8580
pub trait AsyncContext {
8681
fn wait(&mut self) -> Result<usize>;
87-
fn single_wait(&mut self, seq: usize) -> Result<usize>;
8882
fn data(&self, seq: usize) -> Vec<u8>;
83+
fn single_wait(&mut self, seq: usize) -> Result<usize>;
84+
85+
fn submit_read_req(&mut self, buf: Vec<u8>, offset: u64) -> Result<()>;
8986
}

src/env/obfuscated.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,15 @@ impl FileSystem for ObfuscatedFileSystem {
9191
type Handle = <DefaultFileSystem as FileSystem>::Handle;
9292
type Reader = ObfuscatedReader;
9393
type Writer = ObfuscatedWriter;
94+
type AsyncIoContext = AioContext;
95+
96+
fn new_async_reader(
97+
&self,
98+
handle: Arc<Self::Handle>,
99+
ctx: &mut Self::AsyncIoContext,
100+
) -> IoResult<()> {
101+
ctx.new_reader(handle)
102+
}
94103

95104
fn create<P: AsRef<Path>>(&self, path: P) -> IoResult<Self::Handle> {
96105
let r = self.inner.create(path);
@@ -129,4 +138,8 @@ impl FileSystem for ObfuscatedFileSystem {
129138
fn new_writer(&self, handle: Arc<Self::Handle>) -> IoResult<Self::Writer> {
130139
Ok(ObfuscatedWriter(self.inner.new_writer(handle)?))
131140
}
141+
142+
fn new_async_io_context(&self, block_sum: usize) -> IoResult<Self::AsyncIoContext> {
143+
Ok(AioContext::new(block_sum))
144+
}
132145
}

0 commit comments

Comments
 (0)