Skip to content

Commit a882f40

Browse files
authored
Update the operator source code (#162)
The Operators source code has been updated to the latest. ## Does this introduce a breaking change? <!-- Mark one with an "x". --> ``` [ ] Yes [ x] No ``` ## Pull Request Type What kind of change does this Pull Request introduce? Changes the operator source in compliant with aziot-operations <!-- Please check the one that applies to this PR using "x". --> ``` [ ] Bugfix [ ] Feature [ x] Code style update (formatting, local variables) [ ] Refactoring (no functional changes, no api changes) [ ] Documentation content changes [ ] Other... Please describe: ``` ## How to Test * Get the code ``` git clone [repo-address] cd [repo-name] git checkout [branch-name] npm install ``` * Test the code <!-- Add steps to run the tests suite and/or manually test --> ``` ``` ## What to Check Verify that the following are valid * ... ## Other Information <!-- Add any other helpful information that may be needed here. -->
1 parent 1d35903 commit a882f40

File tree

19 files changed

+173
-138
lines changed

19 files changed

+173
-138
lines changed

samples/wasm/rust/examples/collection/Cargo.toml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ edition = "2021"
77

88
[dependencies]
99
wit-bindgen = "0.22"
10-
wasm_graph_sdk = { version = "=1.1.1", registry = "aio-sdks" }
10+
wasm_graph_sdk = { version = "=1.1.3", registry="aio-wg" }
1111

1212
serde = { version = "1", features = [
1313
"derive",
@@ -19,4 +19,3 @@ serde_json = { version = "1.0", default-features = false, features = [
1919

2020
[lib]
2121
crate-type = ["cdylib"]
22-
path = "src/lib.rs"

samples/wasm/rust/examples/collection/src/lib.rs

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,10 @@ use wasm_graph_sdk::macros::accumulate_operator;
99
use wasm_graph_sdk::metrics::{self, CounterValue, Label};
1010

1111
#[accumulate_operator]
12-
fn accumulate_sensor_data(staged: DataModel, inputs: Vec<DataModel>) -> Result<DataModel, Error> {
12+
fn accumulate_sensor_data(
13+
staged: DataModel,
14+
inputs: Vec<DataModel>,
15+
) -> Result<DataModel, Error> {
1316
let labels = vec![Label {
1417
key: "module".to_owned(),
1518
value: "module-collection/accumulate".to_owned(),
@@ -23,26 +26,22 @@ fn accumulate_sensor_data(staged: DataModel, inputs: Vec<DataModel>) -> Result<D
2326
);
2427

2528
let DataModel::Message(mut result) = staged else {
26-
panic!("Unexpected input type");
29+
return Err(Error {message: "Unexpected input type.".to_string()});
2730
};
2831

2932
// Extract payload from message to process
3033
let staged_payload = result.payload.read();
3134

3235
// Initialize sensor data
3336
let mut sensor_data = if staged_payload.is_empty() {
34-
logger::log(
35-
Level::Info,
36-
"module-collection/accumulate",
37-
"first initialization",
38-
);
37+
logger::log(Level::Info, "module-collection/accumulate", "first initialization");
3938
Measurement::SensorData(MeasurementSensorData::new())
4039
} else {
4140
logger::log(Level::Info, "module-collection/accumulate", "adding ...");
4241
match serde_json::from_slice(&staged_payload).unwrap() {
4342
Measurement::SensorData(sensor_data) => Measurement::SensorData(sensor_data),
4443
_ => {
45-
panic!("Unexpected type for result.");
44+
return Err(Error {message: "Unexpected type for result.".to_string()});
4645
}
4746
}
4847
};
@@ -57,7 +56,9 @@ fn accumulate_sensor_data(staged: DataModel, inputs: Vec<DataModel>) -> Result<D
5756
// Extract payload from message to process
5857
rtsp.frame.read()
5958
}
60-
DataModel::BufferOrBytes(_) => panic!("Unexpected input type"),
59+
DataModel::BufferOrBytes(_) => {
60+
return Err(Error {message: "Unexpected input type.".to_string()});
61+
}
6162
};
6263

6364
let measurement: Measurement = serde_json::from_slice(&payload).unwrap();
@@ -70,7 +71,7 @@ fn accumulate_sensor_data(staged: DataModel, inputs: Vec<DataModel>) -> Result<D
7071
sensor_data.temperature.push(measurement);
7172
Measurement::SensorData(sensor_data)
7273
}
73-
_ => panic!("Unexpected type for result."),
74+
_ => return Err(Error {message: "Unexpected type for result.".to_string()}),
7475
}
7576
}
7677
Measurement::Humidity(measurement) => {
@@ -81,7 +82,7 @@ fn accumulate_sensor_data(staged: DataModel, inputs: Vec<DataModel>) -> Result<D
8182
sensor_data.humidity.push(measurement);
8283
Measurement::SensorData(sensor_data)
8384
}
84-
_ => panic!("Unexpected type for result."),
85+
_ => return Err(Error {message: "Unexpected type for result.".to_string()}),
8586
}
8687
}
8788
Measurement::Object(measurement) => {
@@ -92,10 +93,10 @@ fn accumulate_sensor_data(staged: DataModel, inputs: Vec<DataModel>) -> Result<D
9293
sensor_data.object.push(measurement);
9394
Measurement::SensorData(sensor_data)
9495
}
95-
_ => panic!("Unexpected type for result."),
96+
_ => return Err(Error {message: "Unexpected type for result.".to_string()}),
9697
}
9798
}
98-
Measurement::SensorData(_) => panic!("Unexpected measurement type."),
99+
Measurement::SensorData(_) => return Err(Error {message: "Unexpected measurement type.".to_string()}),
99100
}
100101
}
101102

samples/wasm/rust/examples/enrichment/Cargo.toml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ edition = "2021"
77

88
[dependencies]
99
wit-bindgen = "0.22"
10-
wasm_graph_sdk = { version = "=1.1.1", registry = "aio-sdks" }
10+
wasm_graph_sdk = { version = "=1.1.3", registry="aio-wg" }
1111
serde = { version = "1", default-features = false, features = [
1212
"derive",
1313
] }
@@ -17,4 +17,3 @@ serde_json = { version = "1", default-features = false, features = [
1717

1818
[lib]
1919
crate-type = ["cdylib"]
20-
path = "src/lib.rs"

samples/wasm/rust/examples/enrichment/src/lib.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ fn overtemp_check(input: DataModel) -> Result<DataModel, Error> {
2525

2626
// Extract message from input
2727
let DataModel::Message(message) = input else {
28-
panic!("Unexpected input type");
28+
return Err(Error {message: "Unexpected input type.".to_string()});
2929
};
3030

3131
// Extract payload from message to process
@@ -56,7 +56,7 @@ fn overtemp_check(input: DataModel) -> Result<DataModel, Error> {
5656
let sensor_data = match payload_json {
5757
Measurement::SensorData(sensor_data) => Measurement::SensorData(sensor_data),
5858
_ => {
59-
panic!("Unexpected type for result.");
59+
return Err(Error {message: "Unexpected type for result.".to_string()});
6060
}
6161
};
6262

@@ -193,7 +193,7 @@ fn overtemp_check(input: DataModel) -> Result<DataModel, Error> {
193193
sensor_data
194194
}
195195
_ => {
196-
panic!("Unexpected type for result.");
196+
return Err(Error {message: "Unexpected type for result.".to_string()});
197197
}
198198
};
199199

samples/wasm/rust/examples/format/Cargo.toml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ edition = "2021"
99
[dependencies]
1010
image = { version = "0.24" }
1111
wit-bindgen = "0.22"
12-
wasm_graph_sdk = { version = "=1.1.1", registry = "aio-sdks" }
12+
wasm_graph_sdk = { version = "=1.1.3", registry="aio-wg" }
1313
serde = { version = "1", default-features = false, features = [
1414
"derive",
1515
] }
@@ -19,4 +19,3 @@ serde_json = { version = "1", default-features = false, features = [
1919

2020
[lib]
2121
crate-type = ["cdylib"]
22-
path = "src/lib.rs"

samples/wasm/rust/examples/format/src/lib.rs

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
#![allow(clippy::missing_safety_doc)]
66

77
mod map_format {
8-
use core::panic;
98

109
use wasm_graph_sdk::logger::{self, Level};
1110
use wasm_graph_sdk::macros::map_operator;
@@ -44,19 +43,21 @@ mod map_format {
4443
let _ = metrics::add_to_counter("requests", CounterValue::U64(1), Some(&labels));
4544

4645
let (payload, timestamp) = match input {
47-
DataModel::Message(message) => (message.payload.read(), message.timestamp),
46+
DataModel::Message(message) => {
47+
(message.payload.read(), message.timestamp)
48+
},
4849
DataModel::Snapshot(snapshot) => {
4950
let format = match snapshot.format {
5051
BufferOrString::Buffer(ref s) => String::from_utf8_lossy(&s.read()).to_string(),
51-
BufferOrString::String(_) => panic!("Unexpected format type"),
52+
BufferOrString::String(ref s) => s.clone(),
5253
};
5354
if format == FORMAT && snapshot.width == WIDTH && snapshot.height == HEIGHT {
5455
// If the snapshot is already in the desired format and size, return it directly
5556
return Ok(DataModel::Snapshot(snapshot));
5657
}
5758
(snapshot.frame.read(), snapshot.timestamp)
58-
}
59-
DataModel::BufferOrBytes(_) => panic!("Unexpected input type"),
59+
},
60+
DataModel::BufferOrBytes(_) => return Err(Error {message: "Unexpected input type.".to_string()}),
6061
};
6162

6263
// Extract payload from message to process
@@ -71,13 +72,11 @@ mod map_format {
7172
logger::log(
7273
Level::Info,
7374
"module-format/map",
74-
&format!(
75-
"Unexpected image format or size: expected {} bytes, got {} bytes",
76-
WIDTH * HEIGHT * CELL_LENGTH,
77-
payload.len()
78-
),
75+
&format!("Unexpected image format or size: expected {} bytes, got {} bytes",
76+
WIDTH * HEIGHT * CELL_LENGTH,
77+
payload.len()),
7978
);
80-
panic!("Unexpected image format or size");
79+
return Err(Error {message: "Unexpected image format or size.".to_string()});
8180
}
8281
// If the image format cannot be guessed but the payload size is correct, it
8382
// is possibly already in the desired format. So we will return it directly.

samples/wasm/rust/examples/humidity/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ edition = "2021"
77

88
[dependencies]
99
wit-bindgen = "0.22"
10-
wasm_graph_sdk = { version = "=1.1.1", registry = "aio-sdks" }
10+
wasm_graph_sdk = { version = "=1.1.3", registry="aio-wg" }
1111
serde = { version = "1", default-features = false, features = [
1212
"derive",
1313
] }

samples/wasm/rust/examples/humidity/src/lib.rs

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -4,22 +4,23 @@
44
// Generated by `wit_bindgen::generate` expansion.
55
#![allow(clippy::missing_safety_doc)]
66

7-
use core::panic;
8-
97
use wasm_graph_sdk::logger::{self, Level};
108
use wasm_graph_sdk::macros::accumulate_operator;
119
use wasm_graph_sdk::metrics::{self, CounterValue, Label};
1210

1311
#[accumulate_operator]
14-
fn accumulate_humidity(staged: DataModel, inputs: Vec<DataModel>) -> Result<DataModel, Error> {
12+
fn accumulate_humidity(
13+
staged: DataModel,
14+
inputs: Vec<DataModel>,
15+
) -> Result<DataModel, Error> {
1516
let labels = vec![Label {
1617
key: "module".to_owned(),
1718
value: "module-humidity/accumulate".to_owned(),
1819
}];
1920
let _ = metrics::add_to_counter("requests", CounterValue::U64(1), Some(&labels));
2021

2122
let DataModel::Message(mut result) = staged else {
22-
panic!("Unexpected input type");
23+
return Err(Error {message: "Unexpected input type.".to_string()});
2324
};
2425

2526
// Extract payload from message to process
@@ -56,16 +57,12 @@ fn accumulate_humidity(staged: DataModel, inputs: Vec<DataModel>) -> Result<Data
5657
let (ts, topic, payload) = match input {
5758
DataModel::Message(temp) => {
5859
// Extract payload from message to process
59-
(
60-
temp.timestamp.timestamp,
61-
temp.topic,
62-
match &temp.payload {
63-
BufferOrBytes::Buffer(buffer) => buffer.read(),
64-
BufferOrBytes::Bytes(bytes) => bytes.clone(),
65-
},
66-
)
60+
(temp.timestamp.timestamp, temp.topic, match &temp.payload {
61+
BufferOrBytes::Buffer(buffer) => buffer.read(),
62+
BufferOrBytes::Bytes(bytes) => bytes.clone(),
63+
})
6764
}
68-
_ => panic!("Unexpected input type"),
65+
_ => return Err(Error {message: "Unexpected input type.".to_string()}),
6966
};
7067

7168
let measurement: Measurement = serde_json::from_slice(&payload).unwrap();
@@ -94,7 +91,7 @@ fn accumulate_humidity(staged: DataModel, inputs: Vec<DataModel>) -> Result<Data
9491
// Set the topic
9592
result_topic = topic;
9693
}
97-
_ => panic!("Unexpected measurement type."),
94+
_ => return Err(Error { message: "Unexpected measurement type.".to_string() }),
9895
}
9996
}
10097

@@ -106,7 +103,11 @@ fn accumulate_humidity(staged: DataModel, inputs: Vec<DataModel>) -> Result<Data
106103
r#"{{"humidity":{{"count":{count},"max":{max},"min":{min},"average":{avg},"last":{last}}}}}"#,
107104
);
108105

109-
logger::log(Level::Info, "module-humidity/accumulate", &payload_str);
106+
logger::log(
107+
Level::Info,
108+
"module-humidity/accumulate",
109+
&payload_str,
110+
);
110111

111112
let payload = payload_str.as_bytes().to_vec();
112113
result.payload = BufferOrBytes::Bytes(payload);
@@ -116,6 +117,7 @@ fn accumulate_humidity(staged: DataModel, inputs: Vec<DataModel>) -> Result<Data
116117
Ok(DataModel::Message(result))
117118
}
118119

120+
119121
#[derive(Debug, PartialEq, serde::Deserialize, serde::Serialize)]
120122
pub enum Measurement {
121123
#[serde(rename = "temperature")]
Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,15 @@
11
[package]
22
name = "otel-enrich"
3-
version = "0.1.0"
3+
version = "0.0.1"
44
authors = ["Wasm Graph Devs"]
55
license = "MIT"
66
edition = "2021"
77

88
[dependencies]
99
wit-bindgen = "0.22"
10-
wasm_graph_sdk = { version = "=1.1.1", registry = "aio-sdks" }
10+
wasm_graph_sdk = { version = "=1.1.3", registry="aio-wg" }
11+
serde = { version = "1.0", features = ["derive"] }
12+
serde_json = "1.0"
1113

1214
[lib]
1315
crate-type = ["cdylib"]

samples/wasm/rust/examples/otel-enrich/src/lib.rs

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
// Copyright (c) Microsoft Corporation.
22
// Licensed under the MIT License.
33

4-
// Generated by `wit_bindgen::generate` expansion.
54
#![allow(clippy::missing_safety_doc)]
65
mod otel_enrich {
76

@@ -60,12 +59,12 @@ mod otel_enrich {
6059
}];
6160
let _ = metrics::add_to_counter("requests", CounterValue::U64(1), Some(&labels));
6261

62+
// Extract message from input
6363
let DataModel::Message(mut result) = input else {
64-
return Err(Error {
65-
message: "Unexpected input type".to_string(),
66-
});
64+
return Err(Error { message: "Unexpected input type".to_string() });
6765
};
6866

67+
// Get list of keys to enrich from ATTR_KEYS
6968
let Some(attr_keys) = ATTR_KEYS.get() else {
7069
logger::log(
7170
Level::Error,
@@ -81,9 +80,10 @@ mod otel_enrich {
8180
for key in keys {
8281
let key = key.trim();
8382
if key.is_empty() {
84-
continue;
83+
continue; // Skip empty keys
8584
}
8685

86+
// Get value from DSS
8787
let state_store_get = state_store::get(key.as_bytes(), None);
8888
match state_store_get
8989
.as_ref()
@@ -96,8 +96,12 @@ mod otel_enrich {
9696
&format!("Retrieved from state store: key='{key}', value='{value:?}'"),
9797
);
9898
let stringify = String::from_utf8_lossy(value).to_string();
99-
100-
// Add to user properties with "otel/" prefix so we can add as attributes later for OTel
99+
logger::log(
100+
Level::Info,
101+
"module-otel-enrich/map",
102+
&format!("'{key}' found in state store with value '{stringify}'"),
103+
);
104+
// Add the key-value pair to user_properties_vec
101105
user_properties_vec.push((
102106
BufferOrString::String(format!("otel/{key}")),
103107
BufferOrString::String(stringify),
@@ -109,21 +113,28 @@ mod otel_enrich {
109113
"module-otel-enrich/map",
110114
&format!("'{key}' not found in state store, skipping"),
111115
);
116+
continue; // Skip if key not found
112117
}
113118
Err(err) => {
114119
logger::log(
115120
Level::Error,
116121
"module-otel-enrich/map",
117122
&format!("Failed to get value for key '{key}': {err}"),
118123
);
124+
continue; // Skip if error occurs
119125
}
120126
}
121127
}
122128

123-
result.properties = MessageProperties {
124-
user_properties: user_properties_vec,
125-
};
129+
result.topic = BufferOrBytes::Bytes("sensors".as_bytes().to_vec());
130+
result.properties.user_properties = user_properties_vec;
131+
132+
logger::log(
133+
Level::Info,
134+
"module-otel-enrich/map",
135+
&format!("result: {result:?}"),
136+
);
126137

127138
Ok(DataModel::Message(result))
128139
}
129-
}
140+
}

0 commit comments

Comments
 (0)