Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions crates/common/tedge_config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,6 @@ impl TEdgeConfig {
config_location.load().await
}

pub fn load_sync(config_dir: impl AsRef<StdPath>) -> Result<Self, TEdgeConfigError> {
let config_location = TEdgeConfigLocation::from_custom_root(config_dir.as_ref());
config_location.load_sync()
}

pub async fn update_toml(
self,
update: &impl Fn(&mut TEdgeConfigDto, &TEdgeConfigReader) -> ConfigSettingResult<()>,
Expand Down
211 changes: 111 additions & 100 deletions crates/common/tedge_config/src/tedge_toml/tedge_config.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
mod version;
use futures::Stream;
use reqwest::NoProxy;
use tokio::fs::DirEntry;
use serde::Deserialize;
use version::TEdgeTomlVersion;

mod append_remove;
Expand Down Expand Up @@ -31,9 +31,11 @@ use crate::models::AbsolutePath;
use crate::tedge_toml::mapper_config::AwsMapperSpecificConfig;
use crate::tedge_toml::mapper_config::AzMapperSpecificConfig;
use crate::tedge_toml::mapper_config::C8yMapperSpecificConfig;
use crate::tedge_toml::mapper_config::HasPath as _;
use crate::tedge_toml::mapper_config::HasUrl;
use crate::tedge_toml::mapper_config::MapperConfigError;
use crate::tedge_toml::mapper_config::SpecialisedCloudConfig;
use crate::ConfigDecision;
use anyhow::anyhow;
use anyhow::Context;
use camino::Utf8Path;
Expand All @@ -55,6 +57,7 @@ use reqwest::Certificate;
use std::borrow::Borrow;
use std::borrow::Cow;
use std::collections::HashMap;
use std::io::ErrorKind;
use std::io::Read;
use std::iter::Iterator;
use std::net::IpAddr;
Expand Down Expand Up @@ -90,11 +93,13 @@ impl<T> OptionalConfigError<T> for OptionalConfig<T> {
}
}

type AnyMap = anymap3::Map<dyn std::any::Any + Send + Sync + 'static>;

pub struct TEdgeConfig {
dto: TEdgeConfigDto,
reader: TEdgeConfigReader,
location: TEdgeConfigLocation,
cached_mapper_configs:
tokio::sync::Mutex<anymap3::Map<dyn std::any::Any + Send + Sync + 'static>>,
cached_mapper_configs: Arc<tokio::sync::Mutex<AnyMap>>,
}

impl std::ops::Deref for TEdgeConfig {
Expand All @@ -105,30 +110,86 @@ impl std::ops::Deref for TEdgeConfig {
}
}

/// Decision about which configuration source to use
pub enum ConfigDecision {
/// Load from new format file at the given path
LoadNew { path: Utf8PathBuf },
/// Load from tedge.toml via compatibility layer
LoadLegacy,
/// Configuration file not found
NotFound { path: Utf8PathBuf },
/// Permission error accessing mapper config directory
PermissionError {
mapper_config_dir: Utf8PathBuf,
error: std::io::Error,
},
async fn read_file_if_exists(path: &Utf8Path) -> anyhow::Result<Option<String>> {
match tokio::fs::read_to_string(path).await {
Ok(contents) => Ok(Some(contents)),
Err(e) if e.kind() == ErrorKind::NotFound => Ok(None),
Err(e) => Err(e).context(format!("failed to read mapper configuration from {path}")),
}
}

impl TEdgeConfigDto {
async fn populate_single_mapper<T: SpecialisedCloudConfig>(
dto: &mut MultiDto<T::CloudDto>,
location: &TEdgeConfigLocation,
) -> anyhow::Result<()> {
use futures::StreamExt;
use futures::TryStreamExt;

let mappers_dir = location.tedge_config_root_path().join("mappers");
let all_profiles = location.mapper_config_profiles::<T>().await;
let ty = T::expected_cloud_type();
match all_profiles {
Some(profiles) => {
let toml_path = mappers_dir.join(format!("{ty}.toml"));
let default_profile_toml = read_file_if_exists(&toml_path).await?;
let mut default_profile_config: T::CloudDto = default_profile_toml.map_or_else(
|| Ok(<_>::default()),
|toml| {
toml::from_str(&toml).with_context(|| {
format!("failed to deserialise mapper config in {toml_path}")
})
},
)?;
default_profile_config.set_path(toml_path);
dto.non_profile = default_profile_config;

dto.profiles = profiles
.filter_map(|profile| futures::future::ready(profile))
.then(|profile| async {
let toml_path = mappers_dir.join(format!("{ty}.d/{profile}.toml"));
let profile_toml = tokio::fs::read_to_string(&toml_path).await?;
let mut profiled_config: T::CloudDto = toml::from_str(&profile_toml)
.context("failed to deserialise mapper config")?;
profiled_config.set_path(toml_path);
Ok::<_, anyhow::Error>((profile, profiled_config))
})
.try_collect()
.await?;
}
None => (),
}
Ok(())
}

pub(crate) async fn populate_mapper_configs(
&mut self,
location: &TEdgeConfigLocation,
) -> anyhow::Result<()> {
Self::populate_single_mapper::<C8yMapperSpecificConfig>(&mut self.c8y, location).await?;
Self::populate_single_mapper::<AzMapperSpecificConfig>(&mut self.az, location).await?;
Self::populate_single_mapper::<AwsMapperSpecificConfig>(&mut self.aws, location).await?;
Ok(())
}
}

impl TEdgeConfig {
pub(crate) fn from_dto(dto: &TEdgeConfigDto, location: TEdgeConfigLocation) -> Self {
pub(crate) fn from_dto(dto: TEdgeConfigDto, location: TEdgeConfigLocation) -> Self {
Self {
reader: TEdgeConfigReader::from_dto(dto, &location),
reader: TEdgeConfigReader::from_dto(&dto, &location),
dto,
location,
cached_mapper_configs: <_>::default(),
}
}

pub async fn decide_config_source<T>(&self, profile: Option<&ProfileName>) -> ConfigDecision
where
T: ExpectedCloudType,
{
self.location.decide_config_source::<T>(profile).await
}

pub(crate) fn location(&self) -> &TEdgeConfigLocation {
&self.location
}
Expand All @@ -137,49 +198,6 @@ impl TEdgeConfig {
self.location.tedge_config_root_path()
}

/// Decide which configuration source to use for a given cloud and profile
///
/// This function centralizes the decision logic for mapper configuration precedence:
/// 1. New format (`mappers/[cloud].toml` or `mappers/[cloud].d/[profile].toml`) takes precedence
/// 2. If new format exists for some profiles but not the requested one, returns NotFound
/// 3. If the config directory is inaccessible due to a permissions error, return Error
/// 4. If no new format exists at all, fall back to legacy tedge.toml format
pub async fn decide_config_source<T>(&self, profile: Option<&ProfileName>) -> ConfigDecision
where
T: ExpectedCloudType,
{
use tokio::fs::try_exists;

let mapper_config_dir = self.mappers_config_dir();
let ty = T::expected_cloud_type().to_string();

let filename = profile.map_or_else(|| format!("{ty}.toml"), |p| format!("{ty}.d/{p}.toml"));
let path = mapper_config_dir.join(&filename);

let default_profile_path = mapper_config_dir.join(format!("{ty}.toml"));
let profile_dir_path = mapper_config_dir.join(format!("{ty}.d"));

match (
try_exists(&default_profile_path).await,
try_exists(&profile_dir_path).await,
try_exists(&path).await,
) {
// The specific config we're looking for exists
(_, _, Ok(true)) => ConfigDecision::LoadNew { path },

// New format configs exist for this cloud, but not the specific profile requested
(Ok(true), _, _) | (_, Ok(true), _) => ConfigDecision::NotFound { path },

// No new format configs exist for this cloud, use legacy
(Ok(false), Ok(false), _) => ConfigDecision::LoadLegacy,

// Permission error accessing mapper config directory
(Err(err), _, _) | (_, Err(err), _) => ConfigDecision::PermissionError {
mapper_config_dir,
error: err,
},
}
}
pub async fn c8y_mapper_config(
&self,
profile: &Option<impl Borrow<ProfileName>>,
Expand All @@ -206,7 +224,11 @@ impl TEdgeConfig {
let profile = profile.as_ref().map(|p| p.borrow().to_owned());
let ty = T::expected_cloud_type().to_string();

match self.decide_config_source::<T>(profile.as_ref()).await {
match self
.location
.decide_config_source::<T>(profile.as_ref())
.await
{
ConfigDecision::LoadNew { path } => {
// Check for config conflict: both new format and legacy exist
if self.has_legacy_config::<T>(profile.as_ref()) {
Expand All @@ -231,6 +253,7 @@ impl TEdgeConfig {
let map = load_mapper_config::<T>(
&AbsolutePath::try_new(path.as_str()).unwrap(),
self,
profile.as_deref(),
)
.await?;

Expand All @@ -252,10 +275,6 @@ impl TEdgeConfig {
}
}

pub fn mappers_config_dir(&self) -> Utf8PathBuf {
self.root_dir().join("mappers")
}

/// Check if a legacy tedge.toml configuration exists for the given cloud
/// type and profile by checking if the URL is configured
///
Expand Down Expand Up @@ -289,7 +308,7 @@ impl TEdgeConfig {
}

pub fn profiled_config_directories(&self) -> impl Iterator<Item = Utf8PathBuf> + use<'_> {
CloudType::iter().map(|ty| self.mappers_config_dir().join(format!("{ty}.d")))
CloudType::iter().map(|ty| self.location.mappers_config_dir().join(format!("{ty}.d")))
}

async fn all_profiles<T>(
Expand All @@ -298,37 +317,10 @@ impl TEdgeConfig {
where
T: ExpectedCloudType,
{
use futures::future::ready;
use futures::StreamExt;

fn file_name_string(entry: tokio::io::Result<DirEntry>) -> Option<String> {
entry.ok()?.file_name().into_string().ok()
}

fn profile_name_from_filename(filename: &str) -> Option<ProfileName> {
ProfileName::try_from(filename.strip_suffix(".toml")?.to_owned()).ok()
}

let ty = T::expected_cloud_type();

match self.decide_config_source::<T>(None).await {
ConfigDecision::LoadNew { .. }
| ConfigDecision::NotFound { .. }
| ConfigDecision::PermissionError { .. } => {
let default_profile = futures::stream::once(ready(None));
match tokio::fs::read_dir(self.mappers_config_dir().join(format!("{ty}.d"))).await {
Ok(profile_dir) => Box::new(
default_profile.chain(
tokio_stream::wrappers::ReadDirStream::new(profile_dir)
.filter_map(|entry| ready(file_name_string(entry)))
.filter_map(|s| ready(profile_name_from_filename(&s)))
.map(Some),
),
),
Err(_) => Box::new(default_profile),
}
}
ConfigDecision::LoadLegacy => match ty {
if let Some(migrated_profiles) = self.location.mapper_config_profiles::<T>().await {
migrated_profiles
} else {
match T::expected_cloud_type() {
CloudType::C8y => Box::new(futures::stream::iter(
self.c8y.keys().map(|p| p.map(<_>::to_owned)),
)),
Expand All @@ -338,7 +330,7 @@ impl TEdgeConfig {
CloudType::Aws => Box::new(futures::stream::iter(
self.aws.keys().map(|p| p.map(<_>::to_owned)),
)),
},
}
}
}

Expand Down Expand Up @@ -600,6 +592,12 @@ pub static READABLE_KEYS: Lazy<Vec<(Cow<'static, str>, doku::Type)>> = Lazy::new
struct_field_paths(None, &fields)
});

#[derive(Debug, Clone, PartialEq, Eq, Deserialize, doku::Document, serde::Serialize)]
pub enum MapperConfigLocation {
TedgeToml,
SeparateFile(#[doku(as = "String")] camino::Utf8PathBuf),
}

define_tedge_config! {
#[tedge_config(reader(skip))]
config: {
Expand Down Expand Up @@ -727,6 +725,10 @@ define_tedge_config! {

#[tedge_config(multi, reader(private))]
c8y: {
#[tedge_config(reader(skip))]
#[serde(skip)]
read_from: Utf8PathBuf,

/// Endpoint URL of Cumulocity tenant
#[tedge_config(example = "your-tenant.cumulocity.com")]
// Config consumers should use `c8y.http`/`c8y.mqtt` as appropriate, hence this field is private
Expand Down Expand Up @@ -967,6 +969,10 @@ define_tedge_config! {
#[tedge_config(multi)]
#[tedge_config(reader(private))]
az: {
#[tedge_config(reader(skip))]
#[serde(skip)]
read_from: Utf8PathBuf,

/// Endpoint URL of Azure IoT tenant
#[tedge_config(example = "myazure.azure-devices.net")]
url: ConnectUrl,
Expand Down Expand Up @@ -1054,6 +1060,10 @@ define_tedge_config! {
#[tedge_config(multi)]
#[tedge_config(reader(private))]
aws: {
#[tedge_config(reader(skip))]
#[serde(skip)]
read_from: Utf8PathBuf,

/// Endpoint URL of AWS IoT tenant
#[tedge_config(example = "your-endpoint.amazonaws.com")]
url: ConnectUrl,
Expand Down Expand Up @@ -2113,7 +2123,8 @@ mod tests {
let error = tedge_config
.mapper_config::<C8yMapperSpecificConfig>(&profile_name(None))
.await
.unwrap_err();
.err()
.unwrap();
assert_eq!(
format!("{error:#}"),
format!(
Expand Down
Loading
Loading