Skip to content

Commit 940c057

Browse files
authored
Add comprehensive execution tracing (#177)
Signed-off-by: Michael X. Grey <mxgrey@intrinsic.ai>
1 parent 18dc28a commit 940c057

File tree

135 files changed

+7620
-4721
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

135 files changed

+7620
-4721
lines changed

.github/workflows/ci_linux.yaml

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,9 @@ jobs:
2525
steps:
2626
- uses: actions/checkout@v3
2727

28+
- name: Update apt index
29+
run: sudo apt-get update
30+
2831
- name: Get apt dependencies
2932
run: sudo apt-get install protobuf-compiler
3033

@@ -41,7 +44,7 @@ jobs:
4144

4245
- name: Test default features
4346
run: |
44-
cargo test --workspace
47+
cargo test
4548
4649
- name: Test single_threaded_async
4750
run: |
@@ -51,12 +54,12 @@ jobs:
5154
- name: Test diagram
5255
run: |
5356
rm -rf target/debug/build
54-
cargo test --workspace -F=diagram
57+
cargo test -F=diagram
5558
5659
- name: Test trace
5760
run: |
5861
rm -rf target/debug/build
59-
cargo test --workspace -F=trace
62+
cargo test -F=trace
6063
6164
- name: Test all capabilities combined
6265
run: |

Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "crossflow"
3-
version = "0.0.6"
3+
version = "0.0.7"
44
edition = "2024"
55
authors = ["Grey <mxgrey@intrinsic.ai>"]
66
license = "Apache-2.0"
@@ -64,7 +64,7 @@ zenoh-ext = "1.5.1"
6464

6565
[dependencies]
6666
anyhow = { workspace = true }
67-
crossflow_derive = { path = "macros", version = "0.0.6" }
67+
crossflow_derive = { path = "macros", version = "0.0.7" }
6868
bevy_ecs = { workspace = true }
6969
bevy_utils = { workspace = true }
7070
bevy_derive = { workspace = true }

assets/figures/conditional-unreachability-cancel.svg

Lines changed: 1 addition & 0 deletions
Loading

diagram-editor/Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "crossflow_diagram_editor"
3-
version = "0.0.6"
3+
version = "0.0.7"
44
edition = "2024"
55
authors = ["Teo Koon Peng <koonpeng@intrinsic.ai>"]
66
license = "Apache-2.0"
@@ -29,7 +29,7 @@ path = "server/lib.rs"
2929
axum = { workspace = true, features = ["json"], default-features = false }
3030
bevy_app = { workspace = true }
3131
bevy_ecs = { workspace = true }
32-
crossflow = { version = "0.0.6", path = "..", features = ["diagram", "trace"] }
32+
crossflow = { version = "0.0.7", path = "..", features = ["diagram", "trace"] }
3333
clap = { workspace = true, features = ["derive"], optional = true }
3434
flate2 = { version = "1.1.1", optional = true }
3535
futures-util = { workspace = true }

diagram-editor/server/api/executor.rs

Lines changed: 7 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use axum::{
1212
routing::{self},
1313
};
1414
use bevy_ecs::{prelude::Entity, schedule::IntoScheduleConfigs};
15-
use crossflow::{Diagram, DiagramElementRegistry, OperationStarted, Outcome, RequestExt, trace};
15+
use crossflow::{Diagram, DiagramElementRegistry, Outcome, RequestExt, TracedEvent, trace};
1616
use serde::{Deserialize, Serialize};
1717
use std::{
1818
error::Error,
@@ -35,7 +35,7 @@ type WorkflowResponseResult =
3535
Result<(Outcome<serde_json::Value>, Entity), Box<dyn Error + Send + Sync>>;
3636
type WorkflowResponseSender = tokio::sync::oneshot::Sender<WorkflowResponseResult>;
3737

38-
type WorkflowFeedback = OperationStarted;
38+
type WorkflowFeedback = TracedEvent;
3939

4040
#[derive(bevy_ecs::component::Component)]
4141
struct FeedbackSender(tokio::sync::broadcast::Sender<WorkflowFeedback>);
@@ -320,26 +320,13 @@ fn execute_requests(
320320
}
321321

322322
fn debug_feedback(
323-
mut op_started: bevy_ecs::event::EventReader<trace::OperationStarted>,
324-
feedback_query: bevy_ecs::system::Query<&FeedbackSender>,
323+
mut op_started: bevy_ecs::event::EventReader<trace::TracedEvent>,
324+
feedback_query: bevy_ecs::system::Query<(Entity, &FeedbackSender)>,
325325
) {
326326
for ev in op_started.read() {
327-
// not sure if it is working as intended, but the root session is in the 2nd last
328-
// item, not the last as described in `session_stack` docs.
329-
let session = match ev.session_stack.iter().rev().skip(1).next() {
330-
Some(session) => session,
331-
None => {
332-
continue;
333-
}
334-
};
335-
match feedback_query.get(*session) {
336-
Ok(feedback_tx) => {
337-
if let Err(e) = feedback_tx.0.send(ev.clone()) {
338-
error!("{}", e);
339-
}
340-
}
341-
Err(_) => {
342-
// the session has no feedback channel
327+
for (session, channel) in &feedback_query {
328+
if ev.event.is_for_session(session) {
329+
let _ = channel.0.send(ev.clone());
343330
}
344331
}
345332
}

diagram-editor/wasm/Cargo.toml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "crossflow_diagram_editor_wasm"
3-
version = "0.0.6"
3+
version = "0.0.7"
44
edition = "2024"
55
authors = ["Teo Koon Peng <koonpeng@intrinsic.ai>"]
66
license = "Apache-2.0"
@@ -27,11 +27,11 @@ categories = [
2727
[dependencies]
2828
axum = { workspace = true, features = ["json"], default-features = false }
2929
bevy_app = { workspace = true }
30-
crossflow = { version = "0.0.6", path = "../..", features = [
30+
crossflow = { version = "0.0.7", path = "../..", features = [
3131
"diagram",
3232
"single_threaded_async",
3333
] }
34-
crossflow_diagram_editor = { version = "0.0.6", path = "..", default-features = false }
34+
crossflow_diagram_editor = { version = "0.0.7", path = "..", default-features = false }
3535
mime_guess = { workspace = true }
3636
serde_json = { workspace = true }
3737
serde-wasm-bindgen = "0.6.5"

diagram.schema.json

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -105,21 +105,6 @@
105105
"buffers"
106106
]
107107
},
108-
"BufferIdentifier": {
109-
"description": "Uniquely identify a buffer within a buffer map, either by name or by an\nindex value.",
110-
"anyOf": [
111-
{
112-
"description": "Identify a buffer by name",
113-
"type": "string"
114-
},
115-
{
116-
"description": "Identify a buffer by an index value",
117-
"type": "integer",
118-
"format": "uint",
119-
"minimum": 0
120-
}
121-
]
122-
},
123108
"BufferSchema": {
124109
"description": "Create a [`Buffer`][1] which can be used to store and pull data within\na scope.\n\nBy default the [`BufferSettings`][2] will keep the single last message\npushed to the buffer. You can change that with the optional `settings`\nproperty.\n\nUse the `\"serialize\": true` option to serialize the messages into\n[`JsonMessage`] before they are inserted into the buffer. This\nallows any serializable message type to be pushed into the buffer. If\nleft unspecified, the buffer will store the specific data type that gets\npushed into it. If the buffer inputs are not being serialized, then all\nincoming messages being pushed into the buffer must have the same type.\n\n[1]: crate::Buffer\n[2]: crate::BufferSettings\n\n# Examples\n```\n# crossflow::Diagram::from_json_str(r#\"\n{\n \"version\": \"0.1.0\",\n \"start\": \"fork_clone\",\n \"ops\": {\n \"fork_clone\": {\n \"type\": \"fork_clone\",\n \"next\": [\"num_output\", \"string_output\", \"all_num_buffer\", \"serialized_num_buffer\"]\n },\n \"num_output\": {\n \"type\": \"node\",\n \"builder\": \"num_output\",\n \"next\": \"buffer_access\"\n },\n \"string_output\": {\n \"type\": \"node\",\n \"builder\": \"string_output\",\n \"next\": \"string_buffer\"\n },\n \"string_buffer\": {\n \"type\": \"buffer\",\n \"settings\": {\n \"retention\": { \"keep_last\": 10 }\n }\n },\n \"all_num_buffer\": {\n \"type\": \"buffer\",\n \"settings\": {\n \"retention\": \"keep_all\"\n }\n },\n \"serialized_num_buffer\": {\n \"type\": \"buffer\",\n \"serialize\": true\n },\n \"buffer_access\": {\n \"type\": \"buffer_access\",\n \"buffers\": [\"string_buffer\"],\n \"next\": \"with_buffer_access\"\n },\n \"with_buffer_access\": {\n \"type\": \"node\",\n \"builder\": \"with_buffer_access\",\n \"next\": { \"builtin\": \"terminate\" }\n }\n }\n}\n# \"#)?;\n# Ok::<_, serde_json::Error>(())\n```",
125110
"type": "object",
@@ -461,6 +446,21 @@
461446
"err"
462447
]
463448
},
449+
"IdentifierRef": {
450+
"description": "Uniquely identify something by a borrowed name or index.",
451+
"anyOf": [
452+
{
453+
"description": "Identify by a name",
454+
"type": "string"
455+
},
456+
{
457+
"description": "Identify by an index value",
458+
"type": "integer",
459+
"format": "uint",
460+
"minimum": 0
461+
}
462+
]
463+
},
464464
"InputExample": {
465465
"type": "object",
466466
"properties": {
@@ -505,7 +505,7 @@
505505
"description": "List of the keys in the `buffers` dictionary whose value should be cloned\ninstead of removed from the buffer (pulled) when the join occurs. Cloning\nthe value will leave the buffer unchanged after the join operation takes\nplace.",
506506
"type": "array",
507507
"items": {
508-
"$ref": "#/$defs/BufferIdentifier"
508+
"$ref": "#/$defs/IdentifierRef"
509509
}
510510
},
511511
"display_text": {
Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
[package]
22
name = "calculator"
3-
version = "0.0.6"
3+
version = "0.0.7"
44
edition = "2024"
55

66
[dependencies]
7-
crossflow_diagram_editor = { path = "../../../diagram-editor", features = ["basic_executor"] }
8-
calculator_ops_catalog = { path = "../calculator_ops_catalog" }
7+
crossflow_diagram_editor = { path = "../../../diagram-editor", features = ["basic_executor"] , version = "0.0.7" }
8+
calculator_ops_catalog = { path = "../calculator_ops_catalog" , version = "0.0.7" }
99

1010
[dev-dependencies]
1111
assert_cmd = "2.1.2"

examples/diagram/calculator_ops_catalog/Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
[package]
22
name = "calculator_ops_catalog"
3-
version = "0.0.6"
3+
version = "0.0.7"
44
edition = "2024"
55
description = "catalog of reusable calculator operations"
66

77
[dependencies]
88
bevy_app = { workspace = true }
9-
crossflow = { version = "0.0.6", path = "../../..", features = ["diagram"] }
9+
crossflow = { version = "0.0.7", path = "../../..", features = ["diagram"] }
1010
bevy_time = { workspace = true }
1111
serde = { workspace = true }
1212
serde_json = { workspace = true }

examples/diagram/calculator_ops_catalog/src/lib.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use std::fmt::Write;
22

33
use crossflow::{
4-
AsyncMap, ConfigExample, DiagramElementRegistry, JsonMessage, NodeBuilderOptions, StreamPack,
4+
Async, ConfigExample, DiagramElementRegistry, JsonMessage, NodeBuilderOptions, StreamPack,
55
};
66
use schemars::JsonSchema;
77
use serde::{Deserialize, Serialize};
@@ -321,7 +321,7 @@ pub fn register(registry: &mut DiagramElementRegistry) {
321321
.with_config_examples(fibonacci_examples),
322322
|builder, config: Option<u64>| {
323323
builder.create_map(
324-
move |input: AsyncMap<JsonMessage, FibonacciStream>| async move {
324+
move |input: Async<JsonMessage, FibonacciStream>| async move {
325325
let order = if let Some(order) = config {
326326
order
327327
} else if let JsonMessage::Number(number) = &input.request {

0 commit comments

Comments
 (0)