Skip to content
Open
7 changes: 7 additions & 0 deletions core/reactor_common.c
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ instant_t duration = -1LL;
/** Indicator of whether the keepalive command-line option was given. */
bool keepalive_specified = false;

/** The minimum period of triggered timers, used as an upperbound on the lag for wait_until */
instant_t _min_timer_period = FOREVER;

void* lf_allocate(size_t count, size_t size, struct allocation_record_t** head) {
void* mem = calloc(count, size);
if (mem == NULL)
Expand Down Expand Up @@ -306,6 +309,10 @@ void _lf_pop_events(environment_t* env) {

// If the trigger is a periodic timer, create a new event for its next execution.
if (event->trigger->is_timer && event->trigger->period > 0LL) {
// Update the min timer
if (_min_timer_period > event->trigger->period) {
_min_timer_period = event->trigger->period;
}
// Reschedule the trigger.
lf_schedule_trigger(env, event->trigger, event->trigger->period, NULL);
}
Expand Down
47 changes: 38 additions & 9 deletions core/threaded/reactor_threaded.c
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,8 @@ void lf_set_present(lf_port_base_t* port) {
/**
* Wait until physical time matches or exceeds the specified logical time,
* unless -fast is given. For decentralized coordination, this function will
* add the STA offset to the wait time.
* add the STA offset to the wait time. For lag control, this function will implement
* an I-controller to regulate the lag introduced by the underlying sleep function.
*
* If an event is put on the event queue during the wait, then the wait is
* interrupted and this function returns false. It also returns false if the
Expand All @@ -222,6 +223,10 @@ void lf_set_present(lf_port_base_t* port) {
*/
bool wait_until(instant_t logical_time, lf_cond_t* condition) {
LF_PRINT_DEBUG("-------- Waiting until physical time matches logical time " PRINTF_TIME, logical_time);
// Control value for the i-controller
static interval_t error_control = NSEC(0);
// Do not incorporate the first lag measurement. It's an outlier due to initilizations
static int flag = false;
interval_t wait_until_time = logical_time;
#ifdef FEDERATED_DECENTRALIZED // Only apply the STA if coordination is decentralized
// Apply the STA to the logical time
Expand All @@ -234,11 +239,19 @@ bool wait_until(instant_t logical_time, lf_cond_t* condition) {
}
#endif
if (!fast) {
// Check whether we actually need to wait, or if we have already passed the timepoint.
interval_t wait_duration = wait_until_time - lf_time_physical();
if (wait_duration < MIN_SLEEP_DURATION) {
LF_PRINT_DEBUG("Wait time " PRINTF_TIME " is less than MIN_SLEEP_DURATION " PRINTF_TIME ". Skipping wait.",
wait_duration, MIN_SLEEP_DURATION);
error_control = LF_MAX(error_control, NSEC(0));
// Subtract the control value from the requested wait_until_time
interval_t wait_until_time_with_adjustment = wait_until_time - error_control;
instant_t now = lf_time_physical();

// If the requested time is already passed, return
// If the adjusted wait time is already passed, but the requested wait time
// didn't come yet, then adjust the control value and busy wait until the requested time.
if (wait_until_time < now) {
return true;
} else if (wait_until_time_with_adjustment < now && wait_until_time > now) {
while (wait_until_time > lf_time_physical())
;
return true;
}

Expand All @@ -247,7 +260,7 @@ bool wait_until(instant_t logical_time, lf_cond_t* condition) {
// returns 0 if it is awakened before the timeout. Hence, we want to run
// it repeatedly until either it returns non-zero or the current
// physical time matches or exceeds the logical time.
if (lf_clock_cond_timedwait(condition, wait_until_time) != LF_TIMEOUT) {
if (lf_clock_cond_timedwait(condition, wait_until_time_with_adjustment) != LF_TIMEOUT) {
LF_PRINT_DEBUG("-------- wait_until interrupted before timeout.");

// Wait did not time out, which means that there
Expand All @@ -259,8 +272,24 @@ bool wait_until(instant_t logical_time, lf_cond_t* condition) {
return false;
} else {
// Reached timeout.
LF_PRINT_DEBUG("-------- Returned from wait, having waited " PRINTF_TIME " ns.", wait_duration);
return true;
LF_PRINT_DEBUG("-------- Returned from wait, having waited until " PRINTF_TIME " ns.", wait_until_time);

// Calculate the lag and update the control value
instant_t lag = lf_time_physical() - wait_until_time;
if (flag && lag <= _min_timer_period) {
error_control = error_control + ((lag * KI_MUL) / KI_DIV);
} else {
flag = true;
}

// Check if the requested time is passed, if not busy wait
if (lf_time_physical() > wait_until_time) {
return true;
} else {
while (wait_until_time > lf_time_physical())
;
return true;
}
}
}
return true;
Expand Down
1 change: 1 addition & 0 deletions include/core/reactor_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ extern const char** default_argv;
extern instant_t duration;
extern bool fast;
extern bool keepalive_specified;
extern instant_t _min_timer_period;

#ifdef FEDERATED_DECENTRALIZED
extern interval_t lf_fed_STA_offset;
Expand Down
6 changes: 6 additions & 0 deletions low_level_platform/api/platform/lf_arduino_support.h
Original file line number Diff line number Diff line change
Expand Up @@ -132,4 +132,10 @@ typedef void* lf_thread_t;
// Arduinos are embedded platforms with no tty
#define NO_TTY

/* The constants used for I-controller for lag regulation under reactor_threaded.c wait_until function.
The lag gets multiplied by KI_MUL and divided by KI_DIV before incorporated into control value.
Currently no lag control support in this platform. */
#define KI_DIV 1
#define KI_MUL 0

#endif // LF_ARDUINO_SUPPORT_H
5 changes: 5 additions & 0 deletions low_level_platform/api/platform/lf_linux_support.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,9 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#error Linux platform misses clock support
#endif

/* The constants used for I-controller for lag regulation under reactor_threaded.c wait_until function.
The lag gets multiplied by KI_MUL and divided by KI_DIV before incorporated into control value. */
#define KI_DIV 1
#define KI_MUL 1

#endif // LF_LINUX_SUPPORT_H
7 changes: 7 additions & 0 deletions low_level_platform/api/platform/lf_macos_support.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,13 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

#if !defined LF_SINGLE_THREADED
#include "lf_POSIX_threads_support.h"

/* The constants used for I-controller for lag regulation under reactor_threaded.c wait_until function.
The lag gets multiplied by KI_MUL and divided by KI_DIV before incorporated into control value.
Currently no lag control support in this platform. */
#define KI_DIV 1
#define KI_MUL 0

#endif

#endif // LF_MACOS_SUPPORT_H
6 changes: 6 additions & 0 deletions low_level_platform/api/platform/lf_nrf52_support.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,10 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
typedef void* lf_mutex_t;
typedef void _lf_cond_var_t;

/* The constants used for I-controller for lag regulation under reactor_threaded.c wait_until function.
The lag gets multiplied by KI_MUL and divided by KI_DIV before incorporated into control value.
Currently no lag control support in this platform. */
#define KI_DIV 1
#define KI_MUL 0

#endif // LF_nRF52832_SUPPORT_H
6 changes: 6 additions & 0 deletions low_level_platform/api/platform/lf_rp2040_support.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,10 @@
#define LF_TIME_BUFFER_LENGTH 80
#define _LF_TIMEOUT 1

/* The constants used for I-controller for lag regulation under reactor_threaded.c wait_until function.
The lag gets multiplied by KI_MUL and divided by KI_DIV before incorporated into control value.
Currently no lag control support in this platform. */
#define KI_DIV 1
#define KI_MUL 0

#endif // LF_PICO_SUPPORT_H
6 changes: 6 additions & 0 deletions low_level_platform/api/platform/lf_windows_support.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,12 @@ typedef struct {
CONDITION_VARIABLE condition;
} lf_cond_t;
typedef HANDLE lf_thread_t;

/* The constants used for I-controller for lag regulation under reactor_threaded.c wait_until function.
The lag gets multiplied by KI_MUL and divided by KI_DIV before incorporated into control value.
Currently no lag control support in this platform. */
#define KI_DIV 1
#define KI_MUL 0
#endif

// Use 64-bit times and 32-bit unsigned microsteps
Expand Down
6 changes: 6 additions & 0 deletions low_level_platform/api/platform/lf_zephyr_support.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,12 @@ typedef struct {
} lf_cond_t;
typedef struct k_thread* lf_thread_t;

/* The constants used for I-controller for lag regulation under reactor_threaded.c wait_until function.
The lag gets multiplied by KI_MUL and divided by KI_DIV before incorporated into control value.
Currently no lag control support in this platform. */
#define KI_DIV 1
#define KI_MUL 0

#endif // !LF_SINGLE_THREADED

#endif // LF_ZEPHYR_SUPPORT_H