From 77d62058f6e88ca8ce068202e253ed92154d1cf6 Mon Sep 17 00:00:00 2001 From: tanneberger Date: Fri, 31 Oct 2025 20:21:32 +0100 Subject: [PATCH] src: fixing problem where tardy handlers inside hierarchy where not invoked --- include/reactor-uc/connection.h | 5 +- .../org/lflang/generator/LFGenerator.java | 30 ++--------- src/connection.c | 15 +++--- src/federated.c | 7 ++- src/port.c | 3 +- src/schedulers/dynamic/scheduler.c | 8 +-- test/lf/src/FederatedMaxWait2.lf | 2 +- test/lf/src/FederatedMaxWaitContained.lf | 51 +++++++++++++++++++ 8 files changed, 81 insertions(+), 40 deletions(-) create mode 100644 test/lf/src/FederatedMaxWaitContained.lf diff --git a/include/reactor-uc/connection.h b/include/reactor-uc/connection.h index be6de2b66..01b846438 100644 --- a/include/reactor-uc/connection.h +++ b/include/reactor-uc/connection.h @@ -25,12 +25,12 @@ struct Connection { size_t downstreams_registered; // Number of downstreams currently registered void (*register_downstream)(Connection *, Port *); Port *(*get_final_upstream)(Connection *); - void (*trigger_downstreams)(Connection *, const void *value_ptr, size_t value_size); + void (*trigger_downstreams)(Connection *, tag_t intended_tag, const void *value_ptr, size_t value_size); }; void Connection_ctor(Connection *self, TriggerType type, Reactor *parent, Port **downstreams, size_t num_downstreams, EventPayloadPool *payload_pool, void (*prepare)(Trigger *, Event *), void (*cleanup)(Trigger *), - void (*trigger_downstreams)(Connection *, const void *, size_t)); + void (*trigger_downstreams)(Connection *, tag_t, const void *, size_t)); struct LogicalConnection { Connection super; @@ -44,6 +44,7 @@ struct DelayedConnection { ConnectionType type; EventPayloadPool payload_pool; void *staged_payload_ptr; + tag_t intended_tag; }; void DelayedConnection_ctor(DelayedConnection *self, Reactor *parent, Port **downstreams, size_t num_downstreams, diff --git a/lfc/core/src/main/java/org/lflang/generator/LFGenerator.java b/lfc/core/src/main/java/org/lflang/generator/LFGenerator.java index 47cf0485a..48567e81b 100644 --- a/lfc/core/src/main/java/org/lflang/generator/LFGenerator.java +++ b/lfc/core/src/main/java/org/lflang/generator/LFGenerator.java @@ -37,33 +37,14 @@ public static FileConfig createFileConfig( final Target target = Target.fromDecl(ASTUtils.targetDecl(resource)); assert target != null; - // if (FedASTUtils.findFederatedReactor(resource) != null) { - // return new FederationFileConfig(resource, srcGenBasePath, useHierarchicalBin); - // } - - return switch (target) { - // case CCPP, C -> new CFileConfig(resource, srcGenBasePath, useHierarchicalBin); - // case Python -> new PyFileConfig(resource, srcGenBasePath, useHierarchicalBin); - // case CPP -> new CppFileConfig(resource, srcGenBasePath, useHierarchicalBin); - // case Rust -> new RustFileConfig(resource, srcGenBasePath, useHierarchicalBin); - // case TS -> new TSFileConfig(resource, srcGenBasePath, useHierarchicalBin); - case UC -> new UcFileConfig(resource, srcGenBasePath, useHierarchicalBin, runtimeSymlink); - }; + return new UcFileConfig(resource, srcGenBasePath, useHierarchicalBin, runtimeSymlink); } /** Create a generator object for the given target. */ private GeneratorBase createGenerator(LFGeneratorContext context) { final Target target = Target.fromDecl(ASTUtils.targetDecl(context.getFileConfig().resource)); assert target != null; - return switch (target) { - // case C -> new CGenerator(context, false); - // case CCPP -> new CGenerator(context, true); - // case Python -> new PythonGenerator(context); - // case CPP -> new CppGenerator(context, scopeProvider); - // case TS -> new TSGenerator(context); - // case Rust -> new RustGenerator(context, scopeProvider); - case UC -> createUcGenerator(context, scopeProvider); - }; + return createUcGenerator(context, scopeProvider); } @Override @@ -80,11 +61,10 @@ public void doGenerate(Resource resource, IFileSystemAccess2 fsa, IGeneratorCont if (lfContext.getMode() == LFGeneratorContext.Mode.LSP_FAST) return; final GeneratorBase generator = createGenerator(lfContext); - if (generator != null) { - generatorErrorsOccurred = generator.errorsOccurred(); - generator.doGenerate(resource, lfContext); - } + generatorErrorsOccurred = generator.errorsOccurred(); + generator.doGenerate(resource, lfContext); final MessageReporter messageReporter = lfContext.getErrorReporter(); + if (messageReporter instanceof LanguageServerMessageReporter) { ((LanguageServerMessageReporter) messageReporter).publishDiagnostics(); } diff --git a/src/connection.c b/src/connection.c index 262b115b4..db1bf837f 100644 --- a/src/connection.c +++ b/src/connection.c @@ -40,7 +40,7 @@ void Connection_register_downstream(Connection *self, Port *port) { /** * @brief Recursively walks down connection graph and copies value into Input ports and triggers reactions. */ -void LogicalConnection_trigger_downstreams(Connection *self, const void *value, size_t value_size) { +void LogicalConnection_trigger_downstreams(Connection *self, tag_t intended_tag, const void *value, size_t value_size) { LF_DEBUG(CONN, "Triggering downstreams of %p with value %p", self, value); for (size_t i = 0; i < self->downstreams_registered; i++) { Port *down = self->downstreams[i]; @@ -54,19 +54,20 @@ void LogicalConnection_trigger_downstreams(Connection *self, const void *value, // "last write wins" if (!down->super.is_present) { down->super.prepare(&down->super, NULL); + down->intended_tag = intended_tag; } } for (size_t i = 0; i < down->conns_out_registered; i++) { LF_DEBUG(CONN, "Found further downstream connection %p to recurse down", down->conns_out[i]); - down->conns_out[i]->trigger_downstreams(down->conns_out[i], value, value_size); + down->conns_out[i]->trigger_downstreams(down->conns_out[i], intended_tag, value, value_size); } } } void Connection_ctor(Connection *self, TriggerType type, Reactor *parent, Port **downstreams, size_t num_downstreams, EventPayloadPool *payload_pool, void (*prepare)(Trigger *, Event *), void (*cleanup)(Trigger *), - void (*trigger_downstreams)(Connection *, const void *, size_t)) { + void (*trigger_downstreams)(Connection *, tag_t, const void *, size_t)) { self->upstream = NULL; self->downstreams_size = num_downstreams; @@ -100,7 +101,7 @@ void DelayedConnection_prepare(Trigger *trigger, Event *event) { trigger->is_present = true; sched->register_for_cleanup(sched, trigger); - LogicalConnection_trigger_downstreams(&self->super, event->super.payload, pool->payload_size); + LogicalConnection_trigger_downstreams(&self->super, event->intended_tag, event->super.payload, pool->payload_size); validate(pool->free(pool, event->super.payload) == LF_OK); } @@ -123,7 +124,7 @@ void DelayedConnection_cleanup(Trigger *trigger) { if (self->type == PHYSICAL_CONNECTION) { base_tag.time = env->get_physical_time(env); } else { - base_tag = sched->current_tag(sched); + base_tag = self->intended_tag; } tag_t tag = lf_delay_tag(base_tag, self->delay); Event event = EVENT_INIT(tag, &self->super.super, self->staged_payload_ptr); @@ -132,7 +133,8 @@ void DelayedConnection_cleanup(Trigger *trigger) { } } -void DelayedConnection_trigger_downstreams(Connection *_self, const void *value, size_t value_size) { +void DelayedConnection_trigger_downstreams(Connection *_self, tag_t intended_tag, const void *value, + size_t value_size) { DelayedConnection *self = (DelayedConnection *)_self; assert(value); assert(value_size > 0); @@ -149,6 +151,7 @@ void DelayedConnection_trigger_downstreams(Connection *_self, const void *value, return; } } + self->intended_tag = intended_tag; memcpy(self->staged_payload_ptr, value, value_size); sched->register_for_cleanup(sched, &_self->super); } diff --git a/src/federated.c b/src/federated.c index 6fc9d91ec..fd52ad015 100644 --- a/src/federated.c +++ b/src/federated.c @@ -7,7 +7,9 @@ // Called when a reaction does lf_set(outputPort). Should buffer the output data // for later transmission. -void FederatedOutputConnection_trigger_downstream(Connection *_self, const void *value, size_t value_size) { +void FederatedOutputConnection_trigger_downstream(Connection *_self, tag_t intended_tag, const void *value, + size_t value_size) { + (void)intended_tag; LF_DEBUG(FED, "Triggering downstreams on federated output connection %p. Stage for later TX", _self); lf_ret_t ret; FederatedOutputConnection *self = (FederatedOutputConnection *)_self; @@ -111,7 +113,8 @@ void FederatedInputConnection_prepare(Trigger *trigger, Event *event) { for (size_t i = 0; i < down->conns_out_registered; i++) { LF_DEBUG(CONN, "Found further downstream connection %p to recurse down", down->conns_out[i]); - down->conns_out[i]->trigger_downstreams(down->conns_out[i], event->super.payload, pool->payload_size); + down->conns_out[i]->trigger_downstreams(down->conns_out[i], event->intended_tag, event->super.payload, + pool->payload_size); } pool->free(pool, event->super.payload); diff --git a/src/port.c b/src/port.c index c04616c56..6f161950f 100644 --- a/src/port.c +++ b/src/port.c @@ -37,7 +37,8 @@ void Port_set(Port *self, const void *value) { for (size_t i = 0; i < self->conns_out_registered; i++) { Connection *conn = self->conns_out[i]; - conn->trigger_downstreams(conn, value, self->value_size); + const Environment *env = self->super.parent->env; + conn->trigger_downstreams(conn, env->scheduler->current_tag(env->scheduler), value, self->value_size); } } diff --git a/src/schedulers/dynamic/scheduler.c b/src/schedulers/dynamic/scheduler.c index fe7831802..a056515af 100644 --- a/src/schedulers/dynamic/scheduler.c +++ b/src/schedulers/dynamic/scheduler.c @@ -131,7 +131,7 @@ void Scheduler_clean_up_timestep(Scheduler *untyped_self) { } /** - * @brief Checks for safe-to-prcess violations for the given reaction. If a violation is detected + * @brief Checks for safe-to-process violations for the given reaction. If a violation is detected * the violation handler is called. * * @param self @@ -139,11 +139,13 @@ void Scheduler_clean_up_timestep(Scheduler *untyped_self) { * @return true if a violation was detected and handled, false otherwise. */ static bool _Scheduler_check_and_handle_stp_violations(DynamicScheduler *self, Reaction *reaction) { - Reactor *parent = reaction->parent; + const Reactor *parent = reaction->parent; for (size_t i = 0; i < parent->triggers_size; i++) { Trigger *trigger = parent->triggers[i]; if (trigger->type == TRIG_INPUT && trigger->is_present) { - Port *port = (Port *)trigger; + const Port *port = (Port *)trigger; + LF_DEBUG(SCHED, "Intended Tag: " PRINTF_TAG, port->intended_tag); + LF_DEBUG(SCHED, "Current Tag: " PRINTF_TAG, self->current_tag); if (lf_tag_compare(port->intended_tag, self->current_tag) == 0) { continue; } diff --git a/test/lf/src/FederatedMaxWait2.lf b/test/lf/src/FederatedMaxWait2.lf index f00cd2c7c..be56b696c 100644 --- a/test/lf/src/FederatedMaxWait2.lf +++ b/test/lf/src/FederatedMaxWait2.lf @@ -41,4 +41,4 @@ federated reactor { @maxwait(forever) r2 = new Dst() r1.out -> r2.in -} \ No newline at end of file +} diff --git a/test/lf/src/FederatedMaxWaitContained.lf b/test/lf/src/FederatedMaxWaitContained.lf new file mode 100644 index 000000000..91456e6f7 --- /dev/null +++ b/test/lf/src/FederatedMaxWaitContained.lf @@ -0,0 +1,51 @@ +target uC { + platform: Native, + timeout: 5sec, + logging: DEBUG +} + +reactor Src(id: int = 0) { + output out: int + reaction(startup) -> out{= + printf("Hello from Src!\n"); + env->platform->wait_for(env->platform, SEC(2)); + lf_set(out, 42); + env->request_shutdown(env); + =} +} + +reactor Dst { + input in: int + state check2: bool = false + reaction(startup, in) {= + printf("Dst startup\n"); + validate(!self->check2); + printf("Dst is input present? %d\n", lf_is_present(in)); + validate(lf_is_present(in)); + printf("Hello from Dst!\n"); + self->check2 = true; + env->request_shutdown(env); + =} tardy {= + printf("STP violation\n"); + // STP violation should not happen because maxwait is forever. + validate(false); + env->request_shutdown(env); + =} + + reaction(shutdown) {= + validate(self->check2); + =} +} + +reactor Container { + input in: int + r1 = new Dst() + in -> r1.in +} + +federated reactor { + r1 = new Src() + @maxwait(forever) + r2 = new Container() + r1.out -> r2.in +}