Skip to content

Commit a387952

Browse files
committed
feat(kafka): kafka api command.
1 parent 13bc839 commit a387952

File tree

14 files changed

+481
-22
lines changed

14 files changed

+481
-22
lines changed

Cargo.lock

Lines changed: 4 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/broker-server/src/lib.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,13 +144,14 @@ impl BrokerServer {
144144
broker_cache.clone(),
145145
rocksdb_engine_handler.clone(),
146146
connection_manager.clone(),
147-
message_storage_adapter,
147+
message_storage_adapter.clone(),
148148
);
149149

150150
let kafka_params = BrokerServer::build_kafka_server_params(
151151
client_pool.clone(),
152152
broker_cache.clone(),
153153
connection_manager.clone(),
154+
message_storage_adapter.clone(),
154155
);
155156

156157
let journal_params = BrokerServer::build_journal_server(client_pool.clone());
@@ -404,6 +405,7 @@ impl BrokerServer {
404405
client_pool: Arc<ClientPool>,
405406
broker_cache: Arc<BrokerCacheManager>,
406407
connection_manager: Arc<NetworkConnectionManager>,
408+
message_storage_adapter: ArcStorageAdapter,
407409
) -> KafkaBrokerServerParams {
408410
let conf = broker_config();
409411
let proc_config = ProcessorConfig {
@@ -414,6 +416,7 @@ impl BrokerServer {
414416
};
415417
KafkaBrokerServerParams {
416418
connection_manager,
419+
message_storage_adapter,
417420
client_pool,
418421
proc_config,
419422
broker_cache,

src/kafka-broker/Cargo.toml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ license.workspace = true
2121

2222
[dependencies]
2323
axum.workspace = true
24+
common-base.workspace = true
2425
common-config.workspace = true
2526
tokio.workspace = true
2627
tokio-util.workspace = true
@@ -31,4 +32,7 @@ grpc-clients.workspace = true
3132
tracing.workspace = true
3233
protocol.workspace = true
3334
kafka-protocol.workspace = true
34-
thiserror.workspace = true
35+
thiserror.workspace = true
36+
storage-adapter.workspace = true
37+
bytes = "1.10.1"
38+
serde = { version = "1.0.228", features = ["derive"] }

src/kafka-broker/src/broker.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,12 @@ use network_server::tcp::server::TcpServer;
2424
use std::sync::Arc;
2525
use tokio::sync::broadcast::{self};
2626
use tracing::{error, info};
27+
use storage_adapter::storage::ArcStorageAdapter;
2728

2829
#[derive(Clone)]
2930
pub struct KafkaBrokerServerParams {
3031
pub connection_manager: Arc<ConnectionManager>,
32+
pub message_storage_adapter: ArcStorageAdapter,
3133
pub client_pool: Arc<ClientPool>,
3234
pub proc_config: ProcessorConfig,
3335
pub broker_cache: Arc<BrokerCacheManager>,
@@ -43,7 +45,7 @@ pub struct KafkaBrokerServer {
4345
impl KafkaBrokerServer {
4446
pub fn new(params: KafkaBrokerServerParams, main_stop: broadcast::Sender<bool>) -> Self {
4547
let (inner_stop, _) = broadcast::channel(2);
46-
let command = create_command();
48+
let command = create_command(params.message_storage_adapter);
4749
let context = ServerContext {
4850
connection_manager: params.connection_manager.clone(),
4951
client_pool: params.client_pool.clone(),

src/kafka-broker/src/common/error.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,4 +17,6 @@ use thiserror::Error;
1717
#[derive(Error, Debug)]
1818
pub enum KafkaBrokerError {}
1919

20-
pub type ResultKafkaBrokerError = Result<(), KafkaBrokerError>;
20+
pub type ResultKafkaBrokerError = Result<()>;
21+
22+
pub type Result<T> = std::result::Result<T, KafkaBrokerError>;

src/kafka-broker/src/common/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,4 +12,4 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
pub(crate) mod error;
15+
pub mod error;

src/kafka-broker/src/handler/command.rs

Lines changed: 137 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -14,19 +14,41 @@
1414

1515
use axum::async_trait;
1616
use kafka_protocol::messages::api_versions_response::ApiVersion;
17-
use kafka_protocol::messages::{ApiKey, ApiVersionsResponse};
17+
use kafka_protocol::messages::{ApiKey, ApiVersionsResponse, ProduceRequest, ProduceResponse};
18+
use kafka_protocol::messages::produce_response::{PartitionProduceResponse, TopicProduceResponse};
19+
use kafka_protocol::records::{RecordBatchDecoder, RecordBatchEncoder};
1820
use metadata_struct::connection::NetworkConnection;
1921
use network_server::{command::Command, common::packet::ResponsePackage};
2022
use protocol::kafka::packet::KafkaPacket;
2123
use protocol::robust::RobustMQPacket;
2224
use std::net::SocketAddr;
2325
use std::sync::Arc;
26+
use tracing::log::{info, warn};
27+
use storage_adapter::storage::ArcStorageAdapter;
28+
use crate::{
29+
common::error::Result,
30+
storage::message::{Message, TopicPartition},
31+
};
32+
use crate::manager::offset::OffsetManager;
33+
use crate::storage::log_reader::Reader;
34+
use crate::storage::log_writer::Writer;
2435

25-
#[derive(Clone, Default)]
26-
pub struct KafkaCommand {}
36+
pub struct KafkaCommand {
37+
pub writer: Writer,
38+
pub reader: Reader,
39+
pub offset_manager: OffsetManager,
40+
}
2741

2842
impl KafkaCommand {
29-
fn handle_api_versions(tcp_connection: &NetworkConnection) -> ResponsePackage {
43+
pub fn new(storage_adapter: ArcStorageAdapter) -> Self {
44+
Self {
45+
writer: Writer::new(storage_adapter.clone()),
46+
reader: Reader::new(storage_adapter.clone()),
47+
offset_manager: OffsetManager::new(),
48+
}
49+
}
50+
51+
fn handle_api_versions() -> Result<KafkaPacket> {
3052
let api_versions: Vec<ApiVersion> = ApiKey::iter()
3153
.map(|k| {
3254
let range = ApiKey::valid_versions(&k);
@@ -40,10 +62,79 @@ impl KafkaCommand {
4062
.with_error_code(0)
4163
.with_api_keys(api_versions)
4264
.with_throttle_time_ms(0);
43-
ResponsePackage::build(
44-
tcp_connection.connection_id,
45-
RobustMQPacket::KAFKA(KafkaPacket::ApiVersionResponse(res)),
46-
)
65+
Ok(KafkaPacket::ApiVersionResponse(res))
66+
}
67+
68+
async fn handle_produce(
69+
&self,
70+
request: ProduceRequest,
71+
) -> Result<KafkaPacket> {
72+
let mut topic_responses = vec![];
73+
let mut message_batch = vec![];
74+
75+
for topic_data in &request.topic_data {
76+
let topic_name = topic_data.name.as_str();
77+
let mut partition_responses = vec![];
78+
for partition_data in &topic_data.partition_data {
79+
let partition_index = partition_data.index;
80+
let mut base_offset = None;
81+
let mut partition_error_code = 0i16;
82+
if let Some(records_bytes) = &partition_data.records {
83+
let mut records_buf = records_bytes.clone();
84+
let result = RecordBatchDecoder::decode(&mut records_buf);
85+
match result {
86+
Ok(record_set) => {
87+
let mut log_start_offset = 0i64;
88+
if record_set.records.len() > 0 && base_offset.is_none() {
89+
log_start_offset = self.offset_manager.next_offset(topic_name, partition_index)
90+
.await
91+
.unwrap_or(0);
92+
base_offset = Some(log_start_offset);
93+
}
94+
for record in &record_set.records {
95+
log_start_offset += 1;
96+
message_batch.push(Message {
97+
topic_partition: TopicPartition {
98+
topic: topic_name.to_string(),
99+
partition: partition_index,
100+
},
101+
offset: log_start_offset,
102+
record: Message::encode(record.clone()),
103+
});
104+
}
105+
}
106+
Err(e) => {
107+
warn!("Failed to decode record batch: {:?}", e);
108+
partition_error_code = 1;
109+
}
110+
}
111+
}
112+
113+
let partition_resp = PartitionProduceResponse::default()
114+
.with_index(partition_index)
115+
.with_error_code(partition_error_code)
116+
.with_base_offset(base_offset.unwrap_or_default())
117+
.with_log_append_time_ms(-1) // message.timestamp.type
118+
.with_log_start_offset(0);
119+
120+
partition_responses.push(partition_resp);
121+
}
122+
123+
let topic_resp = TopicProduceResponse::default()
124+
.with_name(topic_data.name.clone())
125+
.with_partition_responses(partition_responses);
126+
127+
topic_responses.push(topic_resp);
128+
}
129+
130+
for msg in message_batch.into_iter() {
131+
self.writer.write(&msg).await?;
132+
}
133+
134+
let response = ProduceResponse::default()
135+
.with_responses(topic_responses)
136+
.with_throttle_time_ms(0);
137+
Ok(KafkaPacket::ProduceResponse(response))
47138
}
48139
}
49140

@@ -55,17 +146,46 @@ impl Command for KafkaCommand {
55146
_addr: &SocketAddr,
56147
robust_packet: &RobustMQPacket,
57148
) -> Option<ResponsePackage> {
58-
let packet = robust_packet.get_kafka_packet().unwrap();
59-
match packet.clone() {
60-
KafkaPacket::ApiVersionReq(_) => Some(Self::handle_api_versions(tcp_connection)),
61-
KafkaPacket::ProduceReq(_) => None,
62-
KafkaPacket::FetchReq(_) => None,
63-
_ => None,
64-
}
149+
let packet = match robust_packet.get_kafka_packet() {
150+
Some(p) => p,
151+
None => {
152+
warn!("No Kafka packet found in RobustMQPacket");
153+
return None;
154+
}
155+
};
156+
let kafka_response = match packet {
157+
KafkaPacket::ApiVersionReq(_) => match Self::handle_api_versions() {
158+
Ok(resp) => resp,
159+
Err(e) => {
160+
warn!("Failed to build ApiVersionsResponse: {:?}", e);
161+
return None;
162+
}
163+
},
164+
KafkaPacket::ProduceReq(req) => match self.handle_produce(req).await {
165+
Ok(resp) => resp,
166+
Err(e) => {
167+
warn!("Produce handler failed: {:?}", e);
168+
return None;
169+
}
170+
},
171+
KafkaPacket::FetchReq(_) => {
172+
warn!("Received Fetch request but Fetch handling is not implemented");
173+
return None;
174+
}
175+
other => {
176+
warn!("Unsupported or unhandled Kafka packet: {:?}", other);
177+
return None;
178+
}
179+
};
180+
181+
Some(ResponsePackage::build(
182+
tcp_connection.connection_id,
183+
RobustMQPacket::KAFKA(kafka_response),
184+
))
65185
}
66186
}
67187

68-
pub fn create_command() -> Arc<Box<dyn Command + Send + Sync>> {
69-
let storage: Box<dyn Command + Send + Sync> = Box::new(KafkaCommand::default());
188+
pub fn create_command(message_storage_adapter: ArcStorageAdapter) -> Arc<Box<dyn Command + Send + Sync>> {
189+
let storage: Box<dyn Command + Send + Sync> = Box::new(KafkaCommand::new(message_storage_adapter));
70190
Arc::new(storage)
71191
}

src/kafka-broker/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,3 +16,5 @@ pub mod broker;
1616
pub mod common;
1717
pub mod handler;
1818
pub mod server;
19+
pub mod storage;
20+
pub mod manager;
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
// Copyright 2023 RobustMQ Team
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
pub mod offset;
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
// Copyright 2023 RobustMQ Team
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
use serde::{Deserialize, Serialize};
15+
use std::collections::HashMap;
16+
use std::sync::{Arc, Mutex};
17+
use tracing::debug;
18+
use crate::{
19+
common::error::Result,
20+
};
21+
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
22+
pub struct LogOffsetMetadata {
23+
pub message_offset: i64,
24+
pub segment_start_offset: i64,
25+
}
26+
27+
pub struct OffsetManager {
28+
store_cache: Arc<Mutex<HashMap<(String, i32), LogOffsetMetadata>>>,
29+
}
30+
31+
impl OffsetManager {
32+
33+
pub fn new() -> Self {
34+
Self {
35+
store_cache: Arc::new(Mutex::new(HashMap::new())),
36+
}
37+
}
38+
pub async fn next_offset(&self, topic: &str, partition: i32) -> Result<i64> {
39+
let mut cache = self.store_cache.lock().await;
40+
let key = (topic.to_string(), partition);
41+
42+
let offsets = cache.entry(key.clone()).or_insert_with(|| {
43+
LogOffsetMetadata::default()
44+
});
45+
46+
let next_offset = offsets.next_offset;
47+
offsets.next_offset += 1;
48+
49+
if next_offset % 100 == 0 {
50+
let topic_clone = topic.to_string();
51+
tokio::spawn(async move {
52+
if let Err(e) = Self::persist_offsets(
53+
self,
54+
&topic_clone,
55+
partition,
56+
0,
57+
).await {
58+
debug!("Failed to persist offsets for topic {} partition {}: {:?}", topic_clone, partition, e);
59+
}
60+
});
61+
}
62+
Ok(next_offset)
63+
}
64+
65+
pub async fn persist_offsets(
66+
&self, topic: &str, partition: i32, offset: i64) -> Result<()> {
67+
Ok(())
68+
}
69+
}

0 commit comments

Comments
 (0)