Skip to content

Commit 18588f1

Browse files
authored
refactor(journal): refactor partition journal utilize storage trait (#2909)
Refactor the `PartitionJournal` to use the `Strorage` trait as backing storage, rather than storing data inline.
1 parent 0e435f0 commit 18588f1

File tree

6 files changed

+430
-115
lines changed

6 files changed

+430
-115
lines changed

core/journal/src/lib.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,10 @@ pub trait Storage {
3939
type Buffer;
4040

4141
fn write(&self, buf: Self::Buffer) -> impl Future<Output = usize>;
42-
fn read(&self, offset: usize, buffer: Self::Buffer) -> impl Future<Output = Self::Buffer>;
42+
// TODO: Get rid of the `len` usize, we need to do changes in `Simulator` in order to support that.
43+
// Maybe we should go back to passing in the `Buffer` again, but I am not sure how to handle it in the `Partitions Journal`, since we use in-memory impl
44+
// which extracts the buffer out of the `Vec<Message>` and we don't need to allocate a new buffer.
45+
fn read(&self, offset: usize, len: usize) -> impl Future<Output = Self::Buffer>;
4346
}
4447

4548
pub trait JournalHandle {

core/partitions/src/iggy_partition.rs

Lines changed: 46 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,14 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
use crate::journal::{Noop, PartitionJournal};
18+
use crate::journal::{PartitionJournal, PartitionJournalMemStorage};
1919
use crate::log::SegmentedLog;
20-
use crate::{AppendResult, Partition};
20+
use crate::{AppendResult, Partition, decode_send_messages_batch};
2121
use iggy_common::{
2222
ConsumerGroupOffsets, ConsumerOffsets, IggyByteSize, IggyError, IggyMessagesBatchMut,
2323
IggyTimestamp, PartitionStats,
24+
header::{Operation, PrepareHeader},
25+
message::Message,
2426
};
2527
use journal::Journal as _;
2628
use std::sync::Arc;
@@ -30,7 +32,7 @@ use tokio::sync::Mutex as TokioMutex;
3032
// This struct aliases in terms of the code contained the `LocalPartition from `core/server/src/streaming/partitions/local_partition.rs`.
3133
#[derive(Debug)]
3234
pub struct IggyPartition {
33-
pub log: SegmentedLog<PartitionJournal, Noop>,
35+
pub log: SegmentedLog<PartitionJournal<PartitionJournalMemStorage>, PartitionJournalMemStorage>,
3436
/// Committed offset — advanced only after quorum ack.
3537
pub offset: Arc<AtomicU64>,
3638
/// Dirty offset — advanced on every prepare (before commit).
@@ -46,6 +48,36 @@ pub struct IggyPartition {
4648
}
4749

4850
impl IggyPartition {
51+
fn prepare_message_from_batch(
52+
mut header: PrepareHeader,
53+
batch: &IggyMessagesBatchMut,
54+
) -> Message<PrepareHeader> {
55+
let indexes = batch.indexes();
56+
let count = batch.count();
57+
let body_len = 4 + indexes.len() + batch.len();
58+
let total_size = std::mem::size_of::<PrepareHeader>() + body_len;
59+
header.size = u32::try_from(total_size)
60+
.expect("prepare_message_from_batch: batch size exceeds u32::MAX");
61+
62+
let message = Message::<PrepareHeader>::new(total_size).transmute_header(|_old, new| {
63+
*new = header;
64+
});
65+
66+
let mut bytes = message
67+
.into_inner()
68+
.try_into_mut()
69+
.expect("prepare_message_from_batch: expected unique bytes buffer");
70+
let header_size = std::mem::size_of::<PrepareHeader>();
71+
bytes[header_size..header_size + 4].copy_from_slice(&count.to_le_bytes());
72+
let mut position = header_size + 4;
73+
bytes[position..position + indexes.len()].copy_from_slice(indexes);
74+
position += indexes.len();
75+
bytes[position..position + batch.len()].copy_from_slice(batch);
76+
77+
Message::<PrepareHeader>::from_bytes(bytes.freeze())
78+
.expect("prepare_message_from_batch: invalid prepared message bytes")
79+
}
80+
4981
pub fn new(stats: Arc<PartitionStats>) -> Self {
5082
Self {
5183
log: SegmentedLog::default(),
@@ -65,8 +97,16 @@ impl IggyPartition {
6597
impl Partition for IggyPartition {
6698
async fn append_messages(
6799
&mut self,
68-
mut batch: IggyMessagesBatchMut,
100+
message: Message<PrepareHeader>,
69101
) -> Result<AppendResult, IggyError> {
102+
let header = *message.header();
103+
if header.operation != Operation::SendMessages {
104+
return Err(IggyError::CannotAppendMessage);
105+
}
106+
107+
let mut batch = decode_send_messages_batch(message.body_bytes())
108+
.ok_or(IggyError::CannotAppendMessage)?;
109+
70110
if batch.count() == 0 {
71111
return Ok(AppendResult::new(0, 0, 0));
72112
}
@@ -116,7 +156,8 @@ impl Partition for IggyPartition {
116156
journal.info.end_timestamp = ts;
117157
}
118158

119-
journal.inner.append(batch).await;
159+
let message = Self::prepare_message_from_batch(header, &batch);
160+
journal.inner.append(message).await;
120161

121162
Ok(AppendResult::new(
122163
dirty_offset,

core/partitions/src/iggy_partitions.rs

Lines changed: 16 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,7 @@ use consensus::{
3030
};
3131
use iggy_common::header::Command2;
3232
use iggy_common::{
33-
INDEX_SIZE, IggyByteSize, IggyIndexesMut, IggyMessagesBatchMut, PartitionStats, PooledBuffer,
34-
Segment, SegmentStorage,
33+
IggyByteSize, PartitionStats, Segment, SegmentStorage,
3534
header::{
3635
ConsensusHeader, GenericHeader, Operation, PrepareHeader, PrepareOkHeader, RequestHeader,
3736
},
@@ -355,13 +354,13 @@ where
355354
}
356355

357356
async fn on_replicate(&self, message: <VsrConsensus<B> as Consensus>::Message<PrepareHeader>) {
358-
let header = message.header();
357+
let header = *message.header();
359358
let namespace = IggyNamespace::from_raw(header.namespace);
360359
let consensus = self
361360
.consensus()
362361
.expect("on_replicate: consensus not initialized");
363362

364-
let current_op = match replicate_preflight(consensus, header) {
363+
let current_op = match replicate_preflight(consensus, &header) {
365364
Ok(current_op) => current_op,
366365
Err(reason) => {
367366
warn!(
@@ -372,7 +371,7 @@ where
372371
}
373372
};
374373

375-
let is_old_prepare = fence_old_prepare_by_commit(consensus, header);
374+
let is_old_prepare = fence_old_prepare_by_commit(consensus, &header);
376375
if is_old_prepare {
377376
warn!("received old prepare, not replicating");
378377
} else {
@@ -387,9 +386,9 @@ where
387386
// TODO: Figure out the flow of the partition operations.
388387
// In metadata layer we assume that when an `on_request` or `on_replicate` is called, it's called from correct shard.
389388
// I think we need to do the same here, which means that the code from below is unfallable, the partition should always exist by now!
390-
self.apply_replicated_operation(&namespace, &message).await;
389+
self.apply_replicated_operation(&namespace, message).await;
391390

392-
self.send_prepare_ok(header).await;
391+
self.send_prepare_ok(&header).await;
393392

394393
if consensus.is_follower() {
395394
self.commit_journal(namespace);
@@ -539,37 +538,21 @@ where
539538
.register_namespace(ns);
540539
}
541540

542-
// TODO: Move this elsewhere, also do not reallocate, we do reallocationg now becauise we use PooledBuffer for the batch body
543-
// but `Bytes` for `Message` payload.
544-
fn batch_from_body(body: &[u8]) -> IggyMessagesBatchMut {
545-
assert!(body.len() >= 4, "prepare body too small for batch header");
546-
let count = u32::from_le_bytes(body[0..4].try_into().unwrap());
547-
let indexes_len = count as usize * INDEX_SIZE;
548-
let indexes_end = 4 + indexes_len;
549-
assert!(
550-
body.len() >= indexes_end,
551-
"prepare body too small for {count} indexes",
552-
);
553-
554-
let indexes = IggyIndexesMut::from_bytes(PooledBuffer::from(&body[4..indexes_end]), 0);
555-
let messages = PooledBuffer::from(&body[indexes_end..]);
556-
IggyMessagesBatchMut::from_indexes_and_messages(indexes, messages)
557-
}
558-
559541
async fn apply_replicated_operation(
560542
&self,
561543
namespace: &IggyNamespace,
562-
message: &Message<PrepareHeader>,
544+
message: Message<PrepareHeader>,
563545
) {
564546
let consensus = self
565547
.consensus()
566548
.expect("apply_replicated_operation: consensus not initialized");
567-
let header = message.header();
549+
let header = *message.header();
568550

551+
// TODO: WE have to distinguish between an `message` recv by leader and follower.
552+
// In the follower path, we have to skip the `prepare_for_persistence` path, just append to journal.
569553
match header.operation {
570554
Operation::SendMessages => {
571-
let body = message.body_bytes();
572-
self.append_send_messages_to_journal(namespace, body.as_ref())
555+
self.append_send_messages_to_journal(namespace, message)
573556
.await;
574557
debug!(
575558
replica = consensus.replica(),
@@ -598,28 +581,23 @@ where
598581
}
599582
}
600583

601-
async fn append_send_messages_to_journal(&self, namespace: &IggyNamespace, body: &[u8]) {
602-
let batch = Self::batch_from_body(body);
603-
self.append_messages_to_journal(namespace, batch).await;
604-
}
605-
606-
/// Append a batch to a partition's journal with offset assignment.
584+
/// Append a prepare message to a partition's journal with offset assignment.
607585
///
608586
/// Updates `segment.current_position` (logical position for indexing) but
609587
/// not `segment.end_offset` or `segment.end_timestamp` (committed state).
610588
/// Those are updated during commit.
611589
///
612590
/// Uses `dirty_offset` for offset assignment so that multiple prepares
613591
/// can be pipelined before any commit.
614-
async fn append_messages_to_journal(
592+
async fn append_send_messages_to_journal(
615593
&self,
616594
namespace: &IggyNamespace,
617-
batch: IggyMessagesBatchMut,
595+
message: Message<PrepareHeader>,
618596
) {
619597
let partition = self
620598
.get_mut_by_ns(namespace)
621-
.expect("append_messages_to_journal: partition not found for namespace");
622-
let _ = partition.append_messages(batch).await;
599+
.expect("append_send_messages_to_journal: partition not found for namespace");
600+
let _ = partition.append_messages(message).await;
623601
}
624602

625603
/// Replicate a prepare message to the next replica in the chain.

0 commit comments

Comments
 (0)