Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
169 changes: 69 additions & 100 deletions crates/common/tedge_config/src/tedge_toml/tedge_config.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
mod version;
use futures::Stream;
use reqwest::NoProxy;
use tokio::fs::DirEntry;
use version::TEdgeTomlVersion;

mod append_remove;
Expand All @@ -27,6 +26,7 @@ use super::models::TopicPrefix;
use super::models::HTTPS_PORT;
use super::models::MQTT_TLS_PORT;
use super::tedge_config_location::TEdgeConfigLocation;
use crate::ConfigDecision;
use crate::models::AbsolutePath;
use crate::tedge_toml::mapper_config::AwsMapperSpecificConfig;
use crate::tedge_toml::mapper_config::AzMapperSpecificConfig;
Expand Down Expand Up @@ -90,11 +90,13 @@ impl<T> OptionalConfigError<T> for OptionalConfig<T> {
}
}

type AnyMap = anymap3::Map<dyn std::any::Any + Send + Sync + 'static>;

pub struct TEdgeConfig {
dto: TEdgeConfigDto,
reader: TEdgeConfigReader,
location: TEdgeConfigLocation,
cached_mapper_configs:
tokio::sync::Mutex<anymap3::Map<dyn std::any::Any + Send + Sync + 'static>>,
cached_mapper_configs: Arc<tokio::sync::Mutex<AnyMap>>,
}

impl std::ops::Deref for TEdgeConfig {
Expand All @@ -105,30 +107,46 @@ impl std::ops::Deref for TEdgeConfig {
}
}

/// Decision about which configuration source to use
pub enum ConfigDecision {
/// Load from new format file at the given path
LoadNew { path: Utf8PathBuf },
/// Load from tedge.toml via compatibility layer
LoadLegacy,
/// Configuration file not found
NotFound { path: Utf8PathBuf },
/// Permission error accessing mapper config directory
PermissionError {
mapper_config_dir: Utf8PathBuf,
error: std::io::Error,
},
impl TEdgeConfigDto {
pub(crate) async fn populate_mapper_configs(&mut self, location: &TEdgeConfigLocation) -> anyhow::Result<()> {
use futures::StreamExt;
use futures::TryStreamExt;
let mappers_dir = location.tedge_config_root_path().join("mappers");
let all_profiles = location.mapper_config_profiles::<C8yMapperSpecificConfig>().await;
match all_profiles {
Some(profiles) => {
// TODO don't fail on non-existence of file
let default_profile_toml = tokio::fs::read_to_string(mappers_dir.join("c8y.toml")).await.unwrap();
// TODO quote path in error messages
let c8y_config: TEdgeConfigDtoC8y = toml::from_str(&default_profile_toml).context("failed to deserialise mapper config")?;
self.c8y.non_profile = c8y_config;

self.c8y.profiles = profiles.filter_map(|profile| futures::future::ready(profile)).then(|profile| async {
let profile_toml = tokio::fs::read_to_string(mappers_dir.join("c8y.d").join(format!("{profile}.toml"))).await?;
let c8y_config: TEdgeConfigDtoC8y = toml::from_str(&profile_toml).context("failed to deserialise mapper config")?;
Ok::<_, anyhow::Error>((profile, c8y_config))
}).try_collect().await?;
}
None => ()
}
Ok(())
}
}

impl TEdgeConfig {
pub(crate) fn from_dto(dto: &TEdgeConfigDto, location: TEdgeConfigLocation) -> Self {
pub(crate) fn from_dto(dto: TEdgeConfigDto, location: TEdgeConfigLocation) -> Self {
Self {
reader: TEdgeConfigReader::from_dto(dto, &location),
reader: TEdgeConfigReader::from_dto(&dto, &location),
dto,
location,
cached_mapper_configs: <_>::default(),
}
}

pub async fn decide_config_source<T>(&self, profile: Option<&ProfileName>) -> ConfigDecision where T: ExpectedCloudType{
self.location.decide_config_source::<T>(profile).await
}

pub(crate) fn location(&self) -> &TEdgeConfigLocation {
&self.location
}
Expand All @@ -137,49 +155,6 @@ impl TEdgeConfig {
self.location.tedge_config_root_path()
}

/// Decide which configuration source to use for a given cloud and profile
///
/// This function centralizes the decision logic for mapper configuration precedence:
/// 1. New format (`mappers/[cloud].toml` or `mappers/[cloud].d/[profile].toml`) takes precedence
/// 2. If new format exists for some profiles but not the requested one, returns NotFound
/// 3. If the config directory is inaccessible due to a permissions error, return Error
/// 4. If no new format exists at all, fall back to legacy tedge.toml format
pub async fn decide_config_source<T>(&self, profile: Option<&ProfileName>) -> ConfigDecision
where
T: ExpectedCloudType,
{
use tokio::fs::try_exists;

let mapper_config_dir = self.mappers_config_dir();
let ty = T::expected_cloud_type().to_string();

let filename = profile.map_or_else(|| format!("{ty}.toml"), |p| format!("{ty}.d/{p}.toml"));
let path = mapper_config_dir.join(&filename);

let default_profile_path = mapper_config_dir.join(format!("{ty}.toml"));
let profile_dir_path = mapper_config_dir.join(format!("{ty}.d"));

match (
try_exists(&default_profile_path).await,
try_exists(&profile_dir_path).await,
try_exists(&path).await,
) {
// The specific config we're looking for exists
(_, _, Ok(true)) => ConfigDecision::LoadNew { path },

// New format configs exist for this cloud, but not the specific profile requested
(Ok(true), _, _) | (_, Ok(true), _) => ConfigDecision::NotFound { path },

// No new format configs exist for this cloud, use legacy
(Ok(false), Ok(false), _) => ConfigDecision::LoadLegacy,

// Permission error accessing mapper config directory
(Err(err), _, _) | (_, Err(err), _) => ConfigDecision::PermissionError {
mapper_config_dir,
error: err,
},
}
}
pub async fn c8y_mapper_config(
&self,
profile: &Option<impl Borrow<ProfileName>>,
Expand All @@ -199,14 +174,37 @@ impl TEdgeConfig {
self.mapper_config(profile).await
}

// pub async fn cloud_config_reader<T: SpecialisedCloudConfig>(
// &self,
// profile: &Option<impl Borrow<ProfileName>>,
// ) -> anyhow::Result<T::CloudConfigReader> {
// let profile = profile.as_ref().map(|p| p.borrow().to_owned());
// let ty = T::expected_cloud_type().to_string();

// match self.decide_config_source::<T>(profile.as_ref()).await {
// ConfigDecision::LoadNew { path } => {
// let toml = tokio::fs::read_to_string(&path).await.with_context(|| format!("failed to read mapper configuration at {path}"))?;
// T::read_tedge_config(&toml, self, profile.as_deref()).map(<_>::into)
// },
// ConfigDecision::NotFound { path } => {
// Err(anyhow!("mapper configuration file {path} doesn't exist"))
// }
// ConfigDecision::LoadLegacy => match T::expected_cloud_type() {
// CloudType::Aws => self.aws.try_get(profile.as_ref()).map(Cow::Borrowed),
// CloudType::Az => self.az.try_get(profile.as_ref()).map(Cow::Borrowed),
// CloudType::C8y => self.c8y.try_get(profile.as_ref()).map(Cow::Borrowed),
// },
// }
// }

pub async fn mapper_config<T: SpecialisedCloudConfig>(
&self,
profile: &Option<impl Borrow<ProfileName>>,
) -> anyhow::Result<Arc<MapperConfig<T>>> {
let profile = profile.as_ref().map(|p| p.borrow().to_owned());
let ty = T::expected_cloud_type().to_string();

match self.decide_config_source::<T>(profile.as_ref()).await {
match self.location.decide_config_source::<T>(profile.as_ref()).await {
ConfigDecision::LoadNew { path } => {
// Check for config conflict: both new format and legacy exist
if self.has_legacy_config::<T>(profile.as_ref()) {
Expand All @@ -231,6 +229,7 @@ impl TEdgeConfig {
let map = load_mapper_config::<T>(
&AbsolutePath::try_new(path.as_str()).unwrap(),
self,
profile.as_deref(),
)
.await?;

Expand All @@ -252,10 +251,6 @@ impl TEdgeConfig {
}
}

pub fn mappers_config_dir(&self) -> Utf8PathBuf {
self.root_dir().join("mappers")
}

/// Check if a legacy tedge.toml configuration exists for the given cloud
/// type and profile by checking if the URL is configured
///
Expand Down Expand Up @@ -289,7 +284,7 @@ impl TEdgeConfig {
}

pub fn profiled_config_directories(&self) -> impl Iterator<Item = Utf8PathBuf> + use<'_> {
CloudType::iter().map(|ty| self.mappers_config_dir().join(format!("{ty}.d")))
CloudType::iter().map(|ty| self.location.mappers_config_dir().join(format!("{ty}.d")))
}

async fn all_profiles<T>(
Expand All @@ -298,37 +293,10 @@ impl TEdgeConfig {
where
T: ExpectedCloudType,
{
use futures::future::ready;
use futures::StreamExt;

fn file_name_string(entry: tokio::io::Result<DirEntry>) -> Option<String> {
entry.ok()?.file_name().into_string().ok()
}

fn profile_name_from_filename(filename: &str) -> Option<ProfileName> {
ProfileName::try_from(filename.strip_suffix(".toml")?.to_owned()).ok()
}

let ty = T::expected_cloud_type();

match self.decide_config_source::<T>(None).await {
ConfigDecision::LoadNew { .. }
| ConfigDecision::NotFound { .. }
| ConfigDecision::PermissionError { .. } => {
let default_profile = futures::stream::once(ready(None));
match tokio::fs::read_dir(self.mappers_config_dir().join(format!("{ty}.d"))).await {
Ok(profile_dir) => Box::new(
default_profile.chain(
tokio_stream::wrappers::ReadDirStream::new(profile_dir)
.filter_map(|entry| ready(file_name_string(entry)))
.filter_map(|s| ready(profile_name_from_filename(&s)))
.map(Some),
),
),
Err(_) => Box::new(default_profile),
}
}
ConfigDecision::LoadLegacy => match ty {
if let Some(migrated_profiles) = self.location.mapper_config_profiles::<T>().await {
migrated_profiles
} else {
match T::expected_cloud_type() {
CloudType::C8y => Box::new(futures::stream::iter(
self.c8y.keys().map(|p| p.map(<_>::to_owned)),
)),
Expand All @@ -338,7 +306,7 @@ impl TEdgeConfig {
CloudType::Aws => Box::new(futures::stream::iter(
self.aws.keys().map(|p| p.map(<_>::to_owned)),
)),
},
}
}
}

Expand Down Expand Up @@ -2113,7 +2081,8 @@ mod tests {
let error = tedge_config
.mapper_config::<C8yMapperSpecificConfig>(&profile_name(None))
.await
.unwrap_err();
.err()
.unwrap();
assert_eq!(
format!("{error:#}"),
format!(
Expand Down
Loading
Loading