Skip to content

Commit f4e2e6a

Browse files
committed
Upgrade with librdkafka v1.7.0
1 parent 009ea52 commit f4e2e6a

File tree

6 files changed

+13
-9
lines changed

6 files changed

+13
-9
lines changed

.github/workflows/kafka_api_ci_tests.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ on: [push, pull_request]
55
env:
66
KAFKA_SRC_LINK: https://archive.apache.org/dist/kafka/2.6.0/kafka_2.13-2.6.0.tgz
77
CPU_CORE_NUM: 2
8-
LIBRDKAFKA_VERSION: 1.6.0
8+
LIBRDKAFKA_VERSION: 1.7.0
99
BUILD_SUB_DIR: build/sub-build
1010

1111
jobs:
@@ -179,7 +179,7 @@ jobs:
179179
fi
180180
181181
- name: Test
182-
timeout-minutes: 15
182+
timeout-minutes: 20
183183
run: |
184184
cd ${BUILD_SUB_DIR}
185185

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
The `Modern C++ based Kafka API` (`modern-cpp-kafka`) is a layer of C++ wrapper based on [librdkafka](https://github.com/edenhill/librdkafka) (the C part), with high quality, but more friendly to users.
66

7-
- By now, `modern-cpp-kafka` is compatible with `librdkafka` **v1.6.0**.
7+
- By now, `modern-cpp-kafka` is compatible with `librdkafka` **v1.7.0**.
88

99
```
1010
KAFKA is a registered trademark of The Apache Software Foundation and

tests/integration/TestTransaction.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -234,7 +234,7 @@ TEST(Transaction, CatchException)
234234

235235
producer.beginTransaction();
236236

237-
EXPECT_KAFKA_THROW(producer.commitTransaction(), RD_KAFKA_RESP_ERR_INVALID_TXN_STATE);
237+
producer.commitTransaction();
238238
}
239239
}
240240

tests/robustness/TestAdminClient.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ TEST(AdminClient, BrokersTimeout)
3434
std::cout << "[" << Utility::getCurrentTime() << "] will ListTopics" << std::endl;
3535
{
3636
auto listResult = adminClient.listTopics(std::chrono::seconds(1));
37-
std::cout << "[" << Utility::getCurrentTime() << "] ListTopics: result[" << listResult.message() << "]" << std::endl;
37+
std::cout << "[" << Utility::getCurrentTime() << "] ListTopics: result[" << listResult.message() << "]. Result: " << listResult.message() << std::endl;
3838
EXPECT_TRUE(listResult.errorCode().value() == RD_KAFKA_RESP_ERR__TRANSPORT || listResult.errorCode().value() == RD_KAFKA_RESP_ERR__TIMED_OUT);
3939
EXPECT_EQ(0, listResult.topics.size());
4040
}

tests/unit/TestKafkaClientDefaultProperties.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ TEST(KafkaClient, KafkaConsumerDefaultProperties)
107107
{ Kafka::ConsumerConfig::ENABLE_PARTITION_EOF, "false" },
108108
{ Kafka::ConsumerConfig::MAX_POLL_RECORDS, "500" },
109109
{ Kafka::ConsumerConfig::QUEUED_MIN_MESSAGES, "100000" },
110-
{ Kafka::ConsumerConfig::SESSION_TIMEOUT_MS, "10000" },
110+
{ Kafka::ConsumerConfig::SESSION_TIMEOUT_MS, "45000" },
111111
{ Kafka::ConsumerConfig::SOCKET_TIMEOUT_MS, "60000" },
112112
{ Kafka::ConsumerConfig::SECURITY_PROTOCOL, "plaintext" },
113113
{ "enable.auto.commit", "false" },

tests/utils/TestUtility.h

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -172,9 +172,10 @@ CreateKafkaTopic(const Kafka::Topic& topic, int numPartitions, int replicationFa
172172
{
173173
Kafka::AdminClient adminClient(GetKafkaClientCommonConfig());
174174
auto createResult = adminClient.createTopics({topic}, numPartitions, replicationFactor);
175+
std::cout << "[" << Kafka::Utility::getCurrentTime() << "] " << __FUNCTION__ << ": create topic[" << topic << "] "
176+
<< "with numPartitions[" << numPartitions << "], replicationFactor[" << replicationFactor << "]. Result: " << createResult.message() << std::endl;
175177
ASSERT_FALSE(createResult.errorCode());
176-
std::cout << "[" << Kafka::Utility::getCurrentTime() << "] " << __FUNCTION__ << ": topic[" << topic << "] created with numPartitions[" << numPartitions << "], replicationFactor[" << replicationFactor << "]." << std::endl;
177-
std::this_thread::sleep_for(std::chrono::seconds(1));
178+
std::this_thread::sleep_for(std::chrono::seconds(5));
178179
}
179180

180181
class JoiningThread {
@@ -220,20 +221,23 @@ signalToAllBrokers(int sig)
220221
else if (sig == SIGCONT)
221222
{
222223
std::cout << "[" << Kafka::Utility::getCurrentTime() << "] Brokers resumed" << std::endl;
223-
std::this_thread::sleep_for(std::chrono::seconds(1));
224224
}
225225
}
226226

227227
inline void
228228
PauseBrokers()
229229
{
230+
constexpr int WAIT_AFTER_PAUSE_MS = 100;
230231
signalToAllBrokers(SIGSTOP);
232+
std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_AFTER_PAUSE_MS));
231233
}
232234

233235
inline void
234236
ResumeBrokers()
235237
{
238+
constexpr int WAIT_AFTER_RESUME_SEC = 5;
236239
signalToAllBrokers(SIGCONT);
240+
std::this_thread::sleep_for(std::chrono::seconds(WAIT_AFTER_RESUME_SEC));
237241
}
238242

239243
inline std::shared_ptr<JoiningThread>

0 commit comments

Comments
 (0)