Skip to content

Commit 528b804

Browse files
authored
Enable actions to be scheduled asyncronously (#280)
* Enable actions to be scheduled asyncronously * Format * Remove unnecessary info printing * Format
1 parent 61796a4 commit 528b804

File tree

5 files changed

+21
-18
lines changed

5 files changed

+21
-18
lines changed

include/reactor-uc/macros_api.h

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -58,10 +58,11 @@
5858
#define lf_schedule_with_val(action, offset, val) \
5959
do { \
6060
__typeof__(val) __val = (val); \
61-
lf_ret_t ret = (action)->super.schedule(&(action)->super, (offset), (const void *)&__val); \
61+
Action *__a = (Action *)(action); \
62+
lf_ret_t ret = __a->schedule(__a, (offset), (const void *)&__val); \
6263
if (ret == LF_FATAL) { \
6364
LF_ERR(TRIG, "Scheduling an value, that doesn't have value!"); \
64-
Scheduler *sched = (action)->super.super.parent->env->scheduler; \
65+
Scheduler *sched = __a->super.parent->env->scheduler; \
6566
sched->do_shutdown(sched, sched->current_tag(sched)); \
6667
throw("Tried to schedule a value onto an action without a type!"); \
6768
} \
@@ -70,7 +71,8 @@
7071
/// @private
7172
#define lf_schedule_without_val(action, offset) \
7273
do { \
73-
(action)->super.schedule(&(action)->super, (offset), NULL); \
74+
Action *__a = (Action *)(action); \
75+
__a->schedule(__a, (offset), NULL); \
7476
} while (0)
7577

7678
/// @private
@@ -104,11 +106,12 @@
104106
*/
105107
#define lf_schedule_array(action, offset, array) \
106108
do { \
107-
lf_ret_t ret = (action)->super.schedule(&(action)->super, (offset), (const void *)array); \
108-
if (ret == LF_FATAL) { \
109+
Action *__a = (Action *)(action); \
110+
lf_ret_t __ret = __a->schedule(__a, (offset), (const void *)array); \
111+
if (__ret == LF_FATAL) { \
109112
LF_ERR(TRIG, "Scheduling an value, that doesn't have value!"); \
110-
Scheduler *sched = (action)->super.super.parent->env->scheduler; \
111-
sched->do_shutdown(sched, sched->current_tag(sched)); \
113+
Scheduler *__sched = __a->super.parent->env->scheduler; \
114+
__sched->do_shutdown(__sched, __sched->current_tag(__sched)); \
112115
} \
113116
} while (0)
114117

src/environments/unfederated_environment.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,5 +110,5 @@ void Environment_ctor(Environment *self, Reactor *main, Scheduler *scheduler, bo
110110

111111
void Environment_free(Environment *self) {
112112
(void)self;
113-
LF_INFO(ENV, "Freeing top-level environment.");
113+
LF_DEBUG(ENV, "Freeing top-level environment.");
114114
}

src/federated.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -138,8 +138,8 @@ void FederatedInputConnection_ctor(FederatedInputConnection *self, Reactor *pare
138138
// a TaggedMessage available.
139139
void FederatedConnectionBundle_handle_tagged_msg(FederatedConnectionBundle *self, const FederateMessage *_msg) {
140140
const TaggedMessage *msg = &_msg->message.tagged_message;
141-
LF_INFO(FED, "Callback on FedConnBundle %p for message of size=%u with tag:" PRINTF_TAG, self, msg->payload.size,
142-
msg->tag);
141+
LF_DEBUG(FED, "Callback on FedConnBundle %p for message of size=%u with tag:" PRINTF_TAG, self, msg->payload.size,
142+
msg->tag);
143143
assert(((size_t)msg->conn_id) < self->inputs_size);
144144
lf_ret_t ret;
145145
FederatedInputConnection *input = self->inputs[msg->conn_id];

src/platform/posix/tcp_ip_channel.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ static lf_ret_t _TcpIpChannel_reset_socket(TcpIpChannel *self) {
121121

122122
static void _TcpIpChannel_spawn_worker_thread(TcpIpChannel *self) {
123123
int res;
124-
TCP_IP_CHANNEL_INFO("Spawning worker thread");
124+
TCP_IP_CHANNEL_DEBUG("Spawning worker thread");
125125

126126
memset(&self->worker_thread_stack, 0, TCP_IP_CHANNEL_RECV_THREAD_STACK_SIZE);
127127
if (pthread_attr_init(&self->worker_thread_attr) != 0) {
@@ -414,7 +414,7 @@ static void *_TcpIpChannel_worker_thread(void *untyped_self) {
414414
lf_ret_t ret;
415415
int res;
416416

417-
TCP_IP_CHANNEL_INFO("Starting worker thread");
417+
TCP_IP_CHANNEL_DEBUG("Starting worker thread");
418418

419419
while (true) {
420420
// Check if we have any pending cancel requests from the runtime.

src/startup_coordinator.c

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ static lf_ret_t StartupCoordinator_connect_to_neighbors_blocking(StartupCoordina
1414
FederatedEnvironment *env_fed = (FederatedEnvironment *)self->env;
1515
validate(self->state == StartupCoordinationState_UNINITIALIZED);
1616
self->state = StartupCoordinationState_CONNECTING;
17-
LF_INFO(FED, "%s connecting to %zu federated peers", self->env->main->name, env_fed->net_bundles_size);
17+
LF_DEBUG(FED, "%s connecting to %zu federated peers", self->env->main->name, env_fed->net_bundles_size);
1818
lf_ret_t ret;
1919

2020
// Open all connections.
@@ -50,8 +50,8 @@ static lf_ret_t StartupCoordinator_connect_to_neighbors_blocking(StartupCoordina
5050
}
5151
}
5252

53-
LF_INFO(FED, "%s Established connection to all %zu federated peers", self->env->main->name,
54-
env_fed->net_bundles_size);
53+
LF_DEBUG(FED, "%s Established connection to all %zu federated peers", self->env->main->name,
54+
env_fed->net_bundles_size);
5555
self->state = StartupCoordinationState_HANDSHAKING;
5656
return LF_OK;
5757
}
@@ -184,7 +184,7 @@ static void StartupCoordinator_handle_startup_handshake_response(StartupCoordina
184184
}
185185

186186
if (all_received) {
187-
LF_INFO(FED, "Handshake completed with %zu federated peers", self->num_neighbours);
187+
LF_DEBUG(FED, "Handshake completed with %zu federated peers", self->num_neighbours);
188188
self->state = StartupCoordinationState_NEGOTIATING;
189189
// Schedule the start time negotiation to occur immediately.
190190
StartupCoordinator_schedule_system_self_event(self, self->env->get_physical_time(self->env) + MSEC(50),
@@ -289,8 +289,8 @@ static void StartupCoordinator_handle_start_time_proposal(StartupCoordinator *se
289289
}
290290

291291
if (iteration_completed) {
292-
LF_INFO(FED, "Start time negotiation round %d completed. Current start time: " PRINTF_TIME,
293-
self->start_time_proposal_step, self->start_time_proposal);
292+
LF_DEBUG(FED, "Start time negotiation round %d completed. Current start time: " PRINTF_TIME,
293+
self->start_time_proposal_step, self->start_time_proposal);
294294
if (self->start_time_proposal_step == self->longest_path) {
295295
LF_INFO(FED, "Start time negotiation completed Starting at " PRINTF_TIME, self->start_time_proposal);
296296
self->state = StartupCoordinationState_RUNNING;

0 commit comments

Comments
 (0)