Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 7 additions & 35 deletions python/lstore/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,6 @@
from .lstore import RQuery, RTable


class ReturnRecord:
def __init__(self, columns: List[int]):
self.columns = columns[4:]

def __str__(self):
return f"Record({self.columns})"


class Query:
def __init__(self, table: RTable):
"""Creates a Query object that can perform different queries on the
Expand Down Expand Up @@ -51,22 +43,8 @@ def select(
Returns a list of Record objects upon success
Returns False if record locked by TPL
Assume that select will never be called on a key that doesn't exist

return [
ReturnRecord(
list(
self.rquery.select(
search_key,
search_key_index,
projected_columns_index,
)
)
)
]
"""
res = self.rquery.select(search_key, search_key_index, projected_columns_index)
# If res is not None, it should be a list of records.
return [ReturnRecord(r) for r in res] if res is not None else []
return self.rquery.select(search_key, search_key_index, projected_columns_index)

def select_version(
self,
Expand All @@ -84,18 +62,12 @@ def select_version(
Returns False if record locked by TPL
Assume that select will never be called on a key that doesn't exist
"""
return [
ReturnRecord(
list(
self.rquery.select_version(
search_key,
search_key_index,
projected_columns_index,
relative_version,
)
)
)
]
return self.rquery.select_version(
search_key,
search_key_index,
projected_columns_index,
relative_version,
)

def update(self, primary_key: int, *columns):
"""Update a record with specified key and columns
Expand Down
90 changes: 73 additions & 17 deletions src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,23 @@ use pyo3::prelude::*;
use std::iter::zip;
use std::sync::atomic::{AtomicBool, Ordering};

#[pyclass]
pub struct RReturnRecord {
#[pyo3(get)]
columns: Vec<Option<i64>>,
}

#[pyclass]
pub struct RQuery {
// pub table: RTable,
pub handle: RTableHandle,
merging: AtomicBool,
}

/// Use the projected vector to decide which columns to set to None
fn filter_projected(column_values: Vec<i64>, projected: Vec<i64>) -> Vec<Option<i64>> {
// TODO: HUGE SPEED IMPROVEMENT AVAILABLE - Don't read columns that will be None anyway!!!

// Add the 4 columns used internally
let mut projected_cols: Vec<i64> = vec![1, 1, 1, 1];
projected_cols.extend(projected.clone());
Expand Down Expand Up @@ -70,6 +79,33 @@ impl RQuery {
search_key: i64,
search_key_index: i64,
projected_columns_index: Vec<i64>,
) -> Option<Vec<Option<RReturnRecord>>> {
let ret = self.internal_select(search_key, search_key_index, projected_columns_index);

let mut out = vec![];

match ret {
Some(rows) => {
for row in rows {
let mut a = row;
// Remove the first 4 columns that are used only internally
a.drain(0..4);
// Return the columns encased in the RReturnRecord struct
out.push(Some(RReturnRecord { columns: a }))
}
}
None => out.push(None),
}

return Some(out);
}

/// Formerly just `select` does a select on the database
fn internal_select(
&mut self,
search_key: i64,
search_key_index: i64,
projected_columns_index: Vec<i64>,
) -> Option<Vec<Vec<Option<i64>>>> {
let table = self.handle.table.read().unwrap();

Expand Down Expand Up @@ -123,7 +159,8 @@ impl RQuery {
}
}

pub fn select_version(
/// Formerly just `select_version` does a select and picks which version of the record
fn internal_select_version(
&mut self,
primary_key: i64,
_search_key_index: i64,
Expand All @@ -138,6 +175,25 @@ impl RQuery {
Some(filter_projected(ret, projected_columns_index))
}

pub fn select_version(
&mut self,
primary_key: i64,
_search_key_index: i64,
projected_columns_index: Vec<i64>,
relative_version: i64,
) -> Option<RReturnRecord> {
match self.internal_select_version(
primary_key,
0,
projected_columns_index,
relative_version,
) {
// Return the columns encased in the RReturnRecord struct
Some(columns) => Some(RReturnRecord { columns }),
None => None,
}
}

pub fn update(&mut self, primary_key: i64, columns: Vec<Option<i64>>) -> bool {
let mut table = self.handle.table.write().unwrap();

Expand Down Expand Up @@ -319,7 +375,7 @@ impl RQuery {
// Select the value of the column before we increment
let cols = vec![1i64; num_cols];

let ret = self.select(primary_key, 0, cols);
let ret = self.internal_select(primary_key, 0, cols);

if let Some(records) = ret {
let record = &records[0];
Expand Down Expand Up @@ -347,7 +403,7 @@ mod tests {
q.insert(vec![1, 2, 3]);

// Use primary_key of 1
let vals = q.select(1, 0, vec![1, 1, 1]);
let vals = q.internal_select(1, 0, vec![1, 1, 1]);
assert_eq!(
vals.unwrap()[0],
vec![
Expand All @@ -373,7 +429,7 @@ mod tests {
// Increment the first user column (column 1)
q.increment(1, 1);

let vals = q.select(1, 0, vec![1, 1, 1]); // Select entire row
let vals = q.internal_select(1, 0, vec![1, 1, 1]); // Select entire row
assert_eq!(
vals.unwrap()[0],
vec![
Expand All @@ -389,7 +445,7 @@ mod tests {

q.increment(1, 1);

let vals2 = q.select(1, 0, vec![1, 1, 1]);
let vals2 = q.internal_select(1, 0, vec![1, 1, 1]);
assert_eq!(
vals2.unwrap()[0],
vec![
Expand All @@ -405,7 +461,7 @@ mod tests {

q.increment(1, 1);

let vals3 = q.select(1, 0, vec![1, 1, 1]);
let vals3 = q.internal_select(1, 0, vec![1, 1, 1]);
assert_eq!(
vals3.unwrap()[0],
vec![
Expand Down Expand Up @@ -455,7 +511,7 @@ mod tests {
q.insert(vec![1, 2, 3]);

// Use primary_key of 1
let vals = q.select(1, 0, vec![1, 1, 1]);
let vals = q.internal_select(1, 0, vec![1, 1, 1]);
assert_eq!(
vals.unwrap()[0],
vec![
Expand All @@ -472,7 +528,7 @@ mod tests {
let success = q.update(1, vec![Some(1), Some(5), Some(6)]);
assert!(success);

let vals2 = q.select(1, 0, vec![1, 1, 1]);
let vals2 = q.internal_select(1, 0, vec![1, 1, 1]);
assert_eq!(
vals2.unwrap()[0],
vec![
Expand Down Expand Up @@ -512,7 +568,7 @@ mod tests {
q.update(1, vec![Some(1), Some(6), Some(7)]);
q.update(1, vec![Some(1), Some(8), Some(9)]);

let vals = q.select(1, 0, vec![1, 1, 1]);
let vals = q.internal_select(1, 0, vec![1, 1, 1]);
assert_eq!(
vals.unwrap()[0],
vec![
Expand All @@ -536,7 +592,7 @@ mod tests {
q.insert(vec![1, 2, 3]);
q.delete(1);

assert_eq!(q.select(1, 0, vec![1, 1, 1]), None);
assert_eq!(q.internal_select(1, 0, vec![1, 1, 1]), None);
}

#[test]
Expand All @@ -554,7 +610,7 @@ mod tests {
q.update(1, vec![Some(1), Some(8), Some(9)]); // Version 3

// Test different versions
let latest = q.select_version(1, 0, vec![1, 1, 1], 0);
let latest = q.internal_select_version(1, 0, vec![1, 1, 1], 0);
assert_eq!(
latest.unwrap(),
vec![
Expand All @@ -568,7 +624,7 @@ mod tests {
]
); // Most recent version

let one_back = q.select_version(1, 0, vec![1, 1, 1], 1);
let one_back = q.internal_select_version(1, 0, vec![1, 1, 1], 1);
assert_eq!(
one_back.unwrap(),
vec![
Expand All @@ -582,7 +638,7 @@ mod tests {
]
); // One version back

let two_back = q.select_version(1, 0, vec![1, 1, 1], 2);
let two_back = q.internal_select_version(1, 0, vec![1, 1, 1], 2);
assert_eq!(
two_back.unwrap(),
vec![
Expand All @@ -596,7 +652,7 @@ mod tests {
]
); // Two versions back

let original = q.select_version(1, 0, vec![1, 1, 1], 3);
let original = q.internal_select_version(1, 0, vec![1, 1, 1], 3);
assert_eq!(
original.unwrap(),
vec![
Expand Down Expand Up @@ -624,7 +680,7 @@ mod tests {
assert!(result.is_none());

// Verify that the original record is still intact
let vals = q.select(1, 0, vec![1, 1, 1]);
let vals = q.internal_select(1, 0, vec![1, 1, 1]);
assert_eq!(
vals.unwrap()[0],
vec![
Expand Down Expand Up @@ -655,7 +711,7 @@ mod tests {

q = RQuery::new(table_ref);

let v = q.select(1, 0, vec![1, 1, 1]);
let v = q.internal_select(1, 0, vec![1, 1, 1]);
assert_eq!(
vec![
Some(3),
Expand Down Expand Up @@ -685,7 +741,7 @@ mod tests {

q = RQuery::new(table_ref.clone());

let v = q.select(1, 0, vec![1, 1, 1]);
let v = q.internal_select(1, 0, vec![1, 1, 1]);
assert_eq!(
vec![
Some(600),
Expand Down