Skip to content

Commit ff00a0f

Browse files
authored
add Rust Wasm operator viconverter (#145)
## Purpose <!-- Describe the intention of the changes being proposed. What problem does it solve or functionality does it add? --> * Add a Wasm operator example to transform and flatten video indexer insights to specific format that can be further processed and stored by Azure Event Hub and Fabric Lakehouse. ## Does this introduce a breaking change? <!-- Mark one with an "x". --> ``` [ ] Yes [x] No ``` ## Pull Request Type What kind of change does this Pull Request introduce? <!-- Please check the one that applies to this PR using "x". --> ``` [ ] Bugfix [ ] Feature [ ] Code style update (formatting, local variables) [ ] Refactoring (no functional changes, no api changes) [x] Documentation content changes [x] Other... Please describe: Add a new Wasm operator example ``` ## How to Test * Get the code ``` git clone https://github.com/Azure-Samples/explore-iot-operations.git ``` * Test the code <!-- Add steps to run the tests suite and/or manually test --> ``` cd explore-iot-operations.git/samples/wasm/rust/examples/viconverter docker run --rm -v "$(pwd):/workspace" ghcr.io/azure-samples/explore-iot-operations/rust-wasm-builder --app-name viconverter ``` ## What to Check Verify that the following are valid * viconverter.wasm is built successfully. ``` Build complete! Output: bin/x86_64/release/viconverter.wasm ``` ## Other Information <!-- Add any other helpful information that may be needed here. -->
1 parent 2b94b4f commit ff00a0f

File tree

4 files changed

+165
-0
lines changed

4 files changed

+165
-0
lines changed

samples/wasm/rust/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ The `examples/` directory contains complete examples:
7676
- **collection**: Data collection and batching
7777
- **enrichment**: Data enrichment and annotation
7878
- **window**: Time-based windowing operations
79+
- **viconverter**: Azure AI Video Indexer insights conversion
7980

8081
## Building Operators
8182

samples/wasm/rust/examples/README.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,17 @@ docker run --rm -v "$(pwd):/workspace" ghcr.io/azure-samples/explore-iot-operati
9595
- Time-based data processing
9696
- Key Features: Configurable delay intervals, preserves data ordering
9797

98+
### Azure AI Video Indexer insights processing
99+
100+
#### viconverter - insights conversion processing
101+
- Path: `viconverter/`
102+
- Operator Type: Map
103+
- Purpose: transform and flatten video indexer insights to specific format that can be further processed and stored by Azure Event Hub and Fabric Lakehouse.
104+
- Use Cases:
105+
- Computer vision pipelines
106+
- Image processing
107+
- surveillance systems
108+
98109
## Architecture Patterns
99110

100111
### Operator Type Implementations
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
[package]
2+
name = "viconverter"
3+
version = "0.2.0"
4+
authors = ["TinyKube Devs"]
5+
license = "MIT"
6+
edition = "2021"
7+
8+
[dependencies]
9+
wit-bindgen = "0.22"
10+
tinykube_wasm_sdk = { version = "0.2.0", registry="azure-vscode-tinykube" }
11+
serde = { version = "1", default-features = false, features = [
12+
"derive",
13+
] }
14+
serde_json = { version = "1", default-features = false, features = [
15+
"alloc", # "serde_json requires that either `std` (default) or `alloc` feature is enabled"
16+
] }
17+
jsonschema = { version = "0.8", default-features = false}
18+
once_cell = "1.17.1"
19+
20+
[lib]
21+
crate-type = ["cdylib"]
22+
path = "src/lib.rs"
Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
// Copyright (c) Microsoft Corporation.
2+
// Licensed under the MIT License.
3+
4+
// Generated by `wit_bindgen::generate` expansion.
5+
#![allow(clippy::missing_safety_doc)]
6+
7+
use serde::{Deserialize, Serialize};
8+
use tinykube_wasm_sdk::logger::{self, Level};
9+
use tinykube_wasm_sdk::macros::map_operator;
10+
11+
/// Minimal view of the input: we only deserialize what we need.
12+
#[derive(Debug, Deserialize)]
13+
#[serde(rename_all = "camelCase")]
14+
struct Input {
15+
id: usize,
16+
height: u32,
17+
width: u32,
18+
#[serde(default)]
19+
detections: Vec<Detection>,
20+
}
21+
22+
#[derive(Debug, Deserialize)]
23+
#[serde(rename_all = "camelCase")]
24+
struct Detection {
25+
#[serde(rename = "Id")]
26+
id: String,
27+
insight_name: String,
28+
model_type: String,
29+
instances: Vec<Instance>,
30+
}
31+
32+
#[derive(Debug, Deserialize)]
33+
#[serde(rename_all = "camelCase")]
34+
struct Instance {
35+
x: f32,
36+
y: f32,
37+
width: f32,
38+
height: f32,
39+
start: f64,
40+
end: f64,
41+
start_time: String,
42+
end_time: String,
43+
confidence: f32,
44+
}
45+
46+
/// The flattened row you can store in your DB.
47+
#[derive(Debug, Serialize)]
48+
#[serde(rename_all = "snake_case")]
49+
struct DetectionRow {
50+
// Root (frame) context
51+
id: usize,
52+
frame_width: u32,
53+
frame_height: u32,
54+
55+
// Parent detection metadata
56+
detection_id: String,
57+
insight_name: String,
58+
model_type: String,
59+
60+
// Instance fields
61+
x: f32,
62+
y: f32,
63+
width: f32,
64+
height: f32,
65+
start: f64,
66+
end: f64,
67+
start_time: String,
68+
end_time: String,
69+
confidence: f32,
70+
}
71+
72+
fn map_init(_configuration: ModuleConfiguration) -> bool {
73+
// Add code here to process the module init properties and module schemas from the configuration
74+
75+
true
76+
}
77+
78+
#[map_operator(init = "map_init")]
79+
fn map(input: DataModel) -> DataModel {
80+
logger::log(Level::Info, "viconverter/map", "Get a map request");
81+
// create result object from input
82+
let DataModel::Message(mut result) = input else {
83+
panic!("Unexpected input type");
84+
};
85+
86+
// Extract payload from message to process
87+
let staged_payload = match &result.payload {
88+
BufferOrBytes::Buffer(buffer) => buffer.read(),
89+
BufferOrBytes::Bytes(bytes) => bytes.clone(),
90+
};
91+
92+
let payload_str = std::str::from_utf8(&staged_payload).unwrap();
93+
let Input {
94+
id,
95+
height,
96+
width,
97+
detections,
98+
} = serde_json::from_str(payload_str).unwrap();
99+
100+
let total_instances = detections.iter().map(|d| d.instances.len()).sum();
101+
let mut rows = Vec::with_capacity(total_instances);
102+
103+
for det in detections {
104+
for inst in det.instances {
105+
rows.push(DetectionRow {
106+
id,
107+
frame_width: width,
108+
frame_height: height,
109+
110+
detection_id: det.id.clone(),
111+
insight_name: det.insight_name.clone(),
112+
model_type: det.model_type.clone(),
113+
114+
x: inst.x,
115+
y: inst.y,
116+
width: inst.width,
117+
height: inst.height,
118+
start: inst.start,
119+
end: inst.end,
120+
start_time: inst.start_time,
121+
end_time: inst.end_time,
122+
confidence: inst.confidence,
123+
});
124+
}
125+
}
126+
127+
let serialized_rows = serde_json::to_vec(&rows).unwrap();
128+
result.payload = BufferOrBytes::Bytes(serialized_rows);
129+
// return result
130+
DataModel::Message(result)
131+
}

0 commit comments

Comments
 (0)