Skip to content

Commit 43a54a5

Browse files
authored
Introduce NTP support (#100)
Signed-off-by: Guvenc Gulce <[email protected]>
1 parent 1057b4c commit 43a54a5

File tree

15 files changed

+275
-90
lines changed

15 files changed

+275
-90
lines changed

Cargo.lock

Lines changed: 17 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

feos/services/host-service/Cargo.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,9 @@ prost-types = { workspace = true }
2424
thiserror = { workspace = true }
2525
hyper = {workspace = true}
2626
hyper-util = { workspace = true }
27-
27+
sntpc = { version = "0.7", features = ["tokio-socket", "utils"] }
28+
chrono = { workspace = true }
29+
libc = { workspace = true }
2830

2931
hyper-rustls = "0.27.2"
3032
http-body-util = "0.1.2"

feos/services/host-service/src/worker/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,12 @@
44
pub mod info;
55
pub mod ops;
66
pub mod power;
7+
pub mod time;
78

89
pub use info::{
910
handle_get_cpu_info, handle_get_memory, handle_get_network_info, handle_get_version_info,
1011
handle_hostname,
1112
};
1213
pub use ops::{handle_stream_feos_logs, handle_stream_kernel_logs, handle_upgrade};
1314
pub use power::{handle_reboot, handle_shutdown};
15+
pub use time::TimeSyncWorker;
Lines changed: 163 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,163 @@
1+
// SPDX-FileCopyrightText: 2023 SAP SE or an SAP affiliate company and IronCore contributors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
use chrono::{DateTime, Local, TimeZone};
5+
use log::{error, info, warn};
6+
use sntpc::{NtpContext, StdTimestampGen};
7+
use std::net::{Ipv6Addr, SocketAddr};
8+
use std::time::Duration;
9+
use tokio::net::UdpSocket;
10+
use tokio::time::sleep;
11+
12+
const FALLBACK_NTP_SERVER: &str = "pool.ntp.org";
13+
const SYNC_INTERVAL: Duration = Duration::from_secs(86400); // 24 Hours
14+
const RETRY_INTERVAL: Duration = Duration::from_secs(300); // 5 Minutes
15+
const NTP_PORT: u16 = 123;
16+
17+
pub struct TimeSyncWorker {
18+
ntp_servers: Vec<Ipv6Addr>,
19+
}
20+
21+
impl TimeSyncWorker {
22+
pub fn new(ntp_servers: Vec<Ipv6Addr>) -> Self {
23+
Self { ntp_servers }
24+
}
25+
26+
pub async fn run(self) {
27+
info!("TimeSyncWorker: Started.");
28+
29+
// Initial sync
30+
self.perform_sync_loop().await;
31+
}
32+
33+
async fn perform_sync_loop(&self) {
34+
loop {
35+
match self.synchronize_time().await {
36+
Ok(_) => {
37+
info!(
38+
"TimeSyncWorker: Time synchronization successful. Sleeping for 24 hours."
39+
);
40+
sleep(SYNC_INTERVAL).await;
41+
}
42+
Err(e) => {
43+
error!(
44+
"TimeSyncWorker: Time synchronization failed: {e}. Retrying in 5 minutes."
45+
);
46+
sleep(RETRY_INTERVAL).await;
47+
}
48+
}
49+
}
50+
}
51+
52+
async fn synchronize_time(&self) -> Result<(), String> {
53+
let socket = UdpSocket::bind("[::]:0")
54+
.await
55+
.map_err(|e| format!("Failed to bind UDP socket: {e}"))?;
56+
57+
for server_ip in &self.ntp_servers {
58+
info!("TimeSyncWorker: Attempting sync with DHCPv6 server: {server_ip}");
59+
let target = SocketAddr::from((*server_ip, NTP_PORT));
60+
61+
match self.query_ntp_server(&socket, target, server_ip).await {
62+
Ok(_) => return Ok(()),
63+
Err(e) => {
64+
warn!("TimeSyncWorker: Failed to sync with {server_ip}: {e}");
65+
}
66+
}
67+
}
68+
69+
info!("TimeSyncWorker: Attempting sync with fallback server: {FALLBACK_NTP_SERVER}");
70+
71+
match self.resolve_and_sync(&socket, FALLBACK_NTP_SERVER).await {
72+
Ok(_) => Ok(()),
73+
Err(e) => Err(format!("Failed to sync with fallback server: {e}")),
74+
}
75+
}
76+
77+
async fn resolve_and_sync(&self, socket: &UdpSocket, hostname: &str) -> Result<(), String> {
78+
use tokio::net::lookup_host;
79+
80+
let server_with_port = format!("{hostname}:{NTP_PORT}");
81+
let mut addrs = lookup_host(&server_with_port)
82+
.await
83+
.map_err(|e| format!("Failed to resolve {hostname}: {e}"))?;
84+
85+
let target = addrs
86+
.find(|addr| addr.is_ipv6())
87+
.or_else(|| {
88+
warn!("TimeSyncWorker: No IPv6 address found for {hostname}, trying IPv4");
89+
None
90+
})
91+
.ok_or_else(|| format!("No suitable address found for {hostname}"))?;
92+
93+
info!("TimeSyncWorker: Resolved {hostname} to {target}");
94+
95+
let server_ip = match target.ip() {
96+
std::net::IpAddr::V6(ip) => ip,
97+
std::net::IpAddr::V4(_) => {
98+
return Err(format!(
99+
"Only IPv6 addresses are supported, got IPv4 for {hostname}"
100+
));
101+
}
102+
};
103+
104+
self.query_ntp_server(socket, target, &server_ip).await
105+
}
106+
107+
async fn query_ntp_server(
108+
&self,
109+
socket: &UdpSocket,
110+
target: SocketAddr,
111+
server_ip: &Ipv6Addr,
112+
) -> Result<(), String> {
113+
let context = NtpContext::new(StdTimestampGen::default());
114+
115+
let result = sntpc::get_time(target, socket, context)
116+
.await
117+
.map_err(|e| format!("NTP query failed: {e:?}"))?;
118+
119+
let seconds = result.sec();
120+
let sec_fraction = result.sec_fraction();
121+
122+
let nanoseconds = sntpc::fraction_to_nanoseconds(sec_fraction);
123+
let server_time = Local
124+
.timestamp_opt(seconds as i64, nanoseconds)
125+
.single()
126+
.ok_or_else(|| "Failed to convert NTP time to local time".to_string())?;
127+
128+
let offset_sec = result.offset() as f64 / 1_000_000.0;
129+
let delay_sec = result.roundtrip() as f64 / 1_000_000.0;
130+
131+
info!(
132+
"TimeSyncWorker: {} ({:+.6}s offset) +/- {:.6}s delay, server: {} stratum: {}",
133+
server_time.format("%Y-%m-%d %H:%M:%S%.6f"),
134+
offset_sec,
135+
delay_sec,
136+
server_ip,
137+
result.stratum()
138+
);
139+
140+
self.set_system_time(server_time)
141+
.map_err(|e| format!("Failed to set system time: {e}"))?;
142+
143+
Ok(())
144+
}
145+
146+
fn set_system_time(&self, dt: DateTime<Local>) -> Result<(), std::io::Error> {
147+
use libc::{clock_settime, timespec, CLOCK_REALTIME};
148+
149+
let ts = timespec {
150+
tv_sec: dt.timestamp(),
151+
tv_nsec: dt.timestamp_subsec_nanos() as _,
152+
};
153+
154+
let ret = unsafe { clock_settime(CLOCK_REALTIME, &ts) };
155+
156+
if ret != 0 {
157+
return Err(std::io::Error::last_os_error());
158+
}
159+
160+
info!("TimeSyncWorker: System clock updated to: {dt}");
161+
Ok(())
162+
}
163+
}

feos/src/lib.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,11 @@ pub async fn run_server(restarted_after_upgrade: bool) -> Result<()> {
3838
warn!("Not running as root! (uid: {})", Uid::current());
3939
}
4040

41+
let mut ntp_servers = Vec::new();
42+
4143
if !restarted_after_upgrade {
4244
if std::process::id() == 1 {
43-
perform_first_boot_initialization().await?;
45+
ntp_servers = perform_first_boot_initialization().await?;
4446
}
4547
} else {
4648
info!("Main: Skipping one-time initialization on restart after upgrade.");
@@ -53,7 +55,7 @@ pub async fn run_server(restarted_after_upgrade: bool) -> Result<()> {
5355
let vm_service = initialize_vm_service(&vm_db_url).await?;
5456
let container_service = initialize_container_service().await?;
5557

56-
let host_service = initialize_host_service(restart_tx.clone(), log_handle);
58+
let host_service = initialize_host_service(restart_tx.clone(), log_handle, ntp_servers);
5759

5860
let image_service = initialize_image_service().await?;
5961
let task_service = initialize_task_service().await?;

feos/src/setup.rs

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,8 @@ use feos_utils::host::info::is_running_on_vm;
1818
use feos_utils::host::memory::configure_hugepages;
1919
use feos_utils::network::{configure_network_devices, configure_sriov};
2020
use host_service::{
21-
api::HostApiHandler, dispatcher::HostServiceDispatcher, Command as HostCommand, RestartSignal,
21+
api::HostApiHandler, dispatcher::HostServiceDispatcher, worker::TimeSyncWorker,
22+
Command as HostCommand, RestartSignal,
2223
};
2324
use image_service::{
2425
api::ImageApiHandler, dispatcher::ImageServiceDispatcher, filestore::FileStore,
@@ -28,6 +29,7 @@ use log::{error, info, warn};
2829
use nix::libc;
2930
use std::env;
3031
use std::ffi::CString;
32+
use std::net::Ipv6Addr;
3133
use std::os::unix::ffi::OsStringExt;
3234
use std::os::unix::fs::PermissionsExt;
3335
use std::path::Path;
@@ -96,12 +98,19 @@ pub(crate) async fn initialize_container_service(
9698
pub(crate) fn initialize_host_service(
9799
restart_tx: mpsc::Sender<RestartSignal>,
98100
log_handle: feos_utils::feos_logger::LogHandle,
101+
ntp_servers: Vec<Ipv6Addr>,
99102
) -> HostServiceServer<HostApiHandler> {
100103
let (host_tx, host_rx) = mpsc::channel::<HostCommand>(32);
101104
let host_dispatcher = HostServiceDispatcher::new(host_rx, restart_tx, log_handle);
102105
tokio::spawn(async move {
103106
host_dispatcher.run().await;
104107
});
108+
109+
let time_worker = TimeSyncWorker::new(ntp_servers);
110+
tokio::spawn(async move {
111+
time_worker.run().await;
112+
});
113+
105114
let host_api_handler = HostApiHandler::new(host_tx);
106115
let host_service = HostServiceServer::new(host_api_handler);
107116
info!("Main: Host Service is configured.");
@@ -159,7 +168,7 @@ pub(crate) async fn initialize_task_service() -> Result<TaskServiceServer<TaskAp
159168
Ok(task_service)
160169
}
161170

162-
pub(crate) async fn perform_first_boot_initialization() -> Result<()> {
171+
pub(crate) async fn perform_first_boot_initialization() -> Result<Vec<Ipv6Addr>> {
163172
info!("Main: Performing first-boot initialization...");
164173
info!("Main: Mounting virtual filesystems...");
165174
mount_virtual_filesystems();
@@ -175,11 +184,13 @@ pub(crate) async fn perform_first_boot_initialization() -> Result<()> {
175184
});
176185

177186
info!("Main: Configuring network devices...");
178-
if let Some((delegated_prefix, delegated_prefix_length)) = configure_network_devices()
187+
let mut ntp_servers = Vec::new();
188+
if let Some((delegated_prefix, delegated_prefix_length, servers)) = configure_network_devices()
179189
.await
180190
.expect("could not configure network devices")
181191
{
182192
info!("Main: Delegated prefix: {delegated_prefix}/{delegated_prefix_length}");
193+
ntp_servers = servers;
183194
}
184195

185196
if !is_on_vm {
@@ -189,7 +200,7 @@ pub(crate) async fn perform_first_boot_initialization() -> Result<()> {
189200
}
190201
}
191202

192-
Ok(())
203+
Ok(ntp_servers)
193204
}
194205

195206
pub(crate) async fn setup_database() -> Result<String> {

0 commit comments

Comments
 (0)