Skip to content

Commit 3bfdbbb

Browse files
authored
Add list_index_stats endpoint (quickwit-oss#6035)
* add list_index_size_info endpoint and migration 25 * add filtering, change endpoint name, nits * add breakdown by split state * lints * lints * test_get_stats and nits * use futuresunordered
1 parent 1b9dac1 commit 3bfdbbb

File tree

11 files changed

+730
-27
lines changed

11 files changed

+730
-27
lines changed
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
DROP INDEX IF EXISTS idx_splits_stats;
2+
3+
ALTER TABLE splits DROP COLUMN IF EXISTS split_size_bytes;
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
ALTER TABLE splits ADD COLUMN IF NOT EXISTS split_size_bytes BIGINT NOT NULL GENERATED ALWAYS AS ((split_metadata_json::json->'footer_offsets'->>'end')::bigint) STORED;
2+
3+
CREATE INDEX IF NOT EXISTS idx_splits_stats ON splits (index_uid, split_state) INCLUDE (split_size_bytes);

quickwit/quickwit-metastore/src/metastore/control_plane_metastore.rs

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,14 @@ use quickwit_proto::metastore::{
2626
GetClusterIdentityResponse, GetIndexTemplateRequest, GetIndexTemplateResponse,
2727
IndexMetadataRequest, IndexMetadataResponse, IndexesMetadataRequest, IndexesMetadataResponse,
2828
LastDeleteOpstampRequest, LastDeleteOpstampResponse, ListDeleteTasksRequest,
29-
ListDeleteTasksResponse, ListIndexTemplatesRequest, ListIndexTemplatesResponse,
30-
ListIndexesMetadataRequest, ListIndexesMetadataResponse, ListShardsRequest, ListShardsResponse,
31-
ListSplitsRequest, ListSplitsResponse, ListStaleSplitsRequest, MarkSplitsForDeletionRequest,
32-
MetastoreResult, MetastoreService, MetastoreServiceClient, MetastoreServiceStream,
33-
OpenShardsRequest, OpenShardsResponse, PruneShardsRequest, PublishSplitsRequest,
34-
ResetSourceCheckpointRequest, StageSplitsRequest, ToggleSourceRequest, UpdateIndexRequest,
35-
UpdateSourceRequest, UpdateSplitsDeleteOpstampRequest, UpdateSplitsDeleteOpstampResponse,
29+
ListDeleteTasksResponse, ListIndexStatsRequest, ListIndexStatsResponse,
30+
ListIndexTemplatesRequest, ListIndexTemplatesResponse, ListIndexesMetadataRequest,
31+
ListIndexesMetadataResponse, ListShardsRequest, ListShardsResponse, ListSplitsRequest,
32+
ListSplitsResponse, ListStaleSplitsRequest, MarkSplitsForDeletionRequest, MetastoreResult,
33+
MetastoreService, MetastoreServiceClient, MetastoreServiceStream, OpenShardsRequest,
34+
OpenShardsResponse, PruneShardsRequest, PublishSplitsRequest, ResetSourceCheckpointRequest,
35+
StageSplitsRequest, ToggleSourceRequest, UpdateIndexRequest, UpdateSourceRequest,
36+
UpdateSplitsDeleteOpstampRequest, UpdateSplitsDeleteOpstampResponse,
3637
};
3738

3839
/// A [`MetastoreService`] implementation that proxies some requests to the control plane so it can
@@ -162,6 +163,13 @@ impl MetastoreService for ControlPlaneMetastore {
162163
self.metastore.list_splits(request).await
163164
}
164165

166+
async fn list_index_stats(
167+
&self,
168+
request: ListIndexStatsRequest,
169+
) -> MetastoreResult<ListIndexStatsResponse> {
170+
self.metastore.list_index_stats(request).await
171+
}
172+
165173
async fn list_stale_splits(
166174
&self,
167175
request: ListStaleSplitsRequest,

quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs

Lines changed: 63 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,9 @@ use quickwit_config::{
3030
};
3131
use quickwit_proto::metastore::{
3232
AcquireShardsRequest, AcquireShardsResponse, DeleteQuery, DeleteShardsRequest,
33-
DeleteShardsResponse, DeleteTask, EntityKind, ListShardsSubrequest, ListShardsSubresponse,
34-
MetastoreError, MetastoreResult, OpenShardSubrequest, OpenShardSubresponse, PruneShardsRequest,
33+
DeleteShardsResponse, DeleteTask, EntityKind, IndexStats, ListShardsSubrequest,
34+
ListShardsSubresponse, MetastoreError, MetastoreResult, OpenShardSubrequest,
35+
OpenShardSubresponse, PruneShardsRequest, SplitStats,
3536
};
3637
use quickwit_proto::types::{IndexUid, PublishToken, SourceId, SplitId};
3738
use serde::{Deserialize, Serialize};
@@ -497,6 +498,34 @@ impl FileBackedIndex {
497498
Ok(())
498499
}
499500

501+
/// Gets IndexStats for this index
502+
pub(crate) fn get_stats(&self) -> MetastoreResult<IndexStats> {
503+
let mut staged_stats = SplitStats::default();
504+
let mut published_stats = SplitStats::default();
505+
let mut marked_for_deletion_stats = SplitStats::default();
506+
507+
for split in self.splits.values() {
508+
match split.split_state {
509+
SplitState::Staged => {
510+
staged_stats.add_split(split.split_metadata.footer_offsets.end)
511+
}
512+
SplitState::Published => {
513+
published_stats.add_split(split.split_metadata.footer_offsets.end)
514+
}
515+
SplitState::MarkedForDeletion => {
516+
marked_for_deletion_stats.add_split(split.split_metadata.footer_offsets.end)
517+
}
518+
}
519+
}
520+
521+
Ok(IndexStats {
522+
index_uid: Some(self.index_uid().clone()),
523+
staged: Some(staged_stats),
524+
published: Some(published_stats),
525+
marked_for_deletion: Some(marked_for_deletion_stats),
526+
})
527+
}
528+
500529
/// Adds a source.
501530
pub(crate) fn add_source(&mut self, source_config: SourceConfig) -> MetastoreResult<()> {
502531
let index_uid = self.index_uid().clone();
@@ -764,16 +793,16 @@ fn split_query_predicate(split: &&Split, query: &ListSplitsQuery) -> bool {
764793

765794
#[cfg(test)]
766795
mod tests {
767-
use std::collections::BTreeSet;
796+
use std::collections::{BTreeSet, HashMap};
768797

769798
use quickwit_doc_mapper::tag_pruning::TagFilterAst;
770799
use quickwit_proto::ingest::Shard;
771-
use quickwit_proto::metastore::ListShardsSubrequest;
800+
use quickwit_proto::metastore::{ListShardsSubrequest, SplitStats};
772801
use quickwit_proto::types::{IndexUid, SourceId};
773802

774803
use super::FileBackedIndex;
775804
use crate::file_backed::file_backed_index::split_query_predicate;
776-
use crate::{ListSplitsQuery, Split, SplitMetadata, SplitState};
805+
use crate::{IndexMetadata, ListSplitsQuery, Split, SplitMetadata, SplitState};
777806

778807
impl FileBackedIndex {
779808
pub(crate) fn insert_shards(&mut self, source_id: &SourceId, shards: Vec<Shard>) {
@@ -804,6 +833,7 @@ mod tests {
804833
time_range: Some(32..=40),
805834
tags: BTreeSet::from(["tag-1".to_string()]),
806835
create_timestamp: 12,
836+
footer_offsets: 0..2048,
807837
..Default::default()
808838
},
809839
split_state: SplitState::Staged,
@@ -817,6 +847,7 @@ mod tests {
817847
time_range: None,
818848
tags: BTreeSet::from(["tag-2".to_string(), "tag-3".to_string()]),
819849
create_timestamp: 5,
850+
footer_offsets: 0..1024,
820851
..Default::default()
821852
},
822853
split_state: SplitState::MarkedForDeletion,
@@ -830,6 +861,7 @@ mod tests {
830861
time_range: Some(0..=90),
831862
tags: BTreeSet::from(["tag-2".to_string(), "tag-4".to_string()]),
832863
create_timestamp: 64,
864+
footer_offsets: 0..512,
833865
..Default::default()
834866
},
835867
split_state: SplitState::Published,
@@ -944,4 +976,30 @@ mod tests {
944976
assert!(!split_query_predicate(&&split_2, &query));
945977
assert!(!split_query_predicate(&&split_3, &query));
946978
}
979+
980+
#[test]
981+
fn test_get_stats() {
982+
let index_id = "test-index";
983+
let index_metadata = IndexMetadata::for_test(index_id, "file:///qwdata/indexes/test-index");
984+
let index =
985+
FileBackedIndex::new(index_metadata, make_splits().into(), HashMap::new(), vec![]);
986+
987+
let expected_staged = Some(SplitStats {
988+
num_splits: 1,
989+
total_size_bytes: 2048,
990+
});
991+
let expected_published = Some(SplitStats {
992+
num_splits: 1,
993+
total_size_bytes: 512,
994+
});
995+
let expected_marked_for_deletion = Some(SplitStats {
996+
num_splits: 1,
997+
total_size_bytes: 1024,
998+
});
999+
let stats = index.get_stats().unwrap();
1000+
1001+
assert_eq!(stats.staged, expected_staged);
1002+
assert_eq!(stats.published, expected_published);
1003+
assert_eq!(stats.marked_for_deletion, expected_marked_for_deletion);
1004+
}
9471005
}

quickwit/quickwit-metastore/src/metastore/file_backed/mod.rs

Lines changed: 53 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -49,14 +49,14 @@ use quickwit_proto::metastore::{
4949
IndexMetadataFailure, IndexMetadataFailureReason, IndexMetadataRequest, IndexMetadataResponse,
5050
IndexTemplateMatch, IndexesMetadataRequest, IndexesMetadataResponse, LastDeleteOpstampRequest,
5151
LastDeleteOpstampResponse, ListDeleteTasksRequest, ListDeleteTasksResponse,
52-
ListIndexTemplatesRequest, ListIndexTemplatesResponse, ListIndexesMetadataRequest,
53-
ListIndexesMetadataResponse, ListShardsRequest, ListShardsResponse, ListSplitsRequest,
54-
ListSplitsResponse, ListStaleSplitsRequest, MarkSplitsForDeletionRequest, MetastoreError,
55-
MetastoreResult, MetastoreService, MetastoreServiceStream, OpenShardSubrequest,
56-
OpenShardsRequest, OpenShardsResponse, PruneShardsRequest, PublishSplitsRequest,
57-
ResetSourceCheckpointRequest, StageSplitsRequest, ToggleSourceRequest, UpdateIndexRequest,
58-
UpdateSourceRequest, UpdateSplitsDeleteOpstampRequest, UpdateSplitsDeleteOpstampResponse,
59-
serde_utils,
52+
ListIndexStatsRequest, ListIndexStatsResponse, ListIndexTemplatesRequest,
53+
ListIndexTemplatesResponse, ListIndexesMetadataRequest, ListIndexesMetadataResponse,
54+
ListShardsRequest, ListShardsResponse, ListSplitsRequest, ListSplitsResponse,
55+
ListStaleSplitsRequest, MarkSplitsForDeletionRequest, MetastoreError, MetastoreResult,
56+
MetastoreService, MetastoreServiceStream, OpenShardSubrequest, OpenShardsRequest,
57+
OpenShardsResponse, PruneShardsRequest, PublishSplitsRequest, ResetSourceCheckpointRequest,
58+
StageSplitsRequest, ToggleSourceRequest, UpdateIndexRequest, UpdateSourceRequest,
59+
UpdateSplitsDeleteOpstampRequest, UpdateSplitsDeleteOpstampResponse, serde_utils,
6060
};
6161
use quickwit_proto::types::{IndexId, IndexUid};
6262
use quickwit_storage::Storage;
@@ -809,6 +809,51 @@ impl MetastoreService for FileBackedMetastore {
809809
Ok(ServiceStream::new(splits_responses_stream))
810810
}
811811

812+
async fn list_index_stats(
813+
&self,
814+
request: ListIndexStatsRequest,
815+
) -> MetastoreResult<ListIndexStatsResponse> {
816+
let index_id_matcher =
817+
IndexIdMatcher::try_from_index_id_patterns(&request.index_id_patterns)?;
818+
let index_ids: Vec<IndexId> = {
819+
let inner_rlock_guard = self.state.read().await;
820+
inner_rlock_guard
821+
.indexes
822+
.iter()
823+
.filter_map(|(index_id, index_state)| match index_state {
824+
LazyIndexStatus::Active(_) if index_id_matcher.is_match(index_id) => {
825+
Some(index_id)
826+
}
827+
_ => None,
828+
})
829+
.cloned()
830+
.collect()
831+
};
832+
833+
let mut index_read_futures = FuturesUnordered::new();
834+
for index_id in index_ids {
835+
let index_read_future = async move {
836+
self.read_any(&index_id, None, |index| index.get_stats())
837+
.await
838+
};
839+
index_read_futures.push(index_read_future);
840+
}
841+
842+
let mut index_stats = Vec::new();
843+
while let Some(index_read_result) = index_read_futures.next().await {
844+
match index_read_result {
845+
Ok(stats) => index_stats.push(stats),
846+
Err(MetastoreError::NotFound(_)) => {
847+
// If the index does not exist, we just skip it.
848+
continue;
849+
}
850+
Err(error) => return Err(error),
851+
}
852+
}
853+
854+
Ok(ListIndexStatsResponse { index_stats })
855+
}
856+
812857
async fn list_stale_splits(
813858
&self,
814859
request: ListStaleSplitsRequest,

quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs

Lines changed: 82 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
use std::collections::HashMap;
1616
use std::fmt::{self, Write};
17+
use std::str::FromStr;
1718
use std::time::Duration;
1819

1920
use async_trait::async_trait;
@@ -34,15 +35,16 @@ use quickwit_proto::metastore::{
3435
FindIndexTemplateMatchesRequest, FindIndexTemplateMatchesResponse, GetClusterIdentityRequest,
3536
GetClusterIdentityResponse, GetIndexTemplateRequest, GetIndexTemplateResponse,
3637
IndexMetadataFailure, IndexMetadataFailureReason, IndexMetadataRequest, IndexMetadataResponse,
37-
IndexTemplateMatch, IndexesMetadataRequest, IndexesMetadataResponse, LastDeleteOpstampRequest,
38-
LastDeleteOpstampResponse, ListDeleteTasksRequest, ListDeleteTasksResponse,
38+
IndexStats, IndexTemplateMatch, IndexesMetadataRequest, IndexesMetadataResponse,
39+
LastDeleteOpstampRequest, LastDeleteOpstampResponse, ListDeleteTasksRequest,
40+
ListDeleteTasksResponse, ListIndexStatsRequest, ListIndexStatsResponse,
3941
ListIndexTemplatesRequest, ListIndexTemplatesResponse, ListIndexesMetadataRequest,
4042
ListIndexesMetadataResponse, ListShardsRequest, ListShardsResponse, ListShardsSubresponse,
4143
ListSplitsRequest, ListSplitsResponse, ListStaleSplitsRequest, MarkSplitsForDeletionRequest,
4244
MetastoreError, MetastoreResult, MetastoreService, MetastoreServiceStream, OpenShardSubrequest,
4345
OpenShardSubresponse, OpenShardsRequest, OpenShardsResponse, PruneShardsRequest,
44-
PublishSplitsRequest, ResetSourceCheckpointRequest, StageSplitsRequest, ToggleSourceRequest,
45-
UpdateIndexRequest, UpdateSourceRequest, UpdateSplitsDeleteOpstampRequest,
46+
PublishSplitsRequest, ResetSourceCheckpointRequest, SplitStats, StageSplitsRequest,
47+
ToggleSourceRequest, UpdateIndexRequest, UpdateSourceRequest, UpdateSplitsDeleteOpstampRequest,
4648
UpdateSplitsDeleteOpstampResponse, serde_utils,
4749
};
4850
use quickwit_proto::types::{IndexId, IndexUid, Position, PublishToken, ShardId, SourceId};
@@ -904,6 +906,82 @@ impl MetastoreService for PostgresqlMetastore {
904906
Ok(service_stream)
905907
}
906908

909+
async fn list_index_stats(
910+
&self,
911+
request: ListIndexStatsRequest,
912+
) -> MetastoreResult<ListIndexStatsResponse> {
913+
let index_pattern_sql = build_index_id_patterns_sql_query(&request.index_id_patterns)
914+
.map_err(|error| MetastoreError::Internal {
915+
message: "failed to build `list_index_stats` SQL query".to_string(),
916+
cause: error.to_string(),
917+
})?;
918+
let sql = format!(
919+
"SELECT
920+
i.index_uid,
921+
s.split_state,
922+
COUNT(s.split_state) AS num_splits,
923+
COALESCE(SUM(s.split_size_bytes)::BIGINT, 0) AS total_size_bytes
924+
FROM ({index_pattern_sql}) i
925+
LEFT JOIN splits s ON s.index_uid = i.index_uid
926+
GROUP BY i.index_uid, s.split_state"
927+
);
928+
929+
let rows: Vec<(String, Option<String>, i64, i64)> = sqlx::query_as(&sql)
930+
.fetch_all(&self.connection_pool)
931+
.await?;
932+
933+
let mut index_stats = HashMap::new();
934+
for (index_uid_str, split_state, num_splits, total_size_bytes) in rows {
935+
let Ok(index_uid) = IndexUid::from_str(&index_uid_str) else {
936+
return Err(MetastoreError::Internal {
937+
message: "failed to parse index_uid".to_string(),
938+
cause: index_uid_str.to_string(),
939+
});
940+
};
941+
let stats = index_stats
942+
.entry(index_uid_str)
943+
.or_insert_with(|| IndexStats {
944+
index_uid: Some(index_uid),
945+
staged: Some(SplitStats::default()),
946+
published: Some(SplitStats::default()),
947+
marked_for_deletion: Some(SplitStats::default()),
948+
});
949+
let num_splits = num_splits as u64;
950+
let total_size_bytes = total_size_bytes as u64;
951+
match split_state.as_deref() {
952+
Some("Staged") => {
953+
stats.staged = Some(SplitStats {
954+
num_splits,
955+
total_size_bytes,
956+
});
957+
}
958+
Some("Published") => {
959+
stats.published = Some(SplitStats {
960+
num_splits,
961+
total_size_bytes,
962+
});
963+
}
964+
Some("MarkedForDeletion") => {
965+
stats.marked_for_deletion = Some(SplitStats {
966+
num_splits,
967+
total_size_bytes,
968+
});
969+
}
970+
None => {} // if an index has no splits, we can keep the defaults
971+
Some(split_state) => {
972+
return Err(MetastoreError::Internal {
973+
message: "invalid split state".to_string(),
974+
cause: split_state.to_string(),
975+
});
976+
}
977+
}
978+
}
979+
980+
Ok(ListIndexStatsResponse {
981+
index_stats: index_stats.into_values().collect(),
982+
})
983+
}
984+
907985
#[instrument(skip(self))]
908986
async fn mark_splits_for_deletion(
909987
&self,

0 commit comments

Comments
 (0)