Skip to content

Commit 71fa836

Browse files
committed
Improve consumer.subscribe(...)
1 parent f4e2e6a commit 71fa836

File tree

10 files changed

+135
-77
lines changed

10 files changed

+135
-77
lines changed

.github/workflows/kafka_api_ci_tests.yml

Lines changed: 42 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
name: "[Kafka API] CI Tests"
1+
name: '[Kafka API] CI Tests'
22

33
on: [push, pull_request]
44

@@ -20,40 +20,49 @@ jobs:
2020
CHECK_OPTION: ${{ matrix.check-option }}
2121
GENERATE_DOC: ${{ matrix.generate-doc }}
2222
WITH_INSTALLATION: ${{ matrix.with-installation }}
23+
TEST_LABELS: ${{ matrix.test-labels }}
2324

2425
strategy:
2526
matrix:
2627
include:
2728
- os: macos-10.15
2829
build-cxx: clang++
30+
test-labels: UT|IT
2931

3032
- os: ubuntu-20.04
3133
build-cxx: g++
3234
build-type: Debug
35+
test-labels: UT|IT
3336

3437
- os: ubuntu-20.04
3538
build-cxx: g++
3639
build-type: Release
40+
test-labels: RT
3741

3842
- os: ubuntu-20.04
3943
build-cxx: g++
4044
build-type: Release
4145
cxx-standard: 14
46+
test-labels: UT|IT
4247

4348
- os: ubuntu-20.04
4449
build-cxx: g++
4550
check-option: asan
51+
test-labels: UT|IT
4652

4753
- os: ubuntu-18.04
4854
build-cxx: g++
4955
check-option: tsan
56+
test-labels: UT|IT
5057

5158
- os: ubuntu-20.04
5259
build-cxx: g++
5360
check-option: ubsan
61+
test-labels: UT|IT
5462

5563
- os: ubuntu-20.04
5664
build-cxx: clang++
65+
test-labels: UT|IT
5766
generate-doc: true
5867
with-installation: true
5968

@@ -63,17 +72,19 @@ jobs:
6372

6473
- os: ubuntu-18.04
6574
build-cxx: g++
75+
test-labels: UT|IT
6676

6777
- os: ubuntu-18.04
6878
build-cxx: clang++
79+
test-labels: RT
6980

7081
steps:
7182
- uses: actions/checkout@v2
7283

7384
- name: Update Repo
7485
run: |
75-
if [[ ${OS_VERSION} == "ubuntu"* ]]; then
76-
echo "about to update repo"
86+
if [[ ${OS_VERSION} == 'ubuntu'* ]]; then
87+
echo 'about to update repo'
7788
sed -e 's/azure.archive.ubuntu.com/us.archive.ubuntu.com/g' -e t -e d /etc/apt/sources.list | sudo tee /etc/apt/sources.list.d/nonazure.list
7889
sudo apt-get update
7990
fi
@@ -85,51 +96,62 @@ jobs:
8596
cd ${BUILD_SUB_DIR}
8697
8798
# 1. Install cmake
88-
if [ ${OS_VERSION} == "ubuntu-18.04" ]; then
99+
if [ ${OS_VERSION} == 'ubuntu-18.04' ]; then
89100
sudo snap install cmake --classic
90101
export PATH=/snap/bin:$PATH
91102
fi
92103
93-
# 2. Install clang-tidy
94-
if [[ ${CHECK_OPTION} == *"clang-tidy"* ]]; then
104+
# 2. Install clang/clang-tidy
105+
if [[ ${BUILD_CXX} == 'clang'* ]] && [[ ${OS_VERSION} == 'ubuntu'* ]]; then
106+
sudo rm /usr/bin/clang /usr/bin/clang++
107+
sudo apt install -y clang-10
108+
sudo ln -s clang-10 /usr/bin/clang
109+
sudo ln -s clang++-10 /usr/bin/clang++
110+
fi
111+
if [[ ${CHECK_OPTION} == 'clang-tidy' ]]; then
95112
sudo apt install -y clang-tidy
96113
fi
97114
98-
# 2. Install googletest (v1.10.0)
115+
# 3. Install googletest (v1.10.0)
99116
wget -nv https://github.com/google/googletest/archive/release-1.10.0.tar.gz
100117
tar -xzf release-1.10.0.tar.gz
101118
cd googletest-release-1.10.0
102119
env CXX=${BUILD_CXX} cmake ./
103120
make -j${CPU_CORE_NUM} && sudo make install
104121
cd ../
105122
106-
# 3. Install boost lib
107-
if [[ ${OS_VERSION} == "ubuntu"* ]]; then
123+
# 4. Install boost lib
124+
if [[ ${OS_VERSION} == 'ubuntu'* ]]; then
108125
sudo apt install -y libboost-all-dev
109-
elif [[ ${OS_VERSION} == "macos"* ]]; then
126+
elif [[ ${OS_VERSION} == 'macos'* ]]; then
110127
/bin/bash -c "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/HEAD/install.sh)"
111128
brew install boost
112129
fi
113130
114-
# 4. Install librdkafka
131+
# 5. Install librdkafka
115132
wget -nv https://github.com/edenhill/librdkafka/archive/v${LIBRDKAFKA_VERSION}.tar.gz
116133
tar -xzf v${LIBRDKAFKA_VERSION}.tar.gz
117134
cd librdkafka-${LIBRDKAFKA_VERSION}
118-
./configure
135+
./configure --cxx=${BUILD_CXX}
119136
make -j${CPU_CORE_NUM} && sudo make install
120137
cd ../
121138
122-
# 5. Install kafka
139+
# 6. Install kafka
123140
wget -nv ${KAFKA_SRC_LINK}
124141
tar -xzf `basename ${KAFKA_SRC_LINK}`
125142
126-
# 6. Install tools to generate document
143+
# 7. Install tools to generate document
127144
if [ ${GENERATE_DOC} ]; then
128145
sudo apt install -y python3-pip
129146
sudo pip3 install markdown
130147
sudo apt install -y doxygen
131148
fi
132149
150+
# Print dev env
151+
cmake --version | head -n 1
152+
g++ --version | head -n 1
153+
clang++ --version | head -n 1
154+
133155
- name: Config
134156
run: |
135157
cd ${BUILD_SUB_DIR}
@@ -144,19 +166,19 @@ jobs:
144166
export CMAKE_BUILD_TYPE=""
145167
fi
146168
147-
if [[ ${CHECK_OPTION} == *"clang-tidy"* ]]; then
169+
if [[ ${CHECK_OPTION} == 'clang-tidy' ]]; then
148170
export BUILD_OPTION='-DBUILD_OPTION_CLANG_TIDY=ON'
149171
fi
150172
151-
if [[ ${CHECK_OPTION} == *"asan"* ]]; then
173+
if [[ ${CHECK_OPTION} == 'asan' ]]; then
152174
export BUILD_OPTION="${BUILD_OPTION} -DBUILD_OPTION_USE_ASAN=ON"
153175
fi
154176
155-
if [[ ${CHECK_OPTION} == *"tsan"* ]]; then
177+
if [[ ${CHECK_OPTION} == 'tsan' ]]; then
156178
export BUILD_OPTION="${BUILD_OPTION} -DBUILD_OPTION_USE_TSAN=ON"
157179
fi
158180
159-
if [[ ${CHECK_OPTION} == *"ubsan"* ]]; then
181+
if [[ ${CHECK_OPTION} == *"ubsan" ]]; then
160182
export BUILD_OPTION="${BUILD_OPTION} -DBUILD_OPTION_USE_UBSAN=ON"
161183
fi
162184
@@ -179,6 +201,7 @@ jobs:
179201
fi
180202
181203
- name: Test
204+
if: matrix.test-labels
182205
timeout-minutes: 20
183206
run: |
184207
cd ${BUILD_SUB_DIR}
@@ -191,7 +214,7 @@ jobs:
191214
for i in {1..60}; do cat test.env 2>/dev/null && break || sleep 1; done
192215
193216
source test.env
194-
ctest -VV
217+
ctest -VV -L "${TEST_LABELS}"
195218
# stop kafka cluster
196219
kafka-server-stop.sh
197220
zookeeper-server-stop.sh

include/kafka/KafkaConsumer.h

Lines changed: 27 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -74,8 +74,9 @@ class KafkaConsumer: public KafkaClient
7474
* Subscribe to the given list of topics to get dynamically assigned partitions.
7575
* An exception would be thrown if assign is called previously (without a subsequent call to unsubscribe())
7676
*/
77-
void subscribe(const Topics& topics, Consumer::RebalanceCallback cb = Consumer::RebalanceCallback());
78-
77+
void subscribe(const Topics& topics,
78+
Consumer::RebalanceCallback cb = Consumer::RebalanceCallback(),
79+
std::chrono::milliseconds timeout = std::chrono::milliseconds(DEFAULT_SUBSCRIBE_TIMEOUT_MS));
7980
/**
8081
* Get the current subscription.
8182
*/
@@ -84,7 +85,7 @@ class KafkaConsumer: public KafkaClient
8485
/**
8586
* Unsubscribe from topics currently subscribed.
8687
*/
87-
void unsubscribe();
88+
void unsubscribe(std::chrono::milliseconds timeout = std::chrono::milliseconds(DEFAULT_UNSUBSCRIBE_TIMEOUT_MS));
8889

8990
/**
9091
* Manually assign a list of partitions to this consumer.
@@ -119,7 +120,7 @@ class KafkaConsumer: public KafkaClient
119120
*/
120121
void seekToBeginning(const TopicPartitions& tps,
121122
std::chrono::milliseconds timeout = std::chrono::milliseconds(DEFAULT_SEEK_TIMEOUT_MS)) { seekToBeginningOrEnd(tps, true, timeout); }
122-
void seekToBeginning(std::chrono::milliseconds timeout = std::chrono::milliseconds(DEFAULT_SEEK_TIMEOUT_MS)) { seekToBeginningOrEnd(assignment(false), true, timeout); }
123+
void seekToBeginning(std::chrono::milliseconds timeout = std::chrono::milliseconds(DEFAULT_SEEK_TIMEOUT_MS)) { seekToBeginningOrEnd(_assignment, true, timeout); }
123124

124125
/**
125126
* Seek to the last offset for each of the given partitions.
@@ -132,7 +133,7 @@ class KafkaConsumer: public KafkaClient
132133
*/
133134
void seekToEnd(const TopicPartitions& tps,
134135
std::chrono::milliseconds timeout = std::chrono::milliseconds(DEFAULT_SEEK_TIMEOUT_MS)) { seekToBeginningOrEnd(tps, false, timeout); }
135-
void seekToEnd(std::chrono::milliseconds timeout = std::chrono::milliseconds(DEFAULT_SEEK_TIMEOUT_MS)) { seekToBeginningOrEnd(assignment(false), false, timeout); }
136+
void seekToEnd(std::chrono::milliseconds timeout = std::chrono::milliseconds(DEFAULT_SEEK_TIMEOUT_MS)) { seekToBeginningOrEnd(_assignment, false, timeout); }
136137

137138
/**
138139
* Get the offset of the next record that will be fetched (if a record with that offset exists).
@@ -229,13 +230,17 @@ class KafkaConsumer: public KafkaClient
229230
static const constexpr char* AUTO_COMMIT_INTERVAL_MS = "auto.commit.interval.ms";
230231

231232
#if __cplusplus >= 201703L
232-
static constexpr int DEFAULT_QUERY_TIMEOUT_MS = 10000;
233-
static constexpr int DEFAULT_SEEK_TIMEOUT_MS = 10000;
234-
static constexpr int SEEK_RETRY_INTERVAL_MS = 5000;
233+
static constexpr int DEFAULT_SUBSCRIBE_TIMEOUT_MS = 30000;
234+
static constexpr int DEFAULT_UNSUBSCRIBE_TIMEOUT_MS = 10000;
235+
static constexpr int DEFAULT_QUERY_TIMEOUT_MS = 10000;
236+
static constexpr int DEFAULT_SEEK_TIMEOUT_MS = 10000;
237+
static constexpr int SEEK_RETRY_INTERVAL_MS = 5000;
235238
#else
236-
enum { DEFAULT_QUERY_TIMEOUT_MS = 10000 };
237-
enum { DEFAULT_SEEK_TIMEOUT_MS = 10000 };
238-
enum { SEEK_RETRY_INTERVAL_MS = 5000 };
239+
enum { DEFAULT_SUBSCRIBE_TIMEOUT_MS = 30000 };
240+
enum { DEFAULT_UNSUBSCRIBE_TIMEOUT_MS = 10000 };
241+
enum { DEFAULT_QUERY_TIMEOUT_MS = 10000 };
242+
enum { DEFAULT_SEEK_TIMEOUT_MS = 10000 };
243+
enum { SEEK_RETRY_INTERVAL_MS = 5000 };
239244
#endif
240245

241246
const OffsetCommitOption _offsetCommitOption;
@@ -260,8 +265,6 @@ class KafkaConsumer: public KafkaClient
260265

261266
// Internal interface for "assign"
262267
void _assign(const TopicPartitions& tps);
263-
// Internal interface for "assignment"
264-
TopicPartitions assignment(bool withQueryRequest) const;
265268

266269
std::string _groupId;
267270

@@ -357,7 +360,7 @@ KafkaConsumer::close()
357360

358361
// Subscription
359362
inline void
360-
KafkaConsumer::subscribe(const Topics& topics, RebalanceCallback cb)
363+
KafkaConsumer::subscribe(const Topics& topics, RebalanceCallback cb, std::chrono::milliseconds timeout)
361364
{
362365
std::string topicsStr = toString(topics);
363366

@@ -376,21 +379,21 @@ KafkaConsumer::subscribe(const Topics& topics, RebalanceCallback cb)
376379
KAFKA_THROW_IF_WITH_RESP_ERROR(err);
377380

378381
// The rebalcance callback (e.g. "assign", etc) would be served during the time (within this thread)
379-
rd_kafka_poll(getClientHandle(), TIMEOUT_INFINITE);
382+
rd_kafka_poll(getClientHandle(), timeout.count());
380383

381384
KAFKA_API_DO_LOG(LOG_INFO, "subscribed, topics[%s]", topicsStr.c_str());
382385
}
383386

384387
inline void
385-
KafkaConsumer::unsubscribe()
388+
KafkaConsumer::unsubscribe(std::chrono::milliseconds timeout)
386389
{
387390
KAFKA_API_DO_LOG(LOG_INFO, "will unsubscribe");
388391

389392
rd_kafka_resp_err_t err = rd_kafka_unsubscribe(getClientHandle());
390393
KAFKA_THROW_IF_WITH_RESP_ERROR(err);
391394

392395
// The rebalcance callback (e.g. "assign", etc) would be served during the time (within this thread)
393-
rd_kafka_poll(getClientHandle(), TIMEOUT_INFINITE);
396+
rd_kafka_poll(getClientHandle(), timeout.count());
394397

395398
KAFKA_API_DO_LOG(LOG_INFO, "unsubscribed");
396399
}
@@ -438,31 +441,20 @@ KafkaConsumer::assign(const TopicPartitions& tps)
438441
_assign(tps);
439442
}
440443

441-
// Assignment, -- internal interface
444+
// Assignment
442445
inline TopicPartitions
443-
KafkaConsumer::assignment(bool withQueryRequest) const
446+
KafkaConsumer::assignment() const
444447
{
445-
if (withQueryRequest)
446-
{
447-
rd_kafka_topic_partition_list_t* raw_tps = nullptr;
448-
rd_kafka_resp_err_t err = rd_kafka_assignment(getClientHandle(), &raw_tps);
448+
rd_kafka_topic_partition_list_t* raw_tps = nullptr;
449+
rd_kafka_resp_err_t err = rd_kafka_assignment(getClientHandle(), &raw_tps);
449450

450-
auto rk_tps = rd_kafka_topic_partition_list_unique_ptr(raw_tps);
451+
auto rk_tps = rd_kafka_topic_partition_list_unique_ptr(raw_tps);
451452

452-
KAFKA_THROW_IF_WITH_RESP_ERROR(err);
453-
454-
return getTopicPartitions(rk_tps.get());
455-
}
453+
KAFKA_THROW_IF_WITH_RESP_ERROR(err);
456454

457-
return _assignment;
455+
return getTopicPartitions(rk_tps.get());
458456
}
459457

460-
// Assignment, -- external interface
461-
inline TopicPartitions
462-
KafkaConsumer::assignment() const
463-
{
464-
return subscription().empty() ? assignment(false) : TopicPartitions();
465-
}
466458

467459
// Seek & Position
468460
inline void

tests/integration/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ add_executable("${PROJECT_NAME}" ${TEST_SRCS})
1212
target_link_libraries("${PROJECT_NAME}" modern-cpp-kafka-api gtest gmock gtest_main pthread)
1313

1414
add_test(NAME ${PROJECT_NAME} COMMAND ./${PROJECT_NAME})
15+
set_tests_properties(${PROJECT_NAME} PROPERTIES LABELS "IT")
1516

1617
if (BUILD_OPTION_USE_ASAN OR BUILD_OPTION_USE_TSAN)
1718
target_compile_options(${PROJECT_NAME} PRIVATE "-fno-sanitize=all")

0 commit comments

Comments
 (0)