Skip to content

Commit b14e782

Browse files
authored
Merge pull request #18 from socutes:openraft
cluster is developed based on openraft
2 parents 72ce5e0 + 8beca11 commit b14e782

File tree

30 files changed

+2204
-55
lines changed

30 files changed

+2204
-55
lines changed

Cargo.lock

Lines changed: 454 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ members = [
1818
"src/placement-center",
1919
"src/cmd",
2020
"src/protocol",
21-
"src/clients"
21+
"src/clients",
2222
]
2323

2424
resolver = "2"
@@ -46,10 +46,18 @@ rocksdb = "0.22.0"
4646
bincode = "1.3.3"
4747
tokio-util = { version = "0.7.9", features = ["codec"] }
4848
mobc = "0.8.4"
49+
openraft = { git = "https://github.com/databendlabs/openraft.git", features = [
50+
"serde",
51+
"type-alias",
52+
] }
53+
byteorder = "1.5.0"
54+
tracing = "0.1.40"
55+
tracing-subscriber = { version = "0.3.0", features = ["env-filter"] }
56+
4957

5058
## workspaces members
5159
placement-center = { path = "src/placement-center" }
5260
cmd = { path = "src/cmd" }
5361
protocol = { path = "src/protocol" }
5462
clients = { path = "src/clients" }
55-
common-base = { path = "src/common/base" }
63+
common-base = { path = "src/common/base" }

src/clients/src/placement/mod.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,14 @@ use crate::{poll::ClientPool, retry_sleep_time, retry_times};
1616
use common_base::errors::RobustMQError;
1717
use kv::kv_interface_call;
1818
use log::error;
19+
use openraft::openraft_interface_call;
1920
use std::{sync::Arc, time::Duration};
2021
use tokio::time::sleep;
2122

2223
#[derive(Clone, Debug)]
2324
pub enum PlacementCenterService {
2425
Kv,
26+
OpenRaft,
2527
}
2628

2729
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
@@ -31,9 +33,15 @@ pub enum PlacementCenterInterface {
3133
Get,
3234
Delete,
3335
Exists,
36+
37+
// Open Raft
38+
Vote,
39+
Append,
40+
Snapshot,
3441
}
3542

3643
pub mod kv;
44+
pub mod openraft;
3745

3846
async fn retry_call(
3947
service: PlacementCenterService,
@@ -56,6 +64,16 @@ async fn retry_call(
5664
)
5765
.await
5866
}
67+
68+
PlacementCenterService::OpenRaft => {
69+
openraft_interface_call(
70+
interface.clone(),
71+
client_poll.clone(),
72+
addr.clone(),
73+
request.clone(),
74+
)
75+
.await
76+
}
5977
};
6078

6179
match result {
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
// Copyright 2023 RobustMQ Team
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use super::PlacementCenterInterface;
16+
use crate::{
17+
placement::{retry_call, PlacementCenterService},
18+
poll::ClientPool,
19+
};
20+
use common_base::errors::RobustMQError;
21+
use prost::Message as _;
22+
use protocol::openraft::{
23+
AppendReply, AppendRequest, SnapshotReply, SnapshotRequest, VoteReply, VoteRequest,
24+
};
25+
use std::sync::Arc;
26+
27+
pub async fn placement_openraft_vote(
28+
client_poll: Arc<ClientPool>,
29+
addrs: Vec<String>,
30+
request: VoteRequest,
31+
) -> Result<VoteReply, RobustMQError> {
32+
let request_data = VoteRequest::encode_to_vec(&request);
33+
match retry_call(
34+
PlacementCenterService::OpenRaft,
35+
PlacementCenterInterface::Vote,
36+
client_poll,
37+
addrs,
38+
request_data,
39+
)
40+
.await
41+
{
42+
Ok(data) => match VoteReply::decode(data.as_ref()) {
43+
Ok(da) => return Ok(da),
44+
Err(e) => return Err(RobustMQError::CommmonError(e.to_string())),
45+
},
46+
Err(e) => {
47+
return Err(e);
48+
}
49+
}
50+
}
51+
52+
pub async fn placement_openraft_append(
53+
client_poll: Arc<ClientPool>,
54+
addrs: Vec<String>,
55+
request: AppendRequest,
56+
) -> Result<AppendReply, RobustMQError> {
57+
let request_data = AppendRequest::encode_to_vec(&request);
58+
match retry_call(
59+
PlacementCenterService::OpenRaft,
60+
PlacementCenterInterface::Append,
61+
client_poll,
62+
addrs,
63+
request_data,
64+
)
65+
.await
66+
{
67+
Ok(data) => match AppendReply::decode(data.as_ref()) {
68+
Ok(da) => return Ok(da),
69+
Err(e) => return Err(RobustMQError::CommmonError(e.to_string())),
70+
},
71+
Err(e) => {
72+
return Err(e);
73+
}
74+
}
75+
}
76+
77+
pub async fn placement_openraft_snapshot(
78+
client_poll: Arc<ClientPool>,
79+
addrs: Vec<String>,
80+
request: SnapshotRequest,
81+
) -> Result<SnapshotReply, RobustMQError> {
82+
let request_data = SnapshotRequest::encode_to_vec(&request);
83+
match retry_call(
84+
PlacementCenterService::OpenRaft,
85+
PlacementCenterInterface::Snapshot,
86+
client_poll,
87+
addrs,
88+
request_data,
89+
)
90+
.await
91+
{
92+
Ok(data) => match SnapshotReply::decode(data.as_ref()) {
93+
Ok(da) => return Ok(da),
94+
Err(e) => return Err(RobustMQError::CommmonError(e.to_string())),
95+
},
96+
Err(e) => {
97+
return Err(e);
98+
}
99+
}
100+
}
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
// Copyright 2023 RobustMQ Team
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use super::OpenRaftServiceManager;
16+
use common_base::errors::RobustMQError;
17+
use mobc::Connection;
18+
use prost::Message;
19+
use protocol::openraft::{
20+
AppendReply, AppendRequest, SnapshotReply, SnapshotRequest, VoteReply, VoteRequest,
21+
};
22+
23+
pub(crate) async fn inner_vote(
24+
mut client: Connection<OpenRaftServiceManager>,
25+
request: Vec<u8>,
26+
) -> Result<Vec<u8>, RobustMQError> {
27+
match VoteRequest::decode(request.as_ref()) {
28+
Ok(request) => match client.vote(request).await {
29+
Ok(result) => {
30+
return Ok(VoteReply::encode_to_vec(&result.into_inner()));
31+
}
32+
Err(e) => return Err(RobustMQError::GrpcServerStatus(e)),
33+
},
34+
Err(e) => {
35+
return Err(RobustMQError::CommmonError(e.to_string()));
36+
}
37+
}
38+
}
39+
40+
pub(crate) async fn inner_append(
41+
mut client: Connection<OpenRaftServiceManager>,
42+
request: Vec<u8>,
43+
) -> Result<Vec<u8>, RobustMQError> {
44+
match AppendRequest::decode(request.as_ref()) {
45+
Ok(request) => match client.append(request).await {
46+
Ok(result) => {
47+
return Ok(AppendReply::encode_to_vec(&result.into_inner()));
48+
}
49+
Err(e) => return Err(RobustMQError::GrpcServerStatus(e)),
50+
},
51+
Err(e) => {
52+
return Err(RobustMQError::CommmonError(e.to_string()));
53+
}
54+
}
55+
}
56+
57+
pub(crate) async fn inner_snapshot(
58+
mut client: Connection<OpenRaftServiceManager>,
59+
request: Vec<u8>,
60+
) -> Result<Vec<u8>, RobustMQError> {
61+
match SnapshotRequest::decode(request.as_ref()) {
62+
Ok(request) => match client.snapshot(request).await {
63+
Ok(result) => {
64+
return Ok(SnapshotReply::encode_to_vec(&result.into_inner()));
65+
}
66+
Err(e) => return Err(RobustMQError::GrpcServerStatus(e)),
67+
},
68+
Err(e) => {
69+
return Err(RobustMQError::CommmonError(e.to_string()));
70+
}
71+
}
72+
}
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
// Copyright 2023 RobustMQ Team
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use super::PlacementCenterInterface;
16+
use crate::poll::ClientPool;
17+
use common_base::errors::RobustMQError;
18+
use inner::{inner_append, inner_snapshot, inner_vote};
19+
use mobc::{Connection, Manager};
20+
use protocol::openraft::open_raft_service_client::OpenRaftServiceClient;
21+
use std::sync::Arc;
22+
use tonic::transport::Channel;
23+
24+
pub mod call;
25+
mod inner;
26+
27+
pub(crate) async fn openraft_interface_call(
28+
interface: PlacementCenterInterface,
29+
client_poll: Arc<ClientPool>,
30+
addr: String,
31+
request: Vec<u8>,
32+
) -> Result<Vec<u8>, RobustMQError> {
33+
match openraft_client(client_poll.clone(), addr.clone()).await {
34+
Ok(client) => {
35+
let result = match interface {
36+
PlacementCenterInterface::Vote => inner_vote(client, request.clone()).await,
37+
PlacementCenterInterface::Append => inner_append(client, request.clone()).await,
38+
PlacementCenterInterface::Snapshot => inner_snapshot(client, request.clone()).await,
39+
_ => {
40+
return Err(RobustMQError::CommmonError(format!(
41+
"openraft service does not support service interfaces [{:?}]",
42+
interface
43+
)))
44+
}
45+
};
46+
match result {
47+
Ok(data) => return Ok(data),
48+
Err(e) => {
49+
return Err(e);
50+
}
51+
}
52+
}
53+
Err(e) => {
54+
return Err(e);
55+
}
56+
}
57+
}
58+
59+
async fn openraft_client(
60+
client_poll: Arc<ClientPool>,
61+
addr: String,
62+
) -> Result<Connection<OpenRaftServiceManager>, RobustMQError> {
63+
match client_poll
64+
.placement_center_openraft_services_client(addr)
65+
.await
66+
{
67+
Ok(client) => {
68+
return Ok(client);
69+
}
70+
Err(e) => {
71+
return Err(e);
72+
}
73+
}
74+
}
75+
76+
#[derive(Clone)]
77+
pub struct OpenRaftServiceManager {
78+
pub addr: String,
79+
}
80+
81+
impl OpenRaftServiceManager {
82+
pub fn new(addr: String) -> Self {
83+
Self { addr }
84+
}
85+
}
86+
87+
#[tonic::async_trait]
88+
impl Manager for OpenRaftServiceManager {
89+
type Connection = OpenRaftServiceClient<Channel>;
90+
type Error = RobustMQError;
91+
92+
async fn connect(&self) -> Result<Self::Connection, Self::Error> {
93+
match OpenRaftServiceClient::connect(format!("http://{}", self.addr.clone())).await {
94+
Ok(client) => {
95+
return Ok(client);
96+
}
97+
Err(err) => {
98+
return Err(RobustMQError::CommmonError(format!(
99+
"{},{}",
100+
err.to_string(),
101+
self.addr.clone()
102+
)))
103+
}
104+
};
105+
}
106+
107+
async fn check(&self, conn: Self::Connection) -> Result<Self::Connection, Self::Error> {
108+
Ok(conn)
109+
}
110+
}

0 commit comments

Comments
 (0)