Skip to content

Commit 37b40e8

Browse files
kuzin57Roman KuzinGazizonoki
authored
LOGBROKER-9686 Add limits to in flight bytes per partition (ydb-platform#33024)
Co-authored-by: Roman Kuzin <[email protected]> Co-authored-by: Bulat <[email protected]>
1 parent 1564235 commit 37b40e8

File tree

14 files changed

+608
-8
lines changed

14 files changed

+608
-8
lines changed
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
#include "inflight_limiter.h"
2+
#include <ydb/library/actors/core/log.h>
3+
4+
namespace NKikimr::NPQ {
5+
6+
TInFlightController::TInFlightController(ui64 MaxAllowedSize)
7+
: LayoutUnitSize(MaxAllowedSize / MAX_LAYOUT_COUNT)
8+
, TotalSize(0)
9+
, MaxAllowedSize(MaxAllowedSize)
10+
{
11+
if (MaxAllowedSize > 0 && LayoutUnitSize == 0) {
12+
LayoutUnitSize = 1;
13+
}
14+
}
15+
16+
bool TInFlightController::Add(ui64 Offset, ui64 Size) {
17+
if (MaxAllowedSize == 0) {
18+
// means that there are no limits were set
19+
return true;
20+
}
21+
22+
if (Size == 0) {
23+
return TotalSize < MaxAllowedSize;
24+
}
25+
26+
AFL_ENSURE(Layout.empty() || Offset > Layout.back());
27+
28+
if (TotalSize % LayoutUnitSize != 0) {
29+
AFL_ENSURE(!Layout.empty());
30+
Layout.back() = Offset;
31+
}
32+
33+
auto unitsBefore = (TotalSize + LayoutUnitSize - 1) / LayoutUnitSize;
34+
TotalSize += Size;
35+
auto unitsAfter = std::min(MAX_LAYOUT_COUNT, (TotalSize + LayoutUnitSize - 1) / LayoutUnitSize);
36+
for (auto currentUnits = unitsBefore; currentUnits < unitsAfter; currentUnits++) {
37+
Layout.push_back(Offset);
38+
}
39+
40+
AFL_ENSURE(!Layout.empty());
41+
Layout.back() = Offset;
42+
43+
return TotalSize < MaxAllowedSize;
44+
}
45+
46+
bool TInFlightController::Remove(ui64 Offset) {
47+
if (MaxAllowedSize == 0) {
48+
// means that there are no limits were set
49+
return true;
50+
}
51+
52+
for (auto it = Layout.begin(); it != Layout.end(); it = Layout.erase(it)) {
53+
if (*it >= Offset) {
54+
break;
55+
}
56+
57+
if (Layout.size() == 1) {
58+
TotalSize = 0;
59+
} else {
60+
AFL_ENSURE(TotalSize >= LayoutUnitSize);
61+
TotalSize -= LayoutUnitSize;
62+
}
63+
}
64+
65+
return TotalSize < MaxAllowedSize;
66+
}
67+
68+
bool TInFlightController::IsMemoryLimitReached() const {
69+
if (MaxAllowedSize == 0) {
70+
// means that there are no limits were set
71+
return false;
72+
}
73+
74+
return TotalSize >= MaxAllowedSize;
75+
}
76+
77+
} // namespace NKikimr::NPQ
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
#include <util/system/types.h>
2+
#include <deque>
3+
4+
namespace NKikimr::NPQ {
5+
6+
// This class is used to control in-flight data.
7+
// Contoller handles layout of data units, max units count is MAX_LAYOUT_COUNT constant.
8+
// Layout is a deque of offsets
9+
// Each item (associated with data unit) in the deque is max offset which intersects with this unit
10+
// For example, if we have 3 units:
11+
// Unit 1: [0, 100]
12+
// Unit 2: [100, 200]
13+
// Unit 3: [200, 300]
14+
// Offsets are:
15+
// 1 - Size 60
16+
// 2 - Size 60
17+
// 3 - Size 60
18+
// 4 - Size 80
19+
// Then the deque will be [2, 4, 4]
20+
struct TInFlightController {
21+
constexpr static ui64 MAX_LAYOUT_COUNT = 1024;
22+
23+
TInFlightController() = default;
24+
TInFlightController(ui64 MaxAllowedSize);
25+
26+
ui64 LayoutUnitSize = 0;
27+
std::deque<ui64> Layout;
28+
ui64 TotalSize = 0;
29+
ui64 MaxAllowedSize = 0;
30+
31+
// Adds an offset with size
32+
bool Add(ui64 Offset, ui64 Size);
33+
// Removes offsets <= given offset
34+
bool Remove(ui64 Offset);
35+
bool IsMemoryLimitReached() const;
36+
};
37+
38+
} // namespace NKikimr::NPQ

ydb/core/persqueue/public/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ LIBRARY()
22

33
SRCS(
44
config.cpp
5+
inflight_limiter.cpp
56
pq_database.cpp
67
pq_rl_helpers.cpp
78
utils.cpp

0 commit comments

Comments
 (0)