Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .cargo/config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
[env]
RUST_LOG = "trace"
MQTT_HOSTNAME = "localhost"
MQTT_PORT = "1883"
15 changes: 11 additions & 4 deletions components/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,13 @@
#******************************************************************************/

[workspace]
members = ["horn-client", "horn-proto", "horn-service-kuksa", "software-horn"]
members = [
"horn-client",
"horn-proto",
"horn-service-kuksa",
"software-horn",
"vehicle-gateway",
]
resolver = "2"

[workspace.package]
Expand All @@ -29,6 +35,7 @@ horn-proto = { path = "horn-proto" }
log = { version = "0.4.22" }
env_logger = { version = "0.11.5" }
protobuf = { version = "3.5.0" }
tokio = { version = "1.40.0" }
up-rust = { version = "0.2.0" }
up-transport-zenoh = { version = "0.3.0" }
tokio = { version = "1.40.0", features = ["rt", "macros"] }
up-rust = { version = "0.4.0" }
up-transport-zenoh = { version = "0.5.0" }
up-transport-mqtt5 = { git = "https://github.com/eclipse-uprotocol/up-client-mqtt5-rust", branch = "main" }
33 changes: 33 additions & 0 deletions components/Dockerfile.vehicle-gateway
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# Build stage
FROM rust:1.82-bullseye AS builder

# Install system dependencies
RUN apt-get update && apt-get install -y \
pkg-config \
libssl-dev \
&& rm -rf /var/lib/apt/lists/*


# Copy both crates
COPY . /usr/src/components

# Create workspace directory
WORKDIR /usr/src/components

# Build dependencies - this is the caching Docker layer for all dependencies
RUN cargo build --package vehicle-gateway --release

# Final stage
FROM debian:bullseye-slim

# Install runtime dependencies
RUN apt-get update && apt-get install -y \
libssl1.1 \
ca-certificates \
&& rm -rf /var/lib/apt/lists/*

# Copy the compiled binary from builder
COPY --from=builder /usr/src/components/target/release/vehicle-gateway /usr/local/bin/

# Set the binary as the entrypoint
ENTRYPOINT ["vehicle-gateway"]
3 changes: 2 additions & 1 deletion components/horn-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ clap = { workspace = true }
env_logger = { workspace = true }
horn-proto = { workspace = true }
log = { workspace = true }
paho-mqtt = "0.13.1"
protobuf = { workspace = true }
tokio = { workspace = true }
up-rust = { workspace = true }
up-transport-zenoh = { workspace = true }
up-transport-mqtt5 = { workspace = true }
182 changes: 64 additions & 118 deletions components/horn-client/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,48 +11,57 @@
* SPDX-License-Identifier: EPL-2.0
*******************************************************************************/

use clap::Parser;
use env_logger::Env;
use log::error;
use log::info;
use std::path::PathBuf;
use std::sync::Arc;
use up_rust::communication::{CallOptions, InMemoryRpcClient, RpcClient, UPayload};
use up_transport_zenoh::zenoh_config;
use up_transport_zenoh::UPTransportZenoh;
use up_rust::{
communication::{CallOptions, InMemoryRpcClient, RpcClient, UPayload},
StaticUriProvider, UTransport,
};
use up_transport_mqtt5::{Mqtt5Transport, MqttClientOptions, TransportMode};

use horn_proto::horn_service::{ActivateHornRequest, ActivateHornResponse, DeactivateHornRequest};
use horn_proto::horn_topics::{HornCycle, HornMode, HornSequence};

const HORN_SERVICE_AUTHORITY_NAME: &str = "horn-service-kuksa";
// see https://github.com/COVESA/uservices/blob/main/src/main/proto/vehicle/body/horn/v1/horn_service.proto
const HORN_SERVICE_ENTITY_ID: u32 = 28;
const ACTIVATE_HORN_RESOURCE_ID: u16 = 0x0001;
const DEACTIVATE_HORN_RESOURCE_ID: u16 = 0x0002;
const VEHICLE_AUTHORITY: &str = "WAUWAUGRRWAUWAU";
const HORN_SERVICE_ENTITY_ID: u32 = 0x0003;
const ACTIVATE_HORN_RESOURCE_ID: u16 = 1;
const DEACTIVATE_HORN_RESOURCE_ID: u16 = 2;

const APP_AUTHORITY: &str = "cloud";
const APP_UE_ID: u32 = 0x0001;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
env_logger::Builder::from_env(Env::default().default_filter_or("info")).init();
let args = Args::parse();

info!("Starting the client for the COVESA Horn service over uProtocol");

let transport = UPTransportZenoh::new(args.get_zenoh_config()?, "//horn_client/1/1/0")
.await
.map(Arc::new)?;
let mut mqtt_options = MqttClientOptions::default();
mqtt_options.broker_uri =
std::env::var("MQTT_HOSTNAME").unwrap() + ":" + &std::env::var("MQTT_PORT").unwrap();

let mqtt5_transport = Mqtt5Transport::new(
TransportMode::InVehicle,
mqtt_options,
APP_AUTHORITY.to_string(),
)
.await?;
mqtt5_transport.connect().await?;

// The Zenoh transport happens to implement the
// traits for UTransport and LocalUriProvider,
// which is why it is used twice here.
let rpc_client = InMemoryRpcClient::new(transport.clone(), transport).await?;
let transport: Arc<dyn UTransport> = Arc::new(mqtt5_transport);
let transport_uuri = Arc::new(StaticUriProvider::new(APP_AUTHORITY, APP_UE_ID, 1));

let rpc_client = InMemoryRpcClient::new(transport, transport_uuri).await?;
let activate_horn_uri = up_rust::UUri::try_from_parts(
HORN_SERVICE_AUTHORITY_NAME,
VEHICLE_AUTHORITY,
HORN_SERVICE_ENTITY_ID,
1,
ACTIVATE_HORN_RESOURCE_ID,
)?;
let deactivate_horn_uri = up_rust::UUri::try_from_parts(
HORN_SERVICE_AUTHORITY_NAME,
VEHICLE_AUTHORITY,
HORN_SERVICE_ENTITY_ID,
1,
DEACTIVATE_HORN_RESOURCE_ID,
Expand Down Expand Up @@ -86,108 +95,45 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
}],
..Default::default()
};

let payload = UPayload::try_from_protobuf(horn_request)?;
match rpc_client
.invoke_method(
activate_horn_uri.clone(),
CallOptions::for_rpc_request(1_000, None, None, None),
Some(payload),
)
.await
{
Ok(Some(payload)) => {
let response = payload.extract_protobuf::<ActivateHornResponse>()?;
info!("Activate Horn returned message: {}", response);
}
Ok(None) => info!("The activate horn request returned an empty response"),
Err(e) => error!("The activate horn request returned the error: {:?}", e),
}

// Wait before deactivating the horn
tokio::time::sleep(std::time::Duration::from_millis(1500)).await;

let deactivate_request = DeactivateHornRequest::default();

let deactivate_payload = UPayload::try_from_protobuf(deactivate_request)?;

match rpc_client
.invoke_method(
deactivate_horn_uri.clone(),
CallOptions::for_rpc_request(1_000, None, None, None),
Some(deactivate_payload),
)
.await
{
Ok(Some(_)) => {
info!("The deactivate horn request returned successfully");
}
Ok(None) => error!("The deactivate horn request returned an empty response"),
Err(e) => error!("The deactivate horn request returned the error: {:?}", e),
}
let horn_request = ActivateHornRequest {
mode: HornMode::HM_CONTINUOUS.into(),
command: vec![HornSequence {
horn_cycles: vec![],
..Default::default()
}],
..Default::default()
};

let payload = UPayload::try_from_protobuf(horn_request)?;
match rpc_client
.invoke_method(
activate_horn_uri.clone(),
CallOptions::for_rpc_request(1_000, None, None, None),
Some(payload),
)
.await
{
Ok(Some(payload)) => {
let value = payload.extract_protobuf::<ActivateHornResponse>()?;
info!(
"Activate Horn returned message: {}",
value.status.unwrap().code
);
loop {
let payload = UPayload::try_from_protobuf(horn_request.clone())?;

match rpc_client
.invoke_method(
activate_horn_uri.clone(),
CallOptions::for_rpc_request(1_000, None, None, None),
Some(payload),
)
.await
{
Ok(Some(payload)) => {
let response = payload.extract_protobuf::<ActivateHornResponse>()?;
info!("Activate Horn returned message: {}", response);
}
Ok(None) => info!("The activate horn request returned an empty response"),
Err(e) => error!("The activate horn request returned the error: {:?}", e),
}
Ok(None) => error!("The activate horn request returned an empty response"),
Err(e) => error!("The activate horn request returned the error: {:?}", e),
}

// Wait before deactivating the horn
tokio::time::sleep(std::time::Duration::from_millis(4000)).await;

let deactivate_request = DeactivateHornRequest::default();

let deactivate_payload = UPayload::try_from_protobuf(deactivate_request)?;

match rpc_client
.invoke_method(
deactivate_horn_uri.clone(),
CallOptions::for_rpc_request(1_000, None, None, None),
Some(deactivate_payload),
)
.await
{
Ok(Some(_)) => {
info!("The deactivate horn request returned successfully");
// Wait before deactivating the horn
tokio::time::sleep(std::time::Duration::from_millis(1500)).await;

let deactivate_payload = UPayload::try_from_protobuf(deactivate_request.clone())?;

match rpc_client
.invoke_method(
deactivate_horn_uri.clone(),
CallOptions::for_rpc_request(1_000, None, None, None),
Some(deactivate_payload),
)
.await
{
Ok(Some(_)) => {
info!("The deactivate horn request returned successfully");
}
Ok(None) => error!("The deactivate horn request returned an empty response"),
Err(e) => error!("The deactivate horn request returned the error: {:?}", e),
}
Ok(None) => error!("The deactivate horn request returned an empty response"),
Err(e) => error!("The deactivate horn request returned the error: {:?}", e),
}
Ok(())
}

#[derive(clap::Parser, Clone, PartialEq, Eq, Hash, Debug)]
pub struct Args {
#[arg(short, long, default_value = "zenoh-config.json5")]
/// A Zenoh configuration file.
config: PathBuf,
}

impl Args {
pub fn get_zenoh_config(&self) -> Result<zenoh_config::Config, Box<dyn std::error::Error>> {
// Load the config from file path
zenoh_config::Config::from_file(&self.config).map_err(|e| e as Box<dyn std::error::Error>)
}
}
20 changes: 14 additions & 6 deletions components/horn-service-kuksa/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,19 @@ use clap::Parser;
use env_logger::Env;
use log::info;
use std::sync::Arc;
use up_rust::communication::{InMemoryRpcServer, RpcServer};
use up_rust::{
communication::{InMemoryRpcServer, RpcServer},
StaticUriProvider, UTransport, UUri,
};
use up_transport_zenoh::UPTransportZenoh;

mod config;
mod connections;
mod request_handler;
mod request_processor;

const SERVICE_AUTHORITY: &str = "hcp5";
const SERVICE_UE_ID: u32 = 0x0003;
const ACTIVATE_HORN_METHOD_ID: u16 = 0x0001;
const DEACTIVATE_HORN_METHOD_ID: u16 = 0x0002;

Expand All @@ -42,12 +47,15 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
tokio::spawn(connections::send_to_terminal(rx_kuksa));
}

let zenoh_config = args.get_zenoh_config()?;
UPTransportZenoh::try_init_log_from_env();
let transport = UPTransportZenoh::new(zenoh_config, "//horn-service-kuksa/1C/1/0")
.await
.map(Arc::new)?;
let rpc_server = InMemoryRpcServer::new(transport.clone(), transport);

let server_uuri = UUri::try_from_parts(SERVICE_AUTHORITY, SERVICE_UE_ID, 1, 0)?;
let transport: Arc<dyn UTransport> =
UPTransportZenoh::new(args.get_zenoh_config()?, server_uuri)
.await
.map(Arc::new)?;
let transport_uuri = Arc::new(StaticUriProvider::new(SERVICE_AUTHORITY, SERVICE_UE_ID, 1));
let rpc_server = InMemoryRpcServer::new(transport.clone(), transport_uuri);

let (tx_sequence, rx_sequence) = tokio::sync::mpsc::channel(4);
tokio::spawn(request_processor::receive_requests(
Expand Down
7 changes: 6 additions & 1 deletion components/horn-service-kuksa/src/request_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ use horn_proto::horn_service::{
use horn_proto::status::Status;
use log::info;
use protobuf::MessageField;
use up_rust::communication::{RequestHandler, ServiceInvocationError, UPayload};
use up_rust::{
communication::{RequestHandler, ServiceInvocationError, UPayload},
UAttributes,
};

pub(crate) struct ActivateHorn {
tx_sequence_channel: tokio::sync::mpsc::Sender<Option<ActivateHornRequest>>,
Expand All @@ -38,6 +41,7 @@ impl RequestHandler for ActivateHorn {
async fn handle_request(
&self,
_resource_id: u16,
_message_attributes: &UAttributes,
request_payload: Option<UPayload>,
) -> Result<Option<UPayload>, ServiceInvocationError> {
info!("Handle new request to apply horn sequence");
Expand Down Expand Up @@ -76,6 +80,7 @@ impl RequestHandler for DeactivateHorn {
async fn handle_request(
&self,
_resource_id: u16,
_message_attributes: &UAttributes,
request_payload: Option<UPayload>,
) -> Result<Option<UPayload>, ServiceInvocationError> {
info!("Handle new deactivation request for the horn.");
Expand Down
16 changes: 16 additions & 0 deletions components/vehicle-gateway/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
[package]
name = "vehicle-gateway"
version = "0.1.0"
edition = "2021"

[dependencies]
up-transport-zenoh = { workspace = true }
env_logger = { workspace = true }
log = { workspace = true }
tokio = { workspace = true }
protobuf = { workspace = true }
up-rust = { workspace = true }
async-trait = { workspace = true }
chrono = { workspace = true }
clap = { workspace = true }
horn-proto = { workspace = true }
Loading