@@ -64,6 +64,8 @@ void SendAclSecurityEvents(const AclLog::LogEntry& entry, facade::RedisReplyBuil
64
64
65
65
string AclDbToString (size_t db);
66
66
67
+ template <typename P>
68
+ void TraverseEvictImpl (P predicate, facade::Listener* main_listener, util::ProactorPool* pool);
67
69
} // namespace
68
70
69
71
AclFamily::AclFamily (UserRegistry* registry, util::ProactorPool* pool)
@@ -164,36 +166,17 @@ void AclFamily::SetUser(CmdArgList args, const CommandContext& cmd_cntx) {
164
166
}
165
167
166
168
void AclFamily::EvictOpenConnectionsOnAllProactors (const absl::flat_hash_set<string_view>& users) {
167
- auto close_cb = [&users]([[maybe_unused]] size_t id, util::Connection* conn) {
168
- CHECK (conn);
169
- auto connection = static_cast <facade::Connection*>(conn);
170
- auto ctx = static_cast <ConnectionContext*>(connection->cntx ());
171
- if (ctx && users.contains (ctx->authed_username )) {
172
- connection->ShutdownSelfBlocking ();
173
- }
174
- };
175
-
176
- if (main_listener_) {
177
- // TODO(kostas) fix this it might preempt and TraverseConnection shall be atomic
178
- main_listener_->TraverseConnections (close_cb);
179
- }
169
+ return TraverseEvictImpl ([&](auto * ctx) { return ctx && users.contains (ctx->authed_username ); },
170
+ main_listener_, pool_);
180
171
}
181
172
182
173
void AclFamily::EvictOpenConnectionsOnAllProactorsWithRegistry (
183
174
const UserRegistry::RegistryType& registry) {
184
- auto close_cb = [®istry]([[maybe_unused]] size_t id, util::Connection* conn) {
185
- CHECK (conn);
186
- auto connection = static_cast <facade::Connection*>(conn);
187
- auto ctx = static_cast <ConnectionContext*>(connection->cntx ());
188
- if (ctx && ctx->authed_username != " default" && registry.contains (ctx->authed_username )) {
189
- connection->ShutdownSelfBlocking ();
190
- }
191
- };
192
-
193
- if (main_listener_) {
194
- // TODO(kostas) fix this it might preempt and TraverseConnection shall be atomic
195
- main_listener_->TraverseConnections (close_cb);
196
- }
175
+ return TraverseEvictImpl (
176
+ [&](auto * ctx) {
177
+ return ctx && ctx->authed_username != " default" && registry.contains (ctx->authed_username );
178
+ },
179
+ main_listener_, pool_);
197
180
}
198
181
199
182
void AclFamily::DelUser (CmdArgList args, const CommandContext& cmd_cntx) {
@@ -933,6 +916,34 @@ std::string AclDbToString(size_t db) {
933
916
return std::numeric_limits<size_t >::max () == db ? " all" : absl::StrCat (db);
934
917
}
935
918
919
+ // Fetches the connections that predicate P evaluates to true and shuts them
920
+ // down gracefully.
921
+ template <typename P>
922
+ void TraverseEvictImpl (P predicate, facade::Listener* main_listener, util::ProactorPool* pool) {
923
+ auto close_cb = [&](unsigned idx, util::ProactorBase* p) {
924
+ std::vector<facade::Connection::WeakRef> connections;
925
+ auto traverse_cb = [&](unsigned id, util::Connection* conn) {
926
+ auto connection = static_cast <facade::Connection*>(conn);
927
+ auto ctx = connection->cntx ();
928
+ if (predicate (ctx)) {
929
+ connections.push_back (connection->Borrow ());
930
+ }
931
+ };
932
+
933
+ main_listener->TraverseConnectionsOnThread (traverse_cb, UINT32_MAX, nullptr );
934
+
935
+ for (auto & tcon : connections) {
936
+ facade::Connection* conn = tcon.Get ();
937
+ if (conn && conn->socket ()->proactor ()->GetPoolIndex () == p->GetPoolIndex ()) {
938
+ // preemptive for TlsSocket
939
+ conn->ShutdownSelfBlocking ();
940
+ }
941
+ }
942
+ };
943
+
944
+ pool->AwaitFiberOnAll (close_cb);
945
+ }
946
+
936
947
} // namespace
937
948
938
949
std::string AclFamily::AclCatAndCommandToString (const User::CategoryChanges& cat,
0 commit comments