Skip to content

Commit 8442a07

Browse files
Properly call from_pandas in pyarrow (vega#560)
* Properly call from_pandas in pyarrow * bump rustc * Fix warnings --------- Co-authored-by: Jon Mease <[email protected]>
1 parent 8e0f7e2 commit 8442a07

File tree

19 files changed

+172
-130
lines changed

19 files changed

+172
-130
lines changed

pixi.lock

Lines changed: 70 additions & 70 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pixi.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ jupytext = "1.15.0.*"
131131
openjdk = "20.0.0.*"
132132
minio-server = "2023.9.23.3.47.50.*"
133133
minio = "7.1.17.*"
134-
rust = "1.80.*"
134+
rust = "1.84.*"
135135
taplo = ">=0.9.3,<0.10"
136136
ruff = ">=0.6.9,<0.7"
137137
mypy = ">=1.11.2,<2"

vegafusion-core/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,9 @@ edition = "2021"
55
version = "2.0.1"
66
description = "Core components required by multiple VegaFusion crates, with WASM compatibility"
77

8+
[lints.clippy]
9+
module_inception = "allow"
10+
811
[features]
912
tonic_support = ["tonic", "tonic-build"]
1013
py = ["pyo3", "vegafusion-common/py"]

vegafusion-core/src/chart_state.rs

Lines changed: 44 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -133,46 +133,53 @@ impl ChartState {
133133
runtime: &dyn VegaFusionRuntimeTrait,
134134
updates: Vec<ExportUpdateJSON>,
135135
) -> Result<Vec<ExportUpdateJSON>> {
136-
let mut task_graph = self.task_graph.lock().map_err(|err| {
137-
VegaFusionError::internal(format!("Failed to acquire task graph lock: {:?}", err))
138-
})?;
139-
let server_to_client = self.server_to_client_value_indices.clone();
140-
let mut indices: Vec<NodeValueIndex> = Vec::new();
141-
for export_update in &updates {
142-
let var = match export_update.namespace {
143-
ExportUpdateNamespace::Signal => Variable::new_signal(&export_update.name),
144-
ExportUpdateNamespace::Data => Variable::new_data(&export_update.name),
145-
};
146-
let scoped_var: ScopedVariable = (var, export_update.scope.clone());
147-
let node_value_index = *self
148-
.task_graph_mapping
149-
.get(&scoped_var)
150-
.with_context(|| format!("No task graph node found for {scoped_var:?}"))?;
151-
152-
let value = match export_update.namespace {
153-
ExportUpdateNamespace::Signal => {
154-
TaskValue::Scalar(ScalarValue::from_json(&export_update.value)?)
155-
}
156-
ExportUpdateNamespace::Data => {
157-
TaskValue::Table(VegaFusionTable::from_json(&export_update.value)?)
158-
}
159-
};
160-
161-
indices.extend(task_graph.update_value(node_value_index.node_index as usize, value)?);
162-
}
163-
164-
// Filter to update nodes in the comm plan
165-
let indices: Vec<_> = indices
166-
.iter()
167-
.filter(|&node| server_to_client.contains(node))
168-
.cloned()
169-
.collect();
136+
// Scope the mutex guard to ensure it's dropped before the async call
137+
let (indices, cloned_task_graph) = {
138+
let mut task_graph = self.task_graph.lock().map_err(|err| {
139+
VegaFusionError::internal(format!("Failed to acquire task graph lock: {:?}", err))
140+
})?;
141+
let server_to_client = self.server_to_client_value_indices.clone();
142+
let mut indices: Vec<NodeValueIndex> = Vec::new();
143+
144+
for export_update in &updates {
145+
let var = match export_update.namespace {
146+
ExportUpdateNamespace::Signal => Variable::new_signal(&export_update.name),
147+
ExportUpdateNamespace::Data => Variable::new_data(&export_update.name),
148+
};
149+
let scoped_var: ScopedVariable = (var, export_update.scope.clone());
150+
let node_value_index = *self
151+
.task_graph_mapping
152+
.get(&scoped_var)
153+
.with_context(|| format!("No task graph node found for {scoped_var:?}"))?;
154+
155+
let value = match export_update.namespace {
156+
ExportUpdateNamespace::Signal => {
157+
TaskValue::Scalar(ScalarValue::from_json(&export_update.value)?)
158+
}
159+
ExportUpdateNamespace::Data => {
160+
TaskValue::Table(VegaFusionTable::from_json(&export_update.value)?)
161+
}
162+
};
163+
164+
indices
165+
.extend(task_graph.update_value(node_value_index.node_index as usize, value)?);
166+
}
167+
168+
// Filter to update nodes in the comm plan
169+
let indices: Vec<_> = indices
170+
.iter()
171+
.filter(|&node| server_to_client.contains(node))
172+
.cloned()
173+
.collect();
170174

171-
let cloned_task_graph = task_graph.clone();
175+
// Clone the task graph while we still have the lock
176+
let cloned_task_graph = task_graph.clone();
172177

173-
// Drop the MutexGuard before await call to avoid warning
174-
drop(task_graph);
178+
// Return both values we need
179+
(indices, cloned_task_graph)
180+
}; // MutexGuard is dropped here
175181

182+
// Now we can safely make the async call
176183
let response_task_values = runtime
177184
.query_request(
178185
Arc::new(cloned_task_graph),
@@ -185,7 +192,6 @@ impl ChartState {
185192
.into_iter()
186193
.map(|response_value| {
187194
let variable = response_value.variable;
188-
189195
let scope = response_value.scope;
190196
let value = response_value.value;
191197

vegafusion-core/src/expression/lexer.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ struct Tokenizer<'a> {
9292
full_text: &'a str,
9393
}
9494

95-
impl<'a> Tokenizer<'a> {
95+
impl Tokenizer<'_> {
9696
fn new(src: &str) -> Tokenizer {
9797
Tokenizer {
9898
current_index: 0,

vegafusion-core/src/expression/visitors.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -312,7 +312,7 @@ impl<'a> DatasetsColumnUsageVisitor<'a> {
312312
}
313313
}
314314

315-
impl<'a> ExpressionVisitor for DatasetsColumnUsageVisitor<'a> {
315+
impl ExpressionVisitor for DatasetsColumnUsageVisitor<'_> {
316316
fn visit_member(&mut self, node: &MemberExpression) {
317317
if let (Some(datum_var), Some(object), Some(property)) =
318318
(&self.datum_var, &node.object, &node.property)

0 commit comments

Comments
 (0)