diff --git a/core/partitions/src/iggy_partition.rs b/core/partitions/src/iggy_partition.rs index 65fb470ae..5addf1516 100644 --- a/core/partitions/src/iggy_partition.rs +++ b/core/partitions/src/iggy_partition.rs @@ -15,12 +15,18 @@ // specific language governing permissions and limitations // under the License. -use crate::journal::{PartitionJournal, PartitionJournalMemStorage}; +use crate::journal::{ + MessageLookup, PartitionJournal, PartitionJournalMemStorage, QueryableJournal, +}; use crate::log::SegmentedLog; -use crate::{AppendResult, Partition, decode_send_messages_batch}; +use crate::{ + AppendResult, Partition, PartitionOffsets, PollingArgs, PollingConsumer, + decode_send_messages_batch, +}; use iggy_common::{ - ConsumerGroupOffsets, ConsumerOffsets, IggyByteSize, IggyError, IggyMessagesBatchMut, - IggyTimestamp, PartitionStats, + ConsumerGroupId, ConsumerGroupOffsets, ConsumerKind, ConsumerOffset, ConsumerOffsets, + IggyByteSize, IggyError, IggyMessagesBatchMut, IggyMessagesBatchSet, IggyTimestamp, + PartitionStats, PollingKind, header::{Operation, PrepareHeader}, message::Message, }; @@ -28,6 +34,7 @@ use journal::Journal as _; use std::sync::Arc; use std::sync::atomic::{AtomicU64, Ordering}; use tokio::sync::Mutex as TokioMutex; +use tracing::warn; // This struct aliases in terms of the code contained the `LocalPartition from `core/server/src/streaming/partitions/local_partition.rs`. #[derive(Debug)] @@ -165,4 +172,145 @@ impl Partition for IggyPartition { batch_messages_count, )) } + + async fn poll_messages( + &self, + consumer: PollingConsumer, + args: PollingArgs, + ) -> Result { + if !self.should_increment_offset || args.count == 0 { + return Ok(IggyMessagesBatchSet::empty()); + } + + let committed_offset = self.offset.load(Ordering::Relaxed); + + let start_offset = match args.strategy.kind { + PollingKind::Offset => args.strategy.value, + PollingKind::First => 0, + PollingKind::Last => committed_offset.saturating_sub(u64::from(args.count) - 1), + PollingKind::Timestamp => { + let result = self + .log + .journal() + .inner + .get(&MessageLookup::Timestamp { + timestamp: args.strategy.value, + count: args.count, + }) + .await; + let batch_set = result.unwrap_or_else(IggyMessagesBatchSet::empty); + if let Some(first) = batch_set.first_offset() { + if first > committed_offset { + return Ok(IggyMessagesBatchSet::empty()); + } + let max_count = u32::try_from(committed_offset - first + 1).unwrap_or(u32::MAX); + return Ok(batch_set.get_by_offset(first, batch_set.count().min(max_count))); + } + return Ok(batch_set); + } + PollingKind::Next => self + .get_consumer_offset(consumer) + .map_or(0, |offset| offset + 1), + }; + + if start_offset > committed_offset { + return Ok(IggyMessagesBatchSet::empty()); + } + + let max_count = u32::try_from(committed_offset - start_offset + 1).unwrap_or(u32::MAX); + let count = args.count.min(max_count); + + let result = self + .log + .journal() + .inner + .get(&MessageLookup::Offset { + offset: start_offset, + count, + }) + .await; + + let batch_set = result.unwrap_or_else(IggyMessagesBatchSet::empty); + + if args.auto_commit && !batch_set.is_empty() { + let last_offset = start_offset + u64::from(batch_set.count()) - 1; + if let Err(err) = self.store_consumer_offset(consumer, last_offset) { + // warning for now. + warn!( + consumer = ?consumer, + last_offset, + %err, + "poll_messages: failed to store consumer offset" + ); + } + } + + Ok(batch_set) + } + + #[allow(clippy::cast_possible_truncation)] + fn store_consumer_offset( + &self, + consumer: PollingConsumer, + offset: u64, + ) -> Result<(), IggyError> { + match consumer { + PollingConsumer::Consumer(id, _) => { + let guard = self.consumer_offsets.pin(); + if let Some(existing) = guard.get(&id) { + existing.offset.store(offset, Ordering::Relaxed); + } else { + guard.insert( + id, + ConsumerOffset::new( + ConsumerKind::Consumer, + id as u32, + offset, + String::new(), + ), + ); + } + } + PollingConsumer::ConsumerGroup(group_id, _) => { + let guard = self.consumer_group_offsets.pin(); + let key = ConsumerGroupId(group_id); + if let Some(existing) = guard.get(&key) { + existing.offset.store(offset, Ordering::Relaxed); + } else { + guard.insert( + key, + ConsumerOffset::new( + ConsumerKind::ConsumerGroup, + group_id as u32, + offset, + String::new(), + ), + ); + } + } + } + Ok(()) + } + + fn get_consumer_offset(&self, consumer: PollingConsumer) -> Option { + match consumer { + PollingConsumer::Consumer(id, _) => self + .consumer_offsets + .pin() + .get(&id) + .map(|co| co.offset.load(Ordering::Relaxed)), + PollingConsumer::ConsumerGroup(group_id, _) => self + .consumer_group_offsets + .pin() + .get(&ConsumerGroupId(group_id)) + .map(|co| co.offset.load(Ordering::Relaxed)), + } + } + + fn offsets(&self) -> PartitionOffsets { + PartitionOffsets::new( + self.offset.load(Ordering::Relaxed), + self.dirty_offset.load(Ordering::Relaxed), + ) + } } diff --git a/core/partitions/src/iggy_partitions.rs b/core/partitions/src/iggy_partitions.rs index 931e804a7..d219c905e 100644 --- a/core/partitions/src/iggy_partitions.rs +++ b/core/partitions/src/iggy_partitions.rs @@ -19,6 +19,7 @@ use crate::IggyPartition; use crate::Partition; +use crate::PollingConsumer; use crate::log::JournalInfo; use crate::types::PartitionsConfig; use consensus::PlaneIdentity; @@ -562,11 +563,36 @@ where ); } Operation::StoreConsumerOffset => { - // TODO: Deserialize consumer offset from prepare body - // and store in partition's consumer_offsets. + let body = message.body_bytes(); + let body = body.as_ref(); + let consumer_kind = body[0]; + let consumer_id = u32::from_le_bytes(body[1..5].try_into().unwrap()) as usize; + let offset = u64::from_le_bytes(body[5..13].try_into().unwrap()); + let consumer = match consumer_kind { + 1 => PollingConsumer::Consumer(consumer_id, 0), + 2 => PollingConsumer::ConsumerGroup(consumer_id, 0), + _ => { + warn!( + replica = consensus.replica(), + op = header.op, + consumer_kind, + "on_replicate: unknown consumer kind" + ); + return; + } + }; + + let partition = self + .get_by_ns(namespace) + .expect("store_consumer_offset: partition not found for namespace"); + let _ = partition.store_consumer_offset(consumer, offset); + debug!( replica = consensus.replica(), op = header.op, + consumer_kind, + consumer_id, + offset, "on_replicate: consumer offset stored" ); } diff --git a/core/simulator/src/client.rs b/core/simulator/src/client.rs index 7b55af548..6dc9e1eab 100644 --- a/core/simulator/src/client.rs +++ b/core/simulator/src/client.rs @@ -16,7 +16,7 @@ // under the License. use iggy_common::{ - BytesSerializable, INDEX_SIZE, Identifier, + BytesSerializable, IGGY_MESSAGE_HEADER_SIZE, INDEX_SIZE, Identifier, create_stream::CreateStream, delete_stream::DeleteStream, header::{Operation, RequestHeader}, @@ -72,23 +72,34 @@ impl SimClient { namespace: IggyNamespace, messages: &[&[u8]], ) -> Message { - // Build batch: count | indexes | messages let count = messages.len() as u32; let mut indexes = Vec::with_capacity(count as usize * INDEX_SIZE); let mut messages_buf = Vec::new(); let mut current_position = 0u32; - for msg in messages { - // Write index: position (u32) + length (u32) - indexes.extend_from_slice(¤t_position.to_le_bytes()); - indexes.extend_from_slice(&(msg.len() as u32).to_le_bytes()); - - // Append message + for (i, msg) in messages.iter().enumerate() { + let msg_total_len = (IGGY_MESSAGE_HEADER_SIZE + msg.len()) as u32; + + // Index: offset(u32) + position(u32) + timestamp(u64) + indexes.extend_from_slice(&(i as u32).to_le_bytes()); // offset (relative) + indexes.extend_from_slice(¤t_position.to_le_bytes()); // position + indexes.extend_from_slice(&0u64.to_le_bytes()); // timestamp (set in prepare) + + // Message header (64 bytes) + messages_buf.extend_from_slice(&0u64.to_le_bytes()); // checksum + messages_buf.extend_from_slice(&0u128.to_le_bytes()); // id + messages_buf.extend_from_slice(&0u64.to_le_bytes()); // offset + messages_buf.extend_from_slice(&0u64.to_le_bytes()); // timestamp + messages_buf.extend_from_slice(&0u64.to_le_bytes()); // origin_timestamp + messages_buf.extend_from_slice(&0u32.to_le_bytes()); // user_headers_length + messages_buf.extend_from_slice(&(msg.len() as u32).to_le_bytes()); // payload_length + messages_buf.extend_from_slice(&0u64.to_le_bytes()); // reserved + + // Payload messages_buf.extend_from_slice(msg); - current_position += msg.len() as u32; + current_position += msg_total_len; } - // Build payload: count | indexes | messages let mut payload = Vec::with_capacity(4 + indexes.len() + messages_buf.len()); payload.extend_from_slice(&count.to_le_bytes()); payload.extend_from_slice(&indexes); @@ -97,6 +108,21 @@ impl SimClient { self.build_request_with_namespace(Operation::SendMessages, &payload, namespace) } + pub fn store_consumer_offset( + &self, + namespace: IggyNamespace, + consumer_kind: u8, + consumer_id: u32, + offset: u64, + ) -> Message { + let mut payload = Vec::with_capacity(13); + payload.push(consumer_kind); + payload.extend_from_slice(&consumer_id.to_le_bytes()); + payload.extend_from_slice(&offset.to_le_bytes()); + + self.build_request_with_namespace(Operation::StoreConsumerOffset, &payload, namespace) + } + #[allow(clippy::cast_possible_truncation)] fn build_request_with_namespace( &self, diff --git a/core/simulator/src/lib.rs b/core/simulator/src/lib.rs index f3cf63ea3..5e64aca81 100644 --- a/core/simulator/src/lib.rs +++ b/core/simulator/src/lib.rs @@ -24,9 +24,13 @@ pub mod ready_queue; pub mod replica; use bus::MemBus; +use consensus::PartitionsHandle; use iggy_common::header::ReplyHeader; use iggy_common::message::Message; +use iggy_common::sharding::IggyNamespace; +use iggy_common::{IggyError, IggyMessagesBatchSet}; use message_bus::MessageBus; +use partitions::{Partition, PartitionOffsets, PollingArgs, PollingConsumer}; use replica::{Replica, new_replica}; use std::sync::Arc; @@ -139,6 +143,44 @@ impl Simulator { } } +impl Simulator { + /// Poll messages directly from a replica's partition. + /// + /// # Errors + /// Returns `IggyError::ResourceNotFound` if the namespace does not exist on this replica. + #[allow(clippy::future_not_send)] + pub async fn poll_messages( + &self, + replica_idx: usize, + namespace: IggyNamespace, + consumer: PollingConsumer, + args: PollingArgs, + ) -> Result { + let replica = &self.replicas[replica_idx]; + let partition = + replica + .plane + .partitions() + .get_by_ns(&namespace) + .ok_or(IggyError::ResourceNotFound(format!( + "partition not found for namespace {namespace:?} on replica {replica_idx}" + )))?; + partition.poll_messages(consumer, args).await + } + + /// Get partition offsets from a replica. + #[must_use] + pub fn offsets( + &self, + replica_idx: usize, + namespace: IggyNamespace, + ) -> Option { + let replica = &self.replicas[replica_idx]; + let partition = replica.plane.partitions().get_by_ns(&namespace)?; + Some(partition.offsets()) + } +} + // TODO(IGGY-66): Add acceptance test for per-partition consensus independence. // Setup: 3-replica simulator, two partitions (ns_a, ns_b). // 1. Fill ns_a's pipeline to PIPELINE_PREPARE_QUEUE_MAX without delivering acks. diff --git a/core/simulator/src/main.rs b/core/simulator/src/main.rs index de2288273..1ff966253 100644 --- a/core/simulator/src/main.rs +++ b/core/simulator/src/main.rs @@ -15,10 +15,13 @@ // specific language governing permissions and limitations // under the License. +use iggy_common::PollingStrategy; use iggy_common::header::ReplyHeader; use iggy_common::message::Message; use iggy_common::sharding::IggyNamespace; +use iggy_common::{IggyByteSize, MemoryPool, MemoryPoolConfigOther}; use message_bus::MessageBus; +use partitions::{PollingArgs, PollingConsumer}; use simulator::{Simulator, client::SimClient}; use std::collections::VecDeque; use std::sync::{Arc, Mutex}; @@ -39,7 +42,16 @@ impl Responses { } } +#[allow(clippy::too_many_lines)] fn main() { + // PooledBuffer::from (used by poll_messages) panics if the global pool is uninitialized. + // Disabled pooling just falls through to the system allocator. + MemoryPool::init_pool(&MemoryPoolConfigOther { + enabled: false, + size: IggyByteSize::from(0u64), + bucket_capacity: 1, + }); + let client_id: u128 = 1; let leader: u8 = 0; let mut sim = Simulator::new(3, std::iter::once(client_id)); @@ -126,6 +138,52 @@ fn main() { break; } } + + // Poll messages directly from the leader's partition (bypassing consensus) + let consumer = PollingConsumer::Consumer(1, 0); + let args = PollingArgs::new(PollingStrategy::first(), 10, false); + match sim + .poll_messages(leader as usize, test_namespace, consumer, args) + .await + { + Ok(batch_set) => { + println!( + "[sim] Poll returned {} messages (expected 3)", + batch_set.count() + ); + } + Err(e) => { + println!("[sim] Poll failed: {e}"); + } + } + + let args_auto = PollingArgs::new(PollingStrategy::first(), 2, true); + if let Ok(batch) = sim + .poll_messages(leader as usize, test_namespace, consumer, args_auto) + .await + { + println!("[sim] Auto-commit poll returned {} messages", batch.count()); + } + + // Next poll should start from offset 2 (after auto-commit of 0,1) + let args_next = PollingArgs::new(PollingStrategy::next(), 10, false); + if let Ok(batch) = sim + .poll_messages(leader as usize, test_namespace, consumer, args_next) + .await + { + println!( + "[sim] Next poll returned {} messages (expected 1)", + batch.count() + ); + } + + // Check offsets + if let Some(offsets) = sim.offsets(leader as usize, test_namespace) { + println!( + "[sim] Partition offsets: commit={}, write={}", + offsets.commit_offset, offsets.write_offset + ); + } }); client_handle.join().expect("client thread panicked");