Skip to content

Commit 17b7d07

Browse files
committed
admin: implement OIDC session revocation in v2 API
Implement RevokeOidcSessions RPC to refresh OIDC keys and revoke all active OIDC sessions across the cluster. The receiving broker refreshes OIDC keys on all local shards, revokes OIDC SASL sessions with kafka::server, and broadcasts the request to all other nodes in the cluster, with each broker performing local key refresh and session revocation. The originating node aggregates errors across all brokers and returns an aggregated error if any node fails, otherwise success. Add comprehensive test coverage validating session revocation with various authentication scenarios. This enables administrators to immediately invalidate all OIDC sessions when security policies change or credentials are compromised.
1 parent 45b8136 commit 17b7d07

File tree

5 files changed

+189
-23
lines changed

5 files changed

+189
-23
lines changed

src/v/redpanda/admin/services/BUILD

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ redpanda_cc_library(
5959
deps = [
6060
"//proto/redpanda/core/admin/v2:security_redpanda_proto",
6161
"//src/v/cluster",
62+
"//src/v/kafka/server",
6263
"//src/v/redpanda/admin/proxy:client",
6364
"//src/v/serde/protobuf:rpc",
6465
"@seastar",

src/v/redpanda/admin/services/security.cc

Lines changed: 33 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
#include "redpanda/admin/services/security.h"
1313

14+
#include "kafka/server/server.h"
1415
#include "redpanda/admin/proxy/context.h"
1516
#include "security/oidc_authenticator.h"
1617
#include "security/oidc_service.h"
@@ -26,9 +27,12 @@ ss::logger securitylog{"admin_api_server/security_service"};
2627
} // namespace
2728

2829
security_service_impl::security_service_impl(
29-
admin::proxy::client proxy_client, cluster::controller* controller)
30+
admin::proxy::client proxy_client,
31+
cluster::controller* controller,
32+
ss::sharded<kafka::server>& kafka_server)
3033
: _proxy_client(std::move(proxy_client))
31-
, _controller(controller) {}
34+
, _controller(controller)
35+
, _kafka_server(kafka_server) {}
3236

3337
seastar::future<proto::admin::resolve_oidc_identity_response>
3438
security_service_impl::resolve_oidc_identity(
@@ -108,9 +112,33 @@ security_service_impl::refresh_oidc_keys(
108112

109113
seastar::future<proto::admin::revoke_oidc_sessions_response>
110114
security_service_impl::revoke_oidc_sessions(
111-
serde::pb::rpc::context, proto::admin::revoke_oidc_sessions_request) {
112-
throw serde::pb::rpc::unimplemented_exception(
113-
"revoke_oidc_sessions is not implemented");
115+
serde::pb::rpc::context ctx, proto::admin::revoke_oidc_sessions_request) {
116+
vlog(securitylog.debug, "Refreshing OIDC keys and revoking OIDC sessions");
117+
118+
co_await _controller->get_oidc_service().invoke_on_all(
119+
[](security::oidc::service& s) { return s.refresh_keys(); });
120+
121+
co_await _kafka_server.invoke_on_all([](kafka::server& ks) {
122+
return ks.revoke_credentials(security::oidc::sasl_authenticator::name);
123+
});
124+
125+
if (!proxy::is_proxied(ctx)) {
126+
vlog(securitylog.debug, "Broadcasting request to other nodes");
127+
128+
auto clients = _proxy_client.make_clients_for_other_nodes<
129+
proto::admin::security_service_client>();
130+
131+
for (auto& client_pair : clients) {
132+
auto& [node_id, client] = client_pair;
133+
vlog(
134+
securitylog.trace,
135+
"Proxying revoke_oidc_sessions to {}",
136+
node_id);
137+
co_await client.revoke_oidc_sessions(ctx, {});
138+
}
139+
}
140+
141+
co_return proto::admin::revoke_oidc_sessions_response{};
114142
}
115143

116144
} // namespace admin

src/v/redpanda/admin/services/security.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
#pragma once
1313

1414
#include "cluster/controller.h"
15+
#include "kafka/server/fwd.h"
1516
#include "proto/redpanda/core/admin/v2/security.proto.h"
1617
#include "redpanda/admin/proxy/client.h"
1718

@@ -20,7 +21,9 @@ namespace admin {
2021
class security_service_impl : public proto::admin::security_service {
2122
public:
2223
security_service_impl(
23-
admin::proxy::client proxy_client, cluster::controller* controller);
24+
admin::proxy::client proxy_client,
25+
cluster::controller* controller,
26+
ss::sharded<kafka::server>& kafka_server);
2427

2528
seastar::future<proto::admin::resolve_oidc_identity_response>
2629
resolve_oidc_identity(
@@ -37,6 +40,7 @@ class security_service_impl : public proto::admin::security_service {
3740
private:
3841
admin::proxy::client _proxy_client;
3942
cluster::controller* _controller;
43+
ss::sharded<kafka::server>& _kafka_server;
4044
};
4145

4246
} // namespace admin

src/v/redpanda/application_admin.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ void application::configure_admin_server(model::node_id node_id) {
108108
create_client(), &_cluster_link_service, &metadata_cache));
109109
s.add_service(
110110
std::make_unique<admin::security_service_impl>(
111-
create_client(), controller.get()));
111+
create_client(), controller.get(), _kafka_server.ref()));
112112
})
113113
.get();
114114
}

tests/rptest/tests/redpanda_oauth_test.py

Lines changed: 149 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,20 @@ def get_idp_request_count(self, nodes: list[ClusterNode]):
203203
)
204204
return result["security_idp_latency_seconds_count"]
205205

206+
def get_sasl_session_revoked_total(self):
207+
metrics = [
208+
"kafka_rpc_sasl_session_revoked_total",
209+
]
210+
samples = self.redpanda.metrics_samples(
211+
metrics, self.redpanda.nodes, MetricsEndpoint.METRICS
212+
)
213+
result = {}
214+
for k in samples.keys():
215+
result[k] = result.get(k, 0) + sum(
216+
[int(s.value) for s in samples[k].samples]
217+
)
218+
return result["kafka_rpc_sasl_session_revoked_total"]
219+
206220

207221
class RedpandaOIDCTestMethods(RedpandaOIDCTestBase):
208222
def __init__(self, test_context, **kwargs):
@@ -541,20 +555,6 @@ def test_admin_revoke(self):
541555

542556
kc_node = self.keycloak.nodes[0]
543557

544-
def get_sasl_session_revoked_total():
545-
metrics = [
546-
"kafka_rpc_sasl_session_revoked_total",
547-
]
548-
samples = self.redpanda.metrics_samples(
549-
metrics, self.redpanda.nodes, MetricsEndpoint.METRICS
550-
)
551-
result = {}
552-
for k in samples.keys():
553-
result[k] = result.get(k, 0) + sum(
554-
[int(s.value) for s in samples[k].samples]
555-
)
556-
return result["kafka_rpc_sasl_session_revoked_total"]
557-
558558
client_id = CLIENT_ID
559559
service_user_id = self.create_service_user(client_id)
560560
cfg = self.keycloak.generate_oauth_config(kc_node, client_id)
@@ -642,7 +642,7 @@ def has_group():
642642
t1 = threading.Thread(target=consume_one)
643643
t1.start()
644644

645-
revoked_total = get_sasl_session_revoked_total()
645+
revoked_total = self.get_sasl_session_revoked_total()
646646

647647
time.sleep(5)
648648

@@ -659,9 +659,142 @@ def has_group():
659659
self.redpanda.logger.debug("joined consumer thread")
660660

661661
wait_until(
662-
lambda: revoked_total < get_sasl_session_revoked_total(),
662+
lambda: revoked_total < self.get_sasl_session_revoked_total(),
663+
timeout_sec=10,
664+
backoff_sec=1,
665+
)
666+
667+
consumer.close()
668+
669+
@cluster(num_nodes=4)
670+
def test_admin_v2_revoke_oidc_sessions(self):
671+
FETCH_TIMEOUT_SEC = 10
672+
GROUP_ID = "test_admin_revoke"
673+
674+
kc_node = self.keycloak.nodes[0]
675+
676+
client_id = CLIENT_ID
677+
service_user_id = self.create_service_user(client_id)
678+
cfg = self.keycloak.generate_oauth_config(kc_node, client_id)
679+
token = self.get_client_credentials_token(cfg)
680+
681+
def request_revoke_oidc_sessions(
682+
with_auth: bool,
683+
) -> security_pb2.RevokeOidcSessionsResponse:
684+
admin_v2 = AdminV2(self.redpanda)
685+
req = security_pb2.RevokeOidcSessionsRequest()
686+
return admin_v2.security().revoke_oidc_sessions(
687+
req,
688+
extra_headers={"Authorization": f"Bearer {token['access_token']}"}
689+
if with_auth
690+
else None,
691+
)
692+
693+
# At this point, admin API does not require auth and service_user_id is not a superuser
694+
# Both calls should succeed
695+
request_revoke_oidc_sessions(with_auth=False)
696+
request_revoke_oidc_sessions(with_auth=True)
697+
698+
# Require Auth for Admin
699+
self.redpanda.set_cluster_config({"admin_api_require_auth": True})
700+
701+
with expect_exception(
702+
ConnectError,
703+
lambda e: e.code == ConnectErrorCode.PERMISSION_DENIED,
704+
):
705+
request_revoke_oidc_sessions(with_auth=False)
706+
707+
with expect_exception(
708+
ConnectError,
709+
lambda e: e.code == ConnectErrorCode.PERMISSION_DENIED,
710+
):
711+
request_revoke_oidc_sessions(with_auth=True)
712+
713+
# Add service_user_id as a superuser
714+
self.redpanda.set_cluster_config(
715+
{
716+
"superusers": [
717+
self.redpanda.SUPERUSER_CREDENTIALS.username,
718+
service_user_id,
719+
]
720+
}
721+
)
722+
723+
self.rpk.create_topic(EXAMPLE_TOPIC)
724+
expected_topics = set([EXAMPLE_TOPIC])
725+
wait_until(
726+
lambda: set(self.rpk.list_topics()) == expected_topics,
727+
timeout_sec=10,
728+
err_msg=f"Expected topics: {expected_topics}, got: {self.rpk.list_topics()}",
729+
)
730+
731+
cfg = self.keycloak.generate_oauth_config(kc_node, client_id)
732+
k_client = PythonLibrdkafka(
733+
self.redpanda,
734+
algorithm="OAUTHBEARER",
735+
oauth_config=cfg,
736+
tls_cert=self.client_cert,
737+
)
738+
739+
consumer = k_client.get_consumer(extra_config={"group.id": GROUP_ID})
740+
producer = k_client.get_producer()
741+
742+
self.redpanda.logger.debug("starting producer")
743+
producer.poll(1.0)
744+
wait_until(
745+
lambda: set(producer.list_topics(timeout=10).topics.keys())
746+
== expected_topics,
747+
timeout_sec=5,
748+
err_msg=f"Producer topics do not match expected topics: {expected_topics}",
749+
)
750+
751+
def consume_one():
752+
self.redpanda.logger.debug("starting consumer")
753+
rec = consumer.poll(FETCH_TIMEOUT_SEC)
754+
self.redpanda.logger.debug(f"consumed: {rec}")
755+
return rec
756+
757+
def has_group():
758+
groups = self.rpk.group_describe(group=GROUP_ID, summary=True)
759+
return groups.members == 1 and groups.state == "Stable"
760+
761+
self.redpanda.logger.debug("starting consumer.subscribe")
762+
consumer.subscribe([EXAMPLE_TOPIC])
763+
self.redpanda.logger.debug("consumer.subscribed")
764+
rec = consumer.poll(1.0)
765+
assert rec is None, f"Expected no record, got: {rec}"
766+
767+
wait_until(
768+
has_group,
769+
timeout_sec=30,
770+
backoff_sec=1,
771+
err_msg="Consumer group did not reach expected state",
772+
)
773+
774+
t1 = threading.Thread(target=consume_one)
775+
t1.start()
776+
777+
revoked_total = self.get_sasl_session_revoked_total()
778+
779+
time.sleep(5)
780+
781+
self.redpanda.logger.debug("starting final revoke")
782+
request_revoke_oidc_sessions(with_auth=True)
783+
784+
self.redpanda.logger.debug("starting producer")
785+
producer.produce(topic=EXAMPLE_TOPIC, key="bar", value="23")
786+
producer.flush(timeout=5)
787+
self.redpanda.logger.debug("produced 1")
788+
789+
self.redpanda.logger.debug("joining consumer thread")
790+
t1.join()
791+
self.redpanda.logger.debug("joined consumer thread")
792+
793+
wait_until(
794+
lambda: revoked_total < self.get_sasl_session_revoked_total(),
663795
timeout_sec=10,
664796
backoff_sec=1,
797+
err_msg="Expected sasl_session_revoked_total to increase after OIDC sessions revoke",
665798
)
666799

667800
consumer.close()

0 commit comments

Comments
 (0)