Skip to content

Commit 27f1e0d

Browse files
authored
Merge pull request #42 from ilikepi63/feat/make-partition-key-array-of-bytes
feat: Make Partition key array of bytes
2 parents 551ee28 + 2920d91 commit 27f1e0d

File tree

14 files changed

+85
-75
lines changed

14 files changed

+85
-75
lines changed

Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "riskless"
3-
version = "0.6.3"
3+
version = "0.7.1"
44
edition = "2024"
55
description = "A pure Rust implementation of Diskless Topics"
66
license = "MIT / Apache-2.0"

examples/concurrent.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ async fn main() {
2424
.collect(ProduceRequest {
2525
request_id: 1,
2626
topic: "example-topic".to_string(),
27-
partition: 1,
27+
partition: Vec::from(&1_u8.to_be_bytes()),
2828
data: "hello".as_bytes().to_vec(),
2929
})
3030
.expect("");
@@ -55,7 +55,7 @@ async fn main() {
5555
let consume_response = consume(
5656
ConsumeRequest {
5757
topic: "example-topic".to_string(),
58-
partition: 1,
58+
partition: Vec::from(&1_u8.to_be_bytes()),
5959
offset: 0,
6060
max_partition_fetch_bytes: 0,
6161
},

examples/simple.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ async fn main() {
1717
col.collect(ProduceRequest {
1818
request_id: 1,
1919
topic: "example-topic".to_string(),
20-
partition: 1,
20+
partition: Vec::from(&1_u8.to_be_bytes()),
2121
data: "hello".as_bytes().to_vec(),
2222
})
2323
.expect("");
@@ -31,7 +31,7 @@ async fn main() {
3131
let consume_response = consume(
3232
ConsumeRequest {
3333
topic: "example-topic".to_string(),
34-
partition: 1,
34+
partition: Vec::from(&1_u8.to_be_bytes()),
3535
offset: 0,
3636
max_partition_fetch_bytes: 0,
3737
},

src/batch_coordinator/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use crate::messages::CommitBatchRequest;
1414

1515
/// Merged Topic/Partition identification struc .
1616
#[derive(Debug, Hash, PartialEq, Eq, Clone, Default)]
17-
pub struct TopicIdPartition(pub String, pub u64);
17+
pub struct TopicIdPartition(pub String, pub Vec<u8>);
1818

1919
/// The type of the timestamp given.
2020
#[derive(Debug, Default, Clone)]

src/batch_coordinator/simple/mod.rs

Lines changed: 34 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -55,11 +55,14 @@ impl SimpleBatchCoordinator {
5555
}
5656

5757
/// Retrieves the directory of the given topic/partition.
58-
fn partition_index_file_from_topic_dir(
59-
topic_dir: &mut PathBuf,
60-
partition: u64,
61-
) -> &mut PathBuf {
62-
topic_dir.push(format!("{:0>20}.index", partition.to_string()));
58+
fn partition_index_file_from_topic_dir<'a>(
59+
topic_dir: &'a mut PathBuf,
60+
partition: &'a [u8],
61+
) -> &'a mut PathBuf {
62+
topic_dir.push(format!(
63+
"{:0>20}.index",
64+
partition.iter().map(|b| b.to_string()).collect::<String>()
65+
));
6366

6467
(topic_dir) as _
6568
}
@@ -105,7 +108,7 @@ impl CommitFile for SimpleBatchCoordinator {
105108

106109
let current_partition_file = Self::partition_index_file_from_topic_dir(
107110
&mut current_topic_dir,
108-
batch.topic_id_partition.1,
111+
&batch.topic_id_partition.1,
109112
);
110113

111114
let file = Self::open_or_create_file(current_partition_file);
@@ -162,7 +165,7 @@ impl FindBatches for SimpleBatchCoordinator {
162165

163166
let current_partition_file = Self::partition_index_file_from_topic_dir(
164167
&mut current_topic_dir,
165-
request.topic_id_partition.1,
168+
&request.topic_id_partition.1,
166169
);
167170

168171
let file = Self::open_file(current_partition_file);
@@ -383,10 +386,10 @@ mod tests {
383386
#[test]
384387
fn test_partition_index_file_from_topic_dir() {
385388
let mut topic_dir = PathBuf::from("test_topic");
386-
let partition = 42;
389+
let partition = Vec::from(&42_u8.to_be_bytes());
387390

388391
let result =
389-
SimpleBatchCoordinator::partition_index_file_from_topic_dir(&mut topic_dir, partition);
392+
SimpleBatchCoordinator::partition_index_file_from_topic_dir(&mut topic_dir, &partition);
390393

391394
assert_eq!(
392395
result.to_str().expect(""),
@@ -461,11 +464,11 @@ mod tests {
461464
let whole_dir = temp_dir.clone();
462465

463466
let topic = "test_topic".to_string();
464-
let partition = 1;
467+
let partition = Vec::from(&1_u8.to_be_bytes());
465468

466469
let object_key = Uuid::new_v4().into_bytes();
467470
let batches = vec![CommitBatchRequest {
468-
topic_id_partition: TopicIdPartition(topic.clone(), partition),
471+
topic_id_partition: TopicIdPartition(topic.clone(), partition.clone()),
469472
byte_offset: 0,
470473
size: 100,
471474
request_id: 1,
@@ -479,9 +482,10 @@ mod tests {
479482
last_sequence: 0,
480483
}];
481484

482-
let expected_file_path = temp_dir
483-
.join(&topic)
484-
.join(format!("{:0>20}.index", partition));
485+
let expected_file_path = temp_dir.join(&topic).join(format!(
486+
"{:0>20}.index",
487+
partition.iter().map(|b| b.to_string()).collect::<String>()
488+
));
485489

486490
// File shouldn't exist yet
487491
assert!(!expected_file_path.exists());
@@ -518,15 +522,15 @@ mod tests {
518522
let _ = std::fs::create_dir(index_path);
519523

520524
let topic = "test_topic".to_string();
521-
let partition = 1;
525+
let partition = Vec::from(&1_u8.to_be_bytes());
522526

523527
// First, create an index file with some data
524528
let object_key = Uuid::new_v4().into_bytes();
525529
let offset = 0;
526530
let size = 100;
527531

528532
let batches = vec![CommitBatchRequest {
529-
topic_id_partition: TopicIdPartition(topic.clone(), partition),
533+
topic_id_partition: TopicIdPartition(topic.clone(), partition.clone()),
530534
byte_offset: offset,
531535
size,
532536
request_id: 1,
@@ -582,7 +586,7 @@ mod tests {
582586
let _ = std::fs::create_dir(&index_path);
583587

584588
let topic = "test_topic".to_string();
585-
let partition = 1;
589+
let partition = Vec::from(&1_u8.to_be_bytes());
586590

587591
// First, create an index file with some data
588592
let object_key = Uuid::new_v4().into_bytes();
@@ -591,7 +595,7 @@ mod tests {
591595
let size = 100;
592596

593597
let batches = vec![CommitBatchRequest {
594-
topic_id_partition: TopicIdPartition(topic.clone(), partition),
598+
topic_id_partition: TopicIdPartition(topic.clone(), partition.clone()),
595599
byte_offset: offset,
596600
size,
597601
request_id: 1,
@@ -608,7 +612,7 @@ mod tests {
608612
coordinator.commit_file(object_key, 1, 100, batches).await;
609613

610614
let batches = vec![CommitBatchRequest {
611-
topic_id_partition: TopicIdPartition(topic.clone(), partition),
615+
topic_id_partition: TopicIdPartition(topic.clone(), partition.clone()),
612616
byte_offset: 1,
613617
size,
614618
request_id: 1,
@@ -627,7 +631,7 @@ mod tests {
627631
.await;
628632

629633
let batches = vec![CommitBatchRequest {
630-
topic_id_partition: TopicIdPartition(topic.clone(), partition),
634+
topic_id_partition: TopicIdPartition(topic.clone(), partition.clone()),
631635
byte_offset: 1,
632636
size,
633637
request_id: 2,
@@ -649,15 +653,18 @@ mod tests {
649653

650654
index_path.push(&topic);
651655

652-
index_path.push(format!("{:0>20}.index", partition.to_string()));
656+
index_path.push(format!(
657+
"{:0>20}.index",
658+
partition.iter().map(|b| b.to_string()).collect::<String>()
659+
));
653660

654661
let data = std::fs::read(index_path)?;
655662

656663
assert_eq!(data.len(), Index::packed_size() * 3);
657664

658665
// Now try to find the batch
659666
let find_requests = vec![FindBatchRequest {
660-
topic_id_partition: TopicIdPartition(topic.clone(), partition),
667+
topic_id_partition: TopicIdPartition(topic.clone(), partition.clone()),
661668
offset,
662669
max_partition_fetch_bytes: 1024,
663670
}];
@@ -675,7 +682,7 @@ mod tests {
675682
assert_eq!(batch.metadata.byte_size, size);
676683

677684
let find_requests = vec![FindBatchRequest {
678-
topic_id_partition: TopicIdPartition(topic.clone(), partition),
685+
topic_id_partition: TopicIdPartition(topic.clone(), partition.clone()),
679686
offset: 1,
680687
max_partition_fetch_bytes: 1024,
681688
}];
@@ -706,7 +713,10 @@ mod tests {
706713
let coordinator = SimpleBatchCoordinator::new(temp_dir.to_str().expect("").to_string());
707714

708715
let find_requests = vec![FindBatchRequest {
709-
topic_id_partition: TopicIdPartition("nonexistent_topic".to_string(), 1),
716+
topic_id_partition: TopicIdPartition(
717+
"nonexistent_topic".to_string(),
718+
Vec::from(&1_u8.to_be_bytes()),
719+
),
710720
offset: 0,
711721
max_partition_fetch_bytes: 1024,
712722
}];

src/messages/batch_coordinate.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ pub struct BatchCoordinate {
66
/// The topic for this record.
77
pub topic: String,
88
/// The partition for this record.
9-
pub partition: u64,
9+
pub partition: Vec<u8>,
1010
/// The base_offset inside of the file at which this record exists.
1111
pub base_offset: u64,
1212
/// The offset at which this record exists.

src/messages/commit_batch_request.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ impl From<&BatchCoordinate> for CommitBatchRequest {
3939
// Everything that is defaulted is unknown for now.
4040
CommitBatchRequest {
4141
request_id: value.request.request_id,
42-
topic_id_partition: TopicIdPartition(value.topic.clone(), value.partition),
42+
topic_id_partition: TopicIdPartition(value.topic.clone(), value.partition.clone()),
4343
byte_offset: value.offset,
4444
size: value.size,
4545
base_offset: value.base_offset,

src/messages/consume_request.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ pub struct ConsumeRequest {
99
/// The topic that this request consumes from.
1010
pub topic: String,
1111
/// The partition that this request consumes from.
12-
pub partition: u64,
12+
pub partition: Vec<u8>,
1313
/// The offset from which this request consumes from.
1414
pub offset: u64,
1515
/// The maximum amount of bytes to retrieve from a partition.

src/messages/consume_response.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ pub struct ConsumeBatch {
1111
/// The topic of the record.
1212
pub topic: String,
1313
/// Partition of a record.
14-
pub partition: u64,
14+
pub partition: Vec<u8>,
1515
/// The offset of this record.
1616
pub offset: u64,
1717
/// The maximum amount of bytes to retrieve from each partition.
@@ -46,7 +46,7 @@ impl TryFrom<(FindBatchResponse, &BatchInfo, &Bytes)> for ConsumeBatch {
4646

4747
let batch = ConsumeBatch {
4848
topic: batch_info.metadata.topic_id_partition.0.clone(),
49-
partition: batch_info.metadata.topic_id_partition.1,
49+
partition: batch_info.metadata.topic_id_partition.1.clone(),
5050
offset: find_batch_response.log_start_offset,
5151
max_partition_fetch_bytes: 0,
5252
data,

0 commit comments

Comments
 (0)