Skip to content

Commit 3d9ca04

Browse files
authored
[Rust] [Connector] Forwarder/Destination Endpoint implementation (#797)
This PR: - adds destination support framework - adds destination support for both Mqtt Telemetry and Broker State Store (Storage is stubbed, but not implemented) - adds support for setting the destination for a dataset to be either the one specified in the dataset or the default destination specified on the asset - adds reporting config errors for dataset/asset definitions that aren't valid for their destination. Note that some things are guaranteed (ex. if the destination is Mqtt, the topic must be present), but this is for scenarios like the topic not being a valid MQTT topic, which the operator doesn't validate or there not being a default destination or a dataset destination specified. - adds forwarding logic for Mqtt Telemetry and Broker State Store (and reporting errors back if these don't succeed) - adds a broker state store destination on the sample asset so both destinations are tested - updates the sample to work with an AIO deployment instead of the local version (because Schema Registry is required) - adds forwarding to the temporary sample - temporarily fixes a bug in getting certs from the connector config, but this should be fixed better in a future PR Not present in this PR: - full retry strategy, this needs more thought on what should be retried within the base connector and what should be retried within the application. For now, errors are reported, which aligns with our goals for this first release
1 parent 5cc635f commit 3d9ca04

File tree

15 files changed

+691
-174
lines changed

15 files changed

+691
-174
lines changed

rust/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ module_name_repetitions = "allow"
2727

2828
[workspace.dependencies]
2929
bytes = "1.5.0"
30+
chrono = "0.4"
3031
derive_builder = "0.20"
3132
derive-getters = { version = "0.5.0", features = ["auto_copy_getters"] }
3233
log = "0.4.21" # For performance, I believe this should have some filters on it

rust/azure_iot_operations_connector/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ publish = true
1515
azure_iot_operations_protocol = { version = "0.9", path = "../azure_iot_operations_protocol", registry = "aio-sdks" }
1616
azure_iot_operations_services = { version = "0.8", path = "../azure_iot_operations_services", registry = "aio-sdks", features = ["state_store", "schema_registry", "azure_device_registry"] }
1717
azure_iot_operations_mqtt = { version = "0.9", path = "../azure_iot_operations_mqtt", registry = "aio-sdks" }
18+
chrono.workspace = true
1819
derive-getters.workspace = true
1920
jmespath = "0.3.0"
2021
log.workspace = true
Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
# Instructions to deploy sample Rust connector
22

3-
The sample Rust connector that retrieves ADR definitions is currently under the `examples` folder of this crate.
3+
### The sample Rust connector that retrieves ADR definitions is currently under the `examples` folder of this crate.
44

55
To deploy follow these steps (from the root of the crate):
66

@@ -11,13 +11,19 @@ To deploy follow these steps (from the root of the crate):
1111
5. Deploy a sample device: `kubectl apply -f examples/connector_get_adr_definitions_resources/thermostat-device-definition.yaml`
1212
6. Deploy a sample asset to the device: `kubectl apply -f examples/connector_get_adr_definitions_resources/rest-thermostat-asset-definition.yaml`
1313

14-
The sample Rust connector that uses the base connector to retrieve ADR definitions is currently under the `examples` folder of this crate.
14+
### The sample Rust connector that uses the base connector to retrieve ADR definitions and send transformed data is currently under the `examples` folder of this crate.
1515

16-
To deploy follow these steps (from the root of the crate):
16+
#### Pre-requisites:
17+
1. Have AIO Deployed with necessary features
18+
1. Have an Azure Container Registry instance
1719

20+
To deploy follow these steps (from the root of the crate):
1821
1. Create a binary release of the connector code: `cargo build --release --target-dir target_base_connector_sample --example base_connector_sample`
19-
2. Build the docker container: `docker build -t baseconnector:latest -f examples/base_connector_sample_resources/Dockerfile .`
20-
3. Import into your kubernetes cluster with: `k3d image import baseconnector:latest`
21-
4. Apply the connector template: `kubectl apply -f examples/base_connector_sample_resources/connector_template.yaml`
22-
5. Deploy a sample device: `kubectl apply -f examples/base_connector_sample_resources/thermostat-device-definition.yaml`
23-
6. Deploy a sample asset to the device: `kubectl apply -f examples/base_connector_sample_resources/rest-thermostat-asset-definition.yaml`
22+
1. Build the docker container: `docker build -t baseconnector:latest -f examples/base_connector_sample_resources/Dockerfile .`
23+
1. Tag your docker image `docker tag baseconnector <your ACR name>.azurecr.io/baseconnector`
24+
1. Make sure you're logged into azure cli `az login`
25+
1. Login to your ACR `az acr login --name <your ACR name>`
26+
1. Upload to your ACR instance `docker push <your ACR name>.azurecr.io/baseconnector`
27+
1. Apply the connector template: `kubectl apply -f examples/base_connector_sample_resources/connector_template.yaml`
28+
1. Deploy a sample device: `kubectl apply -f examples/base_connector_sample_resources/thermostat-device-definition.yaml`
29+
1. Deploy a sample asset to the device: `kubectl apply -f examples/base_connector_sample_resources/rest-thermostat-asset-definition.yaml`

rust/azure_iot_operations_connector/examples/base_connector_sample.rs

Lines changed: 57 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,19 @@
1010
//!
1111
//! To deploy and test this example, see instructions in `rust/azure_iot_operations_connector/README.md`
1212
13-
use azure_iot_operations_connector::base_connector::{
14-
BaseConnector,
15-
managed_azure_device_registry::{
16-
AssetClientCreationObservation, DatasetClient, DeviceEndpointClientCreationObservation,
13+
use std::time::Duration;
14+
15+
use azure_iot_operations_connector::{
16+
AdrConfigError, Data,
17+
base_connector::{
18+
BaseConnector,
19+
managed_azure_device_registry::{
20+
AssetClientCreationObservation, DatasetClient, DeviceEndpointClientCreationObservation,
21+
},
1722
},
23+
data_processor::derived_json,
1824
};
1925
use azure_iot_operations_protocol::application::ApplicationContextBuilder;
20-
use azure_iot_operations_services::{azure_device_registry, schema_registry};
2126

2227
#[tokio::main(flavor = "current_thread")]
2328
async fn main() -> Result<(), Box<dyn std::error::Error>> {
@@ -79,9 +84,9 @@ async fn run_program(mut device_creation_observation: DeviceEndpointClientCreati
7984
.name,
8085
unsupported_endpoint_type
8186
);
82-
Err(azure_device_registry::ConfigError {
87+
Err(AdrConfigError {
8388
message: Some("endpoint type is not supported".to_string()),
84-
..azure_device_registry::ConfigError::default()
89+
..AdrConfigError::default()
8590
})
8691
}
8792
};
@@ -115,9 +120,9 @@ async fn run_assets(mut asset_creation_observation: AssetClientCreationObservati
115120
"Asset '{}' not accepted. Manufacturer '{m}' not supported.",
116121
asset_client.asset_ref().name
117122
);
118-
Err(azure_device_registry::ConfigError {
123+
Err(AdrConfigError {
119124
message: Some("asset manufacturer type is not supported".to_string()),
120-
..azure_device_registry::ConfigError::default()
125+
..AdrConfigError::default()
121126
})
122127
}
123128
};
@@ -141,36 +146,54 @@ async fn run_dataset(dataset_client: DatasetClient) {
141146

142147
// now we should update the status of the dataset and report the message schema
143148
dataset_client.report_status(Ok(())).await;
144-
match dataset_client
145-
.report_message_schema(
146-
schema_registry::PutRequestBuilder::default()
147-
.content(
148-
r#"
149-
{
150-
"$schema": "http://json-schema.org/draft-07/schema#",
151-
"type": "object",
152-
"properties": {
153-
"currentTemperature": {
154-
"type": "number"
155-
},
156-
"desiredTemperature": {
157-
"type": "number"
158-
}
159-
}
160-
}
161-
"#,
162-
)
163-
.format(schema_registry::Format::JsonSchemaDraft07)
164-
.build()
165-
.unwrap(),
166-
)
167-
.await
168-
{
149+
150+
let sample_data = mock_received_data(0);
151+
152+
let (_, message_schema) =
153+
derived_json::transform(sample_data, dataset_client.dataset_definition()).unwrap();
154+
match dataset_client.report_message_schema(message_schema).await {
169155
Ok(message_schema_reference) => {
170156
log::info!("Message Schema reported, reference returned: {message_schema_reference:?}");
171157
}
172158
Err(e) => {
173159
log::error!("Error reporting message schema: {e}");
174160
}
175161
}
162+
let mut count = 0;
163+
loop {
164+
tokio::time::sleep(Duration::from_secs(5)).await;
165+
let sample_data = mock_received_data(count);
166+
let (transformed_data, _) =
167+
derived_json::transform(sample_data.clone(), dataset_client.dataset_definition())
168+
.unwrap();
169+
match dataset_client.forward_data(transformed_data).await {
170+
Ok(()) => {
171+
log::info!(
172+
"data {} for {} forwarded",
173+
String::from_utf8(sample_data.payload).unwrap(),
174+
dataset_client.dataset_ref().dataset_name
175+
);
176+
count += 1;
177+
}
178+
Err(e) => log::error!("error forwarding data: {e}"),
179+
}
180+
}
181+
}
182+
183+
#[must_use]
184+
pub fn mock_received_data(count: u32) -> Data {
185+
Data {
186+
// temp and newTemp
187+
payload: format!(
188+
r#"{{
189+
"temp": {count},
190+
"newTemp": {}
191+
}}"#,
192+
count * 2
193+
)
194+
.into(),
195+
content_type: "application/json".to_string(),
196+
custom_user_data: Vec::new(),
197+
timestamp: None,
198+
}
176199
}

rust/azure_iot_operations_connector/examples/base_connector_sample_resources/connector_template.yaml

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,15 @@ spec:
1919
managedConfigurationSettings:
2020
managedConfigurationType: imageConfiguration
2121
imageConfigurationSettings:
22+
registrySettings:
23+
registrySettingsType: containerRegistry
24+
containerRegistrySettings:
25+
registry: "<your acr>.azurecr.io"
2226
imageName: "baseconnector"
23-
imagePullPolicy: Never # This sample image is built locally, so no need to pull
27+
imagePullPolicy: Always
2428
replicas: 1
2529
tagDigestSettings:
2630
tagDigestType: tag
2731
tag: "latest"
2832
mqttConnectionConfiguration:
29-
host: "aio-broker:1883"
30-
tls:
31-
mode: Disabled
33+
host: "aio-broker:18883"

rust/azure_iot_operations_connector/examples/base_connector_sample_resources/rest-thermostat-asset-definition.yaml

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,13 @@ spec:
1010
datasets:
1111
- name: thermostat_status
1212
dataPoints:
13-
- dataSource: /api/thermostat/current
13+
- dataSource: temp
1414
name: currentTemperature
1515
dataPointConfiguration: |-
1616
{
1717
"HttpRequestMethod": "GET",
1818
}
19-
- dataSource: /api/thermostat/desired
19+
- dataSource: newTemp
2020
name: desiredTemperature
2121
dataPointConfiguration: |-
2222
{
@@ -25,8 +25,27 @@ spec:
2525
dataSource: /thermostat
2626
destinations:
2727
- configuration:
28-
topic: /mqtt/machine/status
28+
topic: mqtt/machine/status
2929
target: Mqtt
30+
- name: oven_status
31+
dataPoints:
32+
- dataSource: temp
33+
name: currentTemperature
34+
dataPointConfiguration: |-
35+
{
36+
"HttpRequestMethod": "GET",
37+
}
38+
- dataSource: newTemp
39+
name: desiredTemperature
40+
dataPointConfiguration: |-
41+
{
42+
"HttpRequestMethod": "GET",
43+
}
44+
dataSource: /oven
45+
destinations:
46+
- configuration:
47+
key: oventemp
48+
target: BrokerStateStore
3049
defaultDatasetsConfiguration: |-
3150
{
3251
"samplingInterval": 4000,

0 commit comments

Comments
 (0)