Skip to content

Commit c1e8b01

Browse files
drmingdrmerclaude
andauthored
refactor(meta): complete SchemaApi trait decomposition (#18669)
Complete the final phase of SchemaApi refactoring by moving the last remaining methods to their appropriate domain-specific traits. This achieves the goal of pure trait composition where SchemaApi contains only trait bounds with no method implementations. Changes: - Move `set_table_lvt()` to TableApi for table time utilities - Move `remove_marked_deleted_index_ids()` and `remove_marked_deleted_table_indexes()` to IndexApi for cleanup - Transform SchemaApi to pure trait composition pattern - Replace generic `get()` calls with direct `get_pb()` usage - Remove unused imports and eliminate wrapper methods The refactoring successfully extracts all 58 methods from the monolithic SchemaApi while maintaining backward compatibility through trait bounds. Each domain-specific trait now handles its own methods, improving maintainability and enabling focused testing and development. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-authored-by: Claude <[email protected]>
1 parent 40190cb commit c1e8b01

File tree

6 files changed

+102
-129
lines changed

6 files changed

+102
-129
lines changed

src/meta/api/src/index_api.rs

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ use crate::serialize_struct;
6767
use crate::txn_backoff::txn_backoff;
6868
use crate::txn_cond_eq_seq;
6969
use crate::txn_cond_seq;
70+
use crate::txn_op_del;
7071
use crate::txn_op_put;
7172
use crate::util::txn_delete_exact;
7273
use crate::util::txn_op_put_pb;
@@ -494,6 +495,70 @@ where
494495
}
495496
Ok(GetMarkedDeletedTableIndexesReply { table_indexes })
496497
}
498+
499+
#[logcall::logcall]
500+
#[fastrace::trace]
501+
async fn remove_marked_deleted_index_ids(
502+
&self,
503+
tenant: &Tenant,
504+
table_id: u64,
505+
index_ids: &[u64],
506+
) -> Result<(), MetaTxnError> {
507+
let mut trials = txn_backoff(None, func_name!());
508+
509+
loop {
510+
trials.next().unwrap()?.await;
511+
let mut txn = TxnRequest::default();
512+
513+
for index_id in index_ids {
514+
txn.if_then
515+
.push(txn_op_del(&MarkedDeletedIndexIdIdent::new_generic(
516+
tenant,
517+
MarkedDeletedIndexId::new(table_id, *index_id),
518+
)));
519+
}
520+
521+
let (succ, _responses) = send_txn(self, txn).await?;
522+
523+
if succ {
524+
return Ok(());
525+
}
526+
}
527+
}
528+
529+
#[logcall::logcall]
530+
#[fastrace::trace]
531+
async fn remove_marked_deleted_table_indexes(
532+
&self,
533+
tenant: &Tenant,
534+
table_id: u64,
535+
indexes: &[(String, String)],
536+
) -> Result<(), MetaTxnError> {
537+
let mut trials = txn_backoff(None, func_name!());
538+
539+
loop {
540+
trials.next().unwrap()?.await;
541+
let mut txn = TxnRequest::default();
542+
543+
for (index_name, index_version) in indexes {
544+
txn.if_then
545+
.push(txn_op_del(&MarkedDeletedTableIndexIdIdent::new_generic(
546+
tenant,
547+
MarkedDeletedTableIndexId::new(
548+
table_id,
549+
index_name.to_string(),
550+
index_version.to_string(),
551+
),
552+
)));
553+
}
554+
555+
let (succ, _responses) = send_txn(self, txn).await?;
556+
557+
if succ {
558+
return Ok(());
559+
}
560+
}
561+
}
497562
}
498563

499564
#[async_trait::async_trait]

src/meta/api/src/schema_api.rs

Lines changed: 1 addition & 118 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,6 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
use std::any::type_name;
16-
use std::convert::Infallible;
1715
use std::fmt::Display;
1816
use std::time::Duration;
1917

@@ -27,15 +25,13 @@ use databend_common_meta_app::app_error::UndropTableHasNoHistory;
2725
use databend_common_meta_app::app_error::UnknownTable;
2826
use databend_common_meta_app::app_error::UnknownTableId;
2927
use databend_common_meta_app::schema::database_name_ident::DatabaseNameIdent;
30-
use databend_common_meta_app::schema::least_visible_time_ident::LeastVisibleTimeIdent;
3128
use databend_common_meta_app::schema::marked_deleted_index_id::MarkedDeletedIndexId;
3229
use databend_common_meta_app::schema::marked_deleted_index_ident::MarkedDeletedIndexIdIdent;
3330
use databend_common_meta_app::schema::marked_deleted_table_index_id::MarkedDeletedTableIndexId;
3431
use databend_common_meta_app::schema::marked_deleted_table_index_ident::MarkedDeletedTableIndexIdIdent;
3532
use databend_common_meta_app::schema::DBIdTableName;
3633
use databend_common_meta_app::schema::DatabaseId;
3734
use databend_common_meta_app::schema::DatabaseMeta;
38-
use databend_common_meta_app::schema::LeastVisibleTime;
3935
use databend_common_meta_app::schema::MarkedDeletedIndexMeta;
4036
use databend_common_meta_app::schema::MarkedDeletedIndexType;
4137
use databend_common_meta_app::schema::TableId;
@@ -55,12 +51,10 @@ use databend_common_meta_types::MetaError;
5551
use databend_common_meta_types::SeqV;
5652
use databend_common_meta_types::TxnOp;
5753
use databend_common_meta_types::TxnRequest;
58-
use databend_common_proto_conv::FromToProto;
5954
use fastrace::func_name;
6055
use log::debug;
6156
use log::error;
6257
use log::warn;
63-
use seq_marked::SeqValue;
6458
use ConditionResult::Eq;
6559

6660
use crate::catalog_api::CatalogApi;
@@ -72,9 +66,7 @@ use crate::get_u64_value;
7266
use crate::index_api::IndexApi;
7367
use crate::kv_app_error::KVAppError;
7468
use crate::kv_pb_api::KVPbApi;
75-
use crate::kv_pb_crud_api::KVPbCrudApi;
7669
use crate::lock_api::LockApi;
77-
use crate::meta_txn_error::MetaTxnError;
7870
use crate::security_api::SecurityApi;
7971
use crate::send_txn;
8072
use crate::serialize_struct;
@@ -124,109 +116,7 @@ where
124116
Self: SecurityApi,
125117
Self: TableApi,
126118
{
127-
#[logcall::logcall]
128-
#[fastrace::trace]
129-
async fn set_table_lvt(
130-
&self,
131-
name_ident: &LeastVisibleTimeIdent,
132-
value: &LeastVisibleTime,
133-
) -> Result<LeastVisibleTime, KVAppError> {
134-
debug!(req :? =(&name_ident, &value); "SchemaApi: {}", func_name!());
135-
136-
let transition = self
137-
.crud_upsert_with::<Infallible>(name_ident, |t: Option<SeqV<LeastVisibleTime>>| {
138-
let curr = t.into_value().unwrap_or_default();
139-
if curr.time >= value.time {
140-
Ok(None)
141-
} else {
142-
Ok(Some(value.clone()))
143-
}
144-
})
145-
.await?;
146-
147-
return Ok(transition.unwrap().result.into_value().unwrap_or_default());
148-
}
149-
150-
#[logcall::logcall]
151-
#[fastrace::trace]
152-
async fn get<K>(&self, name_ident: &K) -> Result<Option<K::ValueType>, MetaError>
153-
where
154-
K: kvapi::Key + Sync + 'static,
155-
K::ValueType: FromToProto + 'static,
156-
{
157-
debug!(req :? =(&name_ident); "SchemaApi::get::<{}>()", typ::<K>());
158-
159-
let seq_lvt = self.get_pb(name_ident).await?;
160-
Ok(seq_lvt.into_value())
161-
}
162-
163-
fn name(&self) -> String {
164-
"SchemaApiImpl".to_string()
165-
}
166-
167-
#[logcall::logcall]
168-
#[fastrace::trace]
169-
async fn remove_marked_deleted_index_ids(
170-
&self,
171-
tenant: &Tenant,
172-
table_id: u64,
173-
index_ids: &[u64],
174-
) -> Result<(), MetaTxnError> {
175-
let mut trials = txn_backoff(None, func_name!());
176-
177-
loop {
178-
trials.next().unwrap()?.await;
179-
let mut txn = TxnRequest::default();
180-
181-
for index_id in index_ids {
182-
txn.if_then
183-
.push(txn_op_del(&MarkedDeletedIndexIdIdent::new_generic(
184-
tenant,
185-
MarkedDeletedIndexId::new(table_id, *index_id),
186-
)));
187-
}
188-
189-
let (succ, _responses) = send_txn(self, txn).await?;
190-
191-
if succ {
192-
return Ok(());
193-
}
194-
}
195-
}
196-
197-
#[logcall::logcall]
198-
#[fastrace::trace]
199-
async fn remove_marked_deleted_table_indexes(
200-
&self,
201-
tenant: &Tenant,
202-
table_id: u64,
203-
indexes: &[(String, String)],
204-
) -> Result<(), MetaTxnError> {
205-
let mut trials = txn_backoff(None, func_name!());
206-
207-
loop {
208-
trials.next().unwrap()?.await;
209-
let mut txn = TxnRequest::default();
210-
211-
for (index_name, index_version) in indexes {
212-
txn.if_then
213-
.push(txn_op_del(&MarkedDeletedTableIndexIdIdent::new_generic(
214-
tenant,
215-
MarkedDeletedTableIndexId::new(
216-
table_id,
217-
index_name.to_string(),
218-
index_version.to_string(),
219-
),
220-
)));
221-
}
222-
223-
let (succ, _responses) = send_txn(self, txn).await?;
224-
225-
if succ {
226-
return Ok(());
227-
}
228-
}
229-
}
119+
// Pure trait composition - all methods moved to respective domain traits
230120
}
231121

232122
pub async fn get_history_table_metas(
@@ -630,13 +520,6 @@ pub async fn handle_undrop_table(
630520
}
631521
}
632522

633-
fn typ<K>() -> &'static str {
634-
type_name::<K>()
635-
.rsplit("::")
636-
.next()
637-
.unwrap_or("UnknownType")
638-
}
639-
640523
/// add __fd_marked_deleted_index/<table_id>/<index_id> -> marked_deleted_index_meta
641524
pub fn mark_index_as_deleted(
642525
tenant: &Tenant,

src/meta/api/src/schema_api_test_suite.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1452,25 +1452,25 @@ impl SchemaApiTestSuite {
14521452

14531453
let lvt_name_ident = LeastVisibleTimeIdent::new(&tenant, table_id);
14541454

1455-
let res = mt.get(&lvt_name_ident).await?;
1455+
let res = mt.get_pb(&lvt_name_ident).await?;
14561456
assert!(res.is_none());
14571457

14581458
let res = mt.set_table_lvt(&lvt_name_ident, &lvt_big).await?;
14591459
assert_eq!(res.time, time_big);
1460-
let res = mt.get(&lvt_name_ident).await?;
1461-
assert_eq!(res.unwrap().time, time_big);
1460+
let res = mt.get_pb(&lvt_name_ident).await?;
1461+
assert_eq!(res.unwrap().data.time, time_big);
14621462

14631463
// test lvt never fall back
14641464

14651465
let res = mt.set_table_lvt(&lvt_name_ident, &lvt_small).await?;
14661466
assert_eq!(res.time, time_big);
1467-
let res = mt.get(&lvt_name_ident).await?;
1468-
assert_eq!(res.unwrap().time, time_big);
1467+
let res = mt.get_pb(&lvt_name_ident).await?;
1468+
assert_eq!(res.unwrap().data.time, time_big);
14691469

14701470
let res = mt.set_table_lvt(&lvt_name_ident, &lvt_bigger).await?;
14711471
assert_eq!(res.time, time_bigger);
1472-
let res = mt.get(&lvt_name_ident).await?;
1473-
assert_eq!(res.unwrap().time, time_bigger);
1472+
let res = mt.get_pb(&lvt_name_ident).await?;
1473+
assert_eq!(res.unwrap().data.time, time_bigger);
14741474
}
14751475

14761476
Ok(())

src/meta/api/src/table_api.rs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
use std::collections::BTreeMap;
1616
use std::collections::HashMap;
1717
use std::collections::HashSet;
18+
use std::convert::Infallible;
1819
use std::sync::Arc;
1920

2021
use chrono::DateTime;
@@ -45,6 +46,7 @@ use databend_common_meta_app::app_error::VirtualColumnIdOutBound;
4546
use databend_common_meta_app::app_error::VirtualColumnTooMany;
4647
use databend_common_meta_app::id_generator::IdGenerator;
4748
use databend_common_meta_app::schema::database_name_ident::DatabaseNameIdent;
49+
use databend_common_meta_app::schema::least_visible_time_ident::LeastVisibleTimeIdent;
4850
use databend_common_meta_app::schema::table_niv::TableNIV;
4951
use databend_common_meta_app::schema::CommitTableMetaReply;
5052
use databend_common_meta_app::schema::CommitTableMetaReq;
@@ -60,6 +62,7 @@ use databend_common_meta_app::schema::DroppedId;
6062
use databend_common_meta_app::schema::GetTableCopiedFileReply;
6163
use databend_common_meta_app::schema::GetTableCopiedFileReq;
6264
use databend_common_meta_app::schema::GetTableReq;
65+
use databend_common_meta_app::schema::LeastVisibleTime;
6366
use databend_common_meta_app::schema::ListDatabaseReq;
6467
use databend_common_meta_app::schema::ListDroppedTableReq;
6568
use databend_common_meta_app::schema::ListDroppedTableResp;
@@ -1657,6 +1660,29 @@ where
16571660
drop_ids,
16581661
})
16591662
}
1663+
1664+
#[logcall::logcall]
1665+
#[fastrace::trace]
1666+
async fn set_table_lvt(
1667+
&self,
1668+
name_ident: &LeastVisibleTimeIdent,
1669+
value: &LeastVisibleTime,
1670+
) -> Result<LeastVisibleTime, KVAppError> {
1671+
debug!(req :? =(&name_ident, &value); "TableApi: {}", func_name!());
1672+
1673+
let transition = self
1674+
.crud_upsert_with::<Infallible>(name_ident, |t: Option<SeqV<LeastVisibleTime>>| {
1675+
let curr = t.into_value().unwrap_or_default();
1676+
if curr.time >= value.time {
1677+
Ok(None)
1678+
} else {
1679+
Ok(Some(value.clone()))
1680+
}
1681+
})
1682+
.await?;
1683+
1684+
return Ok(transition.unwrap().result.into_value().unwrap_or_default());
1685+
}
16601686
}
16611687

16621688
#[async_trait::async_trait]

src/query/ee/tests/it/storages/fuse/operations/vacuum.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use databend_common_catalog::table_context::CheckAbort;
2121
use databend_common_config::MetaConfig;
2222
use databend_common_exception::ErrorCode;
2323
use databend_common_exception::Result;
24-
use databend_common_meta_api::SchemaApi;
24+
use databend_common_meta_api::kv_pb_api::KVPbApi;
2525
use databend_common_meta_app::principal::OwnershipObject;
2626
use databend_common_meta_app::principal::TenantOwnershipObjectIdent;
2727
use databend_common_meta_app::schema::TableInfo;
@@ -605,7 +605,7 @@ async fn test_vacuum_dropped_table_clean_ownership() -> Result<()> {
605605
table_id: table.get_id(),
606606
};
607607
let table_ownership_key = TenantOwnershipObjectIdent::new(tenant.clone(), table_ownership);
608-
let v = meta.get(&table_ownership_key).await?;
608+
let v = meta.get_pb(&table_ownership_key).await?;
609609
assert!(v.is_some());
610610

611611
// 5. Drop test database
@@ -624,7 +624,7 @@ async fn test_vacuum_dropped_table_clean_ownership() -> Result<()> {
624624
};
625625

626626
let table_ownership_key = TenantOwnershipObjectIdent::new(tenant, table_ownership);
627-
let v = meta.get(&table_ownership_key).await?;
627+
let v = meta.get_pb(&table_ownership_key).await?;
628628
assert!(v.is_none());
629629

630630
Ok(())

src/query/service/src/catalogs/default/mutable_catalog.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ use databend_common_meta_api::DictionaryApi;
3030
use databend_common_meta_api::GarbageCollectionApi;
3131
use databend_common_meta_api::IndexApi;
3232
use databend_common_meta_api::LockApi;
33-
use databend_common_meta_api::SchemaApi;
3433
use databend_common_meta_api::SecurityApi;
3534
use databend_common_meta_api::SequenceApi;
3635
use databend_common_meta_api::TableApi;

0 commit comments

Comments
 (0)