Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 17 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion feos/services/host-service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ prost-types = { workspace = true }
thiserror = { workspace = true }
hyper = {workspace = true}
hyper-util = { workspace = true }

sntpc = { version = "0.7", features = ["tokio-socket", "utils"] }
chrono = { workspace = true }
libc = { workspace = true }

hyper-rustls = "0.27.2"
http-body-util = "0.1.2"
Expand Down
2 changes: 2 additions & 0 deletions feos/services/host-service/src/worker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@
pub mod info;
pub mod ops;
pub mod power;
pub mod time;

pub use info::{
handle_get_cpu_info, handle_get_memory, handle_get_network_info, handle_get_version_info,
handle_hostname,
};
pub use ops::{handle_stream_feos_logs, handle_stream_kernel_logs, handle_upgrade};
pub use power::{handle_reboot, handle_shutdown};
pub use time::TimeSyncWorker;
163 changes: 163 additions & 0 deletions feos/services/host-service/src/worker/time.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
// SPDX-FileCopyrightText: 2023 SAP SE or an SAP affiliate company and IronCore contributors
// SPDX-License-Identifier: Apache-2.0

use chrono::{DateTime, Local, TimeZone};
use log::{error, info, warn};
use sntpc::{NtpContext, StdTimestampGen};
use std::net::{Ipv6Addr, SocketAddr};
use std::time::Duration;
use tokio::net::UdpSocket;
use tokio::time::sleep;

const FALLBACK_NTP_SERVER: &str = "pool.ntp.org";
const SYNC_INTERVAL: Duration = Duration::from_secs(86400); // 24 Hours
const RETRY_INTERVAL: Duration = Duration::from_secs(300); // 5 Minutes
const NTP_PORT: u16 = 123;

pub struct TimeSyncWorker {
ntp_servers: Vec<Ipv6Addr>,
}

impl TimeSyncWorker {
pub fn new(ntp_servers: Vec<Ipv6Addr>) -> Self {
Self { ntp_servers }
}

pub async fn run(self) {
info!("TimeSyncWorker: Started.");

// Initial sync
self.perform_sync_loop().await;
}

async fn perform_sync_loop(&self) {
loop {
match self.synchronize_time().await {
Ok(_) => {
info!(
"TimeSyncWorker: Time synchronization successful. Sleeping for 24 hours."
);
sleep(SYNC_INTERVAL).await;
}
Err(e) => {
error!(
"TimeSyncWorker: Time synchronization failed: {e}. Retrying in 5 minutes."
);
sleep(RETRY_INTERVAL).await;
}
}
}
}

async fn synchronize_time(&self) -> Result<(), String> {
let socket = UdpSocket::bind("[::]:0")
.await
.map_err(|e| format!("Failed to bind UDP socket: {e}"))?;

for server_ip in &self.ntp_servers {
info!("TimeSyncWorker: Attempting sync with DHCPv6 server: {server_ip}");
let target = SocketAddr::from((*server_ip, NTP_PORT));

match self.query_ntp_server(&socket, target, server_ip).await {
Ok(_) => return Ok(()),
Err(e) => {
warn!("TimeSyncWorker: Failed to sync with {server_ip}: {e}");
}
}
}

info!("TimeSyncWorker: Attempting sync with fallback server: {FALLBACK_NTP_SERVER}");

match self.resolve_and_sync(&socket, FALLBACK_NTP_SERVER).await {
Ok(_) => Ok(()),
Err(e) => Err(format!("Failed to sync with fallback server: {e}")),
}
}

async fn resolve_and_sync(&self, socket: &UdpSocket, hostname: &str) -> Result<(), String> {
use tokio::net::lookup_host;

let server_with_port = format!("{hostname}:{NTP_PORT}");
let mut addrs = lookup_host(&server_with_port)
.await
.map_err(|e| format!("Failed to resolve {hostname}: {e}"))?;

let target = addrs
.find(|addr| addr.is_ipv6())
.or_else(|| {
warn!("TimeSyncWorker: No IPv6 address found for {hostname}, trying IPv4");
None
})
.ok_or_else(|| format!("No suitable address found for {hostname}"))?;

info!("TimeSyncWorker: Resolved {hostname} to {target}");

let server_ip = match target.ip() {
std::net::IpAddr::V6(ip) => ip,
std::net::IpAddr::V4(_) => {
return Err(format!(
"Only IPv6 addresses are supported, got IPv4 for {hostname}"
));
}
};

self.query_ntp_server(socket, target, &server_ip).await
}

async fn query_ntp_server(
&self,
socket: &UdpSocket,
target: SocketAddr,
server_ip: &Ipv6Addr,
) -> Result<(), String> {
let context = NtpContext::new(StdTimestampGen::default());

let result = sntpc::get_time(target, socket, context)
.await
.map_err(|e| format!("NTP query failed: {e:?}"))?;

let seconds = result.sec();
let sec_fraction = result.sec_fraction();

let nanoseconds = sntpc::fraction_to_nanoseconds(sec_fraction);
let server_time = Local
.timestamp_opt(seconds as i64, nanoseconds)
.single()
.ok_or_else(|| "Failed to convert NTP time to local time".to_string())?;

let offset_sec = result.offset() as f64 / 1_000_000.0;
let delay_sec = result.roundtrip() as f64 / 1_000_000.0;

info!(
"TimeSyncWorker: {} ({:+.6}s offset) +/- {:.6}s delay, server: {} stratum: {}",
server_time.format("%Y-%m-%d %H:%M:%S%.6f"),
offset_sec,
delay_sec,
server_ip,
result.stratum()
);

self.set_system_time(server_time)
.map_err(|e| format!("Failed to set system time: {e}"))?;

Ok(())
}

fn set_system_time(&self, dt: DateTime<Local>) -> Result<(), std::io::Error> {
use libc::{clock_settime, timespec, CLOCK_REALTIME};

let ts = timespec {
tv_sec: dt.timestamp(),
tv_nsec: dt.timestamp_subsec_nanos() as _,
};

let ret = unsafe { clock_settime(CLOCK_REALTIME, &ts) };

if ret != 0 {
return Err(std::io::Error::last_os_error());
}

info!("TimeSyncWorker: System clock updated to: {dt}");
Ok(())
}
}
6 changes: 4 additions & 2 deletions feos/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,11 @@ pub async fn run_server(restarted_after_upgrade: bool) -> Result<()> {
warn!("Not running as root! (uid: {})", Uid::current());
}

let mut ntp_servers = Vec::new();

if !restarted_after_upgrade {
if std::process::id() == 1 {
perform_first_boot_initialization().await?;
ntp_servers = perform_first_boot_initialization().await?;
}
} else {
info!("Main: Skipping one-time initialization on restart after upgrade.");
Expand All @@ -53,7 +55,7 @@ pub async fn run_server(restarted_after_upgrade: bool) -> Result<()> {
let vm_service = initialize_vm_service(&vm_db_url).await?;
let container_service = initialize_container_service().await?;

let host_service = initialize_host_service(restart_tx.clone(), log_handle);
let host_service = initialize_host_service(restart_tx.clone(), log_handle, ntp_servers);

let image_service = initialize_image_service().await?;
let task_service = initialize_task_service().await?;
Expand Down
19 changes: 15 additions & 4 deletions feos/src/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ use feos_utils::host::info::is_running_on_vm;
use feos_utils::host::memory::configure_hugepages;
use feos_utils::network::{configure_network_devices, configure_sriov};
use host_service::{
api::HostApiHandler, dispatcher::HostServiceDispatcher, Command as HostCommand, RestartSignal,
api::HostApiHandler, dispatcher::HostServiceDispatcher, worker::TimeSyncWorker,
Command as HostCommand, RestartSignal,
};
use image_service::{
api::ImageApiHandler, dispatcher::ImageServiceDispatcher, filestore::FileStore,
Expand All @@ -28,6 +29,7 @@ use log::{error, info, warn};
use nix::libc;
use std::env;
use std::ffi::CString;
use std::net::Ipv6Addr;
use std::os::unix::ffi::OsStringExt;
use std::os::unix::fs::PermissionsExt;
use std::path::Path;
Expand Down Expand Up @@ -96,12 +98,19 @@ pub(crate) async fn initialize_container_service(
pub(crate) fn initialize_host_service(
restart_tx: mpsc::Sender<RestartSignal>,
log_handle: feos_utils::feos_logger::LogHandle,
ntp_servers: Vec<Ipv6Addr>,
) -> HostServiceServer<HostApiHandler> {
let (host_tx, host_rx) = mpsc::channel::<HostCommand>(32);
let host_dispatcher = HostServiceDispatcher::new(host_rx, restart_tx, log_handle);
tokio::spawn(async move {
host_dispatcher.run().await;
});

let time_worker = TimeSyncWorker::new(ntp_servers);
tokio::spawn(async move {
time_worker.run().await;
});

let host_api_handler = HostApiHandler::new(host_tx);
let host_service = HostServiceServer::new(host_api_handler);
info!("Main: Host Service is configured.");
Expand Down Expand Up @@ -159,7 +168,7 @@ pub(crate) async fn initialize_task_service() -> Result<TaskServiceServer<TaskAp
Ok(task_service)
}

pub(crate) async fn perform_first_boot_initialization() -> Result<()> {
pub(crate) async fn perform_first_boot_initialization() -> Result<Vec<Ipv6Addr>> {
info!("Main: Performing first-boot initialization...");
info!("Main: Mounting virtual filesystems...");
mount_virtual_filesystems();
Expand All @@ -175,11 +184,13 @@ pub(crate) async fn perform_first_boot_initialization() -> Result<()> {
});

info!("Main: Configuring network devices...");
if let Some((delegated_prefix, delegated_prefix_length)) = configure_network_devices()
let mut ntp_servers = Vec::new();
if let Some((delegated_prefix, delegated_prefix_length, servers)) = configure_network_devices()
.await
.expect("could not configure network devices")
{
info!("Main: Delegated prefix: {delegated_prefix}/{delegated_prefix_length}");
ntp_servers = servers;
}

if !is_on_vm {
Expand All @@ -189,7 +200,7 @@ pub(crate) async fn perform_first_boot_initialization() -> Result<()> {
}
}

Ok(())
Ok(ntp_servers)
}

pub(crate) async fn setup_database() -> Result<String> {
Expand Down
Loading
Loading