diff --git a/deepola/wake/examples/tpch_polars/q10.rs b/deepola/wake/examples/tpch_polars/q10.rs index f8ae82f..cc5278e 100644 --- a/deepola/wake/examples/tpch_polars/q10.rs +++ b/deepola/wake/examples/tpch_polars/q10.rs @@ -130,6 +130,8 @@ pub fn query( sum_accumulator .set_group_key(vec![ "o_custkey".into(), + ]) + .set_group_attributes(vec![ "c_name".into(), "c_acctbal".into(), "c_phone".into(), @@ -147,13 +149,13 @@ pub fn query( .appender(MapAppender::new(Box::new(|df: &DataFrame| { let cols = vec![ "o_custkey", - "c_name", + "c_name_first", "disc_price_sum", - "c_acctbal", - "c_phone", - "n_name", - "c_address", - "c_comment", + "c_acctbal_first", + "c_phone_first", + "n_name_first", + "c_address_first", + "c_comment_first", ]; df.select(cols) .unwrap() diff --git a/deepola/wake/examples/tpch_polars/utils.rs b/deepola/wake/examples/tpch_polars/utils.rs index 6043b18..d3c7f57 100644 --- a/deepola/wake/examples/tpch_polars/utils.rs +++ b/deepola/wake/examples/tpch_polars/utils.rs @@ -131,6 +131,7 @@ pub fn run_query( let mut epoch = 0; let start_time = Instant::now(); + log::warn!("Query {} has {} nodes", _query_no, query_service.nodes().len()); query_service.run(); loop { let message = output_reader.read(); @@ -144,14 +145,16 @@ pub fn run_query( let file_path = results_dir_path.join(format!("{}.csv", epoch)); save_df_to_csv(&mut data.clone(), &file_path); if epoch % 10 == 0 { - log::warn!( - "Query Result {} Took: {:.2?}", - epoch + 1, - duration - ); log::warn!("Saved query result to {:?}", file_path); } } + if epoch % 10 == 0 { + log::warn!( + "Query Result {} Took: {:.2?}", + epoch + 1, + duration + ); + } query_result_time_ns.push(duration.as_nanos()); last_df = data.clone(); epoch = epoch + 1; diff --git a/deepola/wake/src/channel/channel_group.rs b/deepola/wake/src/channel/channel_group.rs index 04beecc..cd3fd6e 100644 --- a/deepola/wake/src/channel/channel_group.rs +++ b/deepola/wake/src/channel/channel_group.rs @@ -1,6 +1,6 @@ use std::rc::Rc; -use crate::data::DataMessage; +use crate::{data::DataMessage, utils::log_event}; use super::single_channel::*; @@ -34,6 +34,7 @@ impl MultiChannelReader { /// Read a message from the seq_no-th reader. pub fn read(&self, seq_no: usize) -> DataMessage { + log_event(&format!("read-message-channel-{}",seq_no), "start"); let reader = self.reader(seq_no); let message = reader.read(); log::debug!( @@ -41,6 +42,7 @@ impl MultiChannelReader { reader.channel_id(), message ); + log_event(&format!("read-message-channel-{}",seq_no), "end"); message } } @@ -82,10 +84,12 @@ impl MultiChannelBroadcaster { /// Broadcast a message to all writers. pub fn write(&self, message: DataMessage) { + log_event("write-message", "start"); for w in self.iter() { log::debug!("Writes to (channel: {}). {:?}.", w.channel_id(), message); w.write(message.clone()) } + log_event("write-message", "end"); } pub fn iter(&self) -> impl Iterator> + '_ { diff --git a/deepola/wake/src/graph/exec_service.rs b/deepola/wake/src/graph/exec_service.rs index c3b614c..5f27b00 100644 --- a/deepola/wake/src/graph/exec_service.rs +++ b/deepola/wake/src/graph/exec_service.rs @@ -1,6 +1,8 @@ use getset::Getters; use std::thread::{self, JoinHandle}; +use crate::utils::log_node_mapping; + use super::node::ExecutionNode; #[derive(Getters)] @@ -34,11 +36,7 @@ impl ExecutionService { node.run(); node }); - log::info!( - "[logging] type=node-thread-map node={} thread={:?}", - node_id, - handle.thread().id() - ); + log_node_mapping(&node_id, handle.thread().id()); self.thread_handles.push(handle); } } diff --git a/deepola/wake/src/graph/node.rs b/deepola/wake/src/graph/node.rs index 79efc81..5cdc259 100644 --- a/deepola/wake/src/graph/node.rs +++ b/deepola/wake/src/graph/node.rs @@ -1,5 +1,6 @@ use crate::data::*; use crate::processor::*; +use crate::utils::{log_event, log_node_edge}; use getset::{Getters, Setters}; use nanoid::nanoid; @@ -80,11 +81,7 @@ impl ExecutionNode { pub fn subscribe_to_node(&self, source_node: &ExecutionNode, for_channel: usize) { let writer = &self.self_writers[for_channel]; source_node.add(writer.clone()); - log::info!( - "[logging] type=node-edge source={} dest={}", - source_node.node_id(), - self.node_id() - ); + log_node_edge(source_node.node_id(), self.node_id()); } /// Processes the data from input stream until we see EOF from all input channels. @@ -114,32 +111,19 @@ impl ExecutionNode { } // Pre-processing (if needed) - log::debug!("Starts Pre-Processing for Node: [{}]", self.node_id()); - let start_time = std::time::Instant::now(); + log_event("pre-process", "start"); self.stream_processor .borrow_mut() .pre_process(input_reader.clone()); - log::info!( - "[logging] type=pre-process node={} duration={} unit=micros", - self.node_id(), - start_time.elapsed().as_micros() - ); - log::debug!("Finished Pre-Processing for Node: [{}]", self.node_id()); + log_event("pre-process", "end"); // Actual data processing - log::debug!("Starts Data Processing for Node: [{}]", self.node_id()); - let start_time = std::time::Instant::now(); + log_event("process-stream", "start"); self.stream_processor() .borrow() .process_stream(input_reader.clone(), output_writer.clone()); - log::info!( - "[logging] type=process-stream node={} duration={} unit=micros", - self.node_id(), - start_time.elapsed().as_micros() - ); - log::debug!("Finished Data Processing for Node: [{}]", self.node_id()); + log_event("process-stream", "end"); - log::debug!("Terminating Node: [{}]", self.node_id()); } pub fn input_reader(&self) -> MultiChannelReader { diff --git a/deepola/wake/src/lib.rs b/deepola/wake/src/lib.rs index 3cc6ce4..626aa00 100644 --- a/deepola/wake/src/lib.rs +++ b/deepola/wake/src/lib.rs @@ -4,6 +4,7 @@ pub mod graph; pub mod inference; pub mod polars_operations; pub mod processor; +pub mod utils; // pub mod forecast; #[cfg(test)] diff --git a/deepola/wake/src/polars_operations/accumulator/agg_accumulator.rs b/deepola/wake/src/polars_operations/accumulator/agg_accumulator.rs index df6a9c0..b7e1123 100644 --- a/deepola/wake/src/polars_operations/accumulator/agg_accumulator.rs +++ b/deepola/wake/src/polars_operations/accumulator/agg_accumulator.rs @@ -25,6 +25,11 @@ pub struct AggAccumulator { #[get = "pub"] group_key: Vec, + /// Group attributes + #[set = "pub"] + #[get = "pub"] + group_attributes: Vec, + /// Used to specify aggregations for each column #[set = "pub"] #[get = "pub"] @@ -57,6 +62,7 @@ impl AggAccumulator { pub fn new() -> Self { AggAccumulator { group_key: vec![], + group_attributes: vec![], accumulated: RefCell::new(DataFrame::empty()), aggregates: vec![], add_count_column: false, @@ -130,6 +136,12 @@ impl AggAccumulator { vec!["sum".into()], // To join counts, meed to perform sum )) } + for attribute in self.group_attributes() { + acc_aggs.push(( + format!("{}_{}", attribute, "first"), + vec!["first".into()], + )) + } acc_aggs } @@ -141,6 +153,12 @@ impl AggAccumulator { vec!["count".into()], )) } + for attribute in self.group_attributes() { + aggregates.push(( + attribute.into(), + vec!["first".into()], + )) + } aggregates } } diff --git a/deepola/wake/src/polars_operations/hash_join.rs b/deepola/wake/src/polars_operations/hash_join.rs index da46cf5..debfdc6 100644 --- a/deepola/wake/src/polars_operations/hash_join.rs +++ b/deepola/wake/src/polars_operations/hash_join.rs @@ -4,6 +4,7 @@ use polars::prelude::*; use crate::data::*; use crate::graph::ExecutionNode; use crate::processor::StreamProcessor; +use crate::utils::log_event; #[derive(Default)] pub struct HashJoinBuilder { @@ -106,37 +107,25 @@ impl HashJoinNode { impl StreamProcessor for HashJoinNode { fn pre_process(&mut self, input_stream: crate::channel::MultiChannelReader) { - let mut start_time = std::time::Instant::now(); loop { let channel_seq = 1; let message = input_stream.read(channel_seq); - log::info!( - "[logging] type=execution thread={:?} action=pre-read time={:?}", - std::thread::current().id(), - start_time.elapsed().as_micros() - ); - start_time = std::time::Instant::now(); + log_event("process-message", "start"); match message.payload() { Payload::EOF => { + log_event("process-message", "end"); + break; + } + Payload::Signal(_) => { + log_event("process-message", "end"); break; } - Payload::Signal(_) => break, Payload::Some(dblock) => { self.pre_process(dblock.data()); + log_event("process-message", "end"); } } - log::info!( - "[logging] type=execution thread={:?} action=pre-process time={:?}", - std::thread::current().id(), - start_time.elapsed().as_micros() - ); - start_time = std::time::Instant::now(); } - log::info!( - "[logging] type=execution thread={:?} action=pre-process time={:?}", - std::thread::current().id(), - start_time.elapsed().as_micros() - ); } fn process_stream( @@ -144,41 +133,29 @@ impl StreamProcessor for HashJoinNode { input_stream: crate::channel::MultiChannelReader, output_stream: crate::channel::MultiChannelBroadcaster, ) { - let mut start_time = std::time::Instant::now(); loop { let channel_seq = 0; let message = input_stream.read(channel_seq); - log::info!( - "[logging] type=execution thread={:?} action=read time={:?}", - std::thread::current().id(), - start_time.elapsed().as_micros() - ); - start_time = std::time::Instant::now(); + log_event("process-message", "start"); match message.payload() { Payload::EOF => { output_stream.write(message); + log_event("process-message", "end"); + break; + } + Payload::Signal(_) => { + log_event("process-message", "end"); break; } - Payload::Signal(_) => break, Payload::Some(dblock) => { let output_df = self.process(dblock.data()); let output_dblock = DataBlock::new(output_df, dblock.metadata().clone()); let output_message = DataMessage::from(output_dblock); output_stream.write(output_message); + log_event("process-message", "end"); } } - log::info!( - "[logging] type=execution thread={:?} action=process time={:?}", - std::thread::current().id(), - start_time.elapsed().as_micros() - ); - start_time = std::time::Instant::now(); } - log::info!( - "[logging] type=execution thread={:?} action=process time={:?}", - std::thread::current().id(), - start_time.elapsed().as_micros() - ); } } diff --git a/deepola/wake/src/polars_operations/merger/mapper_df.rs b/deepola/wake/src/polars_operations/merger/mapper_df.rs index 35bdf67..5eaa40f 100644 --- a/deepola/wake/src/polars_operations/merger/mapper_df.rs +++ b/deepola/wake/src/polars_operations/merger/mapper_df.rs @@ -6,6 +6,7 @@ use crate::data::{ MetaCell, Payload, }; +use crate::utils::log_event; use crate::{ channel::{MultiChannelBroadcaster, MultiChannelReader}, processor::StreamProcessor, @@ -140,37 +141,36 @@ impl StreamProcessor for MapperDfMerger { let channel_left = 0; let channel_right = 1; loop { - let start_time = std::time::Instant::now(); - // if both sides don't need any more inputs, we can merge and produce an output. if !self.needs_left().borrow() && !self.needs_right().borrow() { // merge will set needs_left or needs_right to true; thus, in the next iteration, // this condition won't be satisfied. + log_event("process-message", "start"); let output_df = self.merge(); let output_metadata = self.current_metadata(); let output_dblock = DataBlock::new(output_df, output_metadata); let output_message = DataMessage::from(output_dblock); output_stream.write(output_message); - log::info!( - "[logging] type=execution thread={:?} action=process time={:?}", - std::thread::current().id(), - start_time.elapsed().as_micros() - ); + log_event("process-message", "end"); continue; } if self.needs_left() { let message = input_stream.read(channel_left); + log_event("process-message", "start"); match message.payload() { Payload::EOF => { output_stream.write(message); + log_event("process-message", "end"); break; } Payload::Signal(_) => { + log_event("process-message", "end"); break; } Payload::Some(data_block) => { self.supply_left(data_block); + log_event("process-message", "end"); continue; } } @@ -178,16 +178,20 @@ impl StreamProcessor for MapperDfMerger { if self.needs_right() { let message = input_stream.read(channel_right); + log_event("process-message", "start"); match message.payload() { Payload::EOF => { output_stream.write(message); + log_event("process-message", "end"); break; } Payload::Signal(_) => { + log_event("process-message", "end"); break; } Payload::Some(data_block) => { self.supply_right(data_block); + log_event("process-message", "end"); continue; } } diff --git a/deepola/wake/src/polars_operations/merger/sorted_df.rs b/deepola/wake/src/polars_operations/merger/sorted_df.rs index 94a3e99..eae3bfd 100644 --- a/deepola/wake/src/polars_operations/merger/sorted_df.rs +++ b/deepola/wake/src/polars_operations/merger/sorted_df.rs @@ -4,6 +4,7 @@ use crate::data::DATABLOCK_CARDINALITY; use crate::data::DataMessage; use crate::data::MetaCell; use crate::data::Payload; +use crate::utils::log_event; use crate::{ channel::{MultiChannelBroadcaster, MultiChannelReader}, processor::StreamProcessor, @@ -189,37 +190,36 @@ impl StreamProcessor for SortedDfMerger { let channel_right = 1; loop { - let start_time = std::time::Instant::now(); - // if both sides don't need any more inputs, we can merge and produce an output. if !self.needs_left().borrow() && !self.needs_right().borrow() { // merge will set needs_left or needs_right to true; thus, in the next iteration, // this condition won't be satisfied. + log_event("process-message", "start"); let output_df = self.merge(); let output_metadata = self.current_metadata(); let output_dblock = DataBlock::new(output_df, output_metadata); let output_message = DataMessage::from(output_dblock); output_stream.write(output_message); - log::info!( - "[logging] type=execution thread={:?} action=process time={:?}", - std::thread::current().id(), - start_time.elapsed().as_micros() - ); + log_event("process-message", "end"); continue; } if self.needs_left() { let message = input_stream.read(channel_left); + log_event("process-message", "start"); match message.payload() { Payload::EOF => { output_stream.write(message); + log_event("process-message", "end"); break; } Payload::Signal(_) => { + log_event("process-message", "end"); break; } Payload::Some(data_block) => { self.supply_left(data_block); + log_event("process-message", "end"); continue; } } @@ -227,16 +227,20 @@ impl StreamProcessor for SortedDfMerger { if self.needs_right() { let message = input_stream.read(channel_right); + log_event("process-message", "start"); match message.payload() { Payload::EOF => { output_stream.write(message); + log_event("process-message", "end"); break; } Payload::Signal(_) => { + log_event("process-message", "end"); break; } Payload::Some(data_block) => { self.supply_right(data_block); + log_event("process-message", "end"); continue; } } diff --git a/deepola/wake/src/polars_operations/reader/csvreader.rs b/deepola/wake/src/polars_operations/reader/csvreader.rs index dc2027e..c185cd4 100644 --- a/deepola/wake/src/polars_operations/reader/csvreader.rs +++ b/deepola/wake/src/polars_operations/reader/csvreader.rs @@ -3,6 +3,7 @@ use polars::prelude::*; use crate::data::*; use crate::graph::ExecutionNode; use crate::processor::StreamProcessor; +use crate::utils::log_event; pub struct CSVReaderBuilder { delimiter: char, @@ -119,22 +120,20 @@ impl StreamProcessor for CSVReader { input_stream: crate::channel::MultiChannelReader, output_stream: crate::channel::MultiChannelBroadcaster, ) { - let mut start_time = std::time::Instant::now(); loop { let channel_seq = 0; let message = input_stream.read(channel_seq); - log::info!( - "[logging] type=execution thread={:?} action=read time={:?}", - std::thread::current().id(), - start_time.elapsed().as_micros() - ); - start_time = std::time::Instant::now(); + log_event("process-message", "start"); match message.payload() { Payload::EOF => { output_stream.write(message); + log_event("process-message", "end"); + break; + } + Payload::Signal(_) => { + log_event("process-message", "end"); break; } - Payload::Signal(_) => break, Payload::Some(dblock) => { let mut metadata = dblock.metadata().clone(); let mut expected_total_records = @@ -173,20 +172,10 @@ impl StreamProcessor for CSVReader { output_stream.write(output_message); } } + log_event("process-message", "end"); } } - log::info!( - "[logging] type=execution thread={:?} action=process time={:?}", - std::thread::current().id(), - start_time.elapsed().as_micros() - ); - start_time = std::time::Instant::now(); } - log::info!( - "[logging] type=execution thread={:?} action=process time={:?}", - std::thread::current().id(), - start_time.elapsed().as_micros() - ); } } diff --git a/deepola/wake/src/polars_operations/reader/parquetreader.rs b/deepola/wake/src/polars_operations/reader/parquetreader.rs index 641c1de..c809145 100644 --- a/deepola/wake/src/polars_operations/reader/parquetreader.rs +++ b/deepola/wake/src/polars_operations/reader/parquetreader.rs @@ -6,6 +6,7 @@ use polars::prelude::*; use crate::data::*; use crate::graph::ExecutionNode; use crate::processor::StreamProcessor; +use crate::utils::log_event; #[derive(Default)] pub struct ParquetReaderBuilder { @@ -73,22 +74,20 @@ impl StreamProcessor for ParquetReader { input_stream: crate::channel::MultiChannelReader, output_stream: crate::channel::MultiChannelBroadcaster, ) { - let mut start_time = std::time::Instant::now(); loop { let channel_seq = 0; let message = input_stream.read(channel_seq); - log::info!( - "[logging] type=execution thread={:?} action=read time={:?}", - std::thread::current().id(), - start_time.elapsed().as_micros() - ); - start_time = std::time::Instant::now(); + log_event("process-message", "start"); match message.payload() { Payload::EOF => { output_stream.write(message); + log_event("process-message", "end"); + break; + } + Payload::Signal(_) => { + log_event("process-message", "end"); break; } - Payload::Signal(_) => break, Payload::Some(dblock) => { let mut metadata = dblock.metadata().clone(); let mut expected_total_records = @@ -127,19 +126,9 @@ impl StreamProcessor for ParquetReader { output_stream.write(output_message); } } + log_event("process-message", "end"); } } - log::info!( - "[logging] type=execution thread={:?} action=process time={:?}", - std::thread::current().id(), - start_time.elapsed().as_micros() - ); - start_time = std::time::Instant::now(); } - log::info!( - "[logging] type=execution thread={:?} action=process time={:?}", - std::thread::current().id(), - start_time.elapsed().as_micros() - ); } } diff --git a/deepola/wake/src/processor/message_processor.rs b/deepola/wake/src/processor/message_processor.rs index 369a259..2a11f7b 100644 --- a/deepola/wake/src/processor/message_processor.rs +++ b/deepola/wake/src/processor/message_processor.rs @@ -9,6 +9,7 @@ use crate::data::{ MetaCell, Payload, }; +use crate::utils::log_event; use super::StreamProcessor; @@ -28,17 +29,11 @@ impl + Send> StreamProcessor for R { input_stream: crate::channel::MultiChannelReader, output_stream: crate::channel::MultiChannelBroadcaster, ) { - let mut start_time = std::time::Instant::now(); let mut last_metadata: Option> = None; loop { let channel_seq = 0; let message = input_stream.read(channel_seq); - log::info!( - "[logging] type=execution thread={:?} action=read time={:?}", - std::thread::current().id(), - start_time.elapsed().as_micros() - ); - start_time = std::time::Instant::now(); + log_event("process-message", "start"); match message.payload() { Payload::EOF => { if let Some(df_acc) = self.post_process_msg() { @@ -50,9 +45,10 @@ impl + Send> StreamProcessor for R { } let post_process_dblock = DataBlock::new(df_acc, eof_metadata); let post_process_msg = DataMessage::from(post_process_dblock); - output_stream.write(post_process_msg) + output_stream.write(post_process_msg); } output_stream.write(message); + log_event("process-message", "end"); break; } Payload::Some(dblock) => { @@ -64,23 +60,14 @@ impl + Send> StreamProcessor for R { let output_message = DataMessage::from(output_dblock); output_stream.write(output_message); } + log_event("process-message", "end"); } Payload::Signal(_) => { + log_event("process-message", "end"); break; } } - log::info!( - "[logging] type=execution thread={:?} action=process time={:?}", - std::thread::current().id(), - start_time.elapsed().as_micros() - ); - start_time = std::time::Instant::now(); } - log::info!( - "[logging] type=execution thread={:?} action=process time={:?}", - std::thread::current().id(), - start_time.elapsed().as_micros() - ); } } @@ -138,8 +125,10 @@ pub trait MessageFractionProcessor: Send { loop { let channel_seq = 0; let message = input_stream.read(channel_seq); + log_event("process-message", "start"); match message.payload() { Payload::EOF => { + log_event("process-message", "end"); output_stream.write(message); break; } @@ -150,9 +139,11 @@ pub trait MessageFractionProcessor: Send { let output_df = self.process(dblock.data(), fraction); let output_dblock = DataBlock::new(output_df, dblock.metadata().clone()); let output_message = DataMessage::from(output_dblock); + log_event("process-message", "end"); output_stream.write(output_message); } Payload::Signal(_) => { + log_event("process-message", "end"); break; } } diff --git a/scripts/visualization/process_logs.py b/scripts/visualization/process_logs.py index abf39f8..47c5260 100644 --- a/scripts/visualization/process_logs.py +++ b/scripts/visualization/process_logs.py @@ -1,56 +1,86 @@ +import os import sys import pandas as pd import graphviz -from dateutil import parser -from datetime import timedelta -import re -def parse_log_file(log_file = "output.log"): - regex = re.compile(r'\[(.*)\] \[logging\] (.*)') +def parse_log_file(log_file = "output.log", output_dir = "outputs"): log_lines = open(log_file,'r').readlines() collect_logs = [] for line in log_lines: - if '[logging]' in line: - matches = regex.match(line) - event_time = parser.parse(matches.groups()[0].split()[0]) - tokens = matches.groups()[1].split() - log = {'event_time': event_time} - for token in tokens: - key,value = token.split('=') - log[key] = value + if "[logging]" in line: + events = line.split("[logging]")[-1] + key_values = events.split() + log = {} + for element in key_values: + try: + key = element.split(",")[0][1:] + val = element.split(",")[1][:-1] + log[key] = val + except Exception as e: + print(element, line) + raise e collect_logs.append(log) df = pd.DataFrame(collect_logs) + df.to_csv(f"{output_dir}/parsed-logs.csv",index=False) + print(f"Saved parsed dataframe to {output_dir}/parsed-logs.csv") return df -def generate_query_graph(df, query, output_dir = 'outputs/', display=False): +def generate_node_statistics(df, output_dir = "outputs"): + node_statistics = [] + grouped_df = df.groupby("node") + for (node, raw_df) in grouped_df: + tasks = raw_df.groupby("task") + for (task_type, task_type_df) in tasks: + prev_timestamp = 0 + time_spent_in_task = 0 + for (idx,event) in task_type_df.iterrows(): + if event.action == "end": + time_spent_in_task += (event.timestamp - prev_timestamp) + else: + prev_timestamp = event.timestamp + node_statistics.append({ + "node": node, + "task": task_type, + "num_events": task_type_df.size, + "time": time_spent_in_task + }) + node_stats_df = pd.DataFrame(node_statistics) + node_stats_df.sort_values(["node","task"], ascending=False, inplace=True) + node_stats_df.to_csv(f"{output_dir}/node-statistics.csv",index=False) + print(f"Saved node statistics to {output_dir}/node-statistics.csv") + return node_stats_df + +def generate_query_graph(df, output_dir = "outputs", display=False): graph_edges = df[df['type'] == 'node-edge'] g = graphviz.Digraph(name = f'graph', format='png') for index,edge in graph_edges.iterrows(): - source = edge['source'] + source = edge['src'] dest = edge['dest'] g.edge(source, dest) if display: return g else: g.render(directory=output_dir).replace('\\', '/') + print(f"Saved output graph to {output_dir}/graph.gv.png") return g -def generate_execution_query_graph(df, query, output_dir = 'outputs/', display=False): - g = generate_query_graph(df, query, output_dir, display) - execution_df = get_execution_log_df(df) - process_time = execution_df[execution_df['action'] == 'process'] - nodes_by_process_time = process_time[['node','duration']].groupby(['node']).sum().sort_values('duration', ascending=False).reset_index() - maximum_duration = nodes_by_process_time['duration'].max() - minimum_duration = nodes_by_process_time['duration'].min() - print(nodes_by_process_time) - for index,row in nodes_by_process_time.iterrows(): - relative_duration = min(9,int((10*row['duration'])/maximum_duration)) - g.node(row['node'], style='filled', fillcolor=f"/oranges9/{1 if relative_duration <= 0 else relative_duration}") - if display: - return g - else: - g.render(directory=output_dir).replace('\\','/') - return g +# def generate_execution_query_graph(df, output_dir = 'outputs/', display=False): +# g = generate_query_graph(df, output_dir, display) +# execution_df = get_execution_log_df(df) +# # process_time = execution_df[execution_df['action'] == 'process'] +# # nodes_by_process_time = process_time[['node','duration']].groupby(['node']).sum().sort_values('duration', ascending=False).reset_index() +# # maximum_duration = nodes_by_process_time['duration'].max() +# # minimum_duration = nodes_by_process_time['duration'].min() +# # print(nodes_by_process_time) +# for index,row in nodes_by_process_time.iterrows(): +# relative_duration = 9 +# # relative_duration = min(9,int((10*row['duration'])/maximum_duration)) +# g.node(row['node'], style='filled', fillcolor=f"/oranges9/{1 if relative_duration <= 0 else relative_duration}") +# if display: +# return g +# else: +# g.render(directory=output_dir).replace('\\','/') +# return g def get_node_thread_map(df): node_thread_map_logs = df[df['type'] == 'node-thread-map'] @@ -63,35 +93,42 @@ def get_node_thread_map(df): def get_execution_log_df(df): thread_node_map = get_node_thread_map(df) - execution_logs = df[df['type'] == 'execution'] + execution_logs = df[df['type'] == 'event'] execution_logs_entries = [] for _,log in execution_logs.iterrows(): thread = log['thread'] - node = thread_node_map[thread] + node = thread_node_map.get(thread, "main") + task = log['task'] action = log['action'] - time = log['time'] - end_time = log['event_time'] - start_time = end_time - timedelta(microseconds=float(time)) - execution_logs_entries.append({ - 'start_time': start_time.timestamp(), - 'end_time': end_time.timestamp(), - 'thread': thread, - 'node': node, - 'action': action, - 'duration': float(time)/1000 - }) + timestamp = log['timestamp'] + if "write-message" in task or "read-message" in task: + execution_logs_entries.append({ + 'thread': thread, + 'node': node, + 'task': task, + 'action': action, + 'timestamp': int(timestamp), + }) execution_log_df = pd.DataFrame(execution_logs_entries) - min_start_time = execution_log_df['start_time'].min() - execution_log_df['start_time'] -= min_start_time - execution_log_df['end_time'] -= min_start_time - return execution_log_df.sort_values(['start_time', 'end_time']) + print(execution_log_df) + min_start_time = execution_log_df['timestamp'].min() + execution_log_df['timestamp'] -= min_start_time + execution_log_df['timestamp'] /= 1e9 + execution_log_df = execution_log_df.sort_values(['timestamp', 'node', 'action']) + execution_log_df.drop(columns=['thread']) + execution_log_df.to_csv(f"{output_dir}/parsed-event-timeline.csv",index=False) + print(f"Saved events df to {output_dir}/parsed-event-timeline.csv") + return execution_log_df if __name__ == "__main__": - query = sys.argv[1] if len(sys.argv) > 1 else 1 - scale = int(sys.argv[2]) if len(sys.argv) > 2 else 1 - partition = int(sys.argv[3]) if len(sys.argv) > 3 else 10 - log_dir = f'../../deepola/wake/logs' - output_dir = f'../../deepola/wake/outputs/queries/{query}/' - log_file = f'{log_dir}/scale={scale}/partition={partition}/{query}.log' - df = parse_log_file(log_file) - g = generate_execution_query_graph(df, query, output_dir) + if len(sys.argv) < 3: + print("Usage: python3 process_logs.py ") + exit(1) + log_file = sys.argv[1] + output_dir = sys.argv[2] + if not os.path.exists(output_dir): + os.makedirs(output_dir, exist_ok=True) + df = parse_log_file(log_file, output_dir) + g = generate_query_graph(df, output_dir) + events_df = get_execution_log_df(df) + node_statistics_df = generate_node_statistics(events_df, output_dir) \ No newline at end of file