|
2 | 2 | #include "reactor-uc/environment.h" |
3 | 3 | #include "reactor-uc/logging.h" |
4 | 4 | #include "reactor-uc/platform.h" |
| 5 | +#include "reactor-uc/serialization.h" |
5 | 6 |
|
6 | 7 | // TODO: Refactor so this function is available |
7 | 8 | void LogicalConnection_trigger_downstreams(Connection *self, const void *value, size_t value_size); |
@@ -91,14 +92,18 @@ void FederatedOutputConnection_cleanup(Trigger *trigger) { |
91 | 92 | tagged_msg->tag.microstep = sched->current_tag(sched).microstep; |
92 | 93 |
|
93 | 94 | assert(self->bundle->serialize_hooks[self->conn_id]); |
94 | | - size_t msg_size = (*self->bundle->serialize_hooks[self->conn_id])(self->staged_payload_ptr, self->payload_pool.size, |
95 | | - tagged_msg->payload.bytes); |
96 | | - tagged_msg->payload.size = msg_size; |
97 | | - |
98 | | - LF_DEBUG(FED, "FedOutConn %p sending tagged message with tag=%" PRId64 ":%" PRIu32, trigger, tagged_msg->tag.time, |
99 | | - tagged_msg->tag.microstep); |
100 | | - if (channel->send_blocking(channel, &msg) != LF_OK) { |
101 | | - LF_ERR(FED, "FedOutConn %p failed to send message", trigger); |
| 95 | + ssize_t msg_size = (*self->bundle->serialize_hooks[self->conn_id])( |
| 96 | + self->staged_payload_ptr, self->payload_pool.size, tagged_msg->payload.bytes); |
| 97 | + if (msg_size < 0) { |
| 98 | + LF_ERR(FED, "Failed to serialize payload for federated output connection %p", trigger); |
| 99 | + } else { |
| 100 | + tagged_msg->payload.size = msg_size; |
| 101 | + |
| 102 | + LF_DEBUG(FED, "FedOutConn %p sending tagged message with tag=%" PRId64 ":%" PRIu32, trigger, tagged_msg->tag.time, |
| 103 | + tagged_msg->tag.microstep); |
| 104 | + if (channel->send_blocking(channel, &msg) != LF_OK) { |
| 105 | + LF_ERR(FED, "FedOutConn %p failed to send message", trigger); |
| 106 | + } |
102 | 107 | } |
103 | 108 | } else { |
104 | 109 | LF_WARN(FED, "FedOutConn %p not connected. Dropping staged message", trigger); |
@@ -321,10 +326,12 @@ void FederatedConnectionBundle_validate(FederatedConnectionBundle *bundle) { |
321 | 326 | validate(bundle->inputs[i]); |
322 | 327 | validate(bundle->deserialize_hooks[i]); |
323 | 328 | validate(bundle->inputs[i]->super.super.parent); |
| 329 | + validate(bundle->inputs[i]->super.super.payload_pool->size < SERIALIZATION_MAX_PAYLOAD_SIZE); |
324 | 330 | } |
325 | 331 | for (size_t i = 0; i < bundle->outputs_size; i++) { |
326 | 332 | validate(bundle->outputs[i]); |
327 | 333 | validate(bundle->serialize_hooks[i]); |
328 | 334 | validate(bundle->outputs[i]->super.super.parent); |
| 335 | + validate(bundle->outputs[i]->super.super.payload_pool->size < SERIALIZATION_MAX_PAYLOAD_SIZE); |
329 | 336 | } |
330 | 337 | } |
0 commit comments