Skip to content

Commit 6727ed4

Browse files
committed
Synchronized Stream
1 parent 0232386 commit 6727ed4

File tree

1 file changed

+79
-0
lines changed

1 file changed

+79
-0
lines changed
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
#pragma once
2+
#include "AudioTools/CoreAudio/Buffers.h"
3+
#include "LockGuard.h"
4+
#include "Mutex.h"
5+
#include "Stream.h"
6+
7+
namespace audio_tools {
8+
9+
/***
10+
* @brief Wrapper class that can turn any Stream into a thread save
11+
* implementation. This is done by adding a Mutex to the Stream. The
12+
* read and write operations are buffered and the access to the stream is
13+
* protected by the Mutex.
14+
* @ingroup streams
15+
* @ingroup concurrency
16+
* @author Phil Schatzmann
17+
*/
18+
19+
class SynchronizedStream : public Stream {
20+
public:
21+
SynchronizedStream(Stream &stream, MutexBase &mutex) {
22+
p_stream = &stream;
23+
p_mutex = &mutex;
24+
}
25+
26+
// reads a single value
27+
int read() override {
28+
if (read_buffer.isEmpty()) {
29+
LockGuard guard(p_mutex);
30+
p_stream->readBytes(read_buffer.address(), read_buffer.size());
31+
}
32+
return read_buffer.read();
33+
}
34+
35+
// peeks the actual entry from the buffer
36+
int peek() override {
37+
LockGuard guard(p_mutex);
38+
return p_stream->peek();
39+
}
40+
41+
// write add an entry to the buffer
42+
size_t write(uint8_t data) override {
43+
write_buffer.write(data);
44+
if (write_buffer.isFull()) {
45+
LockGuard guard(p_mutex);
46+
size_t written = p_stream->write((const uint8_t *)write_buffer.data(),
47+
write_buffer.size());
48+
assert(written == write_buffer.size());
49+
write_buffer.reset();
50+
}
51+
return 1;
52+
}
53+
54+
// provides the number of entries that are available to read
55+
int available() override {
56+
LockGuard guard(p_mutex);
57+
return p_stream->available();
58+
}
59+
60+
// provides the number of entries that are available to write
61+
int availableForWrite() override {
62+
LockGuard guard(p_mutex);
63+
return p_stream->availableForWrite();
64+
}
65+
66+
/// Defines the size of the internal buffers
67+
void setBufferSize(int size) {
68+
read_buffer.resize(size);
69+
write_buffer.resize(size);
70+
}
71+
72+
protected:
73+
Stream *p_stream = nullptr;
74+
MutexBase *p_mutex = nullptr;
75+
SingleBuffer<uint8_t> read_buffer;
76+
SingleBuffer<uint8_t> write_buffer;
77+
};
78+
79+
} // namespace audio_tools

0 commit comments

Comments
 (0)