Skip to content

Commit 852ca94

Browse files
committed
Add addons
1 parent 98b8f4b commit 852ca94

File tree

2 files changed

+365
-0
lines changed

2 files changed

+365
-0
lines changed
Lines changed: 178 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,178 @@
1+
#pragma once
2+
3+
#include "kafka/Project.h"
4+
5+
#include "kafka/KafkaClient.h"
6+
#include "kafka/Types.h"
7+
8+
#include <deque>
9+
#include <vector>
10+
11+
namespace KAFKA_API {
12+
13+
template <typename T>
14+
class Heap
15+
{
16+
public:
17+
bool empty() const { return data.empty(); }
18+
std::size_t size() const { return data.size(); }
19+
20+
const T& front() const { return data[0]; }
21+
22+
void push(const T& t)
23+
{
24+
data.emplace_back(t);
25+
26+
for (std::size_t indexCurrent = data.size() - 1; indexCurrent > 0;)
27+
{
28+
std::size_t indexParent = (indexCurrent + 1) / 2 - 1;
29+
30+
if (!(data[indexCurrent] < data[indexParent])) return;
31+
32+
std::swap(data[indexCurrent], data[indexParent]);
33+
indexCurrent = indexParent;
34+
}
35+
}
36+
37+
void pop_front()
38+
{
39+
data[0] = data.back();
40+
data.pop_back();
41+
42+
if (data.empty()) return;
43+
44+
for (std::size_t indexCurrent = 0;;)
45+
{
46+
std::size_t indexRightChild = (indexCurrent + 1) * 2;
47+
std::size_t indexLeftChild = indexRightChild - 1;
48+
49+
if (indexLeftChild >= data.size()) return;
50+
51+
std::size_t indexMinChild = (indexRightChild >= data.size() || data[indexLeftChild] < data[indexRightChild]) ? indexLeftChild : indexRightChild;
52+
53+
if (!(data[indexMinChild] < data[indexCurrent])) return;
54+
55+
std::swap(data[indexCurrent], data[indexMinChild]);
56+
indexCurrent = indexMinChild;
57+
}
58+
}
59+
60+
private:
61+
std::vector<T> data;
62+
};
63+
64+
65+
/**
66+
* \brief The queue can be used to determine the right offset to commit.
67+
* A `KafkaManuallyCommitConsumer` might forward the received records to different handlers, while these handlers could not ack the records in order.
68+
* Then, the `UnorderedOffsetCommitQueue` would help,
69+
* 1. Prepare an `UnorderedOffsetCommitQueue` for each topic-partition.
70+
* 2. Make sure call `waitOffset()` for each record received.
71+
* 3. Make sure call `ackOffset()` while a handler acks for an record.
72+
* 4. Figure out whether there's offset to commit with `popOffsetToCommit()` and commit the offset then.
73+
*/
74+
class UnorderedOffsetCommitQueue
75+
{
76+
public:
77+
UnorderedOffsetCommitQueue(const Topic& topic, Partition partition)
78+
: _partitionInfo(std::string("topic[").append(topic).append("], paritition[").append(std::to_string(partition)).append("]"))
79+
{
80+
}
81+
UnorderedOffsetCommitQueue() = default;
82+
83+
/**
84+
* \brief Return how many received offsets have not been popped to commit (with `popOffsetToCommit()`).
85+
*/
86+
std::size_t size() const { return _offsetsReceived.size(); }
87+
88+
/**
89+
* \brief Add an offset (for a ConsumerRecord) to the waiting list, until it being acked (with `ackOffset`).
90+
* Note: Make sure the offset would be `ack` later with `ackOffset()`.
91+
*/
92+
void waitOffset(Offset offset)
93+
{
94+
if (offset < 0 || (!_offsetsReceived.empty() && offset <= _offsetsReceived.back()))
95+
{
96+
// Invalid offset (might be fetched from the record which had no valid offset)
97+
KAFKA_API_LOG(LOG_ERR, "Got invalid offset to wait[%lld]! %s", offset, (_partitionInfo.empty() ? "" : _partitionInfo.c_str()));
98+
return;
99+
}
100+
101+
_offsetsReceived.emplace_back(offset);
102+
}
103+
104+
/**
105+
* \brief Ack the record has been handled and ready to be committed.
106+
* Note: If all offsets ahead has been acked, then with `popOffsetToCommit()`, we'd get `offset + 1`, which is ready to be committed for the consumer.
107+
*/
108+
void ackOffset(Offset offset)
109+
{
110+
Offset maxOffsetReceived = _offsetsReceived.back();
111+
if (offset > maxOffsetReceived)
112+
{
113+
// Runtime error
114+
KAFKA_API_LOG(LOG_ERR, "Got invalid ack offset[%lld]! Even larger than all offsets received[%lld]! %s", offset, maxOffsetReceived, (_partitionInfo.empty() ? "" : _partitionInfo.c_str()));
115+
}
116+
117+
_offsetsToCommit.push(offset);
118+
do
119+
{
120+
Offset minOffsetToCommit = _offsetsToCommit.front();
121+
Offset expectedOffset = _offsetsReceived.front();
122+
if (minOffsetToCommit == expectedOffset)
123+
{
124+
_toCommit = expectedOffset + 1;
125+
_offsetsToCommit.pop_front();
126+
_offsetsReceived.pop_front();
127+
}
128+
else if (minOffsetToCommit < expectedOffset)
129+
{
130+
// Inconsist error (might be caused by duplicated ack)
131+
KAFKA_API_LOG(LOG_ERR, "Got invalid ack offset[%lld]! Even smaller than expected[%lld]! %s", minOffsetToCommit, expectedOffset, (_partitionInfo.empty() ? "" : _partitionInfo.c_str()));
132+
_offsetsToCommit.pop_front();
133+
}
134+
else
135+
{
136+
break;
137+
}
138+
} while (!_offsetsToCommit.empty());
139+
}
140+
141+
/**
142+
* \brief Pop the offset which is ready for the consumer (if any).
143+
*/
144+
Optional<Offset> popOffsetToCommit()
145+
{
146+
Optional<Offset> ret;
147+
if (_committed != _toCommit)
148+
{
149+
ret = _committed = _toCommit;
150+
}
151+
return ret;
152+
}
153+
154+
/**
155+
* \brief Return the offset last popped.
156+
*/
157+
Optional<Offset> lastPoppedOffset()
158+
{
159+
Optional<Offset> ret;
160+
if (_committed != INVALID_OFFSET)
161+
{
162+
ret = _committed;
163+
}
164+
return ret;
165+
}
166+
167+
private:
168+
std::deque<Offset> _offsetsReceived;
169+
Heap<Offset> _offsetsToCommit;
170+
Offset _toCommit = {INVALID_OFFSET};
171+
Offset _committed = {INVALID_OFFSET};
172+
std::string _partitionInfo;
173+
174+
static constexpr Offset INVALID_OFFSET = -1;
175+
};
176+
177+
} // end of KAFKA_API
178+
Lines changed: 187 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,187 @@
1+
#include "kafka/addons/UnorderedOffsetCommitQueue.h"
2+
3+
#include "gtest/gtest.h"
4+
5+
#include <algorithm>
6+
#include <chrono>
7+
#include <vector>
8+
9+
namespace Kafka = KAFKA_API;
10+
11+
TEST(UnorderedOffsetCommitQueue, Functionality)
12+
{
13+
Kafka::UnorderedOffsetCommitQueue queue;
14+
15+
// Suppose consumer received some records with a sigle `poll`, and forwarded them to several handlers
16+
queue.waitOffset(1);
17+
queue.waitOffset(2);
18+
queue.waitOffset(3);
19+
queue.waitOffset(4);
20+
queue.waitOffset(5);
21+
queue.waitOffset(6);
22+
queue.waitOffset(7);
23+
queue.waitOffset(8);
24+
queue.waitOffset(9);
25+
26+
// Suppose these handlers would ack these offsets occasionaly
27+
// And we'll check whether we could get the right offset to commit
28+
queue.ackOffset(3);
29+
EXPECT_FALSE(queue.popOffsetToCommit());
30+
EXPECT_FALSE(queue.lastPoppedOffset());
31+
32+
queue.ackOffset(2);
33+
EXPECT_FALSE(queue.popOffsetToCommit());
34+
EXPECT_FALSE(queue.lastPoppedOffset());
35+
36+
queue.ackOffset(5);
37+
EXPECT_FALSE(queue.popOffsetToCommit());
38+
EXPECT_FALSE(queue.lastPoppedOffset());
39+
40+
queue.ackOffset(1);
41+
auto offset = queue.popOffsetToCommit();
42+
EXPECT_EQ(*offset, queue.lastPoppedOffset());
43+
EXPECT_EQ(3 + 1, *offset);
44+
45+
// No new offset to commit
46+
offset = queue.popOffsetToCommit();
47+
EXPECT_FALSE(offset);
48+
49+
queue.ackOffset(4);
50+
offset = queue.popOffsetToCommit();
51+
EXPECT_EQ(5 + 1, *offset);
52+
53+
queue.ackOffset(7);
54+
offset = queue.popOffsetToCommit();
55+
EXPECT_FALSE(offset);
56+
57+
queue.ackOffset(6);
58+
offset = queue.popOffsetToCommit();
59+
EXPECT_EQ(*offset, *queue.lastPoppedOffset());
60+
EXPECT_EQ(7 + 1, *offset);
61+
62+
queue.ackOffset(8);
63+
offset = queue.popOffsetToCommit();
64+
EXPECT_EQ(8 + 1, *offset);
65+
66+
queue.ackOffset(9);
67+
offset = queue.popOffsetToCommit();
68+
EXPECT_EQ(9 + 1, *offset);
69+
70+
// No more records to commit
71+
offset = queue.popOffsetToCommit();
72+
EXPECT_FALSE(offset);
73+
}
74+
75+
TEST(UnorderedOffsetCommitQueue, AbnormalCases)
76+
{
77+
Kafka::UnorderedOffsetCommitQueue queue("some-topic", 2);
78+
79+
queue.waitOffset(1);
80+
queue.waitOffset(2);
81+
// duplicated offset
82+
queue.waitOffset(2);
83+
// invalid offset
84+
queue.waitOffset(-1);
85+
queue.waitOffset(3);
86+
queue.waitOffset(4);
87+
queue.waitOffset(5);
88+
89+
queue.ackOffset(3);
90+
auto offset = queue.popOffsetToCommit();
91+
EXPECT_FALSE(offset);
92+
93+
queue.ackOffset(2);
94+
offset = queue.popOffsetToCommit();
95+
EXPECT_FALSE(offset);
96+
97+
queue.ackOffset(1);
98+
offset = queue.popOffsetToCommit();
99+
EXPECT_EQ(3 + 1, *offset);
100+
101+
// ack an offset even smaller than expected
102+
queue.ackOffset(2);
103+
offset = queue.popOffsetToCommit();
104+
EXPECT_FALSE(offset);
105+
106+
// ack an offset even smaller than expected
107+
queue.ackOffset(6);
108+
offset = queue.popOffsetToCommit();
109+
EXPECT_FALSE(offset);
110+
111+
queue.ackOffset(4);
112+
offset = queue.popOffsetToCommit();
113+
EXPECT_EQ(4 + 1, *offset);
114+
115+
// Now only 1 offset left un-popped
116+
EXPECT_EQ(1, queue.size());
117+
}
118+
119+
120+
namespace {
121+
122+
auto checkTimeMsConsumedToSortOffsets(std::size_t testNum, std::size_t step)
123+
{
124+
Kafka::UnorderedOffsetCommitQueue queue;
125+
126+
std::vector<Kafka::Offset> waitSequence(testNum);
127+
for (std::size_t i = 0 ; i < testNum; ++i)
128+
{
129+
waitSequence[i] = i;
130+
}
131+
132+
std::vector<Kafka::Offset> ackSequence = waitSequence;
133+
std::random_device rd;
134+
std::mt19937 g(rd());
135+
for (std::size_t iBegin = 0; iBegin < ackSequence.size(); iBegin += step)
136+
{
137+
std::size_t iEnd = std::min(iBegin + step, ackSequence.size());
138+
std::shuffle(ackSequence.begin() + iBegin, ackSequence.begin() + iEnd, g);
139+
}
140+
141+
using namespace std::chrono;
142+
auto timestampBegin = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
143+
144+
std::size_t indexWait = 0;
145+
std::size_t indexAck = 0;
146+
while (indexAck < testNum)
147+
{
148+
for (std::size_t i = 0; i < step && indexWait < testNum; ++i)
149+
{
150+
queue.waitOffset(waitSequence[indexWait++]);
151+
}
152+
153+
for (std::size_t i = 0; i < step && indexAck < testNum; ++i)
154+
{
155+
queue.ackOffset(ackSequence[indexAck++]);
156+
}
157+
}
158+
159+
auto timestampEnd = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
160+
161+
// All offsets have been acked
162+
EXPECT_EQ(testNum, *queue.popOffsetToCommit());
163+
164+
return (timestampEnd - timestampBegin);
165+
}
166+
167+
} // end of namespace
168+
169+
TEST(UnorderedOffsetCommitQueue, CheckPerf)
170+
{
171+
std::size_t testNum = 1000000;
172+
std::size_t step = 100;
173+
std::cout << "Took " << checkTimeMsConsumedToSortOffsets(testNum, step) << " ms to sort " << testNum << " offsets (with step:" << step << ")." << std::endl;
174+
175+
testNum = 1000000;
176+
step = 1000;
177+
std::cout << "Took " << checkTimeMsConsumedToSortOffsets(testNum, step) << " ms to sort " << testNum << " offsets (with step:" << step << ")." << std::endl;
178+
179+
testNum = 1000000;
180+
step = 10000;
181+
std::cout << "Took " << checkTimeMsConsumedToSortOffsets(testNum, step) << " ms to sort " << testNum << " offsets (with step:" << step << ")." << std::endl;
182+
183+
testNum = 1000000;
184+
step = 100000;
185+
std::cout << "Took " << checkTimeMsConsumedToSortOffsets(testNum, step) << " ms to sort " << testNum << " offsets (with step:" << step << ")." << std::endl;
186+
}
187+

0 commit comments

Comments
 (0)