Skip to content

Commit 004649e

Browse files
committed
feat(kafka): kafka protocol support.
1 parent 210a038 commit 004649e

File tree

25 files changed

+485
-107
lines changed

25 files changed

+485
-107
lines changed

Cargo.lock

Lines changed: 16 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -302,7 +302,7 @@ incremental = true # Enable incremental compilation (auto-cleaned peri
302302
opt-level = 0 # No optimization (fastest compile time)
303303
split-debuginfo = "unpacked" # Reduce disk usage on macOS/Linux
304304
codegen-units = 16 # Balanced parallelism (was 256, reduced to prevent cache explosion)
305-
debug = 1 # Reduced debug info (line tables only) - faster linking
305+
debug = 2
306306

307307
# Optimize dependencies even in dev mode for better runtime performance
308308
[profile.dev.package."*"]

src/broker-server/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ axum.workspace = true
2525
thiserror.workspace = true
2626
mqtt-broker.workspace = true
2727
meta-service.workspace = true
28+
kafka-broker.workspace = true
2829
tonic.workspace = true
2930
tower-http = { workspace = true, features = ["cors"] }
3031
tonic-web.workspace = true

src/broker-server/src/lib.rs

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ use journal_server::{
3131
server::connection_manager::ConnectionManager as JournalConnectionManager, JournalServer,
3232
JournalServerParams,
3333
};
34+
use kafka_broker::broker::{KafkaBrokerServer, KafkaBrokerServerParams};
3435
use meta_service::{
3536
controller::{
3637
journal::call_node::JournalInnerCallManager, mqtt::call_broker::MQTTInnerCallManager,
@@ -47,6 +48,7 @@ use mqtt_broker::{
4748
subscribe::manager::SubscribeManager,
4849
};
4950
use network_server::common::connection_manager::ConnectionManager as NetworkConnectionManager;
51+
use network_server::context::ProcessorConfig;
5052
use pprof_monitor::pprof_monitor::start_pprof_monitor;
5153
use rate_limit::RateLimiterManager;
5254
use rocksdb_engine::{
@@ -80,6 +82,7 @@ pub struct BrokerServer {
8082
main_runtime: Runtime,
8183
place_params: MetaServiceServerParams,
8284
mqtt_params: MqttBrokerServerParams,
85+
kafka_params: KafkaBrokerServerParams,
8386
journal_params: JournalServerParams,
8487
client_pool: Arc<ClientPool>,
8588
rocksdb_engine_handler: Arc<RocksDBEngine>,
@@ -144,6 +147,12 @@ impl BrokerServer {
144147
message_storage_adapter,
145148
);
146149

150+
let kafka_params = BrokerServer::build_kafka_server_params(
151+
client_pool.clone(),
152+
broker_cache.clone(),
153+
connection_manager.clone(),
154+
);
155+
147156
let journal_params = BrokerServer::build_journal_server(client_pool.clone());
148157

149158
BrokerServer {
@@ -153,6 +162,7 @@ impl BrokerServer {
153162
journal_params,
154163
config: config.clone(),
155164
mqtt_params,
165+
kafka_params,
156166
client_pool,
157167
rocksdb_engine_handler,
158168
rate_limiter_manager,
@@ -226,6 +236,7 @@ impl BrokerServer {
226236
let mut place_stop_send = None;
227237
let mut mqtt_stop_send = None;
228238
let mut journal_stop_send = None;
239+
let mut kafka_stop_send = None;
229240

230241
let config = broker_config();
231242
// start meta service
@@ -266,12 +277,20 @@ impl BrokerServer {
266277
let (stop_send, _) = broadcast::channel(2);
267278
let mqtt_runtime =
268279
create_runtime("mqtt-runtime", self.config.runtime.runtime_worker_threads);
280+
let kafka_runtime =
281+
create_runtime("kafka-runtime", self.config.runtime.runtime_worker_threads);
269282
if config.is_start_broker() {
270283
mqtt_stop_send = Some(stop_send.clone());
271284
let server = MqttBrokerServer::new(self.mqtt_params.clone(), stop_send.clone());
272285
mqtt_runtime.spawn(async move {
273286
server.start().await;
274287
});
288+
289+
kafka_stop_send = Some(stop_send.clone());
290+
let kafka_server = KafkaBrokerServer::new(self.kafka_params.clone(), stop_send.clone());
291+
kafka_runtime.spawn(async move {
292+
kafka_server.start().await;
293+
});
275294
}
276295

277296
// register node
@@ -293,7 +312,12 @@ impl BrokerServer {
293312
});
294313

295314
// awaiting stop
296-
self.awaiting_stop(place_stop_send, mqtt_stop_send, journal_stop_send);
315+
self.awaiting_stop(
316+
place_stop_send,
317+
mqtt_stop_send,
318+
journal_stop_send,
319+
kafka_stop_send,
320+
);
297321
}
298322

299323
async fn build_meta_service(
@@ -376,6 +400,26 @@ impl BrokerServer {
376400
}
377401
}
378402

403+
fn build_kafka_server_params(
404+
client_pool: Arc<ClientPool>,
405+
broker_cache: Arc<BrokerCacheManager>,
406+
connection_manager: Arc<NetworkConnectionManager>,
407+
) -> KafkaBrokerServerParams {
408+
let conf = broker_config();
409+
let proc_config = ProcessorConfig {
410+
accept_thread_num: conf.network.accept_thread_num,
411+
handler_process_num: conf.network.handler_thread_num,
412+
response_process_num: conf.network.response_thread_num,
413+
channel_size: conf.network.queue_size,
414+
};
415+
KafkaBrokerServerParams {
416+
connection_manager,
417+
client_pool,
418+
proc_config,
419+
broker_cache,
420+
}
421+
}
422+
379423
fn build_journal_server(client_pool: Arc<ClientPool>) -> JournalServerParams {
380424
let config = broker_config();
381425
let connection_manager = Arc::new(JournalConnectionManager::new());
@@ -403,6 +447,7 @@ impl BrokerServer {
403447
place_stop: Option<broadcast::Sender<bool>>,
404448
mqtt_stop: Option<broadcast::Sender<bool>>,
405449
journal_stop: Option<broadcast::Sender<bool>>,
450+
kafka_stop: Option<broadcast::Sender<bool>>,
406451
) {
407452
self.main_runtime.block_on(async {
408453
self.broker_cache
@@ -426,6 +471,13 @@ impl BrokerServer {
426471
sleep(Duration::from_secs(3));
427472
}
428473

474+
if let Some(sx) = kafka_stop {
475+
if let Err(e) = sx.send(true) {
476+
error!("kafka stop signal, error message:{}", e);
477+
}
478+
sleep(Duration::from_secs(3));
479+
}
480+
429481
if let Some(sx) = journal_stop {
430482
if let Err(e) = sx.send(true) {
431483
error!("journal stop signal, error message{}", e);

src/common/config/src/config.rs

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,9 @@
1515
use super::default::{
1616
default_broker_id, default_cluster_name, default_flapping_detect, default_grpc_port,
1717
default_http_port, default_journal_runtime, default_journal_server, default_journal_storage,
18-
default_message_storage, default_meta_addrs, default_mqtt_auth_config, default_mqtt_keep_alive,
19-
default_mqtt_offline_message, default_mqtt_protocol_config, default_mqtt_runtime,
20-
default_mqtt_schema, default_mqtt_security, default_mqtt_server,
18+
default_kafka_config, default_message_storage, default_meta_addrs, default_mqtt_auth_config,
19+
default_mqtt_keep_alive, default_mqtt_offline_message, default_mqtt_protocol_config,
20+
default_mqtt_runtime, default_mqtt_schema, default_mqtt_security, default_mqtt_server,
2121
default_mqtt_slow_subscribe_config, default_mqtt_system_monitor, default_network,
2222
default_place_runtime, default_rocksdb, default_roles, default_runtime,
2323
};
@@ -122,6 +122,10 @@ pub struct BrokerConfig {
122122

123123
#[serde(default = "default_mqtt_system_monitor")]
124124
pub mqtt_system_monitor: MqttSystemMonitor,
125+
126+
// KAFKA
127+
#[serde(default = "default_kafka_config")]
128+
pub kafka_config: KafkaConfig,
125129
}
126130

127131
impl BrokerConfig {
@@ -352,3 +356,8 @@ pub struct JournalStorage {
352356
pub struct JournalServer {
353357
pub tcp_port: u32,
354358
}
359+
360+
#[derive(Serialize, Deserialize, Default, Clone, Debug)]
361+
pub struct KafkaConfig {
362+
pub port: u32,
363+
}

src/common/config/src/default.rs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,10 @@
1414

1515
use super::security::{AuthnConfig, AuthzConfig};
1616
use crate::config::{
17-
JournalRuntime, JournalServer, JournalStorage, MetaRuntime, MqttAuthConfig, MqttFlappingDetect,
18-
MqttKeepAlive, MqttOfflineMessage, MqttProtocolConfig, MqttRuntime, MqttSchema, MqttSecurity,
19-
MqttServer, MqttSlowSubscribeConfig, MqttSystemMonitor, Network, Rocksdb, Runtime,
20-
SchemaFailedOperation, SchemaStrategy,
17+
JournalRuntime, JournalServer, JournalStorage, KafkaConfig, MetaRuntime, MqttAuthConfig,
18+
MqttFlappingDetect, MqttKeepAlive, MqttOfflineMessage, MqttProtocolConfig, MqttRuntime,
19+
MqttSchema, MqttSecurity, MqttServer, MqttSlowSubscribeConfig, MqttSystemMonitor, Network,
20+
Rocksdb, Runtime, SchemaFailedOperation, SchemaStrategy,
2121
};
2222
use crate::storage::{StorageAdapterConfig, StorageAdapterType};
2323
use common_base::enum_type::delay_type::DelayType;
@@ -208,3 +208,7 @@ pub fn default_journal_storage() -> JournalStorage {
208208
rocksdb_max_open_files: 10000,
209209
}
210210
}
211+
212+
pub fn default_kafka_config() -> KafkaConfig {
213+
KafkaConfig { port: 9092 }
214+
}

src/common/network-server/src/common/connection_manager.rs

Lines changed: 61 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,10 @@ use common_base::tools::now_second;
2121
use dashmap::DashMap;
2222
use futures::stream::SplitSink;
2323
use futures::SinkExt;
24+
use kafka_protocol::messages::ResponseHeader;
2425
use metadata_struct::connection::{NetworkConnection, NetworkConnectionType};
2526
use protocol::codec::{RobustMQCodec, RobustMQCodecWrapper};
27+
use protocol::kafka::packet::{KafkaHeader, KafkaPacketWrapper};
2628
use protocol::mqtt::codec::MqttPacketWrapper;
2729
use protocol::robust::{RobustMQPacket, RobustMQPacketWrapper, RobustMQProtocol};
2830
use std::time::Duration;
@@ -219,18 +221,31 @@ impl ConnectionManager {
219221
};
220222

221223
if packet_wrapper.protocol.is_mqtt() {
222-
if let RobustMQPacket::MQTT(pack) = packet_wrapper.packet {
224+
if let RobustMQPacket::MQTT(pack) = &packet_wrapper.packet {
223225
let mqtt_packet = MqttPacketWrapper {
224226
protocol_version: packet_wrapper.protocol.to_u8(),
225-
packet: pack,
227+
packet: pack.clone(),
226228
};
227229
self.write_mqtt_tcp_frame(connection_id, mqtt_packet)
228230
.await?;
229231
}
230232
}
231233

232234
if packet_wrapper.protocol.is_kafka() {
233-
// todo
235+
if let RobustMQPacket::KAFKA(pack) = &packet_wrapper.packet {
236+
let request_header = packet_wrapper.extend.get_kafka_header();
237+
let kafka_packet = KafkaPacketWrapper {
238+
api_key: request_header.request_api_key,
239+
api_version: request_header.request_api_version,
240+
header: KafkaHeader::Response(
241+
ResponseHeader::default()
242+
.with_correlation_id(request_header.correlation_id),
243+
),
244+
packet: pack.clone(),
245+
};
246+
self.write_kafka_tcp_frame(connection_id, kafka_packet)
247+
.await?;
248+
}
234249
}
235250
Ok(())
236251
}
@@ -486,4 +501,47 @@ impl ConnectionManager {
486501
};
487502
}
488503
}
504+
505+
pub async fn write_kafka_tcp_frame(
506+
&self,
507+
connection_id: u64,
508+
pack: KafkaPacketWrapper,
509+
) -> ResultCommonError {
510+
let mut times = 0;
511+
loop {
512+
match self.tcp_write_list.try_get_mut(&connection_id) {
513+
dashmap::try_result::TryResult::Present(mut da) => {
514+
match da.send(RobustMQCodecWrapper::KAFKA(pack.clone())).await {
515+
Ok(_) => {
516+
break;
517+
}
518+
Err(e) => {
519+
if broker_not_available(&e.to_string()) {
520+
return Err(CommonError::CommonError(e.to_string()));
521+
}
522+
523+
if times > self.lock_max_try_mut_times {
524+
return Err(CommonError::FailedToWriteClient(
525+
"tcp".to_string(),
526+
e.to_string(),
527+
));
528+
}
529+
}
530+
}
531+
}
532+
dashmap::try_result::TryResult::Absent => {
533+
if times > self.lock_max_try_mut_times {
534+
return Err(CommonError::NotObtainAvailableConnection(
535+
"tcp".to_string(),
536+
connection_id,
537+
));
538+
}
539+
}
540+
dashmap::try_result::TryResult::Locked => {}
541+
}
542+
times += 1;
543+
sleep(Duration::from_millis(self.lock_try_mut_sleep_time_ms)).await
544+
}
545+
Ok(())
546+
}
489547
}

0 commit comments

Comments
 (0)