Skip to content

Commit d66369f

Browse files
authored
Merge pull request #1105 from slaclab/writer-bandwidth
Add bandwidth tracking and display to StreamWriter and PyDM DataWriter
2 parents db8f437 + ed70f8f commit d66369f

File tree

5 files changed

+96
-1
lines changed

5 files changed

+96
-1
lines changed

include/rogue/utilities/fileio/StreamWriter.h

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,12 +36,15 @@
3636

3737
#include <stdint.h>
3838

39+
#include <chrono>
3940
#include <condition_variable>
41+
#include <deque>
4042
#include <map>
4143
#include <memory>
4244
#include <mutex>
4345
#include <string>
4446
#include <thread>
47+
#include <utility>
4548

4649
#include "rogue/EnableSharedFromThis.h"
4750
#include "rogue/Logging.h"
@@ -100,6 +103,12 @@ class StreamWriter : public rogue::EnableSharedFromThis<rogue::utilities::fileio
100103
//! Total number of frames in file
101104
uint32_t frameCount_;
102105

106+
//! Bytes recorded in the last one second window
107+
uint64_t bandwidthBytes_;
108+
109+
//! Time tagged byte counts for bandwidth calculation
110+
std::deque<std::pair<std::chrono::steady_clock::time_point, uint32_t>> bandwidthHistory_;
111+
103112
//! Internal method for file writing
104113
void intWrite(void* data, uint32_t size);
105114

@@ -109,6 +118,12 @@ class StreamWriter : public rogue::EnableSharedFromThis<rogue::utilities::fileio
109118
//! Flush file
110119
void flush();
111120

121+
//! Update bandwidth accounting
122+
void recordBandwidth(uint32_t size);
123+
124+
//! Remove stale bandwidth samples
125+
void pruneBandwidth(std::chrono::steady_clock::time_point now);
126+
112127
//! Write raw data
113128
bool raw_;
114129

@@ -166,6 +181,9 @@ class StreamWriter : public rogue::EnableSharedFromThis<rogue::utilities::fileio
166181
//! Get current file size
167182
uint64_t getCurrentSize();
168183

184+
//! Get instantaneous bandwidth in bytes per second
185+
double getBandwidth();
186+
169187
//! Get current frame count
170188
uint32_t getFrameCount();
171189

python/pyrogue/_DataWriter.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,17 @@ def __init__(self, *, hidden=True, bufferSize=0, maxFileSize=0, **kwargs):
9898
localGet=self._getTotalSize,
9999
description='Total bytes written.'))
100100

101+
self.add(pr.LocalVariable(
102+
name='Bandwidth',
103+
mode='RO',
104+
value=0.0,
105+
typeStr='Float64',
106+
pollInterval=1,
107+
localGet=self._getBandwidth,
108+
units='Bytes/sec',
109+
disp='{:,.3f}',
110+
description='Instantaneous write bandwidth in bytes per second.'))
111+
101112
self.add(pr.LocalVariable(
102113
name='FrameCount',
103114
mode='RO',
@@ -162,6 +173,10 @@ def _getTotalSize(self):
162173
"""get total file size. Override in sub-class"""
163174
return 0
164175

176+
def _getBandwidth(self, dev=None, var=None):
177+
"""get instantaneous bandwidth. Override in sub-class"""
178+
return 0.0
179+
165180
def _getFrameCount(self):
166181
"""get current file frame count. Override in sub-class"""
167182
return 0

python/pyrogue/pydm/widgets/data_writer.py

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
from pyrogue.pydm.widgets import PyRogueLineEdit
1919
from pyrogue.pydm.data_plugins.rogue_plugin import nodeFromAddress
2020
from qtpy.QtCore import Qt, Slot
21-
from qtpy.QtWidgets import QVBoxLayout, QHBoxLayout, QPushButton
21+
from qtpy.QtWidgets import QVBoxLayout, QHBoxLayout, QPushButton, QWidget, QLabel
2222
from qtpy.QtWidgets import QFormLayout, QGroupBox, QFileDialog
2323
import datetime
2424

@@ -114,6 +114,24 @@ def connection_changed(self, connected):
114114
w.alarmSensitiveBorder = True
115115
fl.addRow('Frame Count:',w)
116116

117+
bwWidget = QWidget()
118+
bwLayout = QHBoxLayout()
119+
bwLayout.setContentsMargins(0,0,0,0)
120+
bwLayout.setSpacing(4)
121+
bwWidget.setLayout(bwLayout)
122+
123+
w = PyDMLabel(parent=None, init_channel=self._path + '.Bandwidth/disp')
124+
w.alarmSensitiveContent = False
125+
w.alarmSensitiveBorder = True
126+
bwLayout.addWidget(w)
127+
128+
units = QLabel('Bytes/sec')
129+
units.setAlignment(Qt.AlignLeft | Qt.AlignVCenter)
130+
bwLayout.addWidget(units)
131+
bwLayout.setAlignment(Qt.AlignLeft | Qt.AlignVCenter)
132+
133+
fl.addRow('Bandwidth:',bwWidget)
134+
117135
w = PyDMLabel(parent=None, init_channel=self._path + '.TotalSize/disp')
118136
w.alarmSensitiveContent = False
119137
w.alarmSensitiveBorder = True

python/pyrogue/utilities/fileio/_StreamWriter.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,9 @@ def _getTotalSize(self,dev,var):
7272

7373
return self._writer.getTotalSize()
7474

75+
def _getBandwidth(self,dev,var):
76+
return self._writer.getBandwidth()
77+
7578
def _getFrameCount(self,dev,var):
7679
return self._writer.getFrameCount()
7780

src/rogue/utilities/fileio/StreamWriter.cpp

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232

3333
#include "rogue/utilities/fileio/StreamWriter.h"
3434

35+
#include <chrono>
3536
#include <fcntl.h>
3637
#include <inttypes.h>
3738
#include <stdint.h>
@@ -79,6 +80,7 @@ void ruf::StreamWriter::setup_python() {
7980
.def("getChannel", &ruf::StreamWriter::getChannel)
8081
.def("getTotalSize", &ruf::StreamWriter::getTotalSize)
8182
.def("getCurrentSize", &ruf::StreamWriter::getCurrentSize)
83+
.def("getBandwidth", &ruf::StreamWriter::getBandwidth)
8284
.def("getFrameCount", &ruf::StreamWriter::getFrameCount)
8385
.def("waitFrameCount", &ruf::StreamWriter::waitFrameCount);
8486
#endif
@@ -94,6 +96,7 @@ ruf::StreamWriter::StreamWriter() {
9496
totSize_ = 0;
9597
buffer_ = NULL;
9698
frameCount_ = 0;
99+
bandwidthBytes_ = 0;
97100
currBuffer_ = 0;
98101
dropErrors_ = false;
99102
isOpen_ = false;
@@ -134,6 +137,8 @@ void ruf::StreamWriter::open(std::string file) {
134137
totSize_ = 0;
135138
currSize_ = 0;
136139
frameCount_ = 0;
140+
bandwidthBytes_ = 0;
141+
bandwidthHistory_.clear();
137142
currBuffer_ = 0;
138143

139144
// Iterate over all channels and reset their frame counts
@@ -150,6 +155,8 @@ void ruf::StreamWriter::close() {
150155
std::lock_guard<std::mutex> lock(mtx_);
151156
isOpen_ = false;
152157
flush();
158+
bandwidthBytes_ = 0;
159+
bandwidthHistory_.clear();
153160
if (fd_ >= 0) ::close(fd_);
154161
fd_ = -1;
155162
}
@@ -231,6 +238,22 @@ uint64_t ruf::StreamWriter::getCurrentSize() {
231238
return (currSize_ + currBuffer_);
232239
}
233240

241+
//! Get instantaneous bandwidth in bytes per second over the last second
242+
double ruf::StreamWriter::getBandwidth() {
243+
rogue::GilRelease noGil;
244+
std::lock_guard<std::mutex> lock(mtx_);
245+
246+
auto now = std::chrono::steady_clock::now();
247+
pruneBandwidth(now);
248+
249+
if (bandwidthHistory_.empty()) return (0.0);
250+
251+
auto windowStart = bandwidthHistory_.front().first;
252+
double seconds = std::chrono::duration<double>(now - windowStart).count();
253+
if (seconds <= 0.0) return (static_cast<double>(bandwidthBytes_));
254+
return (static_cast<double>(bandwidthBytes_) / seconds);
255+
}
256+
234257
//! Get current frame count
235258
uint32_t ruf::StreamWriter::getFrameCount() {
236259
return (frameCount_);
@@ -266,6 +289,22 @@ bool ruf::StreamWriter::waitFrameCount(uint32_t count, uint64_t timeout) {
266289
return (frameCount_ >= count);
267290
}
268291

292+
void ruf::StreamWriter::pruneBandwidth(std::chrono::steady_clock::time_point now) {
293+
auto cutoff = now - std::chrono::seconds(1);
294+
while (!bandwidthHistory_.empty() && bandwidthHistory_.front().first < cutoff) {
295+
bandwidthBytes_ -= bandwidthHistory_.front().second;
296+
bandwidthHistory_.pop_front();
297+
}
298+
}
299+
300+
void ruf::StreamWriter::recordBandwidth(uint32_t size) {
301+
if (size == 0) return;
302+
auto now = std::chrono::steady_clock::now();
303+
pruneBandwidth(now);
304+
bandwidthHistory_.emplace_back(now, size);
305+
bandwidthBytes_ += size;
306+
}
307+
269308
//! Write data to file. Called from StreamWriterChannel
270309
void ruf::StreamWriter::writeFile(uint8_t channel, std::shared_ptr<rogue::interfaces::stream::Frame> frame) {
271310
ris::Frame::BufferIterator it;
@@ -333,6 +372,8 @@ void ruf::StreamWriter::intWrite(void* data, uint32_t size) {
333372
std::memcpy(buffer_ + currBuffer_, data, size);
334373
currBuffer_ += size;
335374
}
375+
376+
recordBandwidth(size);
336377
}
337378

338379
//! Check file size for next write

0 commit comments

Comments
 (0)