Skip to content

Commit 965e9eb

Browse files
authored
Address remaining TODOs after custom executor implementation (#14)
1 parent 4e4d061 commit 965e9eb

File tree

22 files changed

+197
-220
lines changed

22 files changed

+197
-220
lines changed

vegafusion-core/src/proto/pretransform.proto

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ message ExportUpdate {
127127
string namespace = 1;
128128
string name = 2;
129129
repeated uint32 scope = 3;
130-
tasks.TaskValue value = 4;
130+
tasks.MaterializedTaskValue value = 4;
131131
}
132132

133133
message PreTransformLogicalPlanWarning {

vegafusion-core/src/proto/tasks.proto

Lines changed: 2 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -4,26 +4,8 @@ package tasks;
44
import "expression.proto";
55
import "transforms.proto";
66

7-
// ## Task Value
8-
message TaskValue {
9-
oneof data {
10-
/*
11-
* Representation of scalar as single column, single row, record batch in Arrow IPC format
12-
*/
13-
bytes scalar = 1;
14-
15-
/*
16-
* Serialized Arrow record batch in Arrow IPC format
17-
*/
18-
bytes table = 2;
19-
/*
20-
* Serialized DataFusion LogicalPlan in protobuf format
21-
*/
22-
bytes plan = 3;
23-
}
24-
}
25-
267
// ## Materialized Task Value
8+
// Represents a fully materialized (computed) task value, either a scalar or table
279
message MaterializedTaskValue {
2810
oneof data {
2911
/*
@@ -128,7 +110,7 @@ message Task {
128110
Variable variable = 1;
129111
repeated uint32 scope = 2;
130112
oneof task_kind {
131-
TaskValue value = 3;
113+
MaterializedTaskValue value = 3;
132114
DataValuesTask data_values = 4;
133115
DataUrlTask data_url = 5;
134116
DataSourceTask data_source = 6;

vegafusion-core/src/task_graph/graph.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,9 @@ use std::collections::HashMap;
1111

1212
use crate::task_graph::task_value::TaskValue;
1313

14+
use crate::proto::gen::tasks::materialized_task_value::Data;
1415
use crate::proto::gen::tasks::task::TaskKind;
15-
use crate::proto::gen::tasks::task_value::Data;
16-
use crate::proto::gen::tasks::TaskValue as ProtoTaskValue;
16+
use crate::proto::gen::tasks::MaterializedTaskValue as ProtoMaterializedTaskValue;
1717
use std::convert::TryFrom;
1818
use std::hash::{BuildHasher, Hash, Hasher};
1919

@@ -216,7 +216,6 @@ impl TaskGraph {
216216
match value.data.as_ref().unwrap() {
217217
Data::Scalar(_) => "scalar".hash(&mut hasher),
218218
Data::Table(_) => "data".hash(&mut hasher),
219-
Data::Plan(_) => "plan".hash(&mut hasher),
220219
}
221220
} else {
222221
// Include id_fingerprint of parents in the hash
@@ -303,7 +302,9 @@ impl TaskGraph {
303302
node.task = Some(Task {
304303
variable: node.task().variable.clone(),
305304
scope: node.task().scope.clone(),
306-
task_kind: Some(TaskKind::Value(ProtoTaskValue::try_from(&value)?)),
305+
task_kind: Some(TaskKind::Value(ProtoMaterializedTaskValue::try_from(
306+
&value,
307+
)?)),
307308
tz_config: None,
308309
});
309310

vegafusion-core/src/task_graph/task.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use crate::proto::gen::tasks::{
33
task::TaskKind, DataSourceTask, DataUrlTask, DataValuesTask, NodeValueIndex, Task, TzConfig,
44
Variable,
55
};
6-
use crate::proto::gen::tasks::{SignalTask, TaskValue as ProtoTaskValue};
6+
use crate::proto::gen::tasks::{MaterializedTaskValue as ProtoMaterializedTaskValue, SignalTask};
77
use crate::task_graph::task_value::TaskValue;
88
use std::convert::TryFrom;
99

@@ -33,7 +33,9 @@ impl Task {
3333
Self {
3434
variable: Some(variable),
3535
scope: Vec::from(scope),
36-
task_kind: Some(TaskKind::Value(ProtoTaskValue::try_from(&value).unwrap())),
36+
task_kind: Some(TaskKind::Value(
37+
ProtoMaterializedTaskValue::try_from(&value).unwrap(),
38+
)),
3739
tz_config: None,
3840
}
3941
}

vegafusion-core/src/task_graph/task_value.rs

Lines changed: 11 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,7 @@
11
use crate::proto::gen::tasks::materialized_task_value::Data as MaterializedTaskValueData;
2-
use crate::proto::gen::tasks::task_value::Data as TaskValueData;
32
use crate::proto::gen::tasks::ResponseTaskValue;
43
use crate::proto::gen::tasks::{
5-
MaterializedTaskValue as ProtoMaterializedTaskValue, TaskGraphValueResponse,
6-
TaskValue as ProtoTaskValue, Variable,
4+
MaterializedTaskValue as ProtoMaterializedTaskValue, TaskGraphValueResponse, Variable,
75
};
86
use crate::runtime::PlanExecutor;
97
use crate::task_graph::memory::{
@@ -105,30 +103,26 @@ impl MaterializedTaskValue {
105103
}
106104
}
107105

108-
impl TryFrom<&ProtoTaskValue> for TaskValue {
106+
impl TryFrom<&ProtoMaterializedTaskValue> for TaskValue {
109107
type Error = VegaFusionError;
110108

111-
fn try_from(value: &ProtoTaskValue) -> std::result::Result<Self, Self::Error> {
109+
fn try_from(value: &ProtoMaterializedTaskValue) -> std::result::Result<Self, Self::Error> {
112110
match value.data.as_ref().unwrap() {
113-
TaskValueData::Table(value) => Ok(Self::Table(VegaFusionTable::from_ipc_bytes(value)?)),
114-
TaskValueData::Scalar(value) => {
111+
MaterializedTaskValueData::Table(value) => {
112+
Ok(Self::Table(VegaFusionTable::from_ipc_bytes(value)?))
113+
}
114+
MaterializedTaskValueData::Scalar(value) => {
115115
let scalar_table = VegaFusionTable::from_ipc_bytes(value)?;
116116
let scalar_rb = scalar_table.to_record_batch()?;
117117
let scalar_array = scalar_rb.column(0);
118118
let scalar = ScalarValue::try_from_array(scalar_array, 0)?;
119119
Ok(Self::Scalar(scalar))
120120
}
121-
// TODO: we could use datafusion_proto::bytes::logical_plan_from_bytes here, but that
122-
// requires adding datafusion_proto to vegafusion-core deps, as well as passing
123-
// datafusion session (maybe empty one?) to unserialize plan
124-
TaskValueData::Plan(_value) => Err(VegaFusionError::internal(
125-
"Deserialization of Plan TaskValue not yet implemented",
126-
)),
127121
}
128122
}
129123
}
130124

131-
impl TryFrom<&TaskValue> for ProtoTaskValue {
125+
impl TryFrom<&TaskValue> for ProtoMaterializedTaskValue {
132126
type Error = VegaFusionError;
133127

134128
fn try_from(value: &TaskValue) -> std::result::Result<Self, Self::Error> {
@@ -138,17 +132,14 @@ impl TryFrom<&TaskValue> for ProtoTaskValue {
138132
let scalar_rb = RecordBatch::try_from_iter(vec![("value", scalar_array)])?;
139133
let ipc_bytes = VegaFusionTable::from(scalar_rb).to_ipc_bytes()?;
140134
Ok(Self {
141-
data: Some(TaskValueData::Scalar(ipc_bytes)),
135+
data: Some(MaterializedTaskValueData::Scalar(ipc_bytes)),
142136
})
143137
}
144138
TaskValue::Table(table) => Ok(Self {
145-
data: Some(TaskValueData::Table(table.to_ipc_bytes()?)),
139+
data: Some(MaterializedTaskValueData::Table(table.to_ipc_bytes()?)),
146140
}),
147-
// TODO: we could use datafusion_proto::bytes::logical_plan_to_bytes here, but that
148-
// requires adding datafusion_proto to vegafusion-core deps, as well as passing
149-
// datafusion session (maybe empty one?) to unserialize plan
150141
TaskValue::Plan(_) => Err(VegaFusionError::internal(
151-
"Cannot convert Plan TaskValue to protobuf representation",
142+
"TaskValue::Plan cannot be serialized to protobuf. Plans are intermediate values that should be materialized to tables using .to_materialized(plan_executor) before serialization.",
152143
)),
153144
}
154145
}
@@ -174,25 +165,6 @@ impl TryFrom<&MaterializedTaskValue> for ProtoMaterializedTaskValue {
174165
}
175166
}
176167

177-
impl TryFrom<&ProtoMaterializedTaskValue> for TaskValue {
178-
type Error = VegaFusionError;
179-
180-
fn try_from(value: &ProtoMaterializedTaskValue) -> std::result::Result<Self, Self::Error> {
181-
match value.data.as_ref().unwrap() {
182-
MaterializedTaskValueData::Table(value) => {
183-
Ok(Self::Table(VegaFusionTable::from_ipc_bytes(value)?))
184-
}
185-
MaterializedTaskValueData::Scalar(value) => {
186-
let scalar_table = VegaFusionTable::from_ipc_bytes(value)?;
187-
let scalar_rb = scalar_table.to_record_batch()?;
188-
let scalar_array = scalar_rb.column(0);
189-
let scalar = ScalarValue::try_from_array(scalar_array, 0)?;
190-
Ok(Self::Scalar(scalar))
191-
}
192-
}
193-
}
194-
}
195-
196168
impl TaskGraphValueResponse {
197169
pub fn deserialize(self) -> Result<Vec<(Variable, Vec<u32>, TaskValue)>> {
198170
self.response_values

vegafusion-python/tests/test_spark_e2e.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,8 @@ def spark():
4040
.getOrCreate()
4141
)
4242

43-
# TODO: this is required for properly handling temporal. We need to check if we can work around different
44-
# timezone or if we should require users to setup their Spark sessions to operate in UTC
43+
# This is required for properly handling temporal. Due to how Spark handles dates (which doesn't match
44+
# proper SQL standart), we have to set timezone to UTC. We mention this is our docs for users.
4545
session.sql("SET TIME ZONE 'UTC'")
4646

4747
sales_data_df = session.read.parquet(str(SALES_DATA_PATH))

vegafusion-runtime/src/data/tasks.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ impl TaskCall for DataUrlTask {
118118
let url = match self.url.as_ref().unwrap() {
119119
Url::String(url) => url.clone(),
120120
Url::Expr(expr) => {
121-
let compiled = compile(expr, &config, None)?;
121+
let compiled = compile(expr, &config, None).await?;
122122
let url_scalar = compiled.eval_to_scalar()?;
123123
url_scalar.to_scalar_string()?
124124
}

vegafusion-runtime/src/expression/compiler/array.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,14 @@ use vegafusion_common::datafusion_common::DFSchema;
55
use vegafusion_core::error::Result;
66
use vegafusion_core::proto::gen::expression::ArrayExpression;
77

8-
pub fn compile_array(
8+
pub async fn compile_array(
99
node: &ArrayExpression,
1010
config: &CompilationConfig,
1111
schema: &DFSchema,
1212
) -> Result<Expr> {
1313
let mut elements: Vec<Expr> = Vec::new();
1414
for el in &node.elements {
15-
let phys_expr = compile(el, config, Some(schema))?;
15+
let phys_expr = compile(el, config, Some(schema)).await?;
1616
elements.push(phys_expr);
1717
}
1818
Ok(make_array(elements))

vegafusion-runtime/src/expression/compiler/binary.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,14 @@ use vegafusion_core::arrow::datatypes::DataType;
1212
use vegafusion_core::error::Result;
1313
use vegafusion_core::proto::gen::expression::{BinaryExpression, BinaryOperator};
1414

15-
pub fn compile_binary(
15+
pub async fn compile_binary(
1616
node: &BinaryExpression,
1717
config: &CompilationConfig,
1818
schema: &DFSchema,
1919
) -> Result<Expr> {
2020
// First, compile argument
21-
let lhs = compile(node.left(), config, Some(schema))?;
22-
let rhs = compile(node.right(), config, Some(schema))?;
21+
let lhs = compile(node.left(), config, Some(schema)).await?;
22+
let rhs = compile(node.right(), config, Some(schema)).await?;
2323

2424
let lhs_dtype = data_type(&lhs, schema)?;
2525
let rhs_dtype = data_type(&rhs, schema)?;

vegafusion-runtime/src/expression/compiler/call.rs

Lines changed: 23 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ use vegafusion_common::data::table::VegaFusionTable;
4444
use vegafusion_common::datafusion_common::DFSchema;
4545
use vegafusion_common::datatypes::cast_to;
4646
use vegafusion_common::error::{Result, ResultWithContext, VegaFusionError};
47+
use vegafusion_core::data::dataset::VegaFusionDataset;
4748
use vegafusion_core::proto::gen::expression::{
4849
expression, literal, CallExpression, Expression, Literal,
4950
};
@@ -96,15 +97,15 @@ pub enum VegaFusionCallable {
9697
Scale,
9798
}
9899

99-
pub fn compile_scalar_arguments(
100+
pub async fn compile_scalar_arguments(
100101
node: &CallExpression,
101102
config: &CompilationConfig,
102103
schema: &DFSchema,
103104
cast: &Option<DataType>,
104105
) -> Result<Vec<Expr>> {
105106
let mut args: Vec<Expr> = Vec::new();
106107
for arg in &node.arguments {
107-
let compiled_arg = compile(arg, config, Some(schema))?;
108+
let compiled_arg = compile(arg, config, Some(schema)).await?;
108109
let arg_expr = match cast {
109110
None => compiled_arg,
110111
Some(dtype) => cast_to(compiled_arg, dtype, schema)?,
@@ -114,7 +115,7 @@ pub fn compile_scalar_arguments(
114115
Ok(args)
115116
}
116117

117-
pub fn compile_call(
118+
pub async fn compile_call(
118119
node: &CallExpression,
119120
config: &CompilationConfig,
120121
schema: &DFSchema,
@@ -127,10 +128,10 @@ pub fn compile_call(
127128
VegaFusionCallable::Macro(callable) => {
128129
// Apply macro then recursively compile
129130
let new_expr = callable(&node.arguments)?;
130-
compile(&new_expr, config, Some(schema))
131+
compile(&new_expr, config, Some(schema)).await
131132
}
132133
VegaFusionCallable::ScalarUDF { udf, cast } => {
133-
let args = compile_scalar_arguments(node, config, schema, cast)?;
134+
let args = compile_scalar_arguments(node, config, schema, cast).await?;
134135
Ok(Expr::ScalarFunction(expr::ScalarFunction {
135136
func: udf.clone(),
136137
args,
@@ -144,18 +145,19 @@ pub fn compile_call(
144145
..
145146
}) => {
146147
if let Some(dataset) = config.data_scope.get(name) {
147-
if let Some(table) = dataset.as_table() {
148-
let tz_config = config.tz_config.with_context(|| {
149-
"No local timezone info provided".to_string()
150-
})?;
151-
callee(table, &node.arguments[1..], schema, &tz_config)
152-
} else {
153-
// TODO: If a plan executor is available in config, try to materialize here.
154-
Err(VegaFusionError::compilation(format!(
155-
"Dataset {} is not materialized as table",
156-
name
157-
)))
158-
}
148+
let table = match dataset {
149+
VegaFusionDataset::Table { table, .. } => table.clone(),
150+
// Materialize plans on-demand when needed for expression evaluation.
151+
// This allows lazy evaluation of data transformations - plans are only
152+
// executed when the data is actually needed by an expression.
153+
VegaFusionDataset::Plan { plan } => {
154+
config.plan_executor.execute_plan(plan.clone()).await?
155+
}
156+
};
157+
let tz_config = config
158+
.tz_config
159+
.with_context(|| "No local timezone info provided".to_string())?;
160+
callee(&table, &node.arguments[1..], schema, &tz_config)
159161
} else {
160162
Err(VegaFusionError::internal(format!(
161163
"No dataset named {}. Available: {:?}",
@@ -179,11 +181,11 @@ pub fn compile_call(
179181
}
180182
}
181183
VegaFusionCallable::Transform(callable) => {
182-
let args = compile_scalar_arguments(node, config, schema, &None)?;
184+
let args = compile_scalar_arguments(node, config, schema, &None).await?;
183185
callable(&args, schema)
184186
}
185187
VegaFusionCallable::UnaryTransform(callable) => {
186-
let mut args = compile_scalar_arguments(node, config, schema, &None)?;
188+
let mut args = compile_scalar_arguments(node, config, schema, &None).await?;
187189
if args.len() != 1 {
188190
Err(VegaFusionError::internal(format!(
189191
"The {} function requires 1 argument. Received {}",
@@ -195,14 +197,14 @@ pub fn compile_call(
195197
}
196198
}
197199
VegaFusionCallable::LocalTransform(callable) => {
198-
let args = compile_scalar_arguments(node, config, schema, &None)?;
200+
let args = compile_scalar_arguments(node, config, schema, &None).await?;
199201
let tz_config = config
200202
.tz_config
201203
.with_context(|| "No local timezone info provided".to_string())?;
202204
callable(&tz_config, &args, schema)
203205
}
204206
VegaFusionCallable::UtcTransform(callable) => {
205-
let args = compile_scalar_arguments(node, config, schema, &None)?;
207+
let args = compile_scalar_arguments(node, config, schema, &None).await?;
206208
let tz_config = RuntimeTzConfig {
207209
local_tz: chrono_tz::UTC,
208210
default_input_tz: chrono_tz::UTC,

0 commit comments

Comments
 (0)