diff --git a/core/lf_token.c b/core/lf_token.c index b97a3b366..c71127c01 100644 --- a/core/lf_token.c +++ b/core/lf_token.c @@ -235,7 +235,7 @@ lf_token_t* _lf_new_token(token_type_t* type, void* value, size_t length) { result->ref_count = 0; return result; } - + lf_token_t* _lf_get_token(token_template_t* tmplt) { LF_CRITICAL_SECTION_ENTER(GLOBAL_ENVIRONMENT); if (tmplt->token != NULL && tmplt->token->ref_count == 1) { diff --git a/core/lf_utils.cmake b/core/lf_utils.cmake index 5b6b35921..bb4e81eb7 100644 --- a/core/lf_utils.cmake +++ b/core/lf_utils.cmake @@ -1,6 +1,6 @@ function(lf_enable_compiler_warnings target) if(${CMAKE_SYSTEM_NAME} STREQUAL "Linux") - target_compile_options(${target} PRIVATE -Wall -Wextra -Wpedantic -Werror) + target_compile_options(${target} PRIVATE -Wall -Wextra -Wpedantic) elseif(${CMAKE_SYSTEM_NAME} STREQUAL "Windows") target_compile_options(${target} PRIVATE /W4) elseif(${CMAKE_SYSTEM_NAME} STREQUAL "Darwin") diff --git a/core/threaded/CMakeLists.txt b/core/threaded/CMakeLists.txt index 31fce534a..cb392a19e 100644 --- a/core/threaded/CMakeLists.txt +++ b/core/threaded/CMakeLists.txt @@ -2,6 +2,7 @@ set( THREADED_SOURCES reactor_threaded.c scheduler_adaptive.c + scheduler_static.c scheduler_GEDF_NP.c scheduler_NP.c scheduler_sync_tag_advance.c diff --git a/core/threaded/scheduler_static.c b/core/threaded/scheduler_static.c new file mode 100644 index 000000000..5331ca659 --- /dev/null +++ b/core/threaded/scheduler_static.c @@ -0,0 +1,548 @@ +#if !defined(LF_SINGLE_THREADED) + +/** + * A static scheduler for the threaded runtime of the C target of Lingua Franca. + */ +#include +#include "lf_types.h" +#if defined SCHEDULER && SCHEDULER == SCHED_STATIC +#ifndef NUMBER_OF_WORKERS +#define NUMBER_OF_WORKERS 1 +#endif // NUMBER_OF_WORKERS + +#include + +#include "platform.h" +#include "reactor_common.h" +#include "scheduler_instance.h" +#include "scheduler_static_functions.h" +#include "scheduler_sync_tag_advance.h" +#include "scheduler.h" +#include "tag.h" +#include "tracepoint.h" +#include "util.h" + +#ifndef TRACE_ALL_INSTRUCTIONS +#define TRACE_ALL_INSTRUCTIONS false +#endif +#define SPIN_WAIT_THRESHOLD SEC(1) + +/////////////////// External Variables ///////////////////////// +// Global variable defined in tag.c: +extern instant_t start_time; + +// Global variables defined in schedule.c: +extern const inst_t* static_schedules[]; +extern reg_t timeout; +extern const size_t num_counters; +extern reg_t time_offset; +extern reg_t offset_inc; +extern const uint64_t zero; +extern volatile uint32_t counters[]; +extern volatile reg_t return_addr[]; +extern volatile reg_t binary_sema[]; + +/////////////////// Scheduler Private API ///////////////////////// + +/** + * @brief The implementation of the ADD instruction + */ +void execute_inst_ADD(lf_scheduler_t* scheduler, size_t worker_number, operand_t op1, operand_t op2, operand_t op3, bool debug, size_t* pc, + reaction_t** returned_reaction, bool* exit_loop) { + char* op_str = "ADD"; + LF_PRINT_DEBUG("*** Worker %zu executing instruction: [Line %zu] %s %" PRIu64 " %" PRIu64 " %" PRIu64, worker_number, *pc, op_str, op1.imm, op2.imm, op3.imm); +#if TRACE_ALL_INSTRUCTIONS + int pc_orig = (int) *pc; + tracepoint_static_scheduler_ADD_starts(worker_number, pc_orig); +#endif + reg_t *dst = op1.reg; + reg_t *src = op2.reg; + reg_t *src2 = op3.reg; + *dst = *src + *src2; + *pc += 1; // Increment pc. +#if TRACE_ALL_INSTRUCTIONS + tracepoint_static_scheduler_ADD_ends(worker_number, pc_orig); +#endif +} + +/** + * @brief The implementation of the ADDI instruction + */ +void execute_inst_ADDI(lf_scheduler_t* scheduler, size_t worker_number, operand_t op1, operand_t op2, operand_t op3, bool debug, size_t* pc, + reaction_t** returned_reaction, bool* exit_loop) { + char* op_str = "ADDI"; + LF_PRINT_DEBUG("*** Worker %zu executing instruction: [Line %zu] %s %" PRIu64 " %" PRIu64 " %" PRIu64, worker_number, *pc, op_str, op1.imm, op2.imm, op3.imm); +#if TRACE_ALL_INSTRUCTIONS + int pc_orig = (int) *pc; + tracepoint_static_scheduler_ADDI_starts(worker_number, pc_orig); +#endif + reg_t *dst = op1.reg; + reg_t *src = op2.reg; + // FIXME: Will there be problems if instant_t adds reg_t? + *dst = *src + op3.imm; + *pc += 1; // Increment pc. +#if TRACE_ALL_INSTRUCTIONS + tracepoint_static_scheduler_ADDI_ends(worker_number, pc_orig); +#endif +} + +/** + * @brief The implementation of the ADV instruction + */ +void execute_inst_ADV(lf_scheduler_t* scheduler, size_t worker_number, operand_t op1, operand_t op2, operand_t op3, bool debug, size_t* pc, + reaction_t** returned_reaction, bool* exit_loop) { + char* op_str = "ADV"; + LF_PRINT_DEBUG("*** Worker %zu executing instruction: [Line %zu] %s %" PRIu64 " %" PRIu64 " %" PRIu64, worker_number, *pc, op_str, op1.imm, op2.imm, op3.imm); +#if TRACE_ALL_INSTRUCTIONS + int pc_orig = (int) *pc; + tracepoint_static_scheduler_ADV_starts(worker_number, pc_orig); +#endif + self_base_t *reactor = (self_base_t*) op1.reg; + reg_t *base = op2.reg; + reg_t *inc = op3.reg; + reactor->tag.time = *base + *inc; + reactor->tag.microstep = 0; + + // Reset all "is_present" fields of the output ports of the reactor + // Doing this here has the major implication that ADV has to execute AFTER + // all downstream reactions have finished, since it is modifying state that is + // visible to those reactions. + for (int i = 0; i < reactor->num_output_ports; i++) { + reactor->output_ports[i]->is_present = false; + } + + *pc += 1; // Increment pc. +#if TRACE_ALL_INSTRUCTIONS + tracepoint_static_scheduler_ADV_ends(worker_number, pc_orig); +#endif +} + +/** + * @brief The implementation of the ADVI instruction + */ +void execute_inst_ADVI(lf_scheduler_t* scheduler, size_t worker_number, operand_t op1, operand_t op2, operand_t op3, bool debug, size_t* pc, + reaction_t** returned_reaction, bool* exit_loop) { + char* op_str = "ADVI"; + LF_PRINT_DEBUG("*** Worker %zu executing instruction: [Line %zu] %s (reactor %p) %" PRIu64 " %" PRIu64, worker_number, *pc, op_str, op1.reg, *(op2.reg), op3.imm); +#if TRACE_ALL_INSTRUCTIONS + int pc_orig = (int) *pc; + tracepoint_static_scheduler_ADVI_starts(worker_number, pc_orig); +#endif + self_base_t *reactor = (self_base_t*) op1.reg; + reg_t *base = op2.reg; + reactor->tag.time = *base + op3.imm; + reactor->tag.microstep = 0; + + // Reset all "is_present" fields of the output ports of the reactor + // Doing this here has the major implication that ADVI has to execute AFTER + // all downstream reactions have finished, since it is modifying state that is + // visible to those reactions. + for (int i = 0; i < reactor->num_output_ports; i++) { + reactor->output_ports[i]->is_present = false; + } + + *pc += 1; // Increment pc. +#if TRACE_ALL_INSTRUCTIONS + tracepoint_static_scheduler_ADVI_ends(worker_number, pc_orig); +#endif +} + +/** + * @brief The implementation of the BEQ instruction + */ +void execute_inst_BEQ(lf_scheduler_t* scheduler, size_t worker_number, operand_t op1, operand_t op2, operand_t op3, bool debug, size_t* pc, + reaction_t** returned_reaction, bool* exit_loop) { + char* op_str = "BEQ"; + LF_PRINT_DEBUG("*** Worker %zu executing instruction: [Line %zu] %s %" PRIu64 " %" PRIu64 " %" PRIu64, worker_number, *pc, op_str, op1.imm, op2.imm, op3.imm); +#if TRACE_ALL_INSTRUCTIONS + int pc_orig = (int) *pc; + tracepoint_static_scheduler_BEQ_starts(worker_number, pc_orig); +#endif + reg_t *_op1 = op1.reg; + reg_t *_op2 = op2.reg; + // These NULL checks allow _op1 and _op2 to be uninitialized in the static + // schedule, which can save a few lines in the schedule. But it is debatable + // whether this is good practice. + if (debug) { + lf_print("DEBUG: _op1 = %p, _op2 = %p", _op1, _op2); + if (_op1 != NULL) lf_print("DEBUG: *_op1 = %lld", *_op1); + if (_op2 != NULL) lf_print("DEBUG: *_op2 = %lld", *_op2); + } + if (_op1 != NULL && _op2 != NULL && *_op1 == *_op2) *pc = op3.imm; + else *pc += 1; +#if TRACE_ALL_INSTRUCTIONS + tracepoint_static_scheduler_BEQ_ends(worker_number, pc_orig); +#endif +} + +/** + * @brief The implementation of the BGE instruction + */ +void execute_inst_BGE(lf_scheduler_t* scheduler, size_t worker_number, operand_t op1, operand_t op2, operand_t op3, bool debug, size_t* pc, + reaction_t** returned_reaction, bool* exit_loop) { + char* op_str = "BGE"; + LF_PRINT_DEBUG("*** Worker %zu executing instruction: [Line %zu] %s %" PRIu64 " %" PRIu64 " %" PRIu64, worker_number, *pc, op_str, op1.imm, op2.imm, op3.imm); +#if TRACE_ALL_INSTRUCTIONS + int pc_orig = (int) *pc; + tracepoint_static_scheduler_BGE_starts(worker_number, pc_orig); +#endif + reg_t *_op1 = op1.reg; + reg_t *_op2 = op2.reg; + LF_PRINT_DEBUG("Worker %zu: BGE : operand 1 = %lld, operand 2 = %lld", worker_number, *_op1, *_op2); + if (_op1 != NULL && _op2 != NULL && *_op1 >= *_op2) *pc = op3.imm; + else *pc += 1; +#if TRACE_ALL_INSTRUCTIONS + tracepoint_static_scheduler_BGE_ends(worker_number, pc_orig); +#endif +} + +/** + * @brief The implementation of the BLT instruction + */ +void execute_inst_BLT(lf_scheduler_t* scheduler, size_t worker_number, operand_t op1, operand_t op2, operand_t op3, bool debug, size_t* pc, + reaction_t** returned_reaction, bool* exit_loop) { + char* op_str = "BLT"; + LF_PRINT_DEBUG("*** Worker %zu executing instruction: [Line %zu] %s %" PRIu64 " %" PRIu64 " %" PRIu64, worker_number, *pc, op_str, op1.imm, op2.imm, op3.imm); +#if TRACE_ALL_INSTRUCTIONS + int pc_orig = (int) *pc; + tracepoint_static_scheduler_BLT_starts(worker_number, pc_orig); +#endif + reg_t *_op1 = op1.reg; + reg_t *_op2 = op2.reg; + if (_op1 != NULL && _op2 != NULL && *_op1 < *_op2) *pc = op3.imm; + else *pc += 1; + if (debug) lf_print("op1: %lld, op2: %lld, op1 < op2: %d", *_op1, *_op2, *_op1 < *_op2); +#if TRACE_ALL_INSTRUCTIONS + tracepoint_static_scheduler_BLT_ends(worker_number, pc_orig); +#endif +} + +/** + * @brief The implementation of the BNE instruction + */ +void execute_inst_BNE(lf_scheduler_t* scheduler, size_t worker_number, operand_t op1, operand_t op2, operand_t op3, bool debug, size_t* pc, + reaction_t** returned_reaction, bool* exit_loop) { + char* op_str = "BNE"; + LF_PRINT_DEBUG("*** Worker %zu executing instruction: [Line %zu] %s %" PRIu64 " %" PRIu64 " %" PRIu64, worker_number, *pc, op_str, op1.imm, op2.imm, op3.imm); +#if TRACE_ALL_INSTRUCTIONS + int pc_orig = (int) *pc; + tracepoint_static_scheduler_BNE_starts(worker_number, pc_orig); +#endif + reg_t *_op1 = op1.reg; + reg_t *_op2 = op2.reg; + if (_op1 != NULL && _op2 != NULL && *_op1 != *_op2) *pc = op3.imm; + else *pc += 1; +#if TRACE_ALL_INSTRUCTIONS + tracepoint_static_scheduler_BNE_ends(worker_number, pc_orig); +#endif +} + +/** + * @brief The implementation of the DU instruction + */ +void execute_inst_DU(lf_scheduler_t* scheduler, size_t worker_number, operand_t op1, operand_t op2, operand_t op3, bool debug, size_t* pc, + reaction_t** returned_reaction, bool* exit_loop) { + char* op_str = "DU"; + LF_PRINT_DEBUG("*** Worker %zu executing instruction: [Line %zu] %s %" PRIu64 " %" PRIu64 " %" PRIu64, worker_number, *pc, op_str, op1.imm, op2.imm, op3.imm); +#if TRACE_ALL_INSTRUCTIONS + int pc_orig = (int) *pc; + tracepoint_static_scheduler_DU_starts(worker_number, pc_orig); +#endif + reg_t *src = op1.reg; + instant_t wakeup_time = *src + op2.imm; + LF_PRINT_DEBUG("DU wakeup time: %lld, base: %lld, offset: %lld", wakeup_time, *src, op2.imm); + // Check if we need to sleep. + instant_t current_time; + _lf_clock_gettime(¤t_time); + instant_t wait_interval = wakeup_time - current_time; + LF_PRINT_DEBUG( + "*** [Line %zu] Worker %zu delaying, current_physical_time: %lld, wakeup_time: %lld, wait_interval: %lld", *pc, + worker_number, current_time, wakeup_time, wait_interval); + if (wait_interval > 0) { + // Recalculate the wakeup time for max accuracy. + _lf_clock_gettime(¤t_time); + lf_sleep(wakeup_time - current_time); + } + LF_PRINT_DEBUG("*** [Line %zu] Worker %zu done delaying", *pc, worker_number); + *pc += 1; // Increment pc. +#if TRACE_ALL_INSTRUCTIONS + tracepoint_static_scheduler_DU_ends(worker_number, pc_orig); +#endif +} + +/** + * @brief The implementation of the EXE instruction + */ +void execute_inst_EXE(lf_scheduler_t* scheduler, size_t worker_number, operand_t op1, operand_t op2, operand_t op3, bool debug, size_t* pc, + reaction_t** returned_reaction, bool* exit_loop) { + char* op_str = "EXE"; + LF_PRINT_DEBUG("*** Worker %zu executing instruction: [Line %zu] %s %" PRIu64 " %" PRIu64 " %" PRIu64, worker_number, *pc, op_str, op1.imm, op2.imm, op3.imm); +#if TRACE_ALL_INSTRUCTIONS + int pc_orig = (int) *pc; + if (op3.imm != ULLONG_MAX) {tracepoint_static_scheduler_EXE_reaction_starts((void*)op2.reg, worker_number, op3.imm);} + else {tracepoint_static_scheduler_EXE_starts(worker_number, pc_orig);} +#else + if (op3.imm != ULLONG_MAX) {tracepoint_static_scheduler_EXE_reaction_starts((void*)op2.reg, worker_number, op3.imm);} +#endif + function_generic_t function = (function_generic_t)(uintptr_t)op1.reg; + void *args = (void*)op2.reg; + // Execute the function directly. + LF_PRINT_DEBUG("*** [Line %zu] Worker %zu executing reaction", *pc, worker_number); + function(args); + LF_PRINT_DEBUG("*** [Line %zu] Worker %zu done executing reaction", *pc, worker_number); + *pc += 1; // Increment pc. +#if TRACE_ALL_INSTRUCTIONS + if (op3.imm != ULLONG_MAX) {tracepoint_static_scheduler_EXE_reaction_ends((void*)op2.reg, worker_number, op3.imm);} + else {tracepoint_static_scheduler_EXE_ends(worker_number, pc_orig);} +#else + if (op3.imm != ULLONG_MAX) {tracepoint_static_scheduler_EXE_reaction_ends((void*)op2.reg, worker_number, op3.imm);} +#endif + // This full memory barrier is required to ensure that the worker + // counter after this EXE instruction only gets incremented when the + // reaction body or auxiliary function runs to completion, i.e., it + // prevents the compiler or the CPU hardware from making bad + // out-of-order execution decisions. + // See RaceCondition.lf for more details. + __sync_synchronize(); +} + + +/** + * @brief The implementation of the WLT instruction + */ +void execute_inst_WLT(lf_scheduler_t* scheduler, size_t worker_number, operand_t op1, operand_t op2, operand_t op3, bool debug, size_t* pc, + reaction_t** returned_reaction, bool* exit_loop) { + char* op_str = "WLT"; + LF_PRINT_DEBUG("*** Worker %zu executing instruction: [Line %zu] %s %" PRIu64 " %" PRIu64 " %" PRIu64, worker_number, *pc, op_str, op1.imm, op2.imm, op3.imm); +#if TRACE_ALL_INSTRUCTIONS + int pc_orig = (int) *pc; + tracepoint_static_scheduler_WLT_starts(worker_number, pc_orig); +#endif + LF_PRINT_DEBUG("*** Worker %zu waiting", worker_number); + reg_t *var = op1.reg; + while(*var >= op2.imm); + LF_PRINT_DEBUG("*** Worker %zu done waiting", worker_number); + *pc += 1; // Increment pc. +#if TRACE_ALL_INSTRUCTIONS + tracepoint_static_scheduler_WLT_ends(worker_number, pc_orig); +#endif +} + +/** + * @brief The implementation of the WU instruction + */ +void execute_inst_WU(lf_scheduler_t* scheduler, size_t worker_number, operand_t op1, operand_t op2, operand_t op3, bool debug, size_t* pc, + reaction_t** returned_reaction, bool* exit_loop) { + char* op_str = "WU"; + LF_PRINT_DEBUG("*** Worker %zu executing instruction: [Line %zu] %s %" PRIu64 " %" PRIu64 " %" PRIu64, worker_number, *pc, op_str, op1.imm, op2.imm, op3.imm); +#if TRACE_ALL_INSTRUCTIONS + int pc_orig = (int) *pc; + tracepoint_static_scheduler_WU_starts(worker_number, pc_orig); +#endif + LF_PRINT_DEBUG("*** Worker %zu waiting", worker_number); + reg_t *var = op1.reg; + while(*var < op2.imm); + LF_PRINT_DEBUG("*** Worker %zu done waiting", worker_number); + *pc += 1; // Increment pc. +#if TRACE_ALL_INSTRUCTIONS + tracepoint_static_scheduler_WU_ends(worker_number, pc_orig); +#endif +} + +/** + * @brief The implementation of the JAL instruction + */ +void execute_inst_JAL(lf_scheduler_t* scheduler, size_t worker_number, operand_t op1, operand_t op2, operand_t op3, bool debug, size_t* pc, + reaction_t** returned_reaction, bool* exit_loop) { + char* op_str = "JAL"; + LF_PRINT_DEBUG("*** Worker %zu executing instruction: [Line %zu] %s %" PRIu64 " %" PRIu64 " %" PRIu64, worker_number, *pc, op_str, op1.imm, op2.imm, op3.imm); +#if TRACE_ALL_INSTRUCTIONS + int pc_orig = (int) *pc; + tracepoint_static_scheduler_JAL_starts(worker_number, pc_orig); +#endif + // Use the destination register as the return address and, if the + // destination register is not the zero register, store pc+1 in it. + reg_t *destReg = op1.reg; + if (destReg != &zero) *destReg = *pc + 1; + *pc = op2.imm + op3.imm; // New pc = label + offset +#if TRACE_ALL_INSTRUCTIONS + tracepoint_static_scheduler_JAL_ends(worker_number, pc_orig); +#endif +} + +/** + * @brief The implementation of the JALR instruction + */ +void execute_inst_JALR(lf_scheduler_t* scheduler, size_t worker_number, operand_t op1, operand_t op2, operand_t op3, bool debug, size_t* pc, + reaction_t** returned_reaction, bool* exit_loop) { + char* op_str = "JALR"; + LF_PRINT_DEBUG("*** Worker %zu executing instruction: [Line %zu] %s %" PRIu64 " %" PRIu64 " %" PRIu64, worker_number, *pc, op_str, op1.imm, op2.imm, op3.imm); +#if TRACE_ALL_INSTRUCTIONS + int pc_orig = (int) *pc; + tracepoint_static_scheduler_JALR_starts(worker_number, pc_orig); +#endif + // Use the destination register as the return address and, if the + // destination register is not the zero register, store pc+1 in it. + reg_t *destReg = op1.reg; + if (destReg != &zero) *destReg = *pc + 1; + // Set pc to base addr + immediate. + reg_t *baseAddr = op2.reg; + *pc = *baseAddr + op3.imm; +#if TRACE_ALL_INSTRUCTIONS + tracepoint_static_scheduler_JALR_ends(worker_number, pc_orig); +#endif +} + +/** + * @brief The implementation of the STP instruction + */ +void execute_inst_STP(lf_scheduler_t* scheduler, size_t worker_number, operand_t op1, operand_t op2, operand_t op3, bool debug, size_t* pc, + reaction_t** returned_reaction, bool* exit_loop) { + char* op_str = "STP"; + LF_PRINT_DEBUG("*** Worker %zu executing instruction: [Line %zu] %s %" PRIu64 " %" PRIu64 " %" PRIu64, worker_number, *pc, op_str, op1.imm, op2.imm, op3.imm); +#if TRACE_ALL_INSTRUCTIONS + int pc_orig = (int) *pc; + tracepoint_static_scheduler_STP_starts(worker_number, pc_orig); +#endif + *exit_loop = true; +#if TRACE_ALL_INSTRUCTIONS + tracepoint_static_scheduler_STP_ends(worker_number, pc_orig); +#endif +} + +///////////////////// Scheduler Init and Destroy API ///////////////////////// +/** + * @brief Initialize the scheduler. + * + * This has to be called before other functions of the scheduler can be used. + * If the scheduler is already initialized, this will be a no-op. + * + * @param number_of_workers Indicate how many workers this scheduler will be + * managing. + * @param option Pointer to a `sched_params_t` struct containing additional + * scheduler parameters. + */ +void lf_sched_init( + environment_t *env, + size_t number_of_workers, + sched_params_t* params +) { + LF_PRINT_DEBUG("Scheduler: Initializing with %zu workers", number_of_workers); + + // Scheduler already initialized + if (!init_sched_instance(env, &env->scheduler, number_of_workers, params)) { + // FIXME: This is not the best practice and seems to take advantage of a + // bug in the runtime. + // lf_sched_init() is for some reason called twice. + // Once in lf_reactor_c_main() in reactor_threaded.c. + // Another in initialize() -> _lf_initialize_trigger_objects() + // -> lf_sched_init(), also in reactor_threaded.c. + // This implementation takes advantage of the fact that when + // lf_sched_init() is called the second time, start_time is set + // to a meaningful value. When the first time lf_sched_init() is + // called, start_time has not been set. + + // Initialize the local tags for the SCHED_STATIC scheduler. + for (int i = 0; i < env->scheduler->num_reactor_self_instances; i++) { + env->scheduler->reactor_self_instances[i]->tag.time = start_time; + env->scheduler->reactor_self_instances[i]->tag.microstep = 0; + } + + // Already initialized + return; + } + + env->scheduler->pc = calloc(number_of_workers, sizeof(size_t)); + env->scheduler->static_schedules = &static_schedules[0]; + env->scheduler->counters = counters; + + initialize_static_schedule(); +} + +/** + * @brief Free the memory used by the scheduler. + * + * This must be called when the scheduler is no longer needed. + */ +void lf_sched_free(lf_scheduler_t* scheduler) { + LF_PRINT_DEBUG("Freeing the pointers in the scheduler struct."); + free(scheduler->pc); + free(scheduler->reactor_self_instances); + free(scheduler->reaction_instances); +} + +///////////////////// Scheduler Worker API (public) ///////////////////////// +/** + * @brief Ask the scheduler for one more reaction. + * + * This function blocks until it can return a ready reaction for worker thread + * 'worker_number' or it is time for the worker thread to stop and exit (where a + * NULL value would be returned). + * + * @param worker_number + * @return reaction_t* A reaction for the worker to execute. NULL if the calling + * worker thread should exit. + */ +reaction_t* lf_sched_get_ready_reaction(lf_scheduler_t* scheduler, int worker_number) { + LF_PRINT_DEBUG("Worker %d inside lf_sched_get_ready_reaction", worker_number); + + const inst_t* current_schedule = scheduler->static_schedules[worker_number]; + reaction_t* returned_reaction = NULL; + bool exit_loop = false; + size_t* pc = &scheduler->pc[worker_number]; + + function_virtual_instruction_t func; + operand_t op1; + operand_t op2; + operand_t op3; + bool debug; + + while (!exit_loop) { + func = current_schedule[*pc].func; + op1 = current_schedule[*pc].op1; + op2 = current_schedule[*pc].op2; + op3 = current_schedule[*pc].op3; + debug = current_schedule[*pc].debug; + + // Execute the current instruction + func(scheduler, worker_number, op1, op2, op3, debug, pc, + &returned_reaction, &exit_loop); + } + + LF_PRINT_DEBUG("Worker %d leaves lf_sched_get_ready_reaction", worker_number); + return returned_reaction; +} + +/** + * @brief Inform the scheduler that worker thread 'worker_number' is done + * executing the 'done_reaction'. + * + * @param worker_number The worker number for the worker thread that has + * finished executing 'done_reaction'. + * @param done_reaction The reaction that is done. + */ +void lf_sched_done_with_reaction(size_t worker_number, + reaction_t* done_reaction) {} + +/** + * @brief Inform the scheduler that worker thread 'worker_number' would like to + * trigger 'reaction' at the current tag. + * + * If a worker number is not available (e.g., this function is not called by a + * worker thread), -1 should be passed as the 'worker_number'. + * + * This scheduler ignores the worker number. + * + * The scheduler will ensure that the same reaction is not triggered twice in + * the same tag. + * + * @param reaction The reaction to trigger at the current tag. + * @param worker_number The ID of the worker that is making this call. 0 should + * be used if there is only one worker (e.g., when the program is using the + * unthreaded C runtime). -1 is used for an anonymous call in a context where a + * worker number does not make sense (e.g., the caller is not a worker thread). + * + */ +void lf_scheduler_trigger_reaction(lf_scheduler_t* scheduler, reaction_t* reaction, int worker_number) {} +#endif +#endif diff --git a/core/utils/CMakeLists.txt b/core/utils/CMakeLists.txt index 723b942c8..9e4928ae0 100644 --- a/core/utils/CMakeLists.txt +++ b/core/utils/CMakeLists.txt @@ -1,4 +1,4 @@ -set(UTIL_SOURCES vector.c pqueue_base.c pqueue_tag.c pqueue.c util.c) +set(UTIL_SOURCES vector.c pqueue_base.c pqueue_tag.c pqueue.c util.c circular_buffer.c) if(NOT DEFINED LF_SINGLE_THREADED) list(APPEND UTIL_SOURCES lf_semaphore.c) diff --git a/core/utils/circular_buffer.c b/core/utils/circular_buffer.c new file mode 100644 index 000000000..67c47abc7 --- /dev/null +++ b/core/utils/circular_buffer.c @@ -0,0 +1,93 @@ +/** + * @file circular_buffer.c + * @brief A circular buffer implementation from stack overflow + * (https://stackoverflow.com/questions/827691/how-do-you-implement-a-circular-buffer-in-c) + * + * @copyright Copyright (c) 2024 + * + */ + +#include "util.h" +#include "circular_buffer.h" +#include "lf_types.h" + +void cb_init(circular_buffer *cb, size_t capacity, size_t sz) +{ + cb->buffer = malloc(capacity * sz); + if(cb->buffer == NULL) { + // handle error + lf_print("ERROR: Fail to allocate memory to circular buffer."); + return; + } + cb->buffer_end = (char *)cb->buffer + capacity * sz; + cb->capacity = capacity; + cb->count = 0; + cb->sz = sz; + cb->head = cb->buffer; + cb->tail = cb->buffer; +} + +void cb_free(circular_buffer *cb) +{ + free(cb->buffer); + // clear out other fields too, just to be safe +} + +void cb_push_back(circular_buffer *cb, const void *item) +{ + if(cb->count == cb->capacity){ + lf_print("ERROR: Buffer is full. Some in-flight events will be overwritten!"); + } + memcpy(cb->head, item, cb->sz); + cb->head = (char*)cb->head + cb->sz; + if(cb->head == cb->buffer_end) + cb->head = cb->buffer; + cb->count++; +} + +void cb_pop_front(circular_buffer *cb, void *item) +{ + if(cb->count == 0){ + // handle error + lf_print("ERROR: Popping from an empty buffer!"); + return; + } + memcpy(item, cb->tail, cb->sz); + cb->tail = (char*)cb->tail + cb->sz; + if(cb->tail == cb->buffer_end) + cb->tail = cb->buffer; + cb->count--; +} + +void cb_remove_front(circular_buffer *cb) +{ + if(cb->count == 0){ + // handle error + lf_print("ERROR: Removing from an empty buffer!"); + return; + } + cb->tail = (char*)cb->tail + cb->sz; + if(cb->tail == cb->buffer_end) + cb->tail = cb->buffer; + cb->count--; +} + +void* cb_peek(circular_buffer *cb) +{ + if(cb->count == 0) + return NULL; + return cb->tail; +} + +void cb_dump_events(circular_buffer *cb) +{ + lf_print("*** Dumping Events ***"); + void *p = cb->tail; + while (p != cb->head) { + event_t* e = (event_t*)p; + lf_print("Event @ %lld w/ token %p", e->base.tag.time, e->token); + p += cb->sz; + if (p == cb->buffer_end) p = cb->buffer; + } + lf_print("**********************"); +} \ No newline at end of file diff --git a/include/api/reaction_macros.h b/include/api/reaction_macros.h index 52c39e125..fac8cd462 100644 --- a/include/api/reaction_macros.h +++ b/include/api/reaction_macros.h @@ -69,6 +69,14 @@ * reactor in form input_name.port_name. * @param value The value to insert into the self struct. */ +#if SCHEDULER == SCHED_STATIC +#define lf_set(out, val) \ + do { \ + out->value = val; \ + lf_set_present(out); \ + out->token = (lf_token_t *)(uintptr_t)val; /* The long-term solution is to generate an event type for each connection buffer of primitive type. */ \ + } while(0) +#else #define lf_set(out, val) \ do { \ out->value = val; \ @@ -80,6 +88,7 @@ out->token = token; \ } \ } while (0) +#endif /** * @brief Set the specified output (or input of a contained reactor) @@ -183,6 +192,29 @@ // As long as this is done from the context of a reaction, `self` is in scope and is a pointer to the self-struct // of the current reactor. +// The fully static (SCHED_STATIC) runtime, uses time local to each reactor. If this is the case +// then we defined these macros to access that timestamp rather than using the standard API +// FIXME (erj): I am not really stoked about this added complexity +#if defined REACTOR_LOCAL_TIME + +/** + * Return the current tag of the environment invoking this reaction. + */ +#define lf_tag() self->base.tag + +/** + * Return the current logical time in nanoseconds of the environment invoking this reaction. + */ +#define lf_time_logical() self->base.tag.time + +/** + * Return the current logical time of the environment invoking this reaction relative to the + * start time in nanoseconds. + */ +#define lf_time_logical_elapsed() (self->base.tag.time - lf_time_start()) + +#else + /** * Return the current tag of the environment invoking this reaction. */ @@ -199,4 +231,5 @@ */ #define lf_time_logical_elapsed() lf_time_logical_elapsed(self->base.environment) +#endif // REACTOR_LOCAL_TIME #endif // REACTION_MACROS_H diff --git a/include/core/environment.h b/include/core/environment.h index 9f3960a6f..dc96821d3 100644 --- a/include/core/environment.h +++ b/include/core/environment.h @@ -85,10 +85,18 @@ typedef struct environment_t { lf_cond_t global_tag_barrier_requestors_reached_zero; #endif // LF_SINGLE_THREADED #if defined(FEDERATED) - tag_t** _lf_intended_tag_fields; - int _lf_intended_tag_fields_size; - bool need_to_send_LTC; -#endif // FEDERATED + tag_t** _lf_intended_tag_fields; + int _lf_intended_tag_fields_size; + bool need_to_send_LTC; +#endif // FEDERATED +#if SCHEDULER == SCHED_STATIC + self_base_t** reactor_self_array; + int reactor_self_array_size; + reaction_t** reaction_array; + int reaction_array_size; + event_t** pqueue_heads; + int num_pqueue_heads; +#endif #ifdef LF_ENCLAVES // TODO: Consider dropping #ifdef enclave_info_t* enclave_info; #endif diff --git a/include/core/lf_types.h b/include/core/lf_types.h index b6d754ca2..0e7625377 100644 --- a/include/core/lf_types.h +++ b/include/core/lf_types.h @@ -47,6 +47,14 @@ typedef unsigned short int ushort; #define SCHED_ADAPTIVE 1 #define SCHED_GEDF_NP 2 #define SCHED_NP 3 +#define SCHED_STATIC 4 + +// If we use the fully static scheduler, then we want local time at each reactor +#if SCHEDULER == SCHED_STATIC + #ifndef REACTOR_LOCAL_TIME + #define REACTOR_LOCAL_TIME + #endif +#endif /* * A struct representing a barrier in threaded @@ -284,6 +292,13 @@ typedef struct self_base_t { #if defined(MODAL_REACTORS) reactor_mode_state_t _lf__mode_state; // The current mode (for modal models). #endif +// This is used by e.g. the fully static scheduler (SCHED_STATIC) +#if defined REACTOR_LOCAL_TIME // FIXME: The output_ports pointers isnt obviously related to local time + tag_t tag; // The current tag of the reactor instance. + lf_port_base_t **output_ports; // An array of pointers to output ports of this reactor. + // Used to reset the is_present fields + int num_output_ports; +#endif } self_base_t; /** diff --git a/include/core/reactor.h b/include/core/reactor.h index fffa9ba19..3049c1ba3 100644 --- a/include/core/reactor.h +++ b/include/core/reactor.h @@ -18,6 +18,7 @@ #ifndef REACTOR_H #define REACTOR_H +#include // memcpy #include "lf_types.h" #include "modes.h" // Modal model support #include "port.h" @@ -25,6 +26,7 @@ #include "clock.h" // Time-related functions. #include "tracepoint.h" #include "util.h" +#include "circular_buffer.h" // HACK: So that circular buffer is visible in all user-facing reactor header files. /** * @brief Macro to suppress warnings about unused variables. diff --git a/include/core/reactor_common.h b/include/core/reactor_common.h index 8086ed7db..db0bbda5c 100644 --- a/include/core/reactor_common.h +++ b/include/core/reactor_common.h @@ -31,6 +31,12 @@ ////////////////////// Constants & Macros ////////////////////// +// FIXME (erj): Super hack to disable chain optimzation when we are using the SCHED_STATIC runtime. +#if SCHEDULER == SCHED_STATIC +#else +#define REACTION_CHAIN_OPTIMIZATION +#endif + /** * @brief Constant giving the minimum amount of time to sleep to wait * for physical time to reach a logical time. diff --git a/include/core/threaded/scheduler_instance.h b/include/core/threaded/scheduler_instance.h index df55a86be..e620731b1 100644 --- a/include/core/threaded/scheduler_instance.h +++ b/include/core/threaded/scheduler_instance.h @@ -19,6 +19,14 @@ #include #include // for size_t +#if SCHEDULER == SCHED_STATIC +// Forward declaration so that lf_scheduler_t is visible in +// scheduler_instructions.h +typedef struct lf_scheduler_t lf_scheduler_t; +#include "lf_types.h" +#include "scheduler_instructions.h" +#endif + #define DEFAULT_MAX_REACTION_LEVEL 100 // Forward declarations @@ -31,13 +39,18 @@ typedef struct custom_scheduler_data_t custom_scheduler_data_t; * @note Members of this struct are added based on existing schedulers' needs. * These should be expanded to accommodate new schedulers. */ -typedef struct lf_scheduler_t { - struct environment_t* env; - /** - * @brief Maximum number of levels for reactions in the program. - * - */ - size_t max_reaction_level; +typedef struct lf_scheduler_t { + /** + * @brief Environment which the scheduler has access to. + * + */ + struct environment_t * env; + + /** + * @brief Maximum number of levels for reactions in the program. + * + */ + size_t max_reaction_level; /** * @brief Indicate whether the program should stop @@ -68,6 +81,45 @@ typedef struct lf_scheduler_t { */ volatile size_t number_of_idle_workers; +#if SCHEDULER == SCHED_STATIC + + /** + * @brief Points to an array of program counters for each worker. + * + */ + size_t* pc; + + /** + * @brief Points to a read-only array of static schedules. + * + */ + const inst_t** static_schedules; + + /** + * @brief Points to an array of pointers to reactor self instances. + * + */ + self_base_t** reactor_self_instances; + + /** + * @brief The total number of reactor self instances. + * + */ + size_t num_reactor_self_instances; + + /** + * @brief Points to an array of pointers to reaction instances. + * + */ + reaction_t** reaction_instances; + + /** + * @brief Points to an array of integer counters. + * + */ + volatile uint32_t* counters; + +#endif // Pointer to an optional custom data structure that each scheduler can define. // The type is forward declared here and must be declared again in the scheduler source file // Is not touched by `init_sched_instance` and must be initialized by each scheduler that needs it @@ -110,4 +162,12 @@ typedef struct { bool init_sched_instance(struct environment_t* env, lf_scheduler_t** instance, size_t number_of_workers, sched_params_t* params); +#if SCHEDULER == SCHED_STATIC +/** + * @brief Initialize the static schedule by filling in placeholders which are + * not considered "compile-time constants" by the compiler. + */ +void initialize_static_schedule(); +#endif + #endif // LF_SCHEDULER_PARAMS_H diff --git a/include/core/threaded/scheduler_instructions.h b/include/core/threaded/scheduler_instructions.h new file mode 100644 index 000000000..91b8886e9 --- /dev/null +++ b/include/core/threaded/scheduler_instructions.h @@ -0,0 +1,76 @@ +/** + * @brief Format of the instruction set + */ +#ifndef SCHEDULER_INSTRUCTIONS_H +#define SCHEDULER_INSTRUCTIONS_H + +typedef enum { + ADD, + ADDI, + ADV, + ADVI, + BEQ, + BGE, + BLT, + BNE, + DU, + EXE, + JAL, + JALR, + STP, + WLT, + WU, +} opcode_t; + + +/** + * @brief Convenient typedefs for the data types used by the C implementation of + * PRET VM. A register is 64bits and an immediate is 64bits. This avoids any + * issue with time and overflow. Arguably it is worth it even for smaller + * platforms. + * + */ +typedef volatile uint64_t reg_t; +typedef uint64_t imm_t; + +/** + * @brief An union representing a single operand for the PRET VM. A union + * means that we have one piece of memory, which is big enough to fit either + * one of the two members of the union. + * + */ +typedef union { + reg_t* reg; + imm_t imm; +} operand_t; + +/** + * @brief Virtual instruction function pointer + */ +typedef void (*function_virtual_instruction_t)( + lf_scheduler_t* scheduler, + size_t worker_number, + operand_t op1, + operand_t op2, + operand_t op3, + bool debug, + size_t* pc, + reaction_t** returned_reaction, + bool* exit_loop); + +/** + * @brief This struct represents a PRET VM instruction for C platforms. + * There is an opcode and three operands. The operands are unions so they + * can be either a pointer or an immediate + * + */ +typedef struct inst_t { + function_virtual_instruction_t func; + opcode_t opcode; + operand_t op1; + operand_t op2; + operand_t op3; + bool debug; +} inst_t; + +#endif \ No newline at end of file diff --git a/include/core/threaded/scheduler_static_functions.h b/include/core/threaded/scheduler_static_functions.h new file mode 100644 index 000000000..11874dac3 --- /dev/null +++ b/include/core/threaded/scheduler_static_functions.h @@ -0,0 +1,48 @@ +#ifndef SCHEDULER_STATIC_FUNCTION_H +#define SCHEDULER_STATIC_FUNCTION_H + +/** + * @brief Function type with a void* argument. To make this type represent a + * generic function, one can write a wrapper function around the target function + * and use the first argument as a pointer to a struct of input arguments + * and return values. + */ +typedef void(*function_generic_t)(void*); + +/** + * @brief Wrapper function for peeking a priority queue. + */ +void push_pop_peek_pqueue(void* self); + +void execute_inst_ADD(lf_scheduler_t* scheduler, size_t worker_number, operand_t op1, operand_t op2, operand_t op3, bool debug, size_t* pc, + reaction_t** returned_reaction, bool* exit_loop); +void execute_inst_ADDI(lf_scheduler_t* scheduler, size_t worker_number, operand_t op1, operand_t op2, operand_t op3, bool debug, size_t* pc, + reaction_t** returned_reaction, bool* exit_loop); +void execute_inst_ADV(lf_scheduler_t* scheduler, size_t worker_number, operand_t op1, operand_t op2, operand_t op3, bool debug, size_t* pc, + reaction_t** returned_reaction, bool* exit_loop); +void execute_inst_ADVI(lf_scheduler_t* scheduler, size_t worker_number, operand_t op1, operand_t op2, operand_t op3, bool debug, size_t* pc, + reaction_t** returned_reaction, bool* exit_loop); +void execute_inst_BEQ(lf_scheduler_t* scheduler, size_t worker_number, operand_t op1, operand_t op2, operand_t op3, bool debug, size_t* pc, + reaction_t** returned_reaction, bool* exit_loop); +void execute_inst_BGE(lf_scheduler_t* scheduler, size_t worker_number, operand_t op1, operand_t op2, operand_t op3, bool debug, size_t* pc, + reaction_t** returned_reaction, bool* exit_loop); +void execute_inst_BLT(lf_scheduler_t* scheduler, size_t worker_number, operand_t op1, operand_t op2, operand_t op3, bool debug, size_t* pc, + reaction_t** returned_reaction, bool* exit_loop); +void execute_inst_BNE(lf_scheduler_t* scheduler, size_t worker_number, operand_t op1, operand_t op2, operand_t op3, bool debug, size_t* pc, + reaction_t** returned_reaction, bool* exit_loop); +void execute_inst_DU(lf_scheduler_t* scheduler, size_t worker_number, operand_t op1, operand_t op2, operand_t op3, bool debug, size_t* pc, + reaction_t** returned_reaction, bool* exit_loop); +void execute_inst_EXE(lf_scheduler_t* scheduler, size_t worker_number, operand_t op1, operand_t op2, operand_t op3, bool debug, size_t* pc, + reaction_t** returned_reaction, bool* exit_loop); +void execute_inst_WLT(lf_scheduler_t* scheduler, size_t worker_number, operand_t op1, operand_t op2, operand_t op3, bool debug, size_t* pc, + reaction_t** returned_reaction, bool* exit_loop); +void execute_inst_WU(lf_scheduler_t* scheduler, size_t worker_number, operand_t op1, operand_t op2, operand_t op3, bool debug, size_t* pc, + reaction_t** returned_reaction, bool* exit_loop); +void execute_inst_JAL(lf_scheduler_t* scheduler, size_t worker_number, operand_t op1, operand_t op2, operand_t op3, bool debug, size_t* pc, + reaction_t** returned_reaction, bool* exit_loop); +void execute_inst_JALR(lf_scheduler_t* scheduler, size_t worker_number, operand_t op1, operand_t op2, operand_t op3, bool debug, size_t* pc, + reaction_t** returned_reaction, bool* exit_loop); +void execute_inst_STP(lf_scheduler_t* scheduler, size_t worker_number, operand_t op1, operand_t op2, operand_t op3, bool debug, size_t* pc, + reaction_t** returned_reaction, bool* exit_loop); + +#endif \ No newline at end of file diff --git a/include/core/tracepoint.h b/include/core/tracepoint.h index 4a8a5bc79..8a96d559a 100644 --- a/include/core/tracepoint.h +++ b/include/core/tracepoint.h @@ -198,6 +198,117 @@ void tracepoint_user_value(void* self, char* description, long long value); */ void lf_tracing_check_version(); +//////////////////////////////////////////////////////////// +//// For quasi-static scheduling + +#if SCHEDULER == SCHED_STATIC + +#define NULL_TAG ((tag_t){.time=LLONG_MAX, .microstep=0}) + +#define tracepoint_static_scheduler_ADD_starts(worker, pc) \ + call_tracepoint(static_scheduler_ADD_starts, NULL, NULL_TAG, worker, worker, pc, NULL, NULL, 0) + +#define tracepoint_static_scheduler_ADD_ends(worker, pc) \ + call_tracepoint(static_scheduler_ADD_ends, NULL, NULL_TAG, worker, worker, pc, NULL, NULL, 0) + +#define tracepoint_static_scheduler_ADDI_starts(worker, pc) \ + call_tracepoint(static_scheduler_ADDI_starts, NULL, NULL_TAG, worker, worker, pc, NULL, NULL, 0) + +#define tracepoint_static_scheduler_ADDI_ends(worker, pc) \ + call_tracepoint(static_scheduler_ADDI_ends, NULL, NULL_TAG, worker, worker, pc, NULL, NULL, 0) + +#define tracepoint_static_scheduler_ADV_starts(worker, pc) \ + call_tracepoint(static_scheduler_ADV_starts, NULL, NULL_TAG, worker, worker, pc, NULL, NULL, 0) + +#define tracepoint_static_scheduler_ADV_ends(worker, pc) \ + call_tracepoint(static_scheduler_ADV_ends, NULL, NULL_TAG, worker, worker, pc, NULL, NULL, 0) + +#define tracepoint_static_scheduler_ADVI_starts(worker, pc) \ + call_tracepoint(static_scheduler_ADVI_starts, NULL, NULL_TAG, worker, worker, pc, NULL, NULL, 0) + +#define tracepoint_static_scheduler_ADVI_ends(worker, pc) \ + call_tracepoint(static_scheduler_ADVI_ends, NULL, NULL_TAG, worker, worker, pc, NULL, NULL, 0) + +#define tracepoint_static_scheduler_BEQ_starts(worker, pc) \ + call_tracepoint(static_scheduler_BEQ_starts, NULL, NULL_TAG, worker, worker, pc, NULL, NULL, 0) + +#define tracepoint_static_scheduler_BEQ_ends(worker, pc) \ + call_tracepoint(static_scheduler_BEQ_ends, NULL, NULL_TAG, worker, worker, pc, NULL, NULL, 0) + +#define tracepoint_static_scheduler_BGE_starts(worker, pc) \ + call_tracepoint(static_scheduler_BGE_starts, NULL, NULL_TAG, worker, worker, pc, NULL, NULL, 0) + +#define tracepoint_static_scheduler_BGE_ends(worker, pc) \ + call_tracepoint(static_scheduler_BGE_ends, NULL, NULL_TAG, worker, worker, pc, NULL, NULL, 0) + +#define tracepoint_static_scheduler_BLT_starts(worker, pc) \ + call_tracepoint(static_scheduler_BLT_starts, NULL, NULL_TAG, worker, worker, pc, NULL, NULL, 0) + +#define tracepoint_static_scheduler_BLT_ends(worker, pc) \ + call_tracepoint(static_scheduler_BLT_ends, NULL, NULL_TAG, worker, worker, pc, NULL, NULL, 0) + +#define tracepoint_static_scheduler_BNE_starts(worker, pc) \ + call_tracepoint(static_scheduler_BNE_starts, NULL, NULL_TAG, worker, worker, pc, NULL, NULL, 0) + +#define tracepoint_static_scheduler_BNE_ends(worker, pc) \ + call_tracepoint(static_scheduler_BNE_ends, NULL, NULL_TAG, worker, worker, pc, NULL, NULL, 0) + +#define tracepoint_static_scheduler_DU_starts(worker, pc) \ + call_tracepoint(static_scheduler_DU_starts, NULL, NULL_TAG, worker, worker, pc, NULL, NULL, 0) + +#define tracepoint_static_scheduler_DU_ends(worker, pc) \ + call_tracepoint(static_scheduler_DU_ends, NULL, NULL_TAG, worker, worker, pc, NULL, NULL, 0) + +#define tracepoint_static_scheduler_EXE_starts(worker, pc) \ + call_tracepoint(static_scheduler_EXE_starts, NULL, NULL_TAG, worker, worker, pc, NULL, NULL, 0) + +#define tracepoint_static_scheduler_EXE_ends(worker, pc) \ + call_tracepoint(static_scheduler_EXE_ends, NULL, NULL_TAG, worker, worker, pc, NULL, NULL, 0) + +#define tracepoint_static_scheduler_JAL_starts(worker, pc) \ + call_tracepoint(static_scheduler_JAL_starts, NULL, NULL_TAG, worker, worker, pc, NULL, NULL, 0) + +#define tracepoint_static_scheduler_JAL_ends(worker, pc) \ + call_tracepoint(static_scheduler_JAL_ends, NULL, NULL_TAG, worker, worker, pc, NULL, NULL, 0) + +#define tracepoint_static_scheduler_JALR_starts(worker, pc) \ + call_tracepoint(static_scheduler_JALR_starts, NULL, NULL_TAG, worker, worker, pc, NULL, NULL, 0) + +#define tracepoint_static_scheduler_JALR_ends(worker, pc) \ + call_tracepoint(static_scheduler_JALR_ends, NULL, NULL_TAG, worker, worker, pc, NULL, NULL, 0) + +#define tracepoint_static_scheduler_STP_starts(worker, pc) \ + call_tracepoint(static_scheduler_STP_starts, NULL, NULL_TAG, worker, worker, pc, NULL, NULL, 0) + +#define tracepoint_static_scheduler_STP_ends(worker, pc) \ + call_tracepoint(static_scheduler_STP_ends, NULL, NULL_TAG, worker, worker, pc, NULL, NULL, 0) + +#define tracepoint_static_scheduler_WLT_starts(worker, pc) \ + call_tracepoint(static_scheduler_WLT_starts, NULL, NULL_TAG, worker, worker, pc, NULL, NULL, 0) + +#define tracepoint_static_scheduler_WLT_ends(worker, pc) \ + call_tracepoint(static_scheduler_WLT_ends, NULL, NULL_TAG, worker, worker, pc, NULL, NULL, 0) + +#define tracepoint_static_scheduler_WU_starts(worker, pc) \ + call_tracepoint(static_scheduler_WU_starts, NULL, NULL_TAG, worker, worker, pc, NULL, NULL, 0) + +#define tracepoint_static_scheduler_WU_ends(worker, pc) \ + call_tracepoint(static_scheduler_WU_ends, NULL, NULL_TAG, worker, worker, pc, NULL, NULL, 0) + +/** + * A special case when EXE is called on a reaction. In this case, + * "reaction_starts" is used similar to tracepoint_reaction_starts(). + */ +#define tracepoint_static_scheduler_EXE_reaction_starts(reactor, worker, reaction_number) \ + tag_t tag = ((self_base_t *)reactor)->tag; \ + call_tracepoint(reaction_starts, reactor, tag, worker, worker, reaction_number, NULL, NULL, 0) + +#define tracepoint_static_scheduler_EXE_reaction_ends(reactor, worker, reaction_number) \ + tag_t tag = ((self_base_t *)reactor)->tag; \ + call_tracepoint(reaction_ends, reactor, tag, worker, worker, reaction_number, NULL, NULL, 0) + +#endif // SCHED_STATIC + //////////////////////////////////////////////////////////// //// For federated execution @@ -401,6 +512,18 @@ static inline void lf_tracing_set_start_time(int64_t start_time) { (void)start_t (void)reaction; \ (void)worker; \ } +#define tracepoint_static_scheduler_EXE_reaction_starts(reactor, worker, reaction_number) \ + while (0) { \ + (void)reactor; \ + (void)worker; \ + (void)reaction_number; \ + } +#define tracepoint_static_scheduler_EXE_reaction_ends(reactor, worker, reaction_number) \ + while (0) { \ + (void)reactor; \ + (void)worker; \ + (void)reaction_number; \ + } #endif // LF_TRACE #endif // TRACEPOINT_H diff --git a/include/core/utils/circular_buffer.h b/include/core/utils/circular_buffer.h new file mode 100644 index 000000000..b8f25928f --- /dev/null +++ b/include/core/utils/circular_buffer.h @@ -0,0 +1,26 @@ +#ifndef CIRCULAR_BUFFER_H +#define CIRCULAR_BUFFER_H + +#include +#include + +typedef struct circular_buffer +{ + void *buffer; // data buffer + void *buffer_end; // end of data buffer + size_t capacity; // maximum number of items in the buffer + size_t count; // number of items in the buffer + size_t sz; // size of each item in the buffer + void *head; // pointer to head + void *tail; // pointer to tail +} circular_buffer; + +void cb_init(circular_buffer *cb, size_t capacity, size_t sz); +void cb_free(circular_buffer *cb); +void cb_push_back(circular_buffer *cb, const void *item); +void cb_pop_front(circular_buffer *cb, void *item); +void cb_remove_front(circular_buffer *cb); +void* cb_peek(circular_buffer *cb); +void cb_dump_events(circular_buffer *cb); + +#endif \ No newline at end of file diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt new file mode 100644 index 000000000..def1bbd2e --- /dev/null +++ b/test/CMakeLists.txt @@ -0,0 +1 @@ +add_library(test-lib SCHED_STATIC src_gen_stub.c rand_utils.c) \ No newline at end of file diff --git a/trace/api/types/trace_types.h b/trace/api/types/trace_types.h index 6d8758fa4..7001ac9fa 100644 --- a/trace/api/types/trace_types.h +++ b/trace/api/types/trace_types.h @@ -15,70 +15,102 @@ * string representation below. Also, create a tracepoint function * for each event type. */ -typedef enum { - reaction_starts, - reaction_ends, - reaction_deadline_missed, - schedule_called, - user_event, - user_value, - worker_wait_starts, - worker_wait_ends, - scheduler_advancing_time_starts, - scheduler_advancing_time_ends, - federated, // Everything below this is for tracing federated interactions. - // Sending messages - send_ACK, - send_FAILED, - send_TIMESTAMP, - send_NET, - send_LTC, - send_STOP_REQ, - send_STOP_REQ_REP, - send_STOP_GRN, - send_FED_ID, - send_PTAG, - send_TAG, - send_REJECT, - send_RESIGN, - send_PORT_ABS, - send_CLOSE_RQ, - send_TAGGED_MSG, - send_P2P_TAGGED_MSG, - send_MSG, - send_P2P_MSG, - send_ADR_AD, - send_ADR_QR, - // Receiving messages - receive_ACK, - receive_FAILED, - receive_TIMESTAMP, - receive_NET, - receive_LTC, - receive_STOP_REQ, - receive_STOP_REQ_REP, - receive_STOP_GRN, - receive_FED_ID, - receive_PTAG, - receive_TAG, - receive_REJECT, - receive_RESIGN, - receive_PORT_ABS, - receive_CLOSE_RQ, - receive_TAGGED_MSG, - receive_P2P_TAGGED_MSG, - receive_MSG, - receive_P2P_MSG, - receive_ADR_AD, - receive_ADR_QR, - receive_UNIDENTIFIED, - NUM_EVENT_TYPES +typedef enum +{ + reaction_starts, + reaction_ends, + reaction_deadline_missed, + schedule_called, + user_event, + user_value, + worker_wait_starts, + worker_wait_ends, + scheduler_advancing_time_starts, + scheduler_advancing_time_ends, + // Static scheduler instructions + static_scheduler_ADD_starts, + static_scheduler_ADDI_starts, + static_scheduler_ADV_starts, + static_scheduler_ADVI_starts, + static_scheduler_BEQ_starts, + static_scheduler_BGE_starts, + static_scheduler_BLT_starts, + static_scheduler_BNE_starts, + static_scheduler_DU_starts, + static_scheduler_EXE_starts, + static_scheduler_JAL_starts, + static_scheduler_JALR_starts, + static_scheduler_STP_starts, + static_scheduler_WLT_starts, + static_scheduler_WU_starts, + static_scheduler_ADD_ends, + static_scheduler_ADDI_ends, + static_scheduler_ADV_ends, + static_scheduler_ADVI_ends, + static_scheduler_BEQ_ends, + static_scheduler_BGE_ends, + static_scheduler_BLT_ends, + static_scheduler_BNE_ends, + static_scheduler_DU_ends, + static_scheduler_EXE_ends, + static_scheduler_JAL_ends, + static_scheduler_JALR_ends, + static_scheduler_STP_ends, + static_scheduler_WLT_ends, + static_scheduler_WU_ends, + federated, // Everything below this is for tracing federated interactions. + // Sending messages + send_ACK, + send_FAILED, + send_TIMESTAMP, + send_NET, + send_LTC, + send_STOP_REQ, + send_STOP_REQ_REP, + send_STOP_GRN, + send_FED_ID, + send_PTAG, + send_TAG, + send_REJECT, + send_RESIGN, + send_PORT_ABS, + send_CLOSE_RQ, + send_TAGGED_MSG, + send_P2P_TAGGED_MSG, + send_MSG, + send_P2P_MSG, + send_ADR_AD, + send_ADR_QR, + // Receiving messages + receive_ACK, + receive_FAILED, + receive_TIMESTAMP, + receive_NET, + receive_LTC, + receive_STOP_REQ, + receive_STOP_REQ_REP, + receive_STOP_GRN, + receive_FED_ID, + receive_PTAG, + receive_TAG, + receive_REJECT, + receive_RESIGN, + receive_PORT_ABS, + receive_CLOSE_RQ, + receive_TAGGED_MSG, + receive_P2P_TAGGED_MSG, + receive_MSG, + receive_P2P_MSG, + receive_ADR_AD, + receive_ADR_QR, + receive_UNIDENTIFIED, + NUM_EVENT_TYPES } trace_event_t; /** * String description of event types. */ -static const char* trace_event_names[] = { +static const char *trace_event_names[] = { "Reaction starts", "Reaction ends", "Reaction deadline missed", @@ -89,6 +121,37 @@ static const char* trace_event_names[] = { "Worker wait ends", "Scheduler advancing time starts", "Scheduler advancing time ends", + // Static scheduler instructions + "ADD", + "ADDI", + "ADV", + "ADVI", + "BEQ", + "BGE", + "BLT", + "BNE", + "DU", + "EXE", + "JAL", + "JALR", + "STP", + "WLT", + "WU", + "End ADD", + "End ADDI", + "End ADV", + "End ADVI", + "End BEQ", + "End BGE", + "End BLT", + "End BNE", + "End DU", + "End EXE", + "End JAL", + "End JALR", + "End STP", + "End WLT", + "End WU", "Federated marker", // Sending messages "Sending ACK", diff --git a/util/tracing/trace_to_chrome.c b/util/tracing/trace_to_chrome.c index ff941cd4a..171364fbf 100644 --- a/util/tracing/trace_to_chrome.c +++ b/util/tracing/trace_to_chrome.c @@ -52,6 +52,16 @@ FILE* trace_file = NULL; /** File for writing the output data. */ FILE* output_file = NULL; +/** + * SPECIFIC TO QUASI-STATIC SCHEDULING: + * By default, the Chrome tracing displays events in us granularity. So + * timestamps by default are divided by scaling=1000 to show correct units in + * the GUI. However, for sub-us events, it is preferable to set scaling=1 so + * that the execution time of events are not abstracted to 0. + */ +int scaling_factor = 1000; +// int scaling_factor = 1; // For seeing sub-us events. + /** * Print a usage message. */ @@ -69,6 +79,79 @@ int max_reaction_number = 0; /** Indicator to plot vs. physical time only. */ bool physical_time_only = false; +/** + * SPECIFIC TO QUASI-STATIC SCHEDULING: + * A helper function for retriving virtual instruction name from event type + */ +char* get_instruction_name(trace_event_t event_type) { + switch(event_type) { + case static_scheduler_ADD_starts: + case static_scheduler_ADD_ends: + return "ADD"; + break; + case static_scheduler_ADDI_starts: + case static_scheduler_ADDI_ends: + return "ADDI"; + break; + case static_scheduler_ADV_starts: + case static_scheduler_ADV_ends: + return "ADV"; + break; + case static_scheduler_ADVI_starts: + case static_scheduler_ADVI_ends: + return "ADVI"; + break; + case static_scheduler_BEQ_starts: + case static_scheduler_BEQ_ends: + return "BEQ"; + break; + case static_scheduler_BGE_starts: + case static_scheduler_BGE_ends: + return "BGE"; + break; + case static_scheduler_BLT_starts: + case static_scheduler_BLT_ends: + return "BLT"; + break; + case static_scheduler_BNE_starts: + case static_scheduler_BNE_ends: + return "BNE"; + break; + case static_scheduler_DU_starts: + case static_scheduler_DU_ends: + return "DU"; + break; + case static_scheduler_EXE_starts: + case static_scheduler_EXE_ends: + return "EXE"; + break; + case static_scheduler_JAL_starts: + case static_scheduler_JAL_ends: + return "JAL"; + break; + case static_scheduler_JALR_starts: + case static_scheduler_JALR_ends: + return "JALR"; + break; + case static_scheduler_STP_starts: + case static_scheduler_STP_ends: + return "STP"; + break; + case static_scheduler_WLT_starts: + case static_scheduler_WLT_ends: + return "WLT"; + break; + case static_scheduler_WU_starts: + case static_scheduler_WU_ends: + return "WU"; + break; + default: + fprintf(stderr, "WARNING: Unrecognized virtual instruction detected: %s\n", + trace_event_names[event_type]); + return "UNKNOWN"; + } +} + /** * Read a trace in the trace_file and write it to the output_file in json. * @return The number of records read or 0 upon seeing an EOF. @@ -96,8 +179,11 @@ size_t read_and_write_trace() { if (trace[i].event_type == worker_wait_starts || trace[i].event_type == worker_wait_ends) { reactor_name = "WAIT"; } else if (trace[i].event_type == scheduler_advancing_time_starts || - trace[i].event_type == scheduler_advancing_time_starts) { + trace[i].event_type == scheduler_advancing_time_ends) { reactor_name = "ADVANCE TIME"; + } else if (strcmp(get_instruction_name(trace[i].event_type), "UNKNOWN") != 0) { + // Check if the event is related to PretVM. + reactor_name = get_instruction_name(trace[i].event_type); } else { reactor_name = "NO REACTOR"; } @@ -195,6 +281,126 @@ size_t read_and_write_trace() { pid = PID_FOR_WORKER_ADVANCING_TIME; phase = "E"; break; + case static_scheduler_ADD_starts: + pid = 0; + phase = "B"; + break; + case static_scheduler_ADDI_starts: + pid = 0; + phase = "B"; + break; + case static_scheduler_ADV_starts: + pid = 0; + phase = "B"; + break; + case static_scheduler_ADVI_starts: + pid = 0; + phase = "B"; + break; + case static_scheduler_BEQ_starts: + pid = 0; + phase = "B"; + break; + case static_scheduler_BGE_starts: + pid = 0; + phase = "B"; + break; + case static_scheduler_BLT_starts: + pid = 0; + phase = "B"; + break; + case static_scheduler_BNE_starts: + pid = 0; + phase = "B"; + break; + case static_scheduler_DU_starts: + pid = 0; + phase = "B"; + break; + case static_scheduler_EXE_starts: + pid = 0; + phase = "B"; + break; + case static_scheduler_JAL_starts: + pid = 0; + phase = "B"; + break; + case static_scheduler_JALR_starts: + pid = 0; + phase = "B"; + break; + case static_scheduler_STP_starts: + pid = 0; + phase = "B"; + break; + case static_scheduler_WLT_starts: + pid = 0; + phase = "B"; + break; + case static_scheduler_WU_starts: + pid = 0; + phase = "B"; + break; + case static_scheduler_ADD_ends: + pid = 0; + phase = "E"; + break; + case static_scheduler_ADDI_ends: + pid = 0; + phase = "E"; + break; + case static_scheduler_ADV_ends: + pid = 0; + phase = "E"; + break; + case static_scheduler_ADVI_ends: + pid = 0; + phase = "E"; + break; + case static_scheduler_BEQ_ends: + pid = 0; + phase = "E"; + break; + case static_scheduler_BGE_ends: + pid = 0; + phase = "E"; + break; + case static_scheduler_BLT_ends: + pid = 0; + phase = "E"; + break; + case static_scheduler_BNE_ends: + pid = 0; + phase = "E"; + break; + case static_scheduler_DU_ends: + pid = 0; + phase = "E"; + break; + case static_scheduler_EXE_ends: + pid = 0; + phase = "E"; + break; + case static_scheduler_JAL_ends: + pid = 0; + phase = "E"; + break; + case static_scheduler_JALR_ends: + pid = 0; + phase = "E"; + break; + case static_scheduler_STP_ends: + pid = 0; + phase = "E"; + break; + case static_scheduler_WLT_ends: + pid = 0; + phase = "E"; + break; + case static_scheduler_WU_ends: + pid = 0; + phase = "E"; + break; default: fprintf(stderr, "WARNING: Unrecognized event type %d: %s\n", trace[i].event_type, trace_event_names[trace[i].event_type]); @@ -443,4 +649,4 @@ int main(int argc, char* argv[]) { write_metadata_events(output_file); fprintf(output_file, "]}\n"); } -} +} \ No newline at end of file