Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions deepola/wake/examples/tpch_polars/q10.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,8 @@ pub fn query(
sum_accumulator
.set_group_key(vec![
"o_custkey".into(),
])
.set_group_attributes(vec![
"c_name".into(),
"c_acctbal".into(),
"c_phone".into(),
Expand All @@ -147,13 +149,13 @@ pub fn query(
.appender(MapAppender::new(Box::new(|df: &DataFrame| {
let cols = vec![
"o_custkey",
"c_name",
"c_name_first",
"disc_price_sum",
"c_acctbal",
"c_phone",
"n_name",
"c_address",
"c_comment",
"c_acctbal_first",
"c_phone_first",
"n_name_first",
"c_address_first",
"c_comment_first",
];
df.select(cols)
.unwrap()
Expand Down
13 changes: 8 additions & 5 deletions deepola/wake/examples/tpch_polars/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ pub fn run_query(
let mut epoch = 0;

let start_time = Instant::now();
log::warn!("Query {} has {} nodes", _query_no, query_service.nodes().len());
query_service.run();
loop {
let message = output_reader.read();
Expand All @@ -144,14 +145,16 @@ pub fn run_query(
let file_path = results_dir_path.join(format!("{}.csv", epoch));
save_df_to_csv(&mut data.clone(), &file_path);
if epoch % 10 == 0 {
log::warn!(
"Query Result {} Took: {:.2?}",
epoch + 1,
duration
);
log::warn!("Saved query result to {:?}", file_path);
}
}
if epoch % 10 == 0 {
log::warn!(
"Query Result {} Took: {:.2?}",
epoch + 1,
duration
);
}
query_result_time_ns.push(duration.as_nanos());
last_df = data.clone();
epoch = epoch + 1;
Expand Down
6 changes: 5 additions & 1 deletion deepola/wake/src/channel/channel_group.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::rc::Rc;

use crate::data::DataMessage;
use crate::{data::DataMessage, utils::log_event};

use super::single_channel::*;

Expand Down Expand Up @@ -34,13 +34,15 @@ impl<T: Send> MultiChannelReader<T> {

/// Read a message from the seq_no-th reader.
pub fn read(&self, seq_no: usize) -> DataMessage<T> {
log_event(&format!("read-message-channel-{}",seq_no), "start");
let reader = self.reader(seq_no);
let message = reader.read();
log::debug!(
"Read from (channel: {}). {:?}.",
reader.channel_id(),
message
);
log_event(&format!("read-message-channel-{}",seq_no), "end");
message
}
}
Expand Down Expand Up @@ -82,10 +84,12 @@ impl<T: Send> MultiChannelBroadcaster<T> {

/// Broadcast a message to all writers.
pub fn write(&self, message: DataMessage<T>) {
log_event("write-message", "start");
for w in self.iter() {
log::debug!("Writes to (channel: {}). {:?}.", w.channel_id(), message);
w.write(message.clone())
}
log_event("write-message", "end");
}

pub fn iter(&self) -> impl Iterator<Item = &ChannelWriter<T>> + '_ {
Expand Down
8 changes: 3 additions & 5 deletions deepola/wake/src/graph/exec_service.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use getset::Getters;
use std::thread::{self, JoinHandle};

use crate::utils::log_node_mapping;

use super::node::ExecutionNode;

#[derive(Getters)]
Expand Down Expand Up @@ -34,11 +36,7 @@ impl<T: Send + 'static> ExecutionService<T> {
node.run();
node
});
log::info!(
"[logging] type=node-thread-map node={} thread={:?}",
node_id,
handle.thread().id()
);
log_node_mapping(&node_id, handle.thread().id());
self.thread_handles.push(handle);
}
}
Expand Down
28 changes: 6 additions & 22 deletions deepola/wake/src/graph/node.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::data::*;
use crate::processor::*;
use crate::utils::{log_event, log_node_edge};

use getset::{Getters, Setters};
use nanoid::nanoid;
Expand Down Expand Up @@ -80,11 +81,7 @@ impl<T: Send + 'static> ExecutionNode<T> {
pub fn subscribe_to_node(&self, source_node: &ExecutionNode<T>, for_channel: usize) {
let writer = &self.self_writers[for_channel];
source_node.add(writer.clone());
log::info!(
"[logging] type=node-edge source={} dest={}",
source_node.node_id(),
self.node_id()
);
log_node_edge(source_node.node_id(), self.node_id());
}

/// Processes the data from input stream until we see EOF from all input channels.
Expand Down Expand Up @@ -114,32 +111,19 @@ impl<T: Send + 'static> ExecutionNode<T> {
}

// Pre-processing (if needed)
log::debug!("Starts Pre-Processing for Node: [{}]", self.node_id());
let start_time = std::time::Instant::now();
log_event("pre-process", "start");
self.stream_processor
.borrow_mut()
.pre_process(input_reader.clone());
log::info!(
"[logging] type=pre-process node={} duration={} unit=micros",
self.node_id(),
start_time.elapsed().as_micros()
);
log::debug!("Finished Pre-Processing for Node: [{}]", self.node_id());
log_event("pre-process", "end");

// Actual data processing
log::debug!("Starts Data Processing for Node: [{}]", self.node_id());
let start_time = std::time::Instant::now();
log_event("process-stream", "start");
self.stream_processor()
.borrow()
.process_stream(input_reader.clone(), output_writer.clone());
log::info!(
"[logging] type=process-stream node={} duration={} unit=micros",
self.node_id(),
start_time.elapsed().as_micros()
);
log::debug!("Finished Data Processing for Node: [{}]", self.node_id());
log_event("process-stream", "end");

log::debug!("Terminating Node: [{}]", self.node_id());
}

pub fn input_reader(&self) -> MultiChannelReader<T> {
Expand Down
1 change: 1 addition & 0 deletions deepola/wake/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ pub mod graph;
pub mod inference;
pub mod polars_operations;
pub mod processor;
pub mod utils;
// pub mod forecast;

#[cfg(test)]
Expand Down
18 changes: 18 additions & 0 deletions deepola/wake/src/polars_operations/accumulator/agg_accumulator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ pub struct AggAccumulator {
#[get = "pub"]
group_key: Vec<String>,

/// Group attributes
#[set = "pub"]
#[get = "pub"]
group_attributes: Vec<String>,

/// Used to specify aggregations for each column
#[set = "pub"]
#[get = "pub"]
Expand Down Expand Up @@ -57,6 +62,7 @@ impl AggAccumulator {
pub fn new() -> Self {
AggAccumulator {
group_key: vec![],
group_attributes: vec![],
accumulated: RefCell::new(DataFrame::empty()),
aggregates: vec![],
add_count_column: false,
Expand Down Expand Up @@ -130,6 +136,12 @@ impl AggAccumulator {
vec!["sum".into()], // To join counts, meed to perform sum
))
}
for attribute in self.group_attributes() {
acc_aggs.push((
format!("{}_{}", attribute, "first"),
vec!["first".into()],
))
}
acc_aggs
}

Expand All @@ -141,6 +153,12 @@ impl AggAccumulator {
vec!["count".into()],
))
}
for attribute in self.group_attributes() {
aggregates.push((
attribute.into(),
vec!["first".into()],
))
}
aggregates
}
}
Expand Down
53 changes: 15 additions & 38 deletions deepola/wake/src/polars_operations/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use polars::prelude::*;
use crate::data::*;
use crate::graph::ExecutionNode;
use crate::processor::StreamProcessor;
use crate::utils::log_event;

#[derive(Default)]
pub struct HashJoinBuilder {
Expand Down Expand Up @@ -106,79 +107,55 @@ impl HashJoinNode {

impl StreamProcessor<DataFrame> for HashJoinNode {
fn pre_process(&mut self, input_stream: crate::channel::MultiChannelReader<DataFrame>) {
let mut start_time = std::time::Instant::now();
loop {
let channel_seq = 1;
let message = input_stream.read(channel_seq);
log::info!(
"[logging] type=execution thread={:?} action=pre-read time={:?}",
std::thread::current().id(),
start_time.elapsed().as_micros()
);
start_time = std::time::Instant::now();
log_event("process-message", "start");
match message.payload() {
Payload::EOF => {
log_event("process-message", "end");
break;
}
Payload::Signal(_) => {
log_event("process-message", "end");
break;
}
Payload::Signal(_) => break,
Payload::Some(dblock) => {
self.pre_process(dblock.data());
log_event("process-message", "end");
}
}
log::info!(
"[logging] type=execution thread={:?} action=pre-process time={:?}",
std::thread::current().id(),
start_time.elapsed().as_micros()
);
start_time = std::time::Instant::now();
}
log::info!(
"[logging] type=execution thread={:?} action=pre-process time={:?}",
std::thread::current().id(),
start_time.elapsed().as_micros()
);
}

fn process_stream(
&self,
input_stream: crate::channel::MultiChannelReader<DataFrame>,
output_stream: crate::channel::MultiChannelBroadcaster<DataFrame>,
) {
let mut start_time = std::time::Instant::now();
loop {
let channel_seq = 0;
let message = input_stream.read(channel_seq);
log::info!(
"[logging] type=execution thread={:?} action=read time={:?}",
std::thread::current().id(),
start_time.elapsed().as_micros()
);
start_time = std::time::Instant::now();
log_event("process-message", "start");
match message.payload() {
Payload::EOF => {
output_stream.write(message);
log_event("process-message", "end");
break;
}
Payload::Signal(_) => {
log_event("process-message", "end");
break;
}
Payload::Signal(_) => break,
Payload::Some(dblock) => {
let output_df = self.process(dblock.data());
let output_dblock = DataBlock::new(output_df, dblock.metadata().clone());
let output_message = DataMessage::from(output_dblock);
output_stream.write(output_message);
log_event("process-message", "end");
}
}
log::info!(
"[logging] type=execution thread={:?} action=process time={:?}",
std::thread::current().id(),
start_time.elapsed().as_micros()
);
start_time = std::time::Instant::now();
}
log::info!(
"[logging] type=execution thread={:?} action=process time={:?}",
std::thread::current().id(),
start_time.elapsed().as_micros()
);
}
}

Expand Down
Loading