Skip to content

Commit 9092780

Browse files
authored
Remove duplicated NetworkChannel return codes / state (#143)
* Migrate runtime and coap channel * Remove complicated LF_RETURN_TYPES from TcpIpChannel * Fix examples and add ctor asserts * Remove network_channel_state_changed callback * Implement naming convention
1 parent abf025c commit 9092780

File tree

11 files changed

+210
-204
lines changed

11 files changed

+210
-204
lines changed

examples/posix/federated/receiver.c

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ typedef struct {
2929
REACTION_INSTANCE(Receiver, r);
3030
PORT_INSTANCE(Receiver, in, 1);
3131
int cnt;
32-
REACTOR_BOOKKEEPING_INSTANCES(1,1,0);
32+
REACTOR_BOOKKEEPING_INSTANCES(1, 1, 0);
3333
} Receiver;
3434

3535
DEFINE_REACTION_BODY(Receiver, r) {
@@ -56,12 +56,12 @@ typedef struct {
5656
FederatedConnectionBundle super;
5757
TcpIpChannel channel;
5858
FEDERATED_INPUT_CONNECTION_INSTANCE(Receiver, in);
59-
FEDERATED_CONNECTION_BUNDLE_BOOKKEEPING_INSTANCES(1,0)
59+
FEDERATED_CONNECTION_BUNDLE_BOOKKEEPING_INSTANCES(1, 0)
6060
} FEDERATED_CONNECTION_BUNDLE_NAME(Receiver, Sender);
6161

6262
FEDERATED_CONNECTION_BUNDLE_CTOR_SIGNATURE(Receiver, Sender) {
6363
FEDERATED_CONNECTION_BUNDLE_CTOR_PREAMBLE();
64-
TcpIpChannel_ctor(&self->channel, "127.0.0.1", PORT_NUM, AF_INET, false);
64+
TcpIpChannel_ctor(&self->channel, parent->env, "127.0.0.1", PORT_NUM, AF_INET, false);
6565
FEDERATED_CONNECTION_BUNDLE_CALL_CTOR();
6666
INITIALIZE_FEDERATED_INPUT_CONNECTION(Receiver, in, deserialize_msg_t);
6767
}
@@ -71,14 +71,14 @@ typedef struct {
7171
CHILD_REACTOR_INSTANCE(Receiver, receiver, 1);
7272
FEDERATED_CONNECTION_BUNDLE_INSTANCE(Receiver, Sender);
7373
FEDERATE_BOOKKEEPING_INSTANCES(1);
74-
CHILD_INPUT_SOURCES(receiver, in,1,1, 0);
74+
CHILD_INPUT_SOURCES(receiver, in, 1, 1, 0);
7575
} MainRecv;
7676

7777
REACTOR_CTOR_SIGNATURE(MainRecv) {
7878
FEDERATE_CTOR_PREAMBLE();
7979
REACTOR_CTOR(MainRecv);
8080
DEFINE_CHILD_INPUT_ARGS(receiver, in, 1, 1);
81-
INITIALIZE_CHILD_REACTOR_WITH_PARAMETERS(Receiver, receiver,1, _receiver_in_args[i]);
81+
INITIALIZE_CHILD_REACTOR_WITH_PARAMETERS(Receiver, receiver, 1, _receiver_in_args[i]);
8282
INITIALIZE_FEDERATED_CONNECTION_BUNDLE(Receiver, Sender);
8383
BUNDLE_REGISTER_DOWNSTREAM(Receiver, Sender, receiver, in);
8484
}

examples/posix/federated/sender.c

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,14 @@ DEFINE_TIMER_CTOR(Sender, t, 1, 0)
2626
DEFINE_REACTION_STRUCT(Sender, r, 1)
2727
DEFINE_REACTION_CTOR(Sender, r, 0)
2828
DEFINE_OUTPUT_STRUCT(Sender, out, 1, msg_t)
29-
DEFINE_OUTPUT_CTOR(Sender, out, 1)
29+
DEFINE_OUTPUT_CTOR(Sender, out, 1)
3030

3131
typedef struct {
3232
Reactor super;
3333
TIMER_INSTANCE(Sender, t);
3434
REACTION_INSTANCE(Sender, r);
3535
PORT_INSTANCE(Sender, out, 1);
36-
REACTOR_BOOKKEEPING_INSTANCES(1,2,0);
36+
REACTOR_BOOKKEEPING_INSTANCES(1, 2, 0);
3737
} Sender;
3838

3939
DEFINE_REACTION_BODY(Sender, r) {
@@ -65,15 +65,15 @@ typedef struct {
6565
FederatedConnectionBundle super;
6666
TcpIpChannel channel;
6767
FEDERATED_OUTPUT_CONNECTION_INSTANCE(Sender, out);
68-
FEDERATED_CONNECTION_BUNDLE_BOOKKEEPING_INSTANCES(0,1);
68+
FEDERATED_CONNECTION_BUNDLE_BOOKKEEPING_INSTANCES(0, 1);
6969
} FEDERATED_CONNECTION_BUNDLE_NAME(Sender, Receiver);
7070

7171
FEDERATED_CONNECTION_BUNDLE_CTOR_SIGNATURE(Sender, Receiver) {
7272
FEDERATED_CONNECTION_BUNDLE_CTOR_PREAMBLE();
73-
TcpIpChannel_ctor(&self->channel, "127.0.0.1", PORT_NUM, AF_INET, true);
73+
TcpIpChannel_ctor(&self->channel, parent->env, "127.0.0.1", PORT_NUM, AF_INET, true);
7474

7575
FEDERATED_CONNECTION_BUNDLE_CALL_CTOR();
76-
76+
7777
INITIALIZE_FEDERATED_OUTPUT_CONNECTION(Sender, out, serialize_msg_t);
7878
}
7979

@@ -82,17 +82,16 @@ typedef struct {
8282
Reactor super;
8383
CHILD_REACTOR_INSTANCE(Sender, sender, 1);
8484
FEDERATED_CONNECTION_BUNDLE_INSTANCE(Sender, Receiver);
85-
TcpIpChannel channel;
8685
FEDERATE_BOOKKEEPING_INSTANCES(1);
87-
CHILD_OUTPUT_CONNECTIONS(sender, out, 1,1, 1);
88-
CHILD_OUTPUT_EFFECTS(sender, out, 1,1, 0);
89-
CHILD_OUTPUT_OBSERVERS(sender, out, 1,1,0);
86+
CHILD_OUTPUT_CONNECTIONS(sender, out, 1, 1, 1);
87+
CHILD_OUTPUT_EFFECTS(sender, out, 1, 1, 0);
88+
CHILD_OUTPUT_OBSERVERS(sender, out, 1, 1, 0);
9089
} MainSender;
9190

9291
REACTOR_CTOR_SIGNATURE(MainSender) {
9392
FEDERATE_CTOR_PREAMBLE();
94-
DEFINE_CHILD_OUTPUT_ARGS(sender, out,1,1);
95-
INITIALIZE_CHILD_REACTOR_WITH_PARAMETERS(Sender, sender,1, _sender_out_args[i]);
93+
DEFINE_CHILD_OUTPUT_ARGS(sender, out, 1, 1);
94+
INITIALIZE_CHILD_REACTOR_WITH_PARAMETERS(Sender, sender, 1, _sender_out_args[i]);
9695
INITIALIZE_FEDERATED_CONNECTION_BUNDLE(Sender, Receiver);
9796
BUNDLE_REGISTER_UPSTREAM(Sender, Receiver, sender, out);
9897
REACTOR_CTOR(MainSender);

examples/zephyr/basic_federated/common/receiver.h

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ typedef struct {
3232
char msg[32];
3333
} msg_t;
3434

35-
DEFINE_REACTION_STRUCT(Receiver, r, 0);
35+
DEFINE_REACTION_STRUCT(Receiver, r, 0);
3636
DEFINE_REACTION_CTOR(Receiver, r, 0)
3737

3838
DEFINE_INPUT_STRUCT(Receiver, in, 1, 0, msg_t, 0)
@@ -42,7 +42,7 @@ typedef struct {
4242
Reactor super;
4343
REACTION_INSTANCE(Receiver, r);
4444
PORT_INSTANCE(Receiver, in, 1);
45-
REACTOR_BOOKKEEPING_INSTANCES(1,1,0);
45+
REACTOR_BOOKKEEPING_INSTANCES(1, 1, 0);
4646
int cnt;
4747
} Receiver;
4848

@@ -59,25 +59,23 @@ REACTOR_CTOR_SIGNATURE_WITH_PARAMETERS(Receiver, InputExternalCtorArgs *in_exter
5959
REACTOR_CTOR(Receiver);
6060
INITIALIZE_REACTION(Receiver, r);
6161
INITIALIZE_INPUT(Receiver, in, 1, in_external);
62-
62+
6363
// Register reaction as an effect of in
6464
PORT_REGISTER_EFFECT(self->in, self->r, 1);
6565
}
6666

67-
6867
DEFINE_FEDERATED_INPUT_CONNECTION(Receiver, in, msg_t, 5, MSEC(100), false);
6968

7069
typedef struct {
7170
FederatedConnectionBundle super;
7271
TcpIpChannel channel;
7372
FEDERATED_INPUT_CONNECTION_INSTANCE(Receiver, in);
74-
FEDERATED_CONNECTION_BUNDLE_BOOKKEEPING_INSTANCES(1,0)
73+
FEDERATED_CONNECTION_BUNDLE_BOOKKEEPING_INSTANCES(1, 0)
7574
} FEDERATED_CONNECTION_BUNDLE_NAME(Receiver, Sender);
7675

77-
7876
FEDERATED_CONNECTION_BUNDLE_CTOR_SIGNATURE(Receiver, Sender) {
7977
FEDERATED_CONNECTION_BUNDLE_CTOR_PREAMBLE();
80-
TcpIpChannel_ctor(&self->channel, IP_ADDR, PORT_NUM, AF_INET, false);
78+
TcpIpChannel_ctor(&self->channel, parent->env, IP_ADDR, PORT_NUM, AF_INET, false);
8179
FEDERATED_CONNECTION_BUNDLE_CALL_CTOR();
8280
INITIALIZE_FEDERATED_INPUT_CONNECTION(Receiver, in, deserialize_payload_default);
8381
}
@@ -87,14 +85,14 @@ typedef struct {
8785
CHILD_REACTOR_INSTANCE(Receiver, receiver, 1);
8886
FEDERATED_CONNECTION_BUNDLE_INSTANCE(Receiver, Sender);
8987
FEDERATE_BOOKKEEPING_INSTANCES(1);
90-
CHILD_INPUT_SOURCES(receiver, in, 1,1, 0);
88+
CHILD_INPUT_SOURCES(receiver, in, 1, 1, 0);
9189
} MainRecv;
9290

9391
REACTOR_CTOR_SIGNATURE(MainRecv) {
9492
FEDERATE_CTOR_PREAMBLE();
9593
REACTOR_CTOR(MainRecv);
96-
DEFINE_CHILD_INPUT_ARGS(receiver, in,1,1);
97-
INITIALIZE_CHILD_REACTOR_WITH_PARAMETERS(Receiver, receiver,1, _receiver_in_args[i]);
94+
DEFINE_CHILD_INPUT_ARGS(receiver, in, 1, 1);
95+
INITIALIZE_CHILD_REACTOR_WITH_PARAMETERS(Receiver, receiver, 1, _receiver_in_args[i]);
9896
INITIALIZE_FEDERATED_CONNECTION_BUNDLE(Receiver, Sender);
9997
BUNDLE_REGISTER_DOWNSTREAM(Receiver, Sender, receiver, in);
10098
}

examples/zephyr/basic_federated/federated_sender/src/sender.c

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -66,13 +66,12 @@ void setup_led() {
6666
gpio_pin_configure_dt(&led, GPIO_OUTPUT_ACTIVE);
6767
}
6868

69-
7069
typedef struct {
7170
Reactor super;
7271
REACTION_INSTANCE(Sender, r);
7372
ACTION_INSTANCE(Sender, act);
7473
PORT_INSTANCE(Sender, out, 1);
75-
REACTOR_BOOKKEEPING_INSTANCES(1,2,0);
74+
REACTOR_BOOKKEEPING_INSTANCES(1, 2, 0);
7675
} Sender;
7776

7877
DEFINE_REACTION_BODY(Sender, r) {
@@ -104,31 +103,31 @@ typedef struct {
104103
FederatedConnectionBundle super;
105104
TcpIpChannel channel;
106105
FEDERATED_OUTPUT_CONNECTION_INSTANCE(Sender, out);
107-
FEDERATED_CONNECTION_BUNDLE_BOOKKEEPING_INSTANCES(0,1);
106+
FEDERATED_CONNECTION_BUNDLE_BOOKKEEPING_INSTANCES(0, 1);
108107
} FEDERATED_CONNECTION_BUNDLE_NAME(Sender, Receiver1);
109108

110109
typedef struct {
111110
FederatedConnectionBundle super;
112111
TcpIpChannel channel;
113112
FEDERATED_OUTPUT_CONNECTION_INSTANCE(Sender, out);
114-
FEDERATED_CONNECTION_BUNDLE_BOOKKEEPING_INSTANCES(0,1);
113+
FEDERATED_CONNECTION_BUNDLE_BOOKKEEPING_INSTANCES(0, 1);
115114
} FEDERATED_CONNECTION_BUNDLE_NAME(Sender, Receiver2);
116115

117116
FEDERATED_CONNECTION_BUNDLE_CTOR_SIGNATURE(Sender, Receiver1) {
118117
FEDERATED_CONNECTION_BUNDLE_CTOR_PREAMBLE();
119-
TcpIpChannel_ctor(&self->channel, "192.168.1.100", PORT_CONN_1, AF_INET, true);
118+
TcpIpChannel_ctor(&self->channel, parent->env, "192.168.1.100", PORT_CONN_1, AF_INET, true);
120119

121120
FEDERATED_CONNECTION_BUNDLE_CALL_CTOR();
122-
121+
123122
INITIALIZE_FEDERATED_OUTPUT_CONNECTION(Sender, out, serialize_payload_default);
124123
}
125124

126125
FEDERATED_CONNECTION_BUNDLE_CTOR_SIGNATURE(Sender, Receiver2) {
127126
FEDERATED_CONNECTION_BUNDLE_CTOR_PREAMBLE();
128-
TcpIpChannel_ctor(&self->channel, "192.168.1.100", PORT_CONN_2, AF_INET, true);
127+
TcpIpChannel_ctor(&self->channel, parent->env, "192.168.1.100", PORT_CONN_2, AF_INET, true);
129128

130129
FEDERATED_CONNECTION_BUNDLE_CALL_CTOR();
131-
130+
132131
INITIALIZE_FEDERATED_OUTPUT_CONNECTION(Sender, out, serialize_payload_default);
133132
}
134133

@@ -138,9 +137,9 @@ typedef struct {
138137
CHILD_REACTOR_INSTANCE(Sender, sender, 1);
139138
FEDERATED_CONNECTION_BUNDLE_INSTANCE(Sender, Receiver1);
140139
FEDERATED_CONNECTION_BUNDLE_INSTANCE(Sender, Receiver2);
141-
CHILD_OUTPUT_CONNECTIONS(sender, out, 1,1, 2);
142-
CHILD_OUTPUT_EFFECTS(sender, out, 1,1,0);
143-
CHILD_OUTPUT_OBSERVERS(sender, out,1,1, 0);
140+
CHILD_OUTPUT_CONNECTIONS(sender, out, 1, 1, 2);
141+
CHILD_OUTPUT_EFFECTS(sender, out, 1, 1, 0);
142+
CHILD_OUTPUT_OBSERVERS(sender, out, 1, 1, 0);
144143
FEDERATE_BOOKKEEPING_INSTANCES(2);
145144
} MainSender;
146145

include/reactor-uc/error.h

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,6 @@ typedef enum {
2121
LF_INVALID_VALUE,
2222
LF_OUT_OF_BOUNDS,
2323
LF_NO_MEM,
24-
LF_COULD_NOT_CONNECT,
25-
LF_NETWORK_SETUP_FAILED,
26-
LF_CONNECTION_CLOSED,
27-
LF_TIMEOUT,
28-
LF_TRY_AGAIN,
29-
LF_IN_PROGRESS
3024
} lf_ret_t;
3125

3226
/**

include/reactor-uc/federated.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@ struct FederatedConnectionBundle {
3131
serialize_hook *serialize_hooks;
3232
size_t outputs_size;
3333
bool server; // Does this federate work as server or client
34-
void (*network_channel_state_changed)(FederatedConnectionBundle *self);
3534
};
3635

3736
void FederatedConnectionBundle_ctor(FederatedConnectionBundle *self, Reactor *parent, NetworkChannel *net_channel,

include/reactor-uc/platform/posix/tcp_ip_channel.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
#include "proto/message.pb.h"
88
#include "reactor-uc/error.h"
99
#include "reactor-uc/network_channel.h"
10+
#include "reactor-uc/environment.h"
1011

1112
#define TCP_IP_CHANNEL_BUFFERSIZE 1024
1213
#define TCP_IP_CHANNEL_NUM_RETRIES 255
@@ -45,6 +46,7 @@ struct TcpIpChannel {
4546
void (*receive_callback)(FederatedConnectionBundle *conn, const FederateMessage *message);
4647
};
4748

48-
void TcpIpChannel_ctor(TcpIpChannel *self, const char *host, unsigned short port, int protocol_family, bool server);
49+
void TcpIpChannel_ctor(TcpIpChannel *self, Environment *env, const char *host, unsigned short port, int protocol_family,
50+
bool server);
4951

5052
#endif

src/federated.c

Lines changed: 11 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -15,25 +15,25 @@ void FederatedConnectionBundle_connect_to_peers(FederatedConnectionBundle **bund
1515
NetworkChannel *chan = bundle->net_channel;
1616
ret = chan->open_connection(chan);
1717
validate(ret == LF_OK);
18-
bundle->network_channel_state_changed(bundle);
1918
}
2019

2120
bool all_connected = false;
22-
interval_t wait_before_retry = FOREVER; // Intialize to maximum so we can find the lowest requested.
21+
interval_t wait_before_retry = FOREVER; // Initialize to maximum so we can find the lowest requested.
2322
while (!all_connected) {
2423
all_connected = true;
2524
for (size_t i = 0; i < bundles_size; i++) {
2625
FederatedConnectionBundle *bundle = bundles[i];
2726
NetworkChannel *chan = bundle->net_channel;
28-
NetworkChannelState state = chan->get_connection_state(chan);
29-
if (state != NETWORK_CHANNEL_STATE_CONNECTED) {
30-
ret = chan->try_connect(chan);
31-
switch (ret) {
32-
case LF_OK:
33-
bundle->network_channel_state_changed(bundle);
27+
ret = chan->try_connect(chan);
28+
if (ret == LF_OK) {
29+
NetworkChannelState state = chan->get_connection_state(chan);
30+
switch (state) {
31+
case NETWORK_CHANNEL_STATE_CONNECTED:
3432
break;
35-
case LF_IN_PROGRESS:
36-
case LF_TRY_AGAIN:
33+
case NETWORK_CHANNEL_STATE_OPEN:
34+
case NETWORK_CHANNEL_STATE_CONNECTION_IN_PROGRESS:
35+
case NETWORK_CHANNEL_STATE_CONNECTION_FAILED:
36+
case NETWORK_CHANNEL_STATE_LOST_CONNECTION:
3737
if (chan->expected_try_connect_duration < wait_before_retry && chan->expected_try_connect_duration > 0) {
3838
wait_before_retry = chan->expected_try_connect_duration;
3939
}
@@ -108,14 +108,7 @@ void FederatedOutputConnection_cleanup(Trigger *trigger) {
108108

109109
LF_DEBUG(FED, "FedOutConn %p sending tagged message with tag=%" PRId64 ":%" PRIu32, trigger, tagged_msg->tag.time,
110110
tagged_msg->tag.microstep);
111-
ret = channel->send_blocking(channel, &msg);
112-
switch (ret) {
113-
case LF_OK:
114-
break;
115-
case LF_CONNECTION_CLOSED:
116-
self->bundle->network_channel_state_changed(self->bundle);
117-
// Intentional fallthrough
118-
default:
111+
if (channel->send_blocking(channel, &msg) != LF_OK) {
119112
LF_ERR(FED, "FedOutConn %p failed to send message", trigger);
120113
}
121114
} else {
@@ -285,21 +278,6 @@ void FederatedConnectionBundle_msg_received_cb(FederatedConnectionBundle *self,
285278
}
286279
}
287280

288-
void FederatedConnectionBundle_network_channel_state_changed(FederatedConnectionBundle *self) {
289-
NetworkChannelState state = self->net_channel->get_connection_state(self->net_channel);
290-
switch (state) {
291-
case NETWORK_CHANNEL_STATE_CONNECTED:
292-
case NETWORK_CHANNEL_STATE_LOST_CONNECTION:
293-
for (size_t i = 0; i < self->inputs_size; i++) {
294-
FederatedInputConnection *input = self->inputs[i];
295-
input->last_known_tag = FOREVER_TAG;
296-
}
297-
break;
298-
default: // Handle other states also
299-
break;
300-
}
301-
}
302-
303281
void FederatedConnectionBundle_ctor(FederatedConnectionBundle *self, Reactor *parent, NetworkChannel *net_channel,
304282
FederatedInputConnection **inputs, deserialize_hook *deserialize_hooks,
305283
size_t inputs_size, FederatedOutputConnection **outputs,
@@ -316,7 +294,6 @@ void FederatedConnectionBundle_ctor(FederatedConnectionBundle *self, Reactor *pa
316294
self->deserialize_hooks = deserialize_hooks;
317295
self->serialize_hooks = serialize_hooks;
318296
self->net_channel->register_receive_callback(self->net_channel, FederatedConnectionBundle_msg_received_cb, self);
319-
self->network_channel_state_changed = FederatedConnectionBundle_network_channel_state_changed;
320297
}
321298

322299
void Federated_distribute_start_tag(Environment *env, instant_t start_time) {

0 commit comments

Comments
 (0)