Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions include/reactor-uc/connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@ struct Connection {
size_t downstreams_registered; // Number of downstreams currently registered
void (*register_downstream)(Connection *, Port *);
Port *(*get_final_upstream)(Connection *);
void (*trigger_downstreams)(Connection *, const void *value_ptr, size_t value_size);
void (*trigger_downstreams)(Connection *, tag_t intended_tag, const void *value_ptr, size_t value_size);
};

void Connection_ctor(Connection *self, TriggerType type, Reactor *parent, Port **downstreams, size_t num_downstreams,
EventPayloadPool *payload_pool, void (*prepare)(Trigger *, Event *), void (*cleanup)(Trigger *),
void (*trigger_downstreams)(Connection *, const void *, size_t));
void (*trigger_downstreams)(Connection *, tag_t, const void *, size_t));

struct LogicalConnection {
Connection super;
Expand All @@ -44,6 +44,7 @@ struct DelayedConnection {
ConnectionType type;
EventPayloadPool payload_pool;
void *staged_payload_ptr;
tag_t intended_tag;
};

void DelayedConnection_ctor(DelayedConnection *self, Reactor *parent, Port **downstreams, size_t num_downstreams,
Expand Down
30 changes: 5 additions & 25 deletions lfc/core/src/main/java/org/lflang/generator/LFGenerator.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,33 +37,14 @@ public static FileConfig createFileConfig(
final Target target = Target.fromDecl(ASTUtils.targetDecl(resource));
assert target != null;

// if (FedASTUtils.findFederatedReactor(resource) != null) {
// return new FederationFileConfig(resource, srcGenBasePath, useHierarchicalBin);
// }

return switch (target) {
// case CCPP, C -> new CFileConfig(resource, srcGenBasePath, useHierarchicalBin);
// case Python -> new PyFileConfig(resource, srcGenBasePath, useHierarchicalBin);
// case CPP -> new CppFileConfig(resource, srcGenBasePath, useHierarchicalBin);
// case Rust -> new RustFileConfig(resource, srcGenBasePath, useHierarchicalBin);
// case TS -> new TSFileConfig(resource, srcGenBasePath, useHierarchicalBin);
case UC -> new UcFileConfig(resource, srcGenBasePath, useHierarchicalBin, runtimeSymlink);
};
return new UcFileConfig(resource, srcGenBasePath, useHierarchicalBin, runtimeSymlink);
}

/** Create a generator object for the given target. */
private GeneratorBase createGenerator(LFGeneratorContext context) {
final Target target = Target.fromDecl(ASTUtils.targetDecl(context.getFileConfig().resource));
assert target != null;
return switch (target) {
// case C -> new CGenerator(context, false);
// case CCPP -> new CGenerator(context, true);
// case Python -> new PythonGenerator(context);
// case CPP -> new CppGenerator(context, scopeProvider);
// case TS -> new TSGenerator(context);
// case Rust -> new RustGenerator(context, scopeProvider);
case UC -> createUcGenerator(context, scopeProvider);
};
return createUcGenerator(context, scopeProvider);
}

@Override
Expand All @@ -80,11 +61,10 @@ public void doGenerate(Resource resource, IFileSystemAccess2 fsa, IGeneratorCont
if (lfContext.getMode() == LFGeneratorContext.Mode.LSP_FAST) return;

final GeneratorBase generator = createGenerator(lfContext);
if (generator != null) {
generatorErrorsOccurred = generator.errorsOccurred();
generator.doGenerate(resource, lfContext);
}
generatorErrorsOccurred = generator.errorsOccurred();
generator.doGenerate(resource, lfContext);
final MessageReporter messageReporter = lfContext.getErrorReporter();

if (messageReporter instanceof LanguageServerMessageReporter) {
((LanguageServerMessageReporter) messageReporter).publishDiagnostics();
}
Expand Down
15 changes: 9 additions & 6 deletions src/connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ void Connection_register_downstream(Connection *self, Port *port) {
/**
* @brief Recursively walks down connection graph and copies value into Input ports and triggers reactions.
*/
void LogicalConnection_trigger_downstreams(Connection *self, const void *value, size_t value_size) {
void LogicalConnection_trigger_downstreams(Connection *self, tag_t intended_tag, const void *value, size_t value_size) {
LF_DEBUG(CONN, "Triggering downstreams of %p with value %p", self, value);
for (size_t i = 0; i < self->downstreams_registered; i++) {
Port *down = self->downstreams[i];
Expand All @@ -54,19 +54,20 @@ void LogicalConnection_trigger_downstreams(Connection *self, const void *value,
// "last write wins"
if (!down->super.is_present) {
down->super.prepare(&down->super, NULL);
down->intended_tag = intended_tag;
}
}

for (size_t i = 0; i < down->conns_out_registered; i++) {
LF_DEBUG(CONN, "Found further downstream connection %p to recurse down", down->conns_out[i]);
down->conns_out[i]->trigger_downstreams(down->conns_out[i], value, value_size);
down->conns_out[i]->trigger_downstreams(down->conns_out[i], intended_tag, value, value_size);
}
}
}

void Connection_ctor(Connection *self, TriggerType type, Reactor *parent, Port **downstreams, size_t num_downstreams,
EventPayloadPool *payload_pool, void (*prepare)(Trigger *, Event *), void (*cleanup)(Trigger *),
void (*trigger_downstreams)(Connection *, const void *, size_t)) {
void (*trigger_downstreams)(Connection *, tag_t, const void *, size_t)) {

self->upstream = NULL;
self->downstreams_size = num_downstreams;
Expand Down Expand Up @@ -100,7 +101,7 @@ void DelayedConnection_prepare(Trigger *trigger, Event *event) {
trigger->is_present = true;
sched->register_for_cleanup(sched, trigger);

LogicalConnection_trigger_downstreams(&self->super, event->super.payload, pool->payload_size);
LogicalConnection_trigger_downstreams(&self->super, event->intended_tag, event->super.payload, pool->payload_size);
validate(pool->free(pool, event->super.payload) == LF_OK);
}

Expand All @@ -123,7 +124,7 @@ void DelayedConnection_cleanup(Trigger *trigger) {
if (self->type == PHYSICAL_CONNECTION) {
base_tag.time = env->get_physical_time(env);
} else {
base_tag = sched->current_tag(sched);
base_tag = self->intended_tag;
}
tag_t tag = lf_delay_tag(base_tag, self->delay);
Event event = EVENT_INIT(tag, &self->super.super, self->staged_payload_ptr);
Expand All @@ -132,7 +133,8 @@ void DelayedConnection_cleanup(Trigger *trigger) {
}
}

void DelayedConnection_trigger_downstreams(Connection *_self, const void *value, size_t value_size) {
void DelayedConnection_trigger_downstreams(Connection *_self, tag_t intended_tag, const void *value,
size_t value_size) {
DelayedConnection *self = (DelayedConnection *)_self;
assert(value);
assert(value_size > 0);
Expand All @@ -149,6 +151,7 @@ void DelayedConnection_trigger_downstreams(Connection *_self, const void *value,
return;
}
}
self->intended_tag = intended_tag;
memcpy(self->staged_payload_ptr, value, value_size);
sched->register_for_cleanup(sched, &_self->super);
}
Expand Down
7 changes: 5 additions & 2 deletions src/federated.c
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@

// Called when a reaction does lf_set(outputPort). Should buffer the output data
// for later transmission.
void FederatedOutputConnection_trigger_downstream(Connection *_self, const void *value, size_t value_size) {
void FederatedOutputConnection_trigger_downstream(Connection *_self, tag_t intended_tag, const void *value,
size_t value_size) {
(void)intended_tag;
LF_DEBUG(FED, "Triggering downstreams on federated output connection %p. Stage for later TX", _self);
lf_ret_t ret;
FederatedOutputConnection *self = (FederatedOutputConnection *)_self;
Expand Down Expand Up @@ -111,7 +113,8 @@ void FederatedInputConnection_prepare(Trigger *trigger, Event *event) {

for (size_t i = 0; i < down->conns_out_registered; i++) {
LF_DEBUG(CONN, "Found further downstream connection %p to recurse down", down->conns_out[i]);
down->conns_out[i]->trigger_downstreams(down->conns_out[i], event->super.payload, pool->payload_size);
down->conns_out[i]->trigger_downstreams(down->conns_out[i], event->intended_tag, event->super.payload,
pool->payload_size);
}

pool->free(pool, event->super.payload);
Expand Down
3 changes: 2 additions & 1 deletion src/port.c
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ void Port_set(Port *self, const void *value) {

for (size_t i = 0; i < self->conns_out_registered; i++) {
Connection *conn = self->conns_out[i];
conn->trigger_downstreams(conn, value, self->value_size);
const Environment *env = self->super.parent->env;
conn->trigger_downstreams(conn, env->scheduler->current_tag(env->scheduler), value, self->value_size);
}
}

Expand Down
8 changes: 5 additions & 3 deletions src/schedulers/dynamic/scheduler.c
Original file line number Diff line number Diff line change
Expand Up @@ -131,19 +131,21 @@ void Scheduler_clean_up_timestep(Scheduler *untyped_self) {
}

/**
* @brief Checks for safe-to-prcess violations for the given reaction. If a violation is detected
* @brief Checks for safe-to-process violations for the given reaction. If a violation is detected
* the violation handler is called.
*
* @param self
* @param reaction
* @return true if a violation was detected and handled, false otherwise.
*/
static bool _Scheduler_check_and_handle_stp_violations(DynamicScheduler *self, Reaction *reaction) {
Reactor *parent = reaction->parent;
const Reactor *parent = reaction->parent;
for (size_t i = 0; i < parent->triggers_size; i++) {
Trigger *trigger = parent->triggers[i];
if (trigger->type == TRIG_INPUT && trigger->is_present) {
Port *port = (Port *)trigger;
const Port *port = (Port *)trigger;
LF_DEBUG(SCHED, "Intended Tag: " PRINTF_TAG, port->intended_tag);
LF_DEBUG(SCHED, "Current Tag: " PRINTF_TAG, self->current_tag);
if (lf_tag_compare(port->intended_tag, self->current_tag) == 0) {
continue;
}
Expand Down
2 changes: 1 addition & 1 deletion test/lf/src/FederatedMaxWait2.lf
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,4 @@ federated reactor {
@maxwait(forever)
r2 = new Dst()
r1.out -> r2.in
}
}
51 changes: 51 additions & 0 deletions test/lf/src/FederatedMaxWaitContained.lf
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
target uC {
platform: Native,
timeout: 5sec,
logging: DEBUG
}

reactor Src(id: int = 0) {
output out: int
reaction(startup) -> out{=
printf("Hello from Src!\n");
env->platform->wait_for(env->platform, SEC(2));
lf_set(out, 42);
env->request_shutdown(env);
=}
}

reactor Dst {
input in: int
state check2: bool = false
reaction(startup, in) {=
printf("Dst startup\n");
validate(!self->check2);
printf("Dst is input present? %d\n", lf_is_present(in));
validate(lf_is_present(in));
printf("Hello from Dst!\n");
self->check2 = true;
env->request_shutdown(env);
=} tardy {=
printf("STP violation\n");
// STP violation should not happen because maxwait is forever.
validate(false);
env->request_shutdown(env);
=}

reaction(shutdown) {=
validate(self->check2);
=}
}

reactor Container {
input in: int
r1 = new Dst()
in -> r1.in
}

federated reactor {
r1 = new Src()
@maxwait(forever)
r2 = new Container()
r1.out -> r2.in
}
Loading