Skip to content

Commit c3d4cf3

Browse files
committed
fix(test): restore consumer_timestamp_polling_scenario
Restore the scenario file that was incorrectly removed. This test covers high-level consumer polling strategies and is unrelated to the message retrieval API tests. Signed-off-by: shin <sars21@hanmail.net>
1 parent eac8255 commit c3d4cf3

File tree

4 files changed

+229
-4
lines changed

4 files changed

+229
-4
lines changed

core/integration/tests/server/general.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,9 @@
1616
// under the License.
1717

1818
use crate::server::{
19-
ScenarioFn, bench_scenario, create_message_payload_scenario, message_headers_scenario,
20-
run_scenario, stream_size_validation_scenario, system_scenario, user_scenario,
19+
ScenarioFn, bench_scenario, consumer_timestamp_polling_scenario,
20+
create_message_payload_scenario, message_headers_scenario, run_scenario,
21+
stream_size_validation_scenario, system_scenario, user_scenario,
2122
};
2223
use iggy_common::TransportProtocol;
2324
use serial_test::parallel;
@@ -32,6 +33,7 @@ use test_case::test_matrix;
3233
create_message_payload_scenario(),
3334
stream_size_validation_scenario(),
3435
bench_scenario(),
36+
consumer_timestamp_polling_scenario(),
3537
]
3638
)]
3739
#[tokio::test]

core/integration/tests/server/mod.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,9 @@ use scenarios::{
3636
bench_scenario, consumer_group_auto_commit_reconnection_scenario, consumer_group_join_scenario,
3737
consumer_group_offset_cleanup_scenario,
3838
consumer_group_with_multiple_clients_polling_messages_scenario,
39-
consumer_group_with_single_client_polling_messages_scenario, create_message_payload,
40-
message_headers_scenario, stream_size_validation_scenario, system_scenario, user_scenario,
39+
consumer_group_with_single_client_polling_messages_scenario, consumer_timestamp_polling_scenario,
40+
create_message_payload, message_headers_scenario, stream_size_validation_scenario,
41+
system_scenario, user_scenario,
4142
};
4243
use std::pin::Pin;
4344
use std::sync::{Arc, Mutex};
@@ -94,6 +95,10 @@ fn bench_scenario() -> ScenarioFn {
9495
|factory| Box::pin(bench_scenario::run(factory))
9596
}
9697

98+
fn consumer_timestamp_polling_scenario() -> ScenarioFn {
99+
|factory| Box::pin(consumer_timestamp_polling_scenario::run(factory))
100+
}
101+
97102
async fn run_scenario(transport: TransportProtocol, scenario: ScenarioFn) {
98103
// TODO: Need to enable `TCP_NODELAY` flag for TCP transports, due to small messages being used in the test.
99104
// For some reason TCP in compio can't deal with it, but in tokio it works fine.
Lines changed: 217 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,217 @@
1+
/* Licensed to the Apache Software Foundation (ASF) under one
2+
* or more contributor license agreements. See the NOTICE file
3+
* distributed with this work for additional information
4+
* regarding copyright ownership. The ASF licenses this file
5+
* to you under the Apache License, Version 2.0 (the
6+
* "License"); you may not use this file except in compliance
7+
* with the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing,
12+
* software distributed under the License is distributed on an
13+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
* KIND, either express or implied. See the License for the
15+
* specific language governing permissions and limitations
16+
* under the License.
17+
*/
18+
19+
use crate::server::scenarios::{PARTITION_ID, STREAM_NAME, TOPIC_NAME, create_client};
20+
use futures::StreamExt;
21+
use iggy::prelude::*;
22+
use integration::test_server::{ClientFactory, login_root};
23+
use std::str::FromStr;
24+
use tokio::time::{Duration, timeout};
25+
26+
const MESSAGES_COUNT: u32 = 2000;
27+
const BATCH_LENGTH: u32 = 100;
28+
const CONSUME_TIMEOUT: Duration = Duration::from_secs(10);
29+
30+
pub async fn run(client_factory: &dyn ClientFactory) {
31+
let client = create_client(client_factory).await;
32+
login_root(&client).await;
33+
34+
test_offset_strategy(&client).await;
35+
test_timestamp_strategy(&client).await;
36+
test_first_strategy(&client).await;
37+
test_last_strategy(&client).await;
38+
}
39+
40+
async fn test_offset_strategy(client: &IggyClient) {
41+
init_system(client).await;
42+
produce_messages(client).await;
43+
44+
let received =
45+
consume_with_strategy(client, "offset-consumer", PollingStrategy::offset(0)).await;
46+
assert_received_messages(&received, "Offset");
47+
48+
cleanup(client).await;
49+
}
50+
51+
async fn test_timestamp_strategy(client: &IggyClient) {
52+
init_system(client).await;
53+
let start_timestamp = IggyTimestamp::now();
54+
produce_messages(client).await;
55+
56+
let received = consume_with_strategy(
57+
client,
58+
"timestamp-consumer",
59+
PollingStrategy::timestamp(start_timestamp),
60+
)
61+
.await;
62+
assert_received_messages(&received, "Timestamp");
63+
64+
cleanup(client).await;
65+
}
66+
67+
async fn test_first_strategy(client: &IggyClient) {
68+
init_system(client).await;
69+
produce_messages(client).await;
70+
71+
let received = consume_with_strategy(client, "first-consumer", PollingStrategy::first()).await;
72+
assert_received_messages(&received, "First");
73+
74+
cleanup(client).await;
75+
}
76+
77+
async fn test_last_strategy(client: &IggyClient) {
78+
init_system(client).await;
79+
produce_messages(client).await;
80+
81+
// Last strategy with batch_length=100 returns the last 100 messages
82+
let received = consume_with_strategy(client, "last-consumer", PollingStrategy::last()).await;
83+
84+
assert_eq!(
85+
received.len(),
86+
BATCH_LENGTH as usize,
87+
"Last strategy: expected {} messages, received {}",
88+
BATCH_LENGTH,
89+
received.len()
90+
);
91+
92+
// Verify messages are from the last batch (offsets 1900-1999 -> message_1901 to message_2000)
93+
let start_message_num = MESSAGES_COUNT - BATCH_LENGTH + 1;
94+
for (i, msg) in received.iter().enumerate() {
95+
let expected_payload = format!("message_{}", start_message_num + i as u32);
96+
let actual_payload =
97+
String::from_utf8(msg.payload.to_vec()).expect("Payload should be valid UTF-8");
98+
assert_eq!(
99+
actual_payload, expected_payload,
100+
"Last strategy: message {} payload mismatch",
101+
i
102+
);
103+
}
104+
105+
cleanup(client).await;
106+
}
107+
108+
fn assert_received_messages(received: &[IggyMessage], strategy_name: &str) {
109+
assert_eq!(
110+
received.len(),
111+
MESSAGES_COUNT as usize,
112+
"{} strategy: expected {} messages, received {}",
113+
strategy_name,
114+
MESSAGES_COUNT,
115+
received.len()
116+
);
117+
118+
for (i, msg) in received.iter().enumerate() {
119+
let expected_payload = format!("message_{}", i + 1);
120+
let actual_payload =
121+
String::from_utf8(msg.payload.to_vec()).expect("Payload should be valid UTF-8");
122+
assert_eq!(
123+
actual_payload, expected_payload,
124+
"{} strategy: message {} payload mismatch",
125+
strategy_name, i
126+
);
127+
}
128+
}
129+
130+
async fn init_system(client: &IggyClient) {
131+
client.create_stream(STREAM_NAME).await.unwrap();
132+
133+
client
134+
.create_topic(
135+
&Identifier::named(STREAM_NAME).unwrap(),
136+
TOPIC_NAME,
137+
1,
138+
CompressionAlgorithm::default(),
139+
None,
140+
IggyExpiry::NeverExpire,
141+
MaxTopicSize::ServerDefault,
142+
)
143+
.await
144+
.unwrap();
145+
}
146+
147+
async fn produce_messages(client: &IggyClient) {
148+
let mut messages = Vec::with_capacity(MESSAGES_COUNT as usize);
149+
for i in 1..=MESSAGES_COUNT {
150+
let payload = format!("message_{}", i);
151+
let message = IggyMessage::from_str(&payload).unwrap();
152+
messages.push(message);
153+
}
154+
155+
client
156+
.send_messages(
157+
&Identifier::named(STREAM_NAME).unwrap(),
158+
&Identifier::named(TOPIC_NAME).unwrap(),
159+
&Partitioning::partition_id(PARTITION_ID),
160+
&mut messages,
161+
)
162+
.await
163+
.unwrap();
164+
}
165+
166+
async fn consume_with_strategy(
167+
client: &IggyClient,
168+
consumer_name: &str,
169+
strategy: PollingStrategy,
170+
) -> Vec<IggyMessage> {
171+
let expected_count = if strategy.kind == PollingKind::Last {
172+
BATCH_LENGTH
173+
} else {
174+
MESSAGES_COUNT
175+
};
176+
177+
let mut consumer = client
178+
.consumer(consumer_name, STREAM_NAME, TOPIC_NAME, PARTITION_ID)
179+
.unwrap()
180+
.auto_commit(AutoCommit::IntervalOrWhen(
181+
IggyDuration::from_str("2ms").unwrap(),
182+
AutoCommitWhen::ConsumingAllMessages,
183+
))
184+
.polling_strategy(strategy)
185+
.poll_interval(IggyDuration::from_str("2ms").unwrap())
186+
.batch_length(BATCH_LENGTH)
187+
.build();
188+
189+
consumer.init().await.unwrap();
190+
191+
let mut received_messages = Vec::with_capacity(expected_count as usize);
192+
let mut consumed_count = 0u32;
193+
194+
while consumed_count < expected_count {
195+
match timeout(CONSUME_TIMEOUT, consumer.next()).await {
196+
Ok(Some(Ok(received))) => {
197+
received_messages.push(received.message);
198+
consumed_count += 1;
199+
}
200+
Ok(Some(Err(e))) => panic!("Error consuming message: {}", e),
201+
Ok(None) => break,
202+
Err(_) => panic!(
203+
"Timeout after {:?} waiting for message {}/{} with {:?} strategy",
204+
CONSUME_TIMEOUT, consumed_count, expected_count, strategy.kind
205+
),
206+
}
207+
}
208+
209+
received_messages
210+
}
211+
212+
async fn cleanup(client: &IggyClient) {
213+
client
214+
.delete_stream(&Identifier::named(STREAM_NAME).unwrap())
215+
.await
216+
.unwrap();
217+
}

core/integration/tests/server/scenarios/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ pub mod consumer_group_join_scenario;
2323
pub mod consumer_group_offset_cleanup_scenario;
2424
pub mod consumer_group_with_multiple_clients_polling_messages_scenario;
2525
pub mod consumer_group_with_single_client_polling_messages_scenario;
26+
pub mod consumer_timestamp_polling_scenario;
2627
pub mod create_message_payload;
2728
pub mod delete_segments_scenario;
2829
pub mod encryption_scenario;

0 commit comments

Comments
 (0)