Skip to content

Commit efe215d

Browse files
authored
Merge pull request #29530 from michael-redpanda/sr/contexts/core-15182
[CORE-15182] - Add context labels to schema registry metrics
2 parents 103c225 + bb10549 commit efe215d

File tree

2 files changed

+806
-44
lines changed

2 files changed

+806
-44
lines changed

src/v/pandaproxy/schema_registry/store.h

Lines changed: 196 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -474,6 +474,9 @@ class store {
474474
return soft_deleted(sub);
475475
}
476476

477+
// Track the subject's deleted status before modification
478+
is_deleted was_deleted = sub_it->second.deleted;
479+
477480
sub_it->second.written_at.push_back(marker);
478481
sub_it->second.deleted = is_deleted::yes;
479482

@@ -487,8 +490,21 @@ class store {
487490
}
488491

489492
if (permanent) {
493+
// Permanent delete - decrement appropriate counter and erase
494+
auto ctx_it = _context_stores.find(sub.ctx);
495+
if (ctx_it != _context_stores.end()) {
496+
// Decrement based on current status
497+
ctx_it->second.decrement_subject_count(is_deleted::yes);
498+
}
490499
_subjects.erase(sub_it);
491500
} else {
501+
// Soft delete - move from not-deleted to deleted if it wasn't
502+
// already deleted
503+
if (!was_deleted) {
504+
get_or_create_context_store(sub.ctx)
505+
.update_subject_deleted_status(
506+
is_deleted::no, is_deleted::yes);
507+
}
492508
// Mark all versions within the store deleted too: this matters
493509
// if someone revives the subject with new versions later, as
494510
// these older versions should remain deleted.
@@ -563,8 +579,9 @@ class store {
563579
result<bool>
564580
set_mode(seq_marker marker, const context& ctx, mode m, force f) {
565581
BOOST_OUTCOME_TRYX(check_mode_mutability(f));
566-
_context_stores[ctx]._mode_written_at.emplace_back(marker);
567-
return std::exchange(_context_stores[ctx]._mode, m) != m;
582+
auto& context = get_or_create_context_store(ctx);
583+
context._mode_written_at.emplace_back(marker);
584+
return std::exchange(context._mode, m) != m;
568585
}
569586

570587
///\brief Set the mode for a subject.
@@ -590,9 +607,9 @@ class store {
590607
///\brief Clear the mode for a context.
591608
result<bool> clear_mode(const context& ctx, force f) {
592609
BOOST_OUTCOME_TRYX(check_mode_mutability(f));
593-
_context_stores[ctx]._mode_written_at.clear();
594-
return std::exchange(_context_stores[ctx]._mode, std::nullopt)
595-
!= std::nullopt;
610+
auto& context = get_or_create_context_store(ctx);
611+
context._mode_written_at.clear();
612+
return std::exchange(context._mode, std::nullopt) != std::nullopt;
596613
}
597614

598615
/// \brief Return the seq_marker write history of a context, but only
@@ -639,8 +656,9 @@ class store {
639656
seq_marker marker,
640657
const context& ctx,
641658
compatibility_level compatibility) {
642-
_context_stores[ctx]._config_written_at.push_back(marker);
643-
return std::exchange(_context_stores[ctx]._compatibility, compatibility)
659+
auto& context = get_or_create_context_store(ctx);
660+
context._config_written_at.push_back(marker);
661+
return std::exchange(context._compatibility, compatibility)
644662
!= compatibility;
645663
}
646664

@@ -657,8 +675,9 @@ class store {
657675

658676
///\brief Clear the compatibility level of a context.
659677
result<bool> clear_compatibility(const context& ctx) {
660-
_context_stores[ctx]._config_written_at.clear();
661-
return std::exchange(_context_stores[ctx]._compatibility, std::nullopt)
678+
auto& context = get_or_create_context_store(ctx);
679+
context._config_written_at.clear();
680+
return std::exchange(context._compatibility, std::nullopt)
662681
!= std::nullopt;
663682
}
664683

@@ -699,6 +718,11 @@ class store {
699718
: std::prev(_schemas.end())->first.id + 1;
700719
auto [_, inserted] = _schemas.try_emplace(
701720
context_schema_id{ctx, id}, std::move(def));
721+
722+
if (inserted) {
723+
get_or_create_context_store(ctx).increment_schema_count();
724+
}
725+
702726
return {id, inserted};
703727
}
704728

@@ -707,12 +731,26 @@ class store {
707731
if (mark_schema) {
708732
_marked_schemas.push_back(id);
709733
}
710-
return _schemas
711-
.insert_or_assign(std::move(id), schema_entry(std::move(def)))
712-
.second;
734+
auto [it, inserted] = _schemas.insert_or_assign(
735+
std::move(id), schema_entry(std::move(def)));
736+
737+
if (inserted) {
738+
get_or_create_context_store(it->first.ctx).increment_schema_count();
739+
}
740+
741+
return inserted;
713742
}
714743

715-
void delete_schema(const context_schema_id& id) { _schemas.erase(id); }
744+
void delete_schema(const context_schema_id& id) {
745+
auto it = _schemas.find(id);
746+
if (it != _schemas.end()) {
747+
auto ctx_it = _context_stores.find(id.ctx);
748+
if (ctx_it != _context_stores.end()) {
749+
ctx_it->second.decrement_schema_count();
750+
}
751+
_schemas.erase(it);
752+
}
753+
}
716754

717755
// This function returns and unmarkes all marked schemas.
718756
chunked_vector<context_schema_id> extract_marked_schemas() {
@@ -724,16 +762,32 @@ class store {
724762
bool inserted;
725763
};
726764
insert_subject_result insert_subject(context_subject sub, schema_id id) {
727-
auto& subject_entry = get_or_create_subject_entry(std::move(sub));
765+
auto [it, inserted] = _subjects.try_emplace(sub, sub);
766+
auto& subject_entry = it->second;
767+
768+
// Track the previous deleted status
769+
is_deleted was_deleted = subject_entry.deleted;
728770
subject_entry.deleted = is_deleted::no;
771+
772+
// Update counters based on whether this is a new subject or revival
773+
if (inserted) {
774+
// New subject - increment not-deleted counter
775+
get_or_create_context_store(sub.ctx).increment_subject_count(
776+
is_deleted::no);
777+
} else if (was_deleted) {
778+
// Reviving a deleted subject - move from deleted to not-deleted
779+
get_or_create_context_store(sub.ctx).update_subject_deleted_status(
780+
is_deleted::yes, is_deleted::no);
781+
}
782+
729783
auto& versions = subject_entry.versions;
730784
const auto v_it = std::find_if(
731785
versions.begin(), versions.end(), [id](auto v) {
732786
return v.id == id;
733787
});
734788
if (v_it != versions.cend()) {
735-
auto inserted = std::exchange(v_it->deleted, is_deleted::no);
736-
return {v_it->version, bool(inserted)};
789+
auto was_deleted = std::exchange(v_it->deleted, is_deleted::no);
790+
return {v_it->version, bool(was_deleted)};
737791
}
738792

739793
const auto version = versions.empty() ? schema_version{1}
@@ -748,7 +802,12 @@ class store {
748802
schema_version version,
749803
schema_id id,
750804
is_deleted deleted) {
751-
auto& subject_entry = get_or_create_subject_entry(std::move(sub));
805+
auto [it, inserted] = _subjects.try_emplace(sub, sub);
806+
auto& subject_entry = it->second;
807+
808+
// Track if subject deletion status changed
809+
is_deleted old_deleted = subject_entry.deleted;
810+
752811
auto& versions = subject_entry.versions;
753812
subject_entry.written_at.push_back(marker);
754813

@@ -784,6 +843,17 @@ class store {
784843
subject_entry.deleted = deleted;
785844
}
786845

846+
// Update counters based on whether this is new or status changed
847+
if (inserted) {
848+
// New subject - increment appropriate counter
849+
get_or_create_context_store(sub.ctx).increment_subject_count(
850+
deleted);
851+
} else if (old_deleted != subject_entry.deleted) {
852+
// Deletion status changed - move between counters
853+
get_or_create_context_store(sub.ctx).update_subject_deleted_status(
854+
old_deleted, subject_entry.deleted);
855+
}
856+
787857
return !found;
788858
}
789859

@@ -799,24 +869,6 @@ class store {
799869

800870
void setup_metrics() {
801871
namespace sm = ss::metrics;
802-
const auto make_schema_count = [this]() {
803-
return sm::make_gauge(
804-
"schema_count",
805-
[this] { return _schemas.size(); },
806-
sm::description("The number of schemas in the store"));
807-
};
808-
const auto make_subject_count = [this](is_deleted deleted) {
809-
return sm::make_gauge(
810-
"subject_count",
811-
[this, deleted] {
812-
return std::ranges::count_if(
813-
_subjects, [deleted](const auto& entry) {
814-
return entry.second.deleted == deleted;
815-
});
816-
},
817-
sm::description("The number of subjects in the store"),
818-
{sm::label{"deleted"}(deleted)});
819-
};
820872
const auto make_schema_bytes = [this]() {
821873
return sm::make_gauge(
822874
"schema_memory_bytes",
@@ -837,10 +889,7 @@ class store {
837889
_metrics.add_group(
838890
group_name,
839891
{
840-
make_schema_count(),
841892
make_schema_bytes(),
842-
make_subject_count(is_deleted::no),
843-
make_subject_count(is_deleted::yes),
844893
},
845894
{},
846895
agg);
@@ -850,16 +899,13 @@ class store {
850899
_public_metrics.add_group(
851900
group_name,
852901
{
853-
make_schema_count().aggregate(agg),
854902
make_schema_bytes().aggregate(agg),
855-
make_subject_count(is_deleted::no).aggregate(agg),
856-
make_subject_count(is_deleted::yes).aggregate(agg),
857903
});
858904
}
859905
};
860906

861907
void maybe_update_max_schema_id(const context_schema_id& id) {
862-
auto& nsi = _context_stores[id.ctx]._next_schema_id;
908+
auto& nsi = get_or_create_context_store(id.ctx)._next_schema_id;
863909
auto old = nsi;
864910
nsi = std::max(nsi, id.id + schema_id{1});
865911
vlog(
@@ -873,7 +919,7 @@ class store {
873919
// operations if needed. _next_schema_id gets updated
874920
// if the operation was successful, as a side effect
875921
// of applying the write to the store.
876-
return _context_stores[ctx]._next_schema_id;
922+
return get_or_create_context_store(ctx)._next_schema_id;
877923
}
878924

879925
chunked_vector<context> get_materialized_contexts() const {
@@ -896,7 +942,7 @@ class store {
896942
}
897943

898944
void set_context_materialized(const context& ctx, bool materialized) {
899-
_context_stores[ctx]._materialized = materialized;
945+
get_or_create_context_store(ctx)._materialized = materialized;
900946
}
901947

902948
private:
@@ -1030,11 +1076,117 @@ class store {
10301076
schema_id _next_schema_id{1};
10311077
bool _materialized{false};
10321078

1079+
void increment_schema_count() { _schema_count++; }
1080+
1081+
void decrement_schema_count() {
1082+
if (_schema_count > 0) {
1083+
_schema_count--;
1084+
}
1085+
}
1086+
1087+
void clear_schema_count() { _schema_count = 0; }
1088+
1089+
void increment_subject_count(is_deleted deleted) {
1090+
if (deleted == is_deleted::yes) {
1091+
_subject_count_deleted++;
1092+
} else {
1093+
_subject_count_not_deleted++;
1094+
}
1095+
}
1096+
1097+
void decrement_subject_count(is_deleted deleted) {
1098+
if (deleted == is_deleted::yes) {
1099+
if (_subject_count_deleted > 0) {
1100+
_subject_count_deleted--;
1101+
}
1102+
} else {
1103+
if (_subject_count_not_deleted > 0) {
1104+
_subject_count_not_deleted--;
1105+
}
1106+
}
1107+
}
1108+
1109+
void update_subject_deleted_status(
1110+
is_deleted old_status, is_deleted new_status) {
1111+
if (old_status != new_status) {
1112+
decrement_subject_count(old_status);
1113+
increment_subject_count(new_status);
1114+
}
1115+
}
1116+
1117+
void clear_subject_counts() {
1118+
_subject_count_deleted = 0;
1119+
_subject_count_not_deleted = 0;
1120+
}
1121+
10331122
chunked_vector<seq_marker> _config_written_at;
10341123
chunked_vector<seq_marker> _mode_written_at;
1124+
1125+
private:
1126+
metrics::internal_metric_groups _metrics;
1127+
metrics::public_metric_groups _public_metrics;
1128+
size_t _schema_count{0};
1129+
size_t _subject_count_not_deleted{0};
1130+
size_t _subject_count_deleted{0};
1131+
1132+
void setup_metrics(const context& ctx) {
1133+
namespace sm = ss::metrics;
1134+
auto group_name = prometheus_sanitize::metrics_name(
1135+
"schema_registry_cache");
1136+
1137+
const auto make_schema_count = [this, &ctx]() {
1138+
return sm::make_gauge(
1139+
"schema_count",
1140+
[this] { return _schema_count; },
1141+
sm::description("The number of schemas in the store"),
1142+
{sm::label{"context"}(ctx)});
1143+
};
1144+
1145+
const auto make_subject_count = [this, &ctx](is_deleted deleted) {
1146+
return sm::make_gauge(
1147+
"subject_count",
1148+
[this, deleted] {
1149+
return deleted == is_deleted::yes
1150+
? _subject_count_deleted
1151+
: _subject_count_not_deleted;
1152+
},
1153+
sm::description("The number of subjects in the store"),
1154+
{sm::label{"context"}(ctx), sm::label{"deleted"}(deleted)});
1155+
};
1156+
1157+
if (!config::shard_local_cfg().disable_metrics()) {
1158+
_metrics.add_group(
1159+
group_name,
1160+
{make_schema_count(),
1161+
make_subject_count(is_deleted::no),
1162+
make_subject_count(is_deleted::yes)},
1163+
{},
1164+
{sm::shard_label});
1165+
}
1166+
1167+
if (!config::shard_local_cfg().disable_public_metrics()) {
1168+
_public_metrics.add_group(
1169+
group_name,
1170+
{make_schema_count().aggregate({sm::shard_label}),
1171+
make_subject_count(is_deleted::no)
1172+
.aggregate({sm::shard_label}),
1173+
make_subject_count(is_deleted::yes)
1174+
.aggregate({sm::shard_label})});
1175+
}
1176+
}
1177+
1178+
friend class store;
10351179
};
10361180
using context_store_map = absl::node_hash_map<context, context_store>;
10371181

1182+
context_store& get_or_create_context_store(const context& ctx) {
1183+
auto [it, inserted] = _context_stores.try_emplace(ctx);
1184+
if (inserted) {
1185+
it->second.setup_metrics(ctx);
1186+
}
1187+
return it->second;
1188+
}
1189+
10381190
// NOTE: sharded_store shards data into multiple store instances, so some
10391191
// fields are only present on certain shards.
10401192
// _schemas: sharded by (context, schema_id)

0 commit comments

Comments
 (0)