Skip to content

Commit 7285de8

Browse files
committed
msgbroker fifo
1 parent 5d6aed1 commit 7285de8

File tree

3 files changed

+86
-35
lines changed

3 files changed

+86
-35
lines changed

src/imu/imu.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ class Imu : public ImuState {
9090
public:
9191
ImuConfig config;
9292
ImuGizmo *gizmo = nullptr;
93-
MsgTopic<ImuState> topic = MsgTopic<ImuState>("imu");
93+
MsgTopic<ImuState> topic = MsgTopic<ImuState>("imu", 10); //10-deep fifo
9494

9595
int setup(); // Use config to setup gizmo, returns 0 on success, or error code
9696
bool update(); // Returns true if state was updated

src/tbx/MsgBroker.cpp

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,11 @@ void MsgBroker::reset_stats() {
8383
//=============================================================================
8484
// MsgTopicBase
8585
//=============================================================================
86+
MsgTopicBase::MsgTopicBase(const char* name) {
87+
strncpy(this->name, name, sizeof(this->name) - 1);
88+
this->name[sizeof(this->name) - 1] = 0;
89+
}
90+
8691
void MsgTopicBase::add_subscription(MsgSubscriptionBase *sub) {
8792
for(int i = 0; i < MF_MSGSUB_LIST_SIZE; i++) {
8893
if(!sub_list[i]) {
@@ -114,9 +119,6 @@ int MsgTopicBase::subscriber_count() {
114119
//=============================================================================
115120
// MsgSubscriptionBase
116121
//=============================================================================
117-
bool MsgSubscriptionBase::updated() {
118-
return (generation != topic->generation);
119-
}
120122

121123
MsgSubscriptionBase::MsgSubscriptionBase(const char *name, MsgTopicBase *topic) : topic{topic} {
122124
strncpy(this->name, name, sizeof(this->name) - 1);
@@ -128,16 +130,29 @@ MsgSubscriptionBase::~MsgSubscriptionBase() {
128130
topic->remove_subscription(this);
129131
}
130132

133+
134+
//returns true if new msg available
135+
bool MsgSubscriptionBase::updated() {
136+
return (generation != topic->generation);
137+
}
138+
131139
//pull message: returns true if msg was pulled, returns false if no msg available
132140
bool MsgSubscriptionBase::pull(void *msg) {
133-
if(!topic->pull(msg)) return false;
134-
generation = topic->generation;
141+
if(!topic->pull(msg, &generation)) return false;
135142
stat_pull_cnt++;
136-
return true; //NOTE: retrieved msg might be older than this->generation
143+
return true;
137144
}
138145

139146
//pull updated message: returns true when updated msg available, else returns false and does not update msg
140147
bool MsgSubscriptionBase::pull_updated(void *msg) {
141148
if(!updated()) return false;
142149
return pull(msg);
143150
}
151+
152+
//pull last message: returns true if msg was pulled, returns false if no msg available
153+
bool MsgSubscriptionBase::pull_last(void *msg) {
154+
generation = topic->generation - 1;
155+
if(!topic->pull(msg, &generation)) return false;
156+
stat_pull_cnt++;
157+
return true;
158+
}

src/tbx/MsgBroker.h

Lines changed: 64 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -74,12 +74,10 @@ class MsgTopicBase {
7474
MsgSubscriptionBase* sub_list[MF_MSGSUB_LIST_SIZE] = {};
7575

7676
virtual ~MsgTopicBase() {}
77-
MsgTopicBase(const char* name) {
78-
strncpy(this->name, name, sizeof(this->name) - 1);
79-
this->name[sizeof(this->name) - 1] = 0;
80-
}
77+
MsgTopicBase(const char* name);
8178

82-
virtual bool pull(void* msg) = 0;
79+
//pull next message with gen
80+
virtual bool pull(void* msg, uint32_t *subscriber_gen) = 0;
8381

8482
//subscription
8583
void add_subscription(MsgSubscriptionBase *sub);
@@ -88,39 +86,62 @@ class MsgTopicBase {
8886
};
8987

9088
//=============================================================================
91-
//single publisher, multi subscriber - double buffer for storage
89+
//single publisher, multi subscriber - lockless - uses FIFO buffer for storage
9290
template <class T>
9391
class MsgTopic : public MsgTopicBase {
92+
private:
93+
uint16_t msglen = 0; //message length
94+
uint16_t buflen = 0; //message length rounded up to next 4 bytes for alignment
95+
uint16_t bufdepth_1 = 0; //size of buffer in messages minus 1 (E.g. exclusive one message used for loading new data)
96+
uint8_t *buf = nullptr; //buffer for bufdepth messages
97+
uint8_t *buflast = nullptr; //pre-calculated pointer to last message
98+
uint8_t *bufin = nullptr; //next publish message buffer
99+
94100
public:
95-
MsgTopic(const char* name) : MsgTopicBase(name) {
96-
buflen = sizeof(T);
97-
active_buf = aligned_alloc(4, buflen);
98-
inactive_buf = aligned_alloc(4, buflen);
99-
memset(active_buf, 0, buflen);
101+
MsgTopic(const char* name, uint16_t fifo_depth = 1) : MsgTopicBase(name) {
102+
bufdepth_1 = fifo_depth;
103+
msglen = sizeof(T);
104+
buflen = ((msglen + 3) / 4) * 4; //round up to the next 4 bytes
105+
buf = (uint8_t*)aligned_alloc(4, buflen * (bufdepth_1 + 1)); //add one extra message for loading new data
106+
memset(buf, 0, buflen * (bufdepth_1 + 1)); //clear buffer (not really needed)
107+
bufin = buf + buflen;
108+
buflast = buf + bufdepth_1 * buflen;
109+
100110
MsgBroker::add_topic(this);
101111
}
102112

103113
void publish(T *msg) {
104-
memcpy(inactive_buf, msg, buflen);
105-
void *temp = active_buf;
106-
active_buf = inactive_buf;
107-
inactive_buf = temp;
108-
generation++;
114+
memcpy(bufin, msg, buflen); //use buflen for speed (reading potentially some garbage at the end of msg)
115+
generation++; //update as soon as data is in buf
116+
if(bufin < buflast) bufin += buflen; else bufin = buf; //calc next bufin
109117
}
110118

111-
bool pull(void* msg) override {
119+
//pull message subscriber_gen+1 from fifo buffer, if not found then pull closest gen in fifo buffer
120+
bool pull(void* msg, uint32_t *subscriber_gen) override {
112121
if(!generation) return false; //will miss a pull every 4,000,000,000 publishes...
113-
memcpy(msg, active_buf, buflen);
114-
return true;
122+
uint8_t tries = 5;
123+
do {
124+
uint32_t topic_gen = generation; //copy the current topic generation (generation is volatile)
125+
uint32_t depth = topic_gen - (*subscriber_gen + 1); //we're interested in the message at this depth
126+
//limit depth to [0 .. bufdepth_1 - 1]
127+
if(depth & 0x80000000) { //unsigned negative
128+
depth = 0;
129+
} else if(depth >= bufdepth_1) {
130+
depth = bufdepth_1 - 1;
131+
}
132+
uint8_t* bufout = buf + ((topic_gen - depth) % (bufdepth_1 + 1)) * buflen; //get pointer to message at this depth
133+
memcpy(msg, bufout, msglen); //copy the message
134+
if((generation - topic_gen) < (bufdepth_1 - depth)) { //if the message at depth did not get overwitten by publish(), then we have a consistent copy
135+
*subscriber_gen = topic_gen - depth; //update the subscriber generation
136+
return true;
137+
}
138+
//the message got updated while we were memcpy'ing it, try again...
139+
//note: maybe increase subscriber_gen to look for next message and hopefully have a greater change of getting it
140+
}while(--tries);
141+
return false;
115142
}
116-
117-
private:
118-
int buflen = 0;
119-
void *active_buf = nullptr;
120-
void *inactive_buf = nullptr;
121143
};
122144

123-
124145
//=============================================================================
125146
//multi publisher, multi subscriber - uses FreeRTOS Queue for storage
126147
template <class T>
@@ -143,8 +164,20 @@ class MsgTopicQueue : public MsgTopicBase {
143164
generation++;
144165
}
145166

146-
bool pull(void *msg) override {
147-
return (xQueuePeek(queue, msg, 0) == pdPASS);
167+
bool pull(void* msg, uint32_t *subscriber_gen) override {
168+
uint8_t tries = 5;
169+
uint32_t topic_gen;
170+
do {
171+
topic_gen = generation; //copy the current topic generation (generation is volatile)
172+
if(xQueuePeek(queue, msg, 0) != pdPASS) return false; //exit if nothing in the queue
173+
if(topic_gen == generation) { //if generation did not change, then we got the message for that generation
174+
*subscriber_gen = topic_gen; //update the subscriber generation
175+
return true;
176+
}
177+
}while(--tries);
178+
//give up, return guess of retrieved generation
179+
*subscriber_gen = topic_gen; //update the subscriber generation
180+
return true;
148181
}
149182

150183
private:
@@ -155,6 +188,7 @@ class MsgTopicQueue : public MsgTopicBase {
155188
class MsgSubscriptionBase {
156189
public:
157190
bool updated(); //returns true if new msg available
191+
158192
protected:
159193
friend class MsgBroker;
160194
template <class T> friend class MsgSubscription;
@@ -168,6 +202,7 @@ class MsgSubscriptionBase {
168202

169203
bool pull(void *msg); //pull message: returns true if msg was pulled, returns false if no msg available
170204
bool pull_updated(void *msg); //pull updated message: returns true when updated msg available, else returns false and does not update msg
205+
bool pull_last(void *msg); //pull last message: returns true if msg was pulled, returns false if no msg available
171206

172207
private:
173208
MsgSubscriptionBase() {}
@@ -180,5 +215,6 @@ class MsgSubscription : public MsgSubscriptionBase {
180215
public:
181216
MsgSubscription(const char* name, MsgTopicBase *topic) : MsgSubscriptionBase(name, topic) {}
182217
bool pull(T *msg) { return MsgSubscriptionBase::pull(msg); }
183-
bool pull_updated(T *msg) { return MsgSubscriptionBase::pull_updated(msg); }
218+
bool pull_updated(T *msg) { return MsgSubscriptionBase::pull_updated(msg); }
219+
bool pull_last(T *msg) { return MsgSubscriptionBase::pull_last(msg); }
184220
};

0 commit comments

Comments
 (0)