Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
156 changes: 152 additions & 4 deletions core/partitions/src/iggy_partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,26 @@
// specific language governing permissions and limitations
// under the License.

use crate::journal::{PartitionJournal, PartitionJournalMemStorage};
use crate::journal::{
MessageLookup, PartitionJournal, PartitionJournalMemStorage, QueryableJournal,
};
use crate::log::SegmentedLog;
use crate::{AppendResult, Partition, decode_send_messages_batch};
use crate::{
AppendResult, Partition, PartitionOffsets, PollingArgs, PollingConsumer,
decode_send_messages_batch,
};
use iggy_common::{
ConsumerGroupOffsets, ConsumerOffsets, IggyByteSize, IggyError, IggyMessagesBatchMut,
IggyTimestamp, PartitionStats,
ConsumerGroupId, ConsumerGroupOffsets, ConsumerKind, ConsumerOffset, ConsumerOffsets,
IggyByteSize, IggyError, IggyMessagesBatchMut, IggyMessagesBatchSet, IggyTimestamp,
PartitionStats, PollingKind,
header::{Operation, PrepareHeader},
message::Message,
};
use journal::Journal as _;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use tokio::sync::Mutex as TokioMutex;
use tracing::warn;

// This struct aliases in terms of the code contained the `LocalPartition from `core/server/src/streaming/partitions/local_partition.rs`.
#[derive(Debug)]
Expand Down Expand Up @@ -165,4 +172,145 @@ impl Partition for IggyPartition {
batch_messages_count,
))
}

async fn poll_messages(
&self,
consumer: PollingConsumer,
args: PollingArgs,
) -> Result<IggyMessagesBatchSet, IggyError> {
if !self.should_increment_offset || args.count == 0 {
return Ok(IggyMessagesBatchSet::empty());
}

let committed_offset = self.offset.load(Ordering::Relaxed);

let start_offset = match args.strategy.kind {
PollingKind::Offset => args.strategy.value,
PollingKind::First => 0,
PollingKind::Last => committed_offset.saturating_sub(u64::from(args.count) - 1),
PollingKind::Timestamp => {
let result = self
.log
.journal()
.inner
.get(&MessageLookup::Timestamp {
timestamp: args.strategy.value,
count: args.count,
})
.await;
let batch_set = result.unwrap_or_else(IggyMessagesBatchSet::empty);
if let Some(first) = batch_set.first_offset() {
if first > committed_offset {
return Ok(IggyMessagesBatchSet::empty());
}
let max_count = u32::try_from(committed_offset - first + 1).unwrap_or(u32::MAX);
return Ok(batch_set.get_by_offset(first, batch_set.count().min(max_count)));
}
return Ok(batch_set);
}
PollingKind::Next => self
.get_consumer_offset(consumer)
.map_or(0, |offset| offset + 1),
};

if start_offset > committed_offset {
return Ok(IggyMessagesBatchSet::empty());
}

let max_count = u32::try_from(committed_offset - start_offset + 1).unwrap_or(u32::MAX);
let count = args.count.min(max_count);

let result = self
.log
.journal()
.inner
.get(&MessageLookup::Offset {
offset: start_offset,
count,
})
.await;

let batch_set = result.unwrap_or_else(IggyMessagesBatchSet::empty);

if args.auto_commit && !batch_set.is_empty() {
let last_offset = start_offset + u64::from(batch_set.count()) - 1;
if let Err(err) = self.store_consumer_offset(consumer, last_offset) {
// warning for now.
warn!(
consumer = ?consumer,
last_offset,
%err,
"poll_messages: failed to store consumer offset"
);
}
}

Ok(batch_set)
}

#[allow(clippy::cast_possible_truncation)]
fn store_consumer_offset(
&self,
consumer: PollingConsumer,
offset: u64,
) -> Result<(), IggyError> {
match consumer {
PollingConsumer::Consumer(id, _) => {
let guard = self.consumer_offsets.pin();
if let Some(existing) = guard.get(&id) {
existing.offset.store(offset, Ordering::Relaxed);
} else {
guard.insert(
id,
ConsumerOffset::new(
ConsumerKind::Consumer,
id as u32,
offset,
String::new(),
),
);
}
}
PollingConsumer::ConsumerGroup(group_id, _) => {
let guard = self.consumer_group_offsets.pin();
let key = ConsumerGroupId(group_id);
if let Some(existing) = guard.get(&key) {
existing.offset.store(offset, Ordering::Relaxed);
} else {
guard.insert(
key,
ConsumerOffset::new(
ConsumerKind::ConsumerGroup,
group_id as u32,
offset,
String::new(),
),
);
}
}
}
Ok(())
}

fn get_consumer_offset(&self, consumer: PollingConsumer) -> Option<u64> {
match consumer {
PollingConsumer::Consumer(id, _) => self
.consumer_offsets
.pin()
.get(&id)
.map(|co| co.offset.load(Ordering::Relaxed)),
PollingConsumer::ConsumerGroup(group_id, _) => self
.consumer_group_offsets
.pin()
.get(&ConsumerGroupId(group_id))
.map(|co| co.offset.load(Ordering::Relaxed)),
}
}

fn offsets(&self) -> PartitionOffsets {
PartitionOffsets::new(
self.offset.load(Ordering::Relaxed),
self.dirty_offset.load(Ordering::Relaxed),
)
}
}
30 changes: 28 additions & 2 deletions core/partitions/src/iggy_partitions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

use crate::IggyPartition;
use crate::Partition;
use crate::PollingConsumer;
use crate::log::JournalInfo;
use crate::types::PartitionsConfig;
use consensus::PlaneIdentity;
Expand Down Expand Up @@ -562,11 +563,36 @@ where
);
}
Operation::StoreConsumerOffset => {
// TODO: Deserialize consumer offset from prepare body
// and store in partition's consumer_offsets.
let body = message.body_bytes();
let body = body.as_ref();
let consumer_kind = body[0];
let consumer_id = u32::from_le_bytes(body[1..5].try_into().unwrap()) as usize;
let offset = u64::from_le_bytes(body[5..13].try_into().unwrap());
let consumer = match consumer_kind {
1 => PollingConsumer::Consumer(consumer_id, 0),
2 => PollingConsumer::ConsumerGroup(consumer_id, 0),
_ => {
warn!(
replica = consensus.replica(),
op = header.op,
consumer_kind,
"on_replicate: unknown consumer kind"
);
return;
}
};

let partition = self
.get_by_ns(namespace)
.expect("store_consumer_offset: partition not found for namespace");
let _ = partition.store_consumer_offset(consumer, offset);

debug!(
replica = consensus.replica(),
op = header.op,
consumer_kind,
consumer_id,
offset,
"on_replicate: consumer offset stored"
);
}
Expand Down
46 changes: 36 additions & 10 deletions core/simulator/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
// under the License.

use iggy_common::{
BytesSerializable, INDEX_SIZE, Identifier,
BytesSerializable, IGGY_MESSAGE_HEADER_SIZE, INDEX_SIZE, Identifier,
create_stream::CreateStream,
delete_stream::DeleteStream,
header::{Operation, RequestHeader},
Expand Down Expand Up @@ -72,23 +72,34 @@ impl SimClient {
namespace: IggyNamespace,
messages: &[&[u8]],
) -> Message<RequestHeader> {
// Build batch: count | indexes | messages
let count = messages.len() as u32;
let mut indexes = Vec::with_capacity(count as usize * INDEX_SIZE);
let mut messages_buf = Vec::new();

let mut current_position = 0u32;
for msg in messages {
// Write index: position (u32) + length (u32)
indexes.extend_from_slice(&current_position.to_le_bytes());
indexes.extend_from_slice(&(msg.len() as u32).to_le_bytes());

// Append message
for (i, msg) in messages.iter().enumerate() {
let msg_total_len = (IGGY_MESSAGE_HEADER_SIZE + msg.len()) as u32;

// Index: offset(u32) + position(u32) + timestamp(u64)
indexes.extend_from_slice(&(i as u32).to_le_bytes()); // offset (relative)
indexes.extend_from_slice(&current_position.to_le_bytes()); // position
indexes.extend_from_slice(&0u64.to_le_bytes()); // timestamp (set in prepare)

// Message header (64 bytes)
messages_buf.extend_from_slice(&0u64.to_le_bytes()); // checksum
messages_buf.extend_from_slice(&0u128.to_le_bytes()); // id
messages_buf.extend_from_slice(&0u64.to_le_bytes()); // offset
messages_buf.extend_from_slice(&0u64.to_le_bytes()); // timestamp
messages_buf.extend_from_slice(&0u64.to_le_bytes()); // origin_timestamp
messages_buf.extend_from_slice(&0u32.to_le_bytes()); // user_headers_length
messages_buf.extend_from_slice(&(msg.len() as u32).to_le_bytes()); // payload_length
messages_buf.extend_from_slice(&0u64.to_le_bytes()); // reserved

// Payload
messages_buf.extend_from_slice(msg);
current_position += msg.len() as u32;
current_position += msg_total_len;
}

// Build payload: count | indexes | messages
let mut payload = Vec::with_capacity(4 + indexes.len() + messages_buf.len());
payload.extend_from_slice(&count.to_le_bytes());
payload.extend_from_slice(&indexes);
Expand All @@ -97,6 +108,21 @@ impl SimClient {
self.build_request_with_namespace(Operation::SendMessages, &payload, namespace)
}

pub fn store_consumer_offset(
&self,
namespace: IggyNamespace,
consumer_kind: u8,
consumer_id: u32,
offset: u64,
) -> Message<RequestHeader> {
let mut payload = Vec::with_capacity(13);
payload.push(consumer_kind);
payload.extend_from_slice(&consumer_id.to_le_bytes());
payload.extend_from_slice(&offset.to_le_bytes());

self.build_request_with_namespace(Operation::StoreConsumerOffset, &payload, namespace)
}

#[allow(clippy::cast_possible_truncation)]
fn build_request_with_namespace(
&self,
Expand Down
42 changes: 42 additions & 0 deletions core/simulator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,13 @@ pub mod ready_queue;
pub mod replica;

use bus::MemBus;
use consensus::PartitionsHandle;
use iggy_common::header::ReplyHeader;
use iggy_common::message::Message;
use iggy_common::sharding::IggyNamespace;
use iggy_common::{IggyError, IggyMessagesBatchSet};
use message_bus::MessageBus;
use partitions::{Partition, PartitionOffsets, PollingArgs, PollingConsumer};
use replica::{Replica, new_replica};
use std::sync::Arc;

Expand Down Expand Up @@ -139,6 +143,44 @@ impl Simulator {
}
}

impl Simulator {
/// Poll messages directly from a replica's partition.
///
/// # Errors
/// Returns `IggyError::ResourceNotFound` if the namespace does not exist on this replica.
#[allow(clippy::future_not_send)]
pub async fn poll_messages(
&self,
replica_idx: usize,
namespace: IggyNamespace,
consumer: PollingConsumer,
args: PollingArgs,
) -> Result<IggyMessagesBatchSet, IggyError> {
let replica = &self.replicas[replica_idx];
let partition =
replica
.plane
.partitions()
.get_by_ns(&namespace)
.ok_or(IggyError::ResourceNotFound(format!(
"partition not found for namespace {namespace:?} on replica {replica_idx}"
)))?;
partition.poll_messages(consumer, args).await
}

/// Get partition offsets from a replica.
#[must_use]
pub fn offsets(
&self,
replica_idx: usize,
namespace: IggyNamespace,
) -> Option<PartitionOffsets> {
let replica = &self.replicas[replica_idx];
let partition = replica.plane.partitions().get_by_ns(&namespace)?;
Some(partition.offsets())
}
}

// TODO(IGGY-66): Add acceptance test for per-partition consensus independence.
// Setup: 3-replica simulator, two partitions (ns_a, ns_b).
// 1. Fill ns_a's pipeline to PIPELINE_PREPARE_QUEUE_MAX without delivering acks.
Expand Down
Loading
Loading