Skip to content

Commit 89fc1dc

Browse files
committed
DoubleBuffered Stream
1 parent 84eede3 commit 89fc1dc

File tree

2 files changed

+192
-0
lines changed

2 files changed

+192
-0
lines changed

tests/doublebuf_stream_tests.cc

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
#include "doctest.h"
2+
#include "util/doublebuf_stream.hh"
3+
4+
TEST_CASE("Transmit without errors, always complete transmission before another is started") {
5+
std::vector<uint8_t> log;
6+
7+
DoubleBufferStream<uint8_t, 8> double_buf{[&log](std::span<uint8_t> buf) {
8+
log.insert(log.end(), buf.begin(), buf.end());
9+
return true;
10+
}};
11+
12+
std::array<uint8_t, 3> a1 = {'A', 'b', 'c'};
13+
CHECK(double_buf.transmit(a1));
14+
double_buf.tx_done_callback();
15+
16+
CHECK(log.size() == 3);
17+
CHECK(log[0] == 'A');
18+
CHECK(log[1] == 'b');
19+
CHECK(log[2] == 'c');
20+
21+
// rely on unsigned overflow:
22+
uint32_t word = 0xb0227f;
23+
std::array<uint8_t, 3> bytes{uint8_t(word >> 16), uint8_t(word >> 8), uint8_t(word)};
24+
CHECK(double_buf.transmit(bytes));
25+
double_buf.tx_done_callback();
26+
27+
CHECK(log.size() == 6);
28+
CHECK(log[3] == 0xb0);
29+
CHECK(log[4] == 0x22);
30+
CHECK(log[5] == 0x7f);
31+
32+
// Can send up to max size of buffer
33+
std::array<uint8_t, 8> a2 = {1, 2, 3, 4, 5, 6, 7, 8};
34+
CHECK(double_buf.transmit(a2));
35+
double_buf.tx_done_callback();
36+
37+
CHECK(log.size() == 14);
38+
CHECK(log[6] == 1);
39+
CHECK(log[7] == 2);
40+
CHECK(log[8] == 3);
41+
CHECK(log[9] == 4);
42+
CHECK(log[10] == 5);
43+
CHECK(log[11] == 6);
44+
CHECK(log[12] == 7);
45+
CHECK(log[13] == 8);
46+
47+
// Cannot send more then the buffer size
48+
std::array<uint8_t, 9> a3 = {1, 2, 3, 4, 5, 6, 7, 8, 9};
49+
CHECK_FALSE(double_buf.transmit(a3));
50+
CHECK(log.size() == 14);
51+
}
52+
53+
TEST_CASE("Transmit without errors, interleaving transmissions") {
54+
std::vector<uint8_t> log;
55+
56+
DoubleBufferStream<uint8_t, 8> double_buf{[&log](std::span<uint8_t> buf) {
57+
log.insert(log.end(), buf.begin(), buf.end());
58+
return true;
59+
}};
60+
61+
std::array<uint8_t, 3> a1 = {1, 2, 3};
62+
CHECK(double_buf.transmit(a1));
63+
64+
std::array<uint8_t, 4> a11 = {4, 5, 6, 7};
65+
CHECK(double_buf.transmit(a11));
66+
67+
std::array<uint8_t, 1> a2 = {8};
68+
CHECK(double_buf.transmit(a2));
69+
70+
std::array<uint8_t, 2> a22 = {9, 10};
71+
CHECK(double_buf.transmit(a22));
72+
73+
// a1 has been sent in the first internal buffer.
74+
// but the inactive buffer will not be transmitted until
75+
// the tx_done_callback() is called
76+
CHECK(log.size() == 3);
77+
78+
double_buf.tx_done_callback();
79+
// Now the inactive buffer, which contains the remaining data,
80+
// will be sent
81+
CHECK(log.size() == 10);
82+
83+
CHECK(log[0] == 1);
84+
CHECK(log[1] == 2);
85+
CHECK(log[2] == 3);
86+
CHECK(log[3] == 4);
87+
CHECK(log[4] == 5);
88+
CHECK(log[5] == 6);
89+
CHECK(log[6] == 7);
90+
CHECK(log[7] == 8);
91+
CHECK(log[8] == 9);
92+
CHECK(log[9] == 10);
93+
}

util/doublebuf_stream.hh

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
#pragma once
2+
#include "util/fixed_vector.hh"
3+
#include <functional>
4+
#include <optional>
5+
#include <span>
6+
7+
// A double-buffered data transmitter.
8+
// Allows for calling transmit() at any time, even if a transmission is in progress.
9+
// Not thread-safe (would need to use std::atomic or semaphors to make that work).
10+
//
11+
// Call transmit() to send data using the transmit_func provided in the constructor.
12+
// When transmission is complete (hardware is ready to send more), then call tx_done_callback().
13+
//
14+
// When you call transmit():
15+
// -- if a transmission is in progress, then the data will be queued into the inactive buffer
16+
// -- if a transmission is NOT in progress, then the data will be sent immediately.
17+
//
18+
// When a transmission ends (tx_done_callback is called),
19+
// if the inactive buffer has data, then a new transmission will be started with it.
20+
21+
// Note: This is not safe if tx_done_callback() interrupts transmit() or start_tx().
22+
// A valid use case is in the STM USB host library, which does not call
23+
// the tx callback in an IRQ, so we do not run the risk of tx_done_callback()
24+
// interrupting transmit() or start_tx();
25+
26+
// #define DEBUG_DBLBUFSTREAM 1
27+
#if DEBUG_DBLBUFSTREAM
28+
#define pr_dbg printf
29+
#else
30+
#define pr_dbg(...)
31+
#endif
32+
33+
template<typename T, size_t Size>
34+
struct DoubleBufferStream {
35+
// This size can be adjusted if needed
36+
FixedVector<T, Size> tx_buffer[2];
37+
std::optional<unsigned> in_progress_idx = std::nullopt;
38+
39+
std::function<bool(std::span<T>)> transmit_func;
40+
41+
DoubleBufferStream(std::function<bool(std::span<T>)> xmit_func)
42+
: transmit_func{xmit_func} {
43+
}
44+
45+
// Pushes to the inactive buffer and starts transmission if it's not already started
46+
bool transmit(std::span<T> word) {
47+
auto inactive_idx = 1 - in_progress_idx.value_or(1);
48+
auto &inactive_buf = tx_buffer[inactive_idx];
49+
50+
if (inactive_buf.available() >= word.size()) {
51+
for (auto d : word)
52+
inactive_buf.push_back(d);
53+
54+
pr_dbg("DBS::transmit: queued tx data in buffer %u\n", inactive_idx);
55+
56+
// Start a new transmission if one isn't in progress
57+
if (!in_progress_idx.has_value()) {
58+
pr_dbg("DBS::transmit: not tx in progress, so starting new tx\n");
59+
return start_tx(inactive_idx);
60+
}
61+
62+
// tx_done_callback() will transmit our data when it's called
63+
pr_dbg("DBS::transmit: tx is progress, so not starting new tx\n");
64+
return true;
65+
} else {
66+
return false;
67+
}
68+
}
69+
70+
void tx_done_callback() {
71+
if (!in_progress_idx.has_value()) {
72+
pr_dbg("MIDI Host internal error: tx_done_callback called but no buffer is in progress\n");
73+
return;
74+
}
75+
76+
tx_buffer[in_progress_idx.value()].clear();
77+
78+
pr_dbg("DBS::tx_done_callback: finished tx of buffer[%u]\n", in_progress_idx.value());
79+
80+
// Check if we should start transmitting the other buffer
81+
auto other_buffer = 1 - in_progress_idx.value();
82+
83+
if (tx_buffer[other_buffer].size()) {
84+
pr_dbg("DBS: tx_done_callback starting tx for %u\n", other_buffer);
85+
start_tx(other_buffer);
86+
} else {
87+
in_progress_idx = std::nullopt;
88+
}
89+
}
90+
91+
private:
92+
bool start_tx(unsigned idx) {
93+
in_progress_idx = idx;
94+
pr_dbg("DBS::start_tx: starting tx of buffer[%u]\n", idx);
95+
auto &active_buf = tx_buffer[idx];
96+
auto res = transmit_func({active_buf.begin(), active_buf.size()});
97+
return res;
98+
}
99+
};

0 commit comments

Comments
 (0)