Skip to content

Commit 82611b1

Browse files
authored
fix: vacuum index not work if index is dropped by create or replace (#17263)
* fix: vacuum index not work if index is dropped by create or replace * fix test * remove Any * add comment
1 parent 5b372c1 commit 82611b1

File tree

5 files changed

+130
-34
lines changed

5 files changed

+130
-34
lines changed

src/meta/api/src/name_id_value_api.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,17 +71,22 @@ where
7171
/// Such operations do not have any condition constraints.
7272
/// For example, a `name -> id` mapping can have a reverse `id -> name` mapping.
7373
///
74+
/// `mark_delete_records` is used to generate additional key-values for implementing `mark_delete` operation.
75+
/// For example, when an index is dropped by `override_exist`, `__fd_marked_deleted_index/<table_id>/<index_id> -> marked_deleted_index_meta` will be added.
76+
///
7477
/// If there is already a `name_ident` exists, return the existing id in a `Ok(Err(exist))`.
7578
/// Otherwise, create `name -> id -> value` and returns the created id in a `Ok(Ok(created))`.
76-
async fn create_id_value<A>(
79+
async fn create_id_value<A, M>(
7780
&self,
7881
name_ident: &K,
7982
value: &IdRsc::ValueType,
8083
override_exist: bool,
8184
associated_records: A,
85+
mark_delete_records: M,
8286
) -> Result<Result<DataId<IdRsc>, SeqV<DataId<IdRsc>>>, MetaTxnError>
8387
where
8488
A: Fn(DataId<IdRsc>) -> Vec<(String, Vec<u8>)> + Send,
89+
M: Fn(DataId<IdRsc>, &IdRsc::ValueType) -> Result<Vec<(String, Vec<u8>)>, MetaError> + Send,
8590
{
8691
debug!(name_ident :? =name_ident; "NameIdValueApi: {}", func_name!());
8792

@@ -114,6 +119,11 @@ where
114119
for (k, _v) in kvs {
115120
txn.if_then.push(TxnOp::delete(k));
116121
}
122+
123+
let kvs = mark_delete_records(seq_id.data, &seq_meta.data)?;
124+
for (k, v) in kvs {
125+
txn.if_then.push(TxnOp::put(k, v));
126+
}
117127
} else {
118128
return Ok(Err(seq_id));
119129
}

src/meta/api/src/schema_api_impl.rs

Lines changed: 58 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -753,12 +753,21 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
753753
let name_ident_raw = serialize_struct(&IndexNameIdentRaw::from(name_ident))?;
754754

755755
let create_res = self
756-
.create_id_value(name_ident, meta, overriding, |id| {
757-
vec![(
758-
IndexIdToNameIdent::new_generic(name_ident.tenant(), id).to_string_key(),
759-
name_ident_raw.clone(),
760-
)]
761-
})
756+
.create_id_value(
757+
name_ident,
758+
meta,
759+
overriding,
760+
|id| {
761+
vec![(
762+
IndexIdToNameIdent::new_generic(name_ident.tenant(), id).to_string_key(),
763+
name_ident_raw.clone(),
764+
)]
765+
},
766+
|index_id, value| {
767+
mark_index_as_deleted(name_ident.tenant(), value.table_id, *index_id)
768+
.map(|(k, v)| vec![(k, v)])
769+
},
770+
)
762771
.await?;
763772

764773
match create_res {
@@ -803,20 +812,9 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
803812
IndexIdToNameIdent::new_generic(name_ident.tenant(), seq_id.data).to_string_key(),
804813
));
805814

806-
// add __fd_marked_deleted_index/<table_id>/<index_id> -> marked_deleted_index_meta
807-
let marked_deleted_index_id_ident = MarkedDeletedIndexIdIdent::new_generic(
808-
name_ident.tenant(),
809-
MarkedDeletedIndexId::new(seq_meta.data.table_id, *seq_id.data),
810-
);
811-
let marked_deleted_index_meta = MarkedDeletedIndexMeta {
812-
dropped_on: Utc::now(),
813-
index_type: MarkedDeletedIndexType::AGGREGATING,
814-
};
815-
816-
txn.if_then.push(TxnOp::put(
817-
marked_deleted_index_id_ident.to_string_key(),
818-
serialize_struct(&marked_deleted_index_meta)?,
819-
));
815+
let (key, value) =
816+
mark_index_as_deleted(name_ident.tenant(), seq_meta.data.table_id, *seq_id.data)?;
817+
txn.if_then.push(TxnOp::put(key, value));
820818

821819
let (succ, _responses) = send_txn(self, txn).await?;
822820
debug!(key :? =name_ident, id :? =&id_ident,succ = succ; "{}", func_name!());
@@ -2901,12 +2899,18 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
29012899
let name_ident_raw = serialize_struct(&CatalogNameIdentRaw::from(name_ident))?;
29022900

29032901
let res = self
2904-
.create_id_value(name_ident, meta, false, |id| {
2905-
vec![(
2906-
CatalogIdToNameIdent::new_generic(name_ident.tenant(), id).to_string_key(),
2907-
name_ident_raw.clone(),
2908-
)]
2909-
})
2902+
.create_id_value(
2903+
name_ident,
2904+
meta,
2905+
false,
2906+
|id| {
2907+
vec![(
2908+
CatalogIdToNameIdent::new_generic(name_ident.tenant(), id).to_string_key(),
2909+
name_ident_raw.clone(),
2910+
)]
2911+
},
2912+
|_, _| Ok(vec![]),
2913+
)
29102914
.await?;
29112915

29122916
Ok(res)
@@ -3020,7 +3024,13 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
30203024
let name_ident = &req.dictionary_ident;
30213025

30223026
let create_res = self
3023-
.create_id_value(name_ident, &req.dictionary_meta, false, |_| vec![])
3027+
.create_id_value(
3028+
name_ident,
3029+
&req.dictionary_meta,
3030+
false,
3031+
|_| vec![],
3032+
|_, _| Ok(vec![]),
3033+
)
30243034
.await?;
30253035

30263036
match create_res {
@@ -4177,3 +4187,24 @@ fn typ<K>() -> &'static str {
41774187
.next()
41784188
.unwrap_or("UnknownType")
41794189
}
4190+
4191+
/// add __fd_marked_deleted_index/<table_id>/<index_id> -> marked_deleted_index_meta
4192+
pub fn mark_index_as_deleted(
4193+
tenant: &Tenant,
4194+
table_id: u64,
4195+
index_id: u64,
4196+
) -> Result<(String, Vec<u8>), MetaError> {
4197+
let marked_deleted_index_id_ident = MarkedDeletedIndexIdIdent::new_generic(
4198+
tenant,
4199+
MarkedDeletedIndexId::new(table_id, index_id),
4200+
);
4201+
let marked_deleted_index_meta = MarkedDeletedIndexMeta {
4202+
dropped_on: Utc::now(),
4203+
index_type: MarkedDeletedIndexType::AGGREGATING,
4204+
};
4205+
4206+
Ok((
4207+
marked_deleted_index_id_ident.to_string_key(),
4208+
serialize_struct(&marked_deleted_index_meta)?,
4209+
))
4210+
}

src/query/management/src/procedure/procedure_mgr.rs

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -66,12 +66,19 @@ impl ProcedureMgr {
6666

6767
let create_res = self
6868
.kv_api
69-
.create_id_value(name_ident, meta, overriding, |id| {
70-
vec![(
71-
ProcedureIdToNameIdent::new_generic(name_ident.tenant(), id).to_string_key(),
72-
name_ident_raw.clone(),
73-
)]
74-
})
69+
.create_id_value(
70+
name_ident,
71+
meta,
72+
overriding,
73+
|id| {
74+
vec![(
75+
ProcedureIdToNameIdent::new_generic(name_ident.tenant(), id)
76+
.to_string_key(),
77+
name_ident_raw.clone(),
78+
)]
79+
},
80+
|_, _| Ok(vec![]),
81+
)
7582
.await?;
7683

7784
match create_res {

tests/suites/5_ee/01_vacuum/01_004_vacuum_drop_aggregating_index.result

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,3 +37,17 @@ before vacuum, should be 1 index dir
3737
after vacuum, should be 0 index dir
3838
0
3939
0
40+
>>>> create or replace database test_vacuum_drop_aggregating_index
41+
>>>> create or replace table test_vacuum_drop_aggregating_index.agg(a int, b int,c int) 'fs:///tmp/test_vacuum_drop_aggregating_index/'
42+
>>>> insert into test_vacuum_drop_aggregating_index.agg values (1,1,4), (1,2,1), (1,2,4)
43+
3
44+
>>>> CREATE OR REPLACE AGGREGATING INDEX index AS SELECT MIN(a), MAX(b) FROM test_vacuum_drop_aggregating_index.agg;
45+
>>>> insert into test_vacuum_drop_aggregating_index.agg values (2,2,5)
46+
1
47+
>>>> REFRESH AGGREGATING INDEX index;
48+
before vacuum, should be 1 index dir
49+
1
50+
>>>> create or replace aggregating index index AS SELECT MIN(a), MAX(b) FROM test_vacuum_drop_aggregating_index.agg;
51+
after vacuum, should be 0 index dir
52+
0
53+
>>>> drop aggregating index index

tests/suites/5_ee/01_vacuum/01_004_vacuum_drop_aggregating_index.sh

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,3 +86,37 @@ echo "after vacuum, should be 0 index dir"
8686

8787
find /tmp/test_vacuum_drop_aggregating_index/"$PREFIX_1"/_i_a/ -type f | wc -l
8888
find /tmp/test_vacuum_drop_aggregating_index/"$PREFIX_2"/_i_a/ -type f | wc -l
89+
90+
91+
### create or replace index
92+
93+
stmt "create or replace database test_vacuum_drop_aggregating_index"
94+
95+
mkdir -p /tmp/test_vacuum_drop_aggregating_index/
96+
97+
stmt "create or replace table test_vacuum_drop_aggregating_index.agg(a int, b int,c int) 'fs:///tmp/test_vacuum_drop_aggregating_index/'"
98+
99+
100+
stmt "insert into test_vacuum_drop_aggregating_index.agg values (1,1,4), (1,2,1), (1,2,4)"
101+
102+
stmt "CREATE OR REPLACE AGGREGATING INDEX index AS SELECT MIN(a), MAX(b) FROM test_vacuum_drop_aggregating_index.agg;"
103+
104+
stmt "insert into test_vacuum_drop_aggregating_index.agg values (2,2,5)"
105+
106+
stmt "REFRESH AGGREGATING INDEX index;"
107+
108+
SNAPSHOT_LOCATION=$(echo "select snapshot_location from fuse_snapshot('test_vacuum_drop_aggregating_index','agg') limit 1" | $BENDSQL_CLIENT_CONNECT)
109+
PREFIX=$(echo "$SNAPSHOT_LOCATION" | cut -d'/' -f1-2)
110+
111+
echo "before vacuum, should be 1 index dir"
112+
113+
ls /tmp/test_vacuum_drop_aggregating_index/"$PREFIX"/_i_a/ | wc -l
114+
115+
stmt "create or replace aggregating index index AS SELECT MIN(a), MAX(b) FROM test_vacuum_drop_aggregating_index.agg;"
116+
117+
stmt "set data_retention_time_in_days=0; select * from fuse_vacuum_drop_aggregating_index('test_vacuum_drop_aggregating_index','agg')" > /dev/null
118+
119+
echo "after vacuum, should be 0 index dir"
120+
find /tmp/test_vacuum_drop_aggregating_index/"$PREFIX"/_i_a/ -type f | wc -l
121+
122+
stmt "drop aggregating index index"

0 commit comments

Comments
 (0)