diff --git a/engine/engine.go b/engine/engine.go index a1d37a7c264..ee1d5ee53cc 100644 --- a/engine/engine.go +++ b/engine/engine.go @@ -333,11 +333,28 @@ func (bot *Engine) Start() error { if bot.Settings.EnableNTPClient { if bot.Config.NTPClient.Level == 0 { - responseMessage, err := bot.Config.SetNTPCheck(os.Stdin) + // Check NTP config and ensure pools are configured + bot.Config.CheckNTPConfig() + + // Perform actual NTP check before prompting user + offset, err := CheckNTPOffset(context.Background(), bot.Config.NTPClient.Pool) if err != nil { - return fmt.Errorf("unable to set NTP check: %w", err) + gctlog.Warnf(gctlog.TimeMgr, "Unable to check NTP time: %v", err) + } else { + // Prompt user if time is actually out of sync + allowedDiff := *bot.Config.NTPClient.AllowedDifference + allowedNegDiff := -*bot.Config.NTPClient.AllowedNegativeDifference + if offset > allowedDiff || offset < allowedNegDiff { + gctlog.Warnf(gctlog.TimeMgr, "System time offset detected: %v (allowed: +%v / %v)", offset, allowedDiff, allowedNegDiff) + responseMessage, err := bot.Config.SetNTPCheck(os.Stdin) + if err != nil { + return fmt.Errorf("unable to set NTP check: %w", err) + } + gctlog.Infoln(gctlog.TimeMgr, responseMessage) + } else { + gctlog.Debugf(gctlog.TimeMgr, "System time is in sync (offset: %v)", offset) + } } - gctlog.Infoln(gctlog.TimeMgr, responseMessage) } if n, err := setupNTPManager(&bot.Config.NTPClient, *bot.Config.Logging.Enabled); err != nil { gctlog.Errorf(gctlog.Global, "NTP manager unable to start: %s", err) diff --git a/engine/ntp_manager.go b/engine/ntp_manager.go index 66c54e4081a..deb9f07ae7c 100644 --- a/engine/ntp_manager.go +++ b/engine/ntp_manager.go @@ -1,7 +1,9 @@ package engine import ( + "context" "encoding/binary" + "errors" "fmt" "net" "sync/atomic" @@ -11,6 +13,90 @@ import ( "github.com/thrasher-corp/gocryptotrader/log" ) +const ( + ntpEpochOffset = 2208988800 + ntpDialTimeout = 5 * time.Second + ntpReadWriteTimeout = 5 * time.Second +) + +// errNoValidNTPServer is returned when no valid NTP server could be reached +var errNoValidNTPServer = errors.New("no valid NTP server could be reached") + +// CheckNTPOffset performs a one-time NTP check and returns the time offset. +// This can be called before the NTP manager is started to verify time sync. +// It uses the RFC 5905 formula: offset = ((T2-T1) + (T3-T4)) / 2 +func CheckNTPOffset(ctx context.Context, pools []string) (time.Duration, error) { + if len(pools) == 0 { + return 0, errors.New("no NTP pools configured") + } + + dialer := &net.Dialer{ + Timeout: ntpDialTimeout, + } + + for i := range pools { + conn, err := dialer.DialContext(ctx, "udp", pools[i]) + if err != nil { + log.Warnf(log.TimeMgr, "NTP check: Unable to connect to %v, attempting next", pools[i]) + continue + } + + if err = conn.SetDeadline(time.Now().Add(ntpReadWriteTimeout)); err != nil { + log.Warnf(log.TimeMgr, "NTP check: Unable to set deadline on %v. Error %s. Attempting next\n", pools[i], err) + if err = conn.Close(); err != nil { + log.Errorln(log.TimeMgr, err) + } + continue + } + + // T1: Record time before sending request (origin timestamp) + t1 := time.Now() + + req := &ntpPacket{Settings: 0x1B} + if err = binary.Write(conn, binary.BigEndian, req); err != nil { + log.Warnf(log.TimeMgr, "NTP check: Unable to write to %v. Error %s. Attempting next\n", pools[i], err) + if err = conn.Close(); err != nil { + log.Errorln(log.TimeMgr, err) + } + continue + } + + rsp := &ntpPacket{} + if err = binary.Read(conn, binary.BigEndian, rsp); err != nil { + log.Warnf(log.TimeMgr, "NTP check: Unable to read from %v. Error: %s. Attempting next\n", pools[i], err) + if err = conn.Close(); err != nil { + log.Errorln(log.TimeMgr, err) + } + continue + } + + // T4: Record time after receiving response (Destination timestamp) + t4 := time.Now() + + if err = conn.Close(); err != nil { + log.Errorln(log.TimeMgr, err) + } + + // T2: Server receive timestamp (when server received our request) + t2 := ntpTimestampToTime(rsp.RxTimeSec, rsp.RxTimeFrac) + // T3: Server transmit timestamp (when server sent our response) + t3 := ntpTimestampToTime(rsp.TxTimeSec, rsp.TxTimeFrac) + + // RFC 5905 offset calculation: ((T2-T1) + (T3-T4)) / 2 + // This formula cancels out the network round-trip time + offset := (t2.Sub(t1) + t3.Sub(t4)) / 2 + return offset, nil + } + return 0, errNoValidNTPServer +} + +// ntpTimestampToTime converts timestamp (seconds and fractional) to time.Time +func ntpTimestampToTime(seconds, fractional uint32) time.Time { + unixSeconds := int64(seconds) - ntpEpochOffset + nanos := (int64(fractional) * 1.e9) >> 32 + return time.Unix(unixSeconds, nanos) +} + // setupNTPManager creates a new NTP manager func setupNTPManager(cfg *config.NTPClientConfig, loggingEnabled bool) (*ntpManager, error) { if cfg == nil { @@ -53,7 +139,7 @@ func (m *ntpManager) Start() error { // the default retry limits before giving up check: for i := range m.retryLimit { - err := m.processTime() + err := m.processTime(context.Background()) switch err { case nil: break check @@ -107,7 +193,7 @@ func (m *ntpManager) run() { case <-m.shutdown: return case <-t.C: - err := m.processTime() + err := m.processTime(context.Background()) if err != nil { log.Errorln(log.TimeMgr, err) } @@ -123,83 +209,92 @@ func (m *ntpManager) FetchNTPTime() (time.Time, error) { if atomic.LoadInt32(&m.started) == 0 { return time.Time{}, fmt.Errorf("NTP manager %w", ErrSubSystemNotStarted) } - return m.checkTimeInPools(), nil + offset, err := m.getTimeOffset(context.Background()) + if err != nil { + return time.Time{}, err + } + return time.Now().Add(offset), nil } -// processTime determines the difference between system time and NTP time -// to discover discrepancies -func (m *ntpManager) processTime() error { +// processTime determines the difference between system time and NTP time to discover discrepancies +func (m *ntpManager) processTime(ctx context.Context) error { if atomic.LoadInt32(&m.started) == 0 { return fmt.Errorf("NTP manager %w", ErrSubSystemNotStarted) } - NTPTime, err := m.FetchNTPTime() + offset, err := m.getTimeOffset(ctx) if err != nil { return err } - currentTime := time.Now() - diff := NTPTime.Sub(currentTime) configNTPTime := m.allowedDifference negDiff := m.allowedNegativeDifference configNTPNegativeTime := -negDiff - if diff > configNTPTime || diff < configNTPNegativeTime { - log.Warnf(log.TimeMgr, "NTP manager: Time out of sync (NTP): %v | (time.Now()): %v | (Difference): %v | (Allowed): +%v / %v\n", - NTPTime, - currentTime, - diff, - configNTPTime, - configNTPNegativeTime) + if offset > configNTPTime || offset < configNTPNegativeTime { + log.Warnf(log.TimeMgr, "NTP manager: Time out of sync (Offset): %v | (Allowed) +%v / %v\n", offset, configNTPTime, configNTPNegativeTime) } return nil } -// checkTimeInPools returns local based on ntp servers provided timestamp -// if no server can be reached will return local time in UTC() -func (m *ntpManager) checkTimeInPools() time.Time { +// getTimeOffset queries NTP servers and returns the calculated time offset +// using the RFC5905 formula: offset = ((T2-T1) + (T3-T4)) / 2 +// This properly accounts for network round-trip time +func (m *ntpManager) getTimeOffset(ctx context.Context) (time.Duration, error) { + dialer := &net.Dialer{ + Timeout: ntpDialTimeout, + } + for i := range m.pools { - con, err := net.DialTimeout("udp", m.pools[i], 5*time.Second) //nolint:noctx // TODO: #2006 Use (*net.Dialer).DialContext with (*net.Dialer).Timeout + conn, err := dialer.DialContext(ctx, "udp", m.pools[i]) if err != nil { - log.Warnf(log.TimeMgr, "Unable to connect to hosts %v attempting next", m.pools[i]) + log.Warnf(log.TimeMgr, "Unable to connect to hosts %v attempting to next", m.pools[i]) continue } - if err = con.SetDeadline(time.Now().Add(5 * time.Second)); err != nil { - log.Warnf(log.TimeMgr, "Unable to SetDeadline. Error: %s\n", err) - err = con.Close() - if err != nil { + if err = conn.SetDeadline(time.Now().Add(ntpReadWriteTimeout)); err != nil { + log.Warnf(log.TimeMgr, "Unable to set deadline on hosts %v. Error %s. attempting to next\n", m.pools[i], err) + if err = conn.Close(); err != nil { log.Errorln(log.TimeMgr, err) } continue } + // T1: Record time before sending request (origin timestamp) + t1 := time.Now() + req := &ntpPacket{Settings: 0x1B} - if err = binary.Write(con, binary.BigEndian, req); err != nil { - log.Warnf(log.TimeMgr, "Unable to write. Error: %s\n", err) - err = con.Close() - if err != nil { + if err = binary.Write(conn, binary.BigEndian, req); err != nil { + log.Warnf(log.TimeMgr, "Unable to write to hosts %v. Error %s. Attempting to next\n", m.pools[i], err) + if err = conn.Close(); err != nil { log.Errorln(log.TimeMgr, err) } continue } rsp := &ntpPacket{} - if err = binary.Read(con, binary.BigEndian, rsp); err != nil { - log.Warnf(log.TimeMgr, "Unable to read. Error: %s\n", err) - err = con.Close() - if err != nil { + if err = binary.Read(conn, binary.BigEndian, rsp); err != nil { + log.Warnf(log.TimeMgr, "Unable to read from hosts %v. Error: %s. Attempting to next\n", m.pools[i], err) + if err = conn.Close(); err != nil { log.Errorln(log.TimeMgr, err) } continue } - secs := float64(rsp.TxTimeSec) - 2208988800 - nanos := (int64(rsp.TxTimeFrac) * 1e9) >> 32 + // T4L Record time after receiving response (Destination timestamp) + t4 := time.Now() - err = con.Close() - if err != nil { + if err = conn.Close(); err != nil { log.Errorln(log.TimeMgr, err) } - return time.Unix(int64(secs), nanos) + + // T2: Server receive timestamp (when server received our request) + t2 := ntpTimestampToTime(rsp.RxTimeSec, rsp.RxTimeFrac) + // T3: Server transmit timestamp (when server sent our response) + t3 := ntpTimestampToTime(rsp.TxTimeSec, rsp.TxTimeFrac) + + // RFC 5905 offset calculation: ((T2-T1) + (T3-T4)) / 2 + // This formula cancels out the network round-trip time + offset := (t2.Sub(t1) + t3.Sub(t4)) / 2 + return offset, nil } - log.Warnln(log.TimeMgr, "No valid NTP servers found, using current system time") - return time.Now().UTC() + log.Warnln(log.TimeMgr, "No valid NTP servers found") + return 0, errNoValidNTPServer } diff --git a/engine/ntp_manager_test.go b/engine/ntp_manager_test.go index 6c01a3be47a..fabd0aa698f 100644 --- a/engine/ntp_manager_test.go +++ b/engine/ntp_manager_test.go @@ -1,6 +1,7 @@ package engine import ( + "context" "testing" "time" @@ -65,6 +66,7 @@ func TestNTPManagerStart(t *testing.T) { cfg := &config.NTPClientConfig{ AllowedDifference: &sec, AllowedNegativeDifference: &sec, + Pool: []string{"0.pool.ntp.org:123"}, } m, err = setupNTPManager(cfg, true) assert.NoError(t, err) @@ -114,6 +116,7 @@ func TestFetchNTPTime(t *testing.T) { AllowedDifference: &sec, AllowedNegativeDifference: &sec, Level: 1, + Pool: []string{"0.pool.ntp.org:123"}, } m, err = setupNTPManager(cfg, true) assert.NoError(t, err) @@ -130,14 +133,6 @@ func TestFetchNTPTime(t *testing.T) { if tt.IsZero() { t.Error("expected time") } - - m.pools = []string{"0.pool.ntp.org:123"} - tt, err = m.FetchNTPTime() - assert.NoError(t, err) - - if tt.IsZero() { - t.Error("expected time") - } } func TestProcessTime(t *testing.T) { @@ -151,17 +146,17 @@ func TestProcessTime(t *testing.T) { m, err := setupNTPManager(cfg, true) assert.NoError(t, err) - err = m.processTime() + err = m.processTime(context.Background()) assert.ErrorIs(t, err, ErrSubSystemNotStarted) err = m.Start() assert.NoError(t, err) - err = m.processTime() + err = m.processTime(context.Background()) assert.NoError(t, err) m.allowedDifference = time.Duration(1) m.allowedNegativeDifference = time.Duration(1) - err = m.processTime() + err = m.processTime(context.Background()) assert.NoError(t, err) }