diff --git a/Cargo.toml b/Cargo.toml index 4545889..28cb740 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,6 +28,10 @@ harness = false name = "query_benchmarks" harness = false +[[bench]] +name = "merge_benchmarks" +harness = false + [profile.bench] debug = true diff --git a/benches/merge_benchmarks.rs b/benches/merge_benchmarks.rs new file mode 100644 index 0000000..2434a46 --- /dev/null +++ b/benches/merge_benchmarks.rs @@ -0,0 +1,125 @@ +use criterion::{black_box, criterion_group, criterion_main, BenchmarkId, Criterion}; +use redoxql::pagerange::PageRange; +use redoxql::record::{Record, RecordAddress}; +use redoxql::table::PageDirectory; +use std::sync::{Arc, Mutex}; +use std::time::Duration; + +fn setup_benchmark_data( + num_records: usize, + num_cols: i64, +) -> (PageRange, Arc>) { + let page_range = PageRange::new(num_cols); + let page_directory = Arc::new(Mutex::new(PageDirectory::new())); + + { + let mut pd = page_directory.lock().unwrap(); + + // We usually have less columns (30 seems realistic) + for i in 0..30 { + let rid = i as i64; + let mut addresses = Vec::new(); + + addresses.push(RecordAddress { + page: page_range.base_container.rid_page(), + offset: i as i64, + }); + + let record = Record { + rid, + addresses: Arc::new(Mutex::new(addresses)), + }; + + pd.directory.insert(rid, record); + } + + for i in 0..num_records { + let rid = (1000 + i) as i64; + let mut addresses = Vec::new(); + + addresses.push(RecordAddress { + page: page_range.base_container.rid_page(), + offset: (i % 100) as i64, + }); + + let record = Record { + rid, + addresses: Arc::new(Mutex::new(addresses)), + }; + + pd.directory.insert(rid, record); + } + } + + { + let a = page_range.tail_container.rid_page(); + let mut rid_guard = a.lock().unwrap(); + rid_guard.data.resize(num_records, 0); + for i in 0..num_records { + rid_guard.data[i] = (1000 + i) as i64; + } + } + + (page_range, page_directory) +} + +fn benchmark_merge(c: &mut Criterion) { + let mut group = c.benchmark_group("PageRange_Merge"); + + for size in [100, 1_000, 10_000, 100_000].iter() { + group.bench_with_input(BenchmarkId::new("Original", size), size, |b, &size| { + b.iter_with_setup( + || setup_benchmark_data(size as usize, 5), + |(mut page_range, pd)| { + black_box(page_range.merge(pd)); + }, + ); + }); + + group.bench_with_input(BenchmarkId::new("Optimized", size), size, |b, &size| { + b.iter_with_setup( + || setup_benchmark_data(size as usize, 5), + |(mut page_range, pd)| { + black_box(page_range.optimized_merge(pd)); + }, + ); + }); + } + + group.finish(); +} + +fn benchmark_column_counts(c: &mut Criterion) { + let mut group = c.benchmark_group("PageRange_ColumnCounts"); + + for cols in [3, 5].iter() { + group.bench_with_input(BenchmarkId::new("Original", cols), cols, |b, &cols| { + b.iter_with_setup( + || setup_benchmark_data(100, cols), + |(mut page_range, pd)| { + black_box(page_range.merge(pd)); + }, + ); + }); + + group.bench_with_input(BenchmarkId::new("Optimized", cols), cols, |b, &cols| { + b.iter_with_setup( + || setup_benchmark_data(100, cols), + |(mut page_range, pd)| { + black_box(page_range.optimized_merge(pd)); + }, + ); + }); + } + + group.finish(); +} + +criterion_group! { + name = benches; + config = Criterion::default() + .measurement_time(Duration::from_secs(10)) + .sample_size(10); + targets = benchmark_merge, benchmark_column_counts +} +criterion_main!(benches); diff --git a/python/benchmarks/scaling_tester.py b/python/benchmarks/scaling_tester.py index 1da4429..77bd3e4 100644 --- a/python/benchmarks/scaling_tester.py +++ b/python/benchmarks/scaling_tester.py @@ -16,6 +16,8 @@ def run_inserts(amount: int): def run_updates(amount: int): db = Database() + # TODO: Make sure this doesn't save data between runs. + # I don't think it does, but I need to check grades_table = db.create_table("Grades", 5, 0) query = Query(grades_table) diff --git a/src/pagerange.rs b/src/pagerange.rs index c696046..2158952 100644 --- a/src/pagerange.rs +++ b/src/pagerange.rs @@ -46,14 +46,215 @@ impl PageRange { Some(self.base_container.read_record(record)) } - /// Merge the two containers in a separate thread pub fn merge(&mut self, page_directory: Arc>) { + // println!("Merge: Pre-starting merge operation in a separate thread"); + + let base_container = self.base_container.clone(); + let tail_container = self.tail_container.clone(); + let thread_pd = page_directory.clone(); + + // println!("Merge: Starting merge operation in a separate thread"); + + let handle = thread::spawn(move || { + // println!("Thread: Merge thread started"); + let mut new_records: Vec = Vec::new(); + let mut seen_rids: HashSet = HashSet::new(); + + // println!("Thread: Locking tail_container.rid_page()"); + let tail_rid_page = tail_container.rid_page(); + let tail_rid_data = { + let rid_guard = tail_rid_page.lock().unwrap(); + // println!("Thread: Acquired tail_container.rid_page lock, data length: {}", rid_guard.data.len()); + let data = rid_guard.data.clone(); + data + }; + + if tail_rid_data.is_empty() { + // println!("Thread: tail_rid_data is empty, returning early"); + return (base_container, Vec::new()); + } + + let mut tail_rids_to_process = tail_rid_data; + tail_rids_to_process.reverse(); + // println!("Thread: Reversed tail_rids_to_process"); + + let mut new_base = base_container.deep_copy(); + let last_tail_rid = tail_rids_to_process[0]; + + for tail_rid in tail_rids_to_process { + // println!("Thread: Processing tail_rid: {}", tail_rid); + + // Check if we've seen all rids using new_base's RID page. + { + let new_base_rid_page = new_base.rid_page(); + let rid_guard = new_base_rid_page.lock().unwrap(); + // println!("Thread: new_base.rid_page num_records: {}", rid_guard.num_records); + if seen_rids.len() >= rid_guard.num_records as usize { + // println!("Thread: Seen all rids, breaking loop"); + break; + } + } + + let pd_guard = thread_pd.lock().unwrap(); + // println!("Thread: Locking page_directory to get tail_record for tail_rid: {}", tail_rid); + let tail_record = { + // println!("Thread: Acquired page_directory lock for tail_rid: {}", tail_rid); + pd_guard.directory.get(&tail_rid).unwrap() + }; + // println!("Thread: Got tail_record for tail_rid: {}", tail_rid); + + let base_rid_address = tail_record.base_rid(); + let base_rid = { + // println!("Thread: Locking base_rid_address.page for base_rid"); + let base_rid_page = base_rid_address.page; + let page_guard = base_rid_page.lock().unwrap(); + let brid = page_guard.data[base_rid_address.offset as usize]; + // println!("Thread: Retrieved base_rid: {}", brid); + brid + }; + + if !seen_rids.contains(&base_rid) { + let offset = new_base.find_rid_offset(base_rid); + // println!("Thread: Found offset {} for base_rid: {}", offset, base_rid); + + { + // println!("Thread: Locking new_base.schema_encoding_page"); + let schema_page = new_base.schema_encoding_page(); + let mut schema_guard = schema_page.lock().unwrap(); + schema_guard.data[offset] = 0; + // println!("Thread: Updated schema_encoding_page at offset {}", offset); + } + { + // println!("Thread: Locking new_base.indirection_page"); + let indirection_page = new_base.indirection_page(); + let mut indir_guard = indirection_page.lock().unwrap(); + indir_guard.data[offset] = base_rid; + // println!("Thread: Updated indirection_page at offset {} with base_rid {}", offset, base_rid); + } + + // println!("Thread: Creating new record for base_rid: {}", base_rid); + let new_record = Record { + rid: base_rid, + addresses: Arc::new(Mutex::new(Vec::new())), + }; + + { + let new_rid_page = new_base.rid_page(); + let mut addr_guard = new_record.addresses.lock().unwrap(); + addr_guard.push(RecordAddress { + page: new_rid_page, + offset: offset as i64, + }); + // println!("Thread: Pushed RID address for record {}", base_rid); + } + { + let schema_page = new_base.schema_encoding_page(); + let mut addr_guard = new_record.addresses.lock().unwrap(); + addr_guard.push(RecordAddress { + page: schema_page, + offset: offset as i64, + }); + // println!("Thread: Pushed schema_encoding address for record {}", base_rid); + } + { + let indirection_page = new_base.indirection_page(); + let mut addr_guard = new_record.addresses.lock().unwrap(); + addr_guard.push(RecordAddress { + page: indirection_page, + offset: offset as i64, + }); + // println!("Thread: Pushed indirection address for record {}", base_rid); + } + { + let base_rid_page = new_base.base_rid_page(); + let mut addr_guard = new_record.addresses.lock().unwrap(); + addr_guard.push(RecordAddress { + page: base_rid_page, + offset: offset as i64, + }); + // println!("Thread: Pushed base_rid address for record {}", base_rid); + } + + for i in 0..tail_record.columns().len() { + // println!("Thread: Processing column {} for tail_record with base_rid: {}", i, base_rid); + let tail_col_page = tail_record.columns()[i].page.clone(); + let col_value = { + let col_guard = tail_col_page.lock().unwrap(); + let val = col_guard.data[tail_record.columns()[i].offset as usize]; + // println!("Thread: Got column value {} for column {}", val, i); + val + }; + + { + let new_col_page = new_base.column_page(i as i64); + let mut new_col_guard = new_col_page.lock().unwrap(); + new_col_guard.data[offset] = col_value; + // println!("Thread: Updated new_base column {} at offset {} with value {}", i, offset, col_value); + } + { + let new_col_page = new_base.column_page(i as i64); + let mut addr_guard = new_record.addresses.lock().unwrap(); + addr_guard.push(RecordAddress { + page: new_col_page, + offset: offset as i64, + }); + // println!("Thread: Pushed column address for column {} for record {}", i, base_rid); + } + } + + new_records.push(new_record); + // println!("Thread: New record for base_rid {} added", base_rid); + seen_rids.insert(base_rid); + } + } + new_base.tail_page_sequence = last_tail_rid; + // println!("Thread: Set new_base.tail_page_sequence to {}", last_tail_rid); + // println!("Thread: Merge thread complete, returning new_base and new_records"); + (new_base, new_records) + }); + + let (new_base_container, new_records) = handle.join().unwrap(); + // println!("Main: Merge thread joined successfully"); + + self.base_container = new_base_container; + // println!("Main: Updated self.base_container"); + + for record in new_records { + // println!("Main: Processing record with rid: {}", record.rid); + let mut pd_guard = page_directory.lock().unwrap(); + let current_record = pd_guard.directory.get(&record.rid).unwrap().clone(); + + let current_indir_val = { + // println!("Main: Locking current_record.indirection().page for record {}", record.rid); + let indirection_page = current_record.indirection().page.clone(); + let indir_guard = indirection_page.lock().unwrap(); + let val = indir_guard.data[current_record.indirection().offset as usize]; + // println!("Main: Current indirection value for record {} is {}", record.rid, val); + val + }; + + if current_indir_val > self.base_container.tail_page_sequence { + // println!("Main: Updating record {} indirection with value {}", record.rid, current_indir_val); + let record_indirection_page = record.indirection().page.clone(); + let mut rec_indir_guard = record_indirection_page.lock().unwrap(); + rec_indir_guard.data[record.indirection().offset as usize] = current_indir_val; + } + let rid = record.rid; + pd_guard.directory.insert(rid, record); + // println!("Main: Inserted record {} into page_directory", rid); + } + } + + pub fn optimized_merge(&mut self, page_directory: Arc>) { info!("Starting merge!"); // println!("Merge: Pre-starting merge operation in a separate thread"); let base_container = self.base_container.clone(); let tail_container = self.tail_container.clone(); + + info!("Finished first clone of base_container and tail_container."); + let thread_pd = page_directory.clone(); // println!("Merge: Starting merge operation in a separate thread"); @@ -81,9 +282,14 @@ impl PageRange { tail_rids_to_process.reverse(); // println!("Thread: Reversed tail_rids_to_process"); - let mut new_base = base_container.deep_copy(); + let mut new_base = base_container.clone(); let last_tail_rid = tail_rids_to_process[0]; + info!( + "Starting loop over tail_rids_to_process with len = {}.", + tail_rids_to_process.len() + ); + for tail_rid in tail_rids_to_process { // println!("Thread: Processing tail_rid: {}", tail_rid); @@ -98,11 +304,11 @@ impl PageRange { } } + let pd_guard = thread_pd.lock().unwrap(); // println!("Thread: Locking page_directory to get tail_record for tail_rid: {}", tail_rid); let tail_record = { - let pd_guard = thread_pd.lock().unwrap(); // println!("Thread: Acquired page_directory lock for tail_rid: {}", tail_rid); - pd_guard.directory.get(&tail_rid).unwrap().clone() + pd_guard.directory.get(&tail_rid).unwrap() }; // println!("Thread: Got tail_record for tail_rid: {}", tail_rid); @@ -219,12 +425,22 @@ impl PageRange { let (new_base_container, new_records) = handle.join().unwrap(); // println!("Main: Merge thread joined successfully"); + // TODO: ~~WOW! The code gets to here really really quickly~~ + // ~~The speed issue is below this section~~ + // Unfortunately, I was wrong, and I didn't see the handle.join() that waits until the + // above loop is done. If the code in the thread is the problem, I wonder if we can use 1 + // thread per column? This would definitely speed things up if we can. + info!("Finished first loop section."); + self.base_container = new_base_container; // println!("Main: Updated self.base_container"); + let mut pd_guard = page_directory.lock().unwrap(); + + info!("About to get from and insert into page_directory {} times. Which are both O(log n). As well as reading and writing O(n) pages.", new_records.len()); + for record in new_records { // println!("Main: Processing record with rid: {}", record.rid); - let mut pd_guard = page_directory.lock().unwrap(); let current_record = pd_guard.directory.get(&record.rid).unwrap(); let current_indir_val = {