Skip to content

Commit 31678f2

Browse files
authored
refactor(meta): replace manual struct instantiation with constructor methods (#18393)
* refactor(meta): replace manual LogEntry construction with constructor methods Replace manual LogEntry struct instantiation with LogEntry::new(cmd) for cases where time_ms is None, and introduce LogEntry::new_with_time(cmd, time_ms) for cases that need to preserve existing timestamps. * refactor(meta): replace manual struct instantiation with constructor methods Replace manual struct instantiation with constructor methods across meta types to improve code consistency and maintainability following the LogEntry pattern: - UpsertKV: Use ::new(), ::update(), ::delete() instead of manual instantiation - WatchRequest: Use ::new() with ::with_filter(), ::with_initial_flush() builders - RaftRequest: Use GrpcHelper::encode_raft_request() for consistent serialization - TxnPutRequest: Add ::new() constructor for (key, value, prev_value, expire_at, ttl_ms) - TxnGetRequest: Add ::new() constructor for (key) - TxnDeleteRequest: Add ::new() constructor for (key, prev_value, match_seq) This provides better type safety, readability, and ensures field changes only require updating constructor methods rather than scattered manual instantiations. * chore: fmt * chore(meta): resolve clippy warnings for unused imports and private module access - Remove unused MatchSeq and Operation imports from util.rs - Fix grpc_helper module access by using re-exported GrpcHelper - Ensure all constructor method refactoring passes clippy lints * chore: fix fmt
1 parent 1b09bb2 commit 31678f2

File tree

20 files changed

+135
-216
lines changed

20 files changed

+135
-216
lines changed

src/meta/api/src/schema_api_impl.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2188,9 +2188,7 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
21882188
txn.if_then
21892189
.push(txn_op_put(&tbid, serialize_struct(&new_table_meta)?));
21902190
txn.else_then.push(TxnOp {
2191-
request: Some(Request::Get(TxnGetRequest {
2192-
key: tbid.to_string_key(),
2193-
})),
2191+
request: Some(Request::Get(TxnGetRequest::new(tbid.to_string_key()))),
21942192
});
21952193

21962194
new_table_meta_map.insert(req.0.table_id, new_table_meta);

src/meta/api/src/schema_api_test_suite.rs

Lines changed: 2 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,6 @@ use databend_common_meta_kvapi::kvapi;
135135
use databend_common_meta_kvapi::kvapi::Key;
136136
use databend_common_meta_types::MatchSeq;
137137
use databend_common_meta_types::MetaError;
138-
use databend_common_meta_types::Operation;
139138
use databend_common_meta_types::UpsertKV;
140139
use fastrace::func_name;
141140
use log::debug;
@@ -240,12 +239,7 @@ async fn upsert_test_data(
240239
value: Vec<u8>,
241240
) -> Result<u64, KVAppError> {
242241
let res = kv_api
243-
.upsert_kv(UpsertKV {
244-
key: key.to_string_key(),
245-
seq: MatchSeq::GE(0),
246-
value: Operation::Update(value),
247-
value_meta: None,
248-
})
242+
.upsert_kv(UpsertKV::update(key.to_string_key(), &value))
249243
.await?;
250244

251245
let seq_v = res.result.unwrap();
@@ -257,12 +251,7 @@ async fn delete_test_data(
257251
key: &impl kvapi::Key,
258252
) -> Result<(), KVAppError> {
259253
let _res = kv_api
260-
.upsert_kv(UpsertKV {
261-
key: key.to_string_key(),
262-
seq: MatchSeq::GE(0),
263-
value: Operation::Delete,
264-
value_meta: None,
265-
})
254+
.upsert_kv(UpsertKV::delete(key.to_string_key()))
266255
.await?;
267256

268257
Ok(())

src/meta/api/src/util.rs

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,8 @@ use databend_common_meta_types::txn_condition::Target;
2828
use databend_common_meta_types::ConditionResult;
2929
use databend_common_meta_types::InvalidArgument;
3030
use databend_common_meta_types::InvalidReply;
31-
use databend_common_meta_types::MatchSeq;
3231
use databend_common_meta_types::MetaError;
3332
use databend_common_meta_types::MetaNetworkError;
34-
use databend_common_meta_types::Operation;
3533
use databend_common_meta_types::SeqV;
3634
use databend_common_meta_types::TxnCondition;
3735
use databend_common_meta_types::TxnGetResponse;
@@ -200,12 +198,7 @@ pub async fn fetch_id<T: kvapi::Key>(
200198
generator: T,
201199
) -> Result<u64, MetaError> {
202200
let res = kv_api
203-
.upsert_kv(UpsertKV {
204-
key: generator.to_string_key(),
205-
seq: MatchSeq::GE(0),
206-
value: Operation::Update(b"".to_vec()),
207-
value_meta: None,
208-
})
201+
.upsert_kv(UpsertKV::update(generator.to_string_key(), b""))
209202
.await?;
210203

211204
// seq: MatchSeq::Any always succeeds

src/meta/binaries/meta/entry.rs

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -208,14 +208,11 @@ async fn do_register(meta_node: &Arc<MetaNode>, conf: &Config) -> Result<(), Met
208208
println!("Register this node: {{{}}}", node);
209209
println!();
210210

211-
let ent = LogEntry {
212-
time_ms: None,
213-
cmd: Cmd::AddNode {
214-
node_id,
215-
node,
216-
overriding: true,
217-
},
218-
};
211+
let ent = LogEntry::new(Cmd::AddNode {
212+
node_id,
213+
node,
214+
overriding: true,
215+
});
219216
info!("Raft log entry for updating node: {:?}", ent);
220217

221218
meta_node.write(ent).await?;

src/meta/client/src/grpc_action.rs

Lines changed: 7 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ use databend_common_meta_types::protobuf::RaftRequest;
2929
use databend_common_meta_types::protobuf::StreamItem;
3030
use databend_common_meta_types::protobuf::WatchRequest;
3131
use databend_common_meta_types::protobuf::WatchResponse;
32+
use databend_common_meta_types::GrpcHelper;
3233
use databend_common_meta_types::InvalidArgument;
3334
use databend_common_meta_types::TxnReply;
3435
use databend_common_meta_types::TxnRequest;
@@ -71,10 +72,7 @@ impl TryInto<MetaGrpcReq> for Request<RaftRequest> {
7172

7273
impl From<MetaGrpcReq> for RaftRequest {
7374
fn from(v: MetaGrpcReq) -> Self {
74-
let raft_request = RaftRequest {
75-
// Safe unwrap(): serialize to string must be ok.
76-
data: serde_json::to_string(&v).unwrap(),
77-
};
75+
let raft_request = GrpcHelper::encode_raft_request(&v).expect("fail to serialize");
7876

7977
debug!(
8078
req :? =(&raft_request);
@@ -87,10 +85,8 @@ impl From<MetaGrpcReq> for RaftRequest {
8785

8886
impl MetaGrpcReq {
8987
pub fn to_raft_request(&self) -> Result<RaftRequest, InvalidArgument> {
90-
let raft_request = RaftRequest {
91-
data: serde_json::to_string(self)
92-
.map_err(|e| InvalidArgument::new(e, "fail to encode request"))?,
93-
};
88+
let raft_request = GrpcHelper::encode_raft_request(self)
89+
.map_err(|e| InvalidArgument::new(e, "fail to encode request"))?;
9490

9591
debug!(
9692
req :? =(&raft_request);
@@ -124,10 +120,7 @@ impl RequestFor for MetaGrpcReadReq {
124120

125121
impl From<MetaGrpcReadReq> for RaftRequest {
126122
fn from(v: MetaGrpcReadReq) -> Self {
127-
let raft_request = RaftRequest {
128-
// Safe unwrap(): serialize to string must be ok.
129-
data: serde_json::to_string(&v).unwrap(),
130-
};
123+
let raft_request = GrpcHelper::encode_raft_request(&v).expect("fail to serialize");
131124

132125
debug!(
133126
req :? =(&raft_request);
@@ -140,10 +133,8 @@ impl From<MetaGrpcReadReq> for RaftRequest {
140133

141134
impl MetaGrpcReadReq {
142135
pub fn to_raft_request(&self) -> Result<RaftRequest, InvalidArgument> {
143-
let raft_request = RaftRequest {
144-
data: serde_json::to_string(self)
145-
.map_err(|e| InvalidArgument::new(e, "fail to encode request"))?,
146-
};
136+
let raft_request = GrpcHelper::encode_raft_request(self)
137+
.map_err(|e| InvalidArgument::new(e, "fail to encode request"))?;
147138

148139
debug!(
149140
req :? =(&raft_request);

src/meta/control/src/import.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -251,7 +251,7 @@ async fn init_new_cluster(
251251

252252
let entry: Entry = Entry {
253253
log_id,
254-
payload: EntryPayload::Normal(LogEntry { time_ms: None, cmd }),
254+
payload: EntryPayload::Normal(LogEntry::new(cmd)),
255255
};
256256

257257
sto.blocking_append([entry]).await?;

src/meta/kvapi-test-suite/src/kvapi_test_suite.rs

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -834,23 +834,19 @@ impl TestSuite {
834834
TxnOp::put(txn_key2.clone(), b("new_v2")),
835835
// get k1
836836
TxnOp {
837-
request: Some(txn_op::Request::Get(TxnGetRequest {
838-
key: txn_key1.clone(),
839-
})),
837+
request: Some(txn_op::Request::Get(TxnGetRequest::new(txn_key1.clone()))),
840838
},
841839
// delete k1
842840
TxnOp {
843-
request: Some(txn_op::Request::Delete(TxnDeleteRequest {
844-
key: txn_key1.clone(),
845-
prev_value: true,
846-
match_seq: None,
847-
})),
841+
request: Some(txn_op::Request::Delete(TxnDeleteRequest::new(
842+
txn_key1.clone(),
843+
true,
844+
None,
845+
))),
848846
},
849847
// get k1
850848
TxnOp {
851-
request: Some(txn_op::Request::Get(TxnGetRequest {
852-
key: txn_key1.clone(),
853-
})),
849+
request: Some(txn_op::Request::Get(TxnGetRequest::new(txn_key1.clone()))),
854850
},
855851
];
856852

src/meta/process/src/kv_processor.rs

Lines changed: 15 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -138,10 +138,10 @@ where F: Fn(&str, Vec<u8>) -> Result<Vec<u8>, anyhow::Error>
138138
Cmd::RemoveNode { .. } => Ok(None),
139139
Cmd::SetFeature { .. } => Ok(None),
140140
Cmd::UpsertKV(ups) => {
141-
let x = LogEntry {
142-
time_ms: log_entry.time_ms,
143-
cmd: Cmd::UpsertKV(unwrap_or_return!(self.proc_upsert_kv(ups)?)),
144-
};
141+
let x = LogEntry::new_with_time(
142+
Cmd::UpsertKV(unwrap_or_return!(self.proc_upsert_kv(ups)?)),
143+
log_entry.time_ms,
144+
);
145145
Ok(Some(x))
146146
}
147147
Cmd::Transaction(tx) => {
@@ -160,10 +160,10 @@ where F: Fn(&str, Vec<u8>) -> Result<Vec<u8>, anyhow::Error>
160160
else_then.push(self.proc_txop(op)?);
161161
}
162162

163-
Ok(Some(LogEntry {
164-
time_ms: log_entry.time_ms,
165-
cmd: Cmd::Transaction(TxnRequest::new(condition, if_then).with_else(else_then)),
166-
}))
163+
Ok(Some(LogEntry::new_with_time(
164+
Cmd::Transaction(TxnRequest::new(condition, if_then).with_else(else_then)),
165+
log_entry.time_ms,
166+
)))
167167
}
168168
}
169169
}
@@ -173,12 +173,12 @@ where F: Fn(&str, Vec<u8>) -> Result<Vec<u8>, anyhow::Error>
173173
Operation::Update(v) => {
174174
let buf = (self.process_pb)(&ups.key, v)?;
175175

176-
Ok(Some(UpsertKV {
177-
key: ups.key,
178-
seq: ups.seq,
179-
value: Operation::Update(buf),
180-
value_meta: ups.value_meta,
181-
}))
176+
Ok(Some(UpsertKV::new(
177+
ups.key,
178+
ups.seq,
179+
Operation::Update(buf),
180+
ups.value_meta,
181+
)))
182182
}
183183
Operation::Delete => Ok(None),
184184
#[allow(deprecated)]
@@ -220,13 +220,7 @@ where F: Fn(&str, Vec<u8>) -> Result<Vec<u8>, anyhow::Error>
220220
fn proc_tx_put_request(&self, p: TxnPutRequest) -> Result<TxnPutRequest, anyhow::Error> {
221221
let value = (self.process_pb)(&p.key, p.value)?;
222222

223-
let pr = TxnPutRequest {
224-
key: p.key,
225-
value,
226-
prev_value: p.prev_value,
227-
expire_at: p.expire_at,
228-
ttl_ms: p.ttl_ms,
229-
};
223+
let pr = TxnPutRequest::new(p.key, value, p.prev_value, p.expire_at, p.ttl_ms);
230224

231225
Ok(pr)
232226
}

src/meta/raft-store/src/state_machine/testing.rs

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -37,10 +37,9 @@ pub fn snapshot_logs() -> (Vec<Entry>, Vec<String>) {
3737
Entry::new_blank(new_log_id(1, 0, 3)),
3838
Entry {
3939
log_id: new_log_id(1, 0, 4),
40-
payload: EntryPayload::Normal(LogEntry {
41-
time_ms: None,
42-
cmd: Cmd::UpsertKV(UpsertKV::update("a", b"A")),
43-
}),
40+
payload: EntryPayload::Normal(LogEntry::new(Cmd::UpsertKV(UpsertKV::update(
41+
"a", b"A",
42+
)))),
4443
},
4544
Entry {
4645
log_id: new_log_id(1, 0, 5),
@@ -60,14 +59,11 @@ pub fn snapshot_logs() -> (Vec<Entry>, Vec<String>) {
6059
},
6160
Entry {
6261
log_id: new_log_id(1, 0, 9),
63-
payload: EntryPayload::Normal(LogEntry {
64-
time_ms: None,
65-
cmd: Cmd::AddNode {
66-
node_id: 5,
67-
node: Default::default(),
68-
overriding: false,
69-
},
70-
}),
62+
payload: EntryPayload::Normal(LogEntry::new(Cmd::AddNode {
63+
node_id: 5,
64+
node: Default::default(),
65+
overriding: false,
66+
})),
7167
},
7268
];
7369
let want = [ //

src/meta/service/src/message.rs

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ use databend_common_meta_types::protobuf::RaftRequest;
2424
use databend_common_meta_types::raft_types::NodeId;
2525
use databend_common_meta_types::AppliedState;
2626
use databend_common_meta_types::Endpoint;
27+
use databend_common_meta_types::GrpcHelper;
2728
use databend_common_meta_types::LogEntry;
2829
use databend_common_meta_types::MetaAPIError;
2930

@@ -150,18 +151,14 @@ pub enum ForwardResponse {
150151

151152
impl tonic::IntoRequest<RaftRequest> for ForwardRequest<ForwardRequestBody> {
152153
fn into_request(self) -> tonic::Request<RaftRequest> {
153-
let mes = RaftRequest {
154-
data: serde_json::to_string(&self).expect("fail to serialize"),
155-
};
154+
let mes = GrpcHelper::encode_raft_request(&self).expect("fail to serialize");
156155
tonic::Request::new(mes)
157156
}
158157
}
159158

160159
impl tonic::IntoRequest<RaftRequest> for ForwardRequest<MetaGrpcReadReq> {
161160
fn into_request(self) -> tonic::Request<RaftRequest> {
162-
let mes = RaftRequest {
163-
data: serde_json::to_string(&self).expect("fail to serialize"),
164-
};
161+
let mes = GrpcHelper::encode_raft_request(&self).expect("fail to serialize");
165162
tonic::Request::new(mes)
166163
}
167164
}

0 commit comments

Comments
 (0)