Skip to content

Commit 23f5aae

Browse files
committed
feat(tests): Consumer and Producer ping test.
1 parent 8906293 commit 23f5aae

File tree

2 files changed

+237
-0
lines changed

2 files changed

+237
-0
lines changed

tests/CMakeLists.txt

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
# CMakeLists.txt for the test directory
2+
3+
# Ensure this directory is part of a parent project
4+
if (NOT PROJECT_NAME)
5+
message(FATAL_ERROR "This CMakeLists.txt must be included in a parent project.")
6+
endif()
7+
8+
# Get all subdirectories in the current directory
9+
file(GLOB SUBDIRS RELATIVE ${CMAKE_CURRENT_SOURCE_DIR} ${CMAKE_CURRENT_SOURCE_DIR}/*)
10+
11+
# Filter only directories
12+
set(DIRECTORY_LIST "")
13+
foreach(SUBDIR ${SUBDIRS})
14+
if (IS_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/${SUBDIR})
15+
list(APPEND DIRECTORY_LIST ${SUBDIR})
16+
endif()
17+
endforeach()
18+
19+
# Print the directories
20+
message(STATUS "Found directories in ${CMAKE_CURRENT_SOURCE_DIR}: ${DIRECTORY_LIST}")
21+
22+
# Create a CTest target for each directory
23+
foreach(DIR ${DIRECTORY_LIST})
24+
# Define the test target name
25+
set(TEST_TARGET_NAME test_${DIR})
26+
27+
# Collect all *.cpp files in the directory
28+
file(GLOB_RECURSE TEST_SOURCES ${CMAKE_CURRENT_SOURCE_DIR}/${DIR}/*.cpp ${CMAKE_CURRENT_SOURCE_DIR}/${DIR}/**/*.cpp)
29+
if (NOT TEST_SOURCES)
30+
message(WARNING "No test sources found in ${CMAKE_CURRENT_SOURCE_DIR}/${DIR}. Skipping.")
31+
continue()
32+
endif()
33+
34+
# Add the test target
35+
add_executable(${TEST_TARGET_NAME} ${TEST_SOURCES})
36+
37+
# Link the test target with any necessary libraries (adjust as needed)
38+
target_link_libraries(${TEST_TARGET_NAME} PRIVATE KafkaLib)
39+
40+
# Add the test to CTest
41+
add_test(NAME ${TEST_TARGET_NAME} COMMAND ${TEST_TARGET_NAME})
42+
43+
# Set properties for the test target
44+
set_target_properties(${TEST_TARGET_NAME} PROPERTIES
45+
FOLDER "Tests"
46+
47+
)
48+
# Set the output path for all configurations
49+
foreach(OUTPUTCONFIG ${CMAKE_CONFIGURATION_TYPES})
50+
string(TOUPPER ${OUTPUTCONFIG} OUTPUTCONFIG_UPPER)
51+
set_target_properties(${TEST_TARGET_NAME} PROPERTIES
52+
RUNTIME_OUTPUT_DIRECTORY_${OUTPUTCONFIG_UPPER} "${PROJECT_SOURCE_DIR}/.build/bin/${CMAKE_SYSTEM_NAME}/${OUTPUTCONFIG}"
53+
)
54+
endforeach()
55+
endforeach()
Lines changed: 182 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,182 @@
1+
// ------------------------------------------------------------------------- \\
2+
// TEST PROGRAM \\
3+
// ------------------------------------------------------------------------- \\
4+
// Purpose: \\
5+
// This program tests the basic m_consumer and producer functionality, \\
6+
// via sending packets back and forth collecting a ping latency. \\
7+
// ------------------------------------------------------------------------- \\
8+
// Author(s): \\
9+
// Brandon J. Smith - July 4th, 2025 \\
10+
// ------------------------------------------------------------------------- \\
11+
// License: \\
12+
// This file is part of the "Test Program" project, distributed under the \\
13+
// MIT License. \\
14+
// See the LICENSE file in the project root for more information. \\
15+
// ------------------------------------------------------------------------- \\
16+
17+
#include <kafkalib.hpp>
18+
#include <rdkafkacpp.h>
19+
#include <chrono>
20+
21+
#define ASSERT(condition, message) \
22+
if (!(condition)) \
23+
{ \
24+
fprintf(stderr, "Assertion failed: %s\n", message); \
25+
exit(EXIT_FAILURE); \
26+
}
27+
28+
static void _delivery_report_callback(const RdKafka::Message &message)
29+
{
30+
if (message.err() != RdKafka::ErrorCode::ERR_NO_ERROR)
31+
{
32+
fprintf(stderr, "Message delivery failed: %s\n", message.errstr().c_str());
33+
}
34+
/*else
35+
{
36+
fprintf(stderr, "Message delivered successfully to %s [%d] at offset %lld\n",
37+
message.topic_name().c_str(), message.partition(), message.offset());
38+
}*/
39+
}
40+
41+
static void _logger_callback(const RdKafka::Severity level, const std::string &message)
42+
{
43+
switch (level)
44+
{
45+
case RdKafka::Event::Severity::EVENT_SEVERITY_DEBUG:
46+
fprintf(stderr, "> DEBUG: %s\n", message.c_str());
47+
break;
48+
case RdKafka::Event::Severity::EVENT_SEVERITY_INFO:
49+
fprintf(stderr, "> INFO: %s\n", message.c_str());
50+
break;
51+
case RdKafka::Event::Severity::EVENT_SEVERITY_NOTICE:
52+
fprintf(stderr, "> NOTICE: %s\n", message.c_str());
53+
break;
54+
case RdKafka::Event::Severity::EVENT_SEVERITY_WARNING:
55+
fprintf(stderr, "> WARNING: %s\n", message.c_str());
56+
break;
57+
case RdKafka::Event::Severity::EVENT_SEVERITY_ERROR:
58+
fprintf(stderr, "> ERROR: %s\n", message.c_str());
59+
break;
60+
default:
61+
fprintf(stderr, "> UNKNOWN: %s\n", message.c_str());
62+
break;
63+
}
64+
}
65+
66+
static void _rebalance_callback(RdKafka::KafkaConsumer *consumer, RdKafka::ErrorCode err, std::vector<RdKafka::TopicPartition*> &partitions)
67+
{
68+
if (err == RdKafka::ErrorCode::ERR__ASSIGN_PARTITIONS)
69+
{
70+
fprintf(stderr, "Rebalance: Assigning partitions\n");
71+
consumer->assign(partitions); // Assign the partitions to the consumer.
72+
}
73+
else if (err == RdKafka::ErrorCode::ERR__REVOKE_PARTITIONS)
74+
{
75+
fprintf(stderr, "Rebalance: Revoking partitions\n");
76+
consumer->unassign(); // Unassign the partitions from the consumer.
77+
}
78+
else
79+
{
80+
fprintf(stderr, "Rebalance error: %s\n", RdKafka::err2str(err).c_str());
81+
}
82+
}
83+
84+
int main()
85+
{
86+
GodotStreaming::KafkaController controller;
87+
88+
// Initialize a Publisher...
89+
GodotStreaming::KafkaPublisherMetadata publisherMetadata;
90+
publisherMetadata.brokers = "localhost:19092";
91+
publisherMetadata.severity_log_level = RdKafka::Severity::EVENT_SEVERITY_DEBUG;
92+
publisherMetadata.delivery_report_callback = _delivery_report_callback;
93+
publisherMetadata.logger_callback = _logger_callback;
94+
//publisherMetadata.flush_immediately = true;
95+
96+
GodotStreaming::Result<GodotStreaming::KafkaPublisher> publisher = controller.CreatePublisher(publisherMetadata);
97+
if (!publisher)
98+
{
99+
fprintf(stderr, "Failed to create publisher: %s\n", publisher.error_message.c_str());
100+
return 1;
101+
}
102+
103+
// Initialize a Consumer...
104+
GodotStreaming::KafkaSubscriberMetadata consumerMetadata;
105+
consumerMetadata.brokers = "127.0.0.1:19092";
106+
consumerMetadata.topics = std::vector<std::string>{"test_topic"};
107+
consumerMetadata.group_id = "test_group";
108+
consumerMetadata.severity_log_level = RdKafka::Severity::EVENT_SEVERITY_DEBUG;
109+
consumerMetadata.logger_callback = _logger_callback;
110+
consumerMetadata.delivery_report_callback = _delivery_report_callback;
111+
consumerMetadata.rebalance_callback = _rebalance_callback;
112+
consumerMetadata.offset_reset = RdKafka::OffsetSpec_t::OFFSET_END;
113+
consumerMetadata.enable_auto_commit = true;
114+
consumerMetadata.enable_partition_eof = true;
115+
116+
GodotStreaming::Result<GodotStreaming::KafkaSubscriber> consumer = controller.CreateConsumer(consumerMetadata);
117+
if (!consumer)
118+
{
119+
fprintf(stderr, "Failed to create consumer: %s\n", consumer.error_message.c_str());
120+
return 1;
121+
}
122+
123+
GodotStreaming::KafkaPublisher &kafkaPublisher = *publisher.value;
124+
GodotStreaming::KafkaSubscriber &kafkaConsumer = *consumer.value;
125+
126+
do
127+
{
128+
// In a real application, you would likely have a loop here to continuously send packets.
129+
// Get the current now.
130+
auto now = std::chrono::system_clock::now();
131+
auto now_ms = std::chrono::duration_cast<std::chrono::milliseconds>(now.time_since_epoch()).count();
132+
133+
// Create a packet and serialize the current time as a timestamp.
134+
GodotStreaming::Packet packet;
135+
packet.Serialize<int64_t>(now_ms); // Serialize the current time as a timestamp.
136+
137+
GodotStreaming::Status sent_status = kafkaPublisher.Publish(consumerMetadata.topics[0], packet, std::to_string(now_ms));
138+
139+
if (!sent_status)
140+
{
141+
fprintf(stderr, "Failed to send packet: %s\n", sent_status.message.c_str());
142+
return 1;
143+
}
144+
145+
// Now, we will consume the packet and read the timestamp.
146+
std::vector<GodotStreaming::Packet> packets;
147+
GodotStreaming::Status poll_status = kafkaConsumer.Poll(packets, 15, 1);
148+
if (!poll_status)
149+
{
150+
fprintf(stderr, "Polling failed: %s\n", poll_status.message.c_str());
151+
continue; // Polling failed, continue to the next iteration.
152+
}
153+
if (packets.empty())
154+
{
155+
continue; // No packets received, continue to the next iteration.
156+
}
157+
158+
// Assuming we received at least one packet, we can read the timestamp.
159+
GodotStreaming::Packet &received_packet = packets[packets.size() - 1];
160+
GodotStreaming::Result<int64_t> received_timestamp = received_packet.Deserialize<int64_t>();
161+
if (!received_timestamp)
162+
{
163+
fprintf(stderr, "Failed to deserialize packet: %s\n", received_timestamp.error_message.c_str());
164+
return 1;
165+
}
166+
int64_t received_time = *received_timestamp.value;
167+
168+
if (now_ms == received_time)
169+
{
170+
// Calculate the latency.
171+
auto post_now = std::chrono::system_clock::now();
172+
auto latency = post_now - std::chrono::system_clock::time_point(std::chrono::milliseconds(received_time));
173+
174+
auto latency_ms = std::chrono::duration_cast<std::chrono::milliseconds>(latency).count();
175+
fprintf(stderr, "Ping latency: %lld ms, %lld | %lld | %lld\n", latency_ms, now_ms, received_time, packets.size());
176+
ASSERT(latency_ms >= 0, "Latency should not be negative."); // If this happens, clean your Kafka topic; if it continues, there's a serious issue with this.
177+
}
178+
179+
} while (true);
180+
181+
return 0;
182+
}

0 commit comments

Comments
 (0)