Skip to content

Commit eae7c37

Browse files
committed
queue: Add occupancy histogram
1 parent d54463c commit eae7c37

File tree

4 files changed

+184
-6
lines changed

4 files changed

+184
-6
lines changed
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
# Copyright (c) 2016-2019, Nefeli Networks, Inc.
2+
# All rights reserved.
3+
#
4+
# Redistribution and use in source and binary forms, with or without
5+
# modification, are permitted provided that the following conditions are met:
6+
#
7+
# * Redistributions of source code must retain the above copyright notice, this
8+
# list of conditions and the following disclaimer.
9+
#
10+
# * Redistributions in binary form must reproduce the above copyright notice,
11+
# this list of conditions and the following disclaimer in the documentation
12+
# and/or other materials provided with the distribution.
13+
#
14+
# * Neither the names of the copyright holders nor the names of their
15+
# contributors may be used to endorse or promote products derived from this
16+
# software without specific prior written permission.
17+
#
18+
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19+
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20+
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21+
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
22+
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
23+
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
24+
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
25+
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
26+
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
27+
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28+
# POSSIBILITY OF SUCH DAMAGE.
29+
30+
from test_utils import *
31+
32+
33+
class BessQueueOccupancyTest(BessModuleTestCase):
34+
def _send_packets(self, q):
35+
eth = scapy.Ether(src='02:1e:67:9f:4d:ae', dst='06:16:3e:1b:72:32')
36+
ip = scapy.IP(src='172.16.0.2', dst='8.8.8.8')
37+
tcp = scapy.TCP(sport=52428, dport=80)
38+
l7 = 'helloworld'
39+
pkt = eth / ip / tcp / l7
40+
41+
pkts = [pkt] * 100
42+
_ = self.run_module(q, 0, pkts, [0])
43+
return len(pkts)
44+
45+
def test_hist_enable(self):
46+
q = Queue(size=1024, track_occupancy=True)
47+
sent = self._send_packets(q)
48+
resp = q.get_status()
49+
self.assertEqual(resp.enqueued, sent)
50+
self.assertEqual(resp.dequeued, sent)
51+
self.assertEqual(resp.occupancy_summary.count, sent)
52+
53+
def test_hist_disable(self):
54+
q = Queue(size=1024, track_occupancy=False)
55+
sent = self._send_packets(q)
56+
resp = q.get_status()
57+
self.assertEqual(resp.enqueued, sent)
58+
self.assertEqual(resp.dequeued, sent)
59+
self.assertEqual(resp.occupancy_summary.count, 0)
60+
61+
def test_hist_size(self):
62+
q = Queue(size=1024, track_occupancy=True)
63+
resp = q.get_status()
64+
self.assertEqual(resp.size, 1024)
65+
self.assertEqual(resp.occupancy_summary.num_buckets, 32)
66+
self.assertEqual(resp.occupancy_summary.bucket_width, 32)
67+
68+
q.set_size(size=2048)
69+
resp = q.get_status()
70+
self.assertEqual(resp.size, 2048)
71+
self.assertEqual(resp.occupancy_summary.num_buckets, 32)
72+
self.assertEqual(resp.occupancy_summary.bucket_width, 64)
73+
74+
q = Queue(size=1024, track_occupancy=True, occupancy_hist_buckets=64)
75+
resp = q.get_status()
76+
self.assertEqual(resp.size, 1024)
77+
self.assertEqual(resp.occupancy_summary.num_buckets, 64)
78+
self.assertEqual(resp.occupancy_summary.bucket_width, 16)
79+
80+
def test_hist_summary(self):
81+
q = Queue(size=1024, track_occupancy=True)
82+
sent = self._send_packets(q)
83+
84+
resp = q.get_status(occupancy_percentiles=[0.5, 0.9, 0.99])
85+
self.assertEqual(resp.occupancy_summary.count, 100)
86+
self.assertEqual(len(resp.occupancy_summary.percentile_values), 3)
87+
88+
resp = q.get_status(occupancy_percentiles=[0, 0.5, 0.9, 0.99])
89+
self.assertEqual(resp.occupancy_summary.count, 100)
90+
self.assertEqual(len(resp.occupancy_summary.percentile_values), 4)
91+
92+
resp = q.get_status(clear_hist=True)
93+
self.assertEqual(resp.occupancy_summary.count, 100)
94+
95+
resp = q.get_status()
96+
self.assertEqual(resp.occupancy_summary.count, 0)
97+
98+
99+
suite = unittest.TestLoader().loadTestsFromTestCase(BessQueueOccupancyTest)
100+
results = unittest.TextTestRunner(verbosity=2, stream=sys.stdout).run(suite)
101+
102+
if results.failures or results.errors:
103+
sys.exit(1)

core/modules/queue.cc

Lines changed: 57 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,6 @@
3434

3535
#include "../utils/format.h"
3636

37-
#define DEFAULT_QUEUE_SIZE 1024
38-
3937
const Commands Queue::cmds = {
4038
{"set_burst", "QueueCommandSetBurstArg",
4139
MODULE_CMD_FUNC(&Queue::CommandSetBurst), Command::THREAD_SAFE},
@@ -79,6 +77,10 @@ int Queue::Resize(int slots) {
7977
queue_ = new_queue;
8078
size_ = slots;
8179

80+
if (track_occupancy_) {
81+
occupancy_hist_.Resize(occupancy_buckets_, slots / occupancy_buckets_);
82+
}
83+
8284
if (backpressure_) {
8385
AdjustWaterLevels();
8486
}
@@ -97,6 +99,15 @@ CommandResponse Queue::Init(const bess::pb::QueueArg &arg) {
9799

98100
burst_ = bess::PacketBatch::kMaxBurst;
99101

102+
if (arg.track_occupancy()) {
103+
track_occupancy_ = true;
104+
occupancy_buckets_ = kDefaultBuckets;
105+
if (arg.occupancy_hist_buckets() != 0) {
106+
occupancy_buckets_ = arg.occupancy_hist_buckets();
107+
}
108+
VLOG(1) << "Occupancy tracking enabled for " << name() << "::Queue (" << occupancy_buckets_ << " buckets)";
109+
}
110+
100111
if (arg.backpressure()) {
101112
VLOG(1) << "Backpressure enabled for " << name() << "::Queue";
102113
backpressure_ = true;
@@ -191,7 +202,19 @@ struct task_result Queue::RunTask(Context *ctx, bess::PacketBatch *batch,
191202

192203
RunNextModule(ctx, batch);
193204

194-
if (backpressure_ && llring_count(queue_) < low_water_) {
205+
uint32_t occupancy;
206+
if (track_occupancy_ || backpressure_) {
207+
occupancy = llring_count(queue_);
208+
}
209+
210+
if (track_occupancy_) {
211+
mcslock_node_t mynode;
212+
mcs_lock(&lock_, &mynode);
213+
occupancy_hist_.Insert(occupancy);
214+
mcs_unlock(&lock_, &mynode);
215+
}
216+
217+
if (backpressure_ && occupancy < low_water_) {
195218
SignalUnderload();
196219
}
197220

@@ -236,16 +259,46 @@ CommandResponse Queue::CommandSetSize(
236259
}
237260

238261
CommandResponse Queue::CommandGetStatus(
239-
const bess::pb::QueueCommandGetStatusArg &) {
262+
const bess::pb::QueueCommandGetStatusArg &arg) {
240263
bess::pb::QueueCommandGetStatusResponse resp;
264+
265+
std::vector<double> occupancy_percentiles;
266+
std::copy(arg.occupancy_percentiles().begin(), arg.occupancy_percentiles().end(),
267+
back_inserter(occupancy_percentiles));
268+
if (!IsValidPercentiles(occupancy_percentiles)) {
269+
return CommandFailure(EINVAL, "invalid 'occupancy_percentiles'");
270+
}
271+
const auto &occupancy_summary = occupancy_hist_.Summarize(occupancy_percentiles);
272+
241273
resp.set_count(llring_count(queue_));
242274
resp.set_size(size_);
243275
resp.set_enqueued(stats_.enqueued);
244276
resp.set_dequeued(stats_.dequeued);
245277
resp.set_dropped(stats_.dropped);
278+
SetHistogram(resp.mutable_occupancy_summary(), occupancy_summary);
279+
280+
if (arg.clear_hist()) {
281+
// Note that some samples might be lost due to the small gap between
282+
// Summarize() and the next mcs_lock... but we posit that smaller
283+
// critical section is more important.
284+
ClearOccupancyHist();
285+
}
286+
246287
return CommandSuccess(resp);
247288
}
248289

290+
void Queue::ClearOccupancyHist() {
291+
// vector initialization is expensive thus should be out of critical section
292+
decltype(occupancy_hist_) new_occupancy_hist(occupancy_hist_.num_buckets(),
293+
occupancy_hist_.bucket_width());
294+
295+
// Use move semantics to minimize critical section
296+
mcslock_node_t mynode;
297+
mcs_lock(&lock_, &mynode);
298+
occupancy_hist_ = std::move(new_occupancy_hist);
299+
mcs_unlock(&lock_, &mynode);
300+
}
301+
249302
void Queue::AdjustWaterLevels() {
250303
high_water_ = static_cast<uint64_t>(size_ * kHighWaterRatio);
251304
low_water_ = static_cast<uint64_t>(size_ * kLowWaterRatio);

core/modules/queue.h

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,10 @@
3434
#include "../kmod/llring.h"
3535
#include "../module.h"
3636
#include "../pb/module_msg.pb.h"
37+
#include "../utils/histogram.h"
38+
#include "../utils/mcslock.h"
39+
40+
#define DEFAULT_QUEUE_SIZE 1024
3741

3842
class Queue : public Module {
3943
public:
@@ -48,7 +52,9 @@ class Queue : public Module {
4852
size_(),
4953
high_water_(),
5054
low_water_(),
51-
stats_() {
55+
stats_(),
56+
track_occupancy_(),
57+
occupancy_hist_(kDefaultBuckets, kDefaultBucketWidth) {
5258
is_task_ = true;
5359
propagate_workers_ = false;
5460
max_allowed_workers_ = Worker::kMaxWorkers;
@@ -77,6 +83,8 @@ class Queue : public Module {
7783

7884
int Resize(int slots);
7985

86+
void ClearOccupancyHist();
87+
8088
// Readjusts the water level according to `size_`.
8189
void AdjustWaterLevels();
8290

@@ -105,6 +113,14 @@ class Queue : public Module {
105113
uint64_t dequeued;
106114
uint64_t dropped;
107115
} stats_;
116+
117+
// Queue occupancy statistics
118+
const uint64_t kDefaultBuckets = 32;
119+
const uint64_t kDefaultBucketWidth = DEFAULT_QUEUE_SIZE / kDefaultBuckets;
120+
bool track_occupancy_;
121+
uint64_t occupancy_buckets_;
122+
Histogram<uint64_t> occupancy_hist_;
123+
mcslock lock_;
108124
};
109125

110126
#endif // BESS_MODULES_QUEUE_H_

protobuf/module_msg.proto

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -320,7 +320,10 @@ message QueueCommandSetSizeArg {
320320
* Modules that are queues or contain queues may contain functions
321321
* `get_status()` that return QueueCommandGetStatusResponse.
322322
*/
323-
message QueueCommandGetStatusArg {}
323+
message QueueCommandGetStatusArg {
324+
bool clear_hist = 1; /// if true, occupancy histogram will be all cleared after read
325+
repeated double occupancy_percentiles = 2; /// ascending list of real numbers in [0.0, 100.0]
326+
}
324327

325328
/**
326329
* Modules that are queues or contain queues may contain functions
@@ -333,6 +336,7 @@ message QueueCommandGetStatusResponse {
333336
uint64 enqueued = 3; /// total enqueued
334337
uint64 dequeued = 4; /// total dequeued
335338
uint64 dropped = 5; /// total dropped
339+
HistogramSummary occupancy_summary = 6; /// Valid only if queue created with track_occupancy
336340
}
337341

338342
/**
@@ -797,6 +801,8 @@ message QueueArg {
797801
uint64 size = 1; /// The maximum number of packets to store in the queue.
798802
bool prefetch = 2; /// When prefetch is enabled, the module will perform CPU prefetch on the first 64B of each packet onto CPU L1 cache. Default value is false.
799803
bool backpressure = 3; // When backpressure is enabled, the module will notify upstream if it is overloaded.
804+
bool track_occupancy = 4; // When occupancy tracking is enabled, the module will keep a histogram of queue occupancies (observations recorded after each dequeue).
805+
uint64 occupancy_hist_buckets = 5; // The number of buckets to use in the histogram when occupancy tracking is enabled.
800806
}
801807

802808
/**

0 commit comments

Comments
 (0)