@@ -42,22 +42,21 @@ For more information, please refer to <http://unlicense.org/>
4242#include < openssl/evp.h>
4343#include < string>
4444#include " parallel_hashmap/phmap.h"
45- #include " jwt-cpp/jwt.h"
4645#include < shared_mutex>
4746
4847/*
49- * creates a p hashmap with 4 shards with shared mutex so read/write is thread safe. Read is lock free.
48+ * creates a p hashmap with shared mutex so read/write is thread safe. Read is lock free.
5049 * this is a global variable, so it is shared between all threads because acl check can happen in any thread.
51- * 2**4 = 16 sub maps, so lock is applied to sub map so concurrency is intrinsinc to the map.
50+ * 2**N so 2** 4 = 16 sub maps, so lock is applied to sub map so concurrency is intrinsinc to the map.
5251 */
53- const int SHARDS = 4 ;
52+ const int N = 4 ;
5453using TokenCache = phmap::parallel_flat_hash_map<
5554 std::string,
56- long long ,
55+ std:: int64_t ,
5756 phmap::priv::hash_default_hash<std::string>,
5857 phmap::priv::hash_default_eq<std::string>,
59- phmap::priv::Allocator<phmap::priv::Pair<const std::string, long long >>,
60- SHARDS ,
58+ phmap::priv::Allocator<phmap::priv::Pair<const std::string, std:: int64_t >>,
59+ N ,
6160 std::shared_mutex>;
6261
6362static TokenCache token_expiry_cache;
@@ -222,8 +221,8 @@ AuthResult flashmq_plugin_login_check(void *thread_data, const std::string &clie
222221 const std::string rsa_pub_key = base64_decode (rsa_pub_env_key);
223222
224223 const std::string token = password;
225- if (token. empty ())
226- {
224+
225+ if (token. empty ()) {
227226 flashmq_logf (LOG_ERR, " No token found for username: %s" , username.c_str ());
228227 return AuthResult::error;
229228 }
@@ -246,7 +245,8 @@ AuthResult flashmq_plugin_login_check(void *thread_data, const std::string &clie
246245 [&](auto & kv) { kv.second = exp_epoch; },
247246 exp_epoch // construct if missing
248247 );
249- flashmq_logf (LOG_INFO, " Verified JWT token successfully with public key" );
248+
249+ flashmq_logf (LOG_INFO, " Verified JWT token successfully for user: %s" , username.c_str ());
250250 return AuthResult::success;
251251 } catch (const std::exception &e) {
252252 flashmq_logf (LOG_ERR, " Failed to decode JWT token: %s" , e.what ());
@@ -265,13 +265,10 @@ AuthResult flashmq_plugin_acl_check(void *thread_data, const AclAccess access, c
265265 const std::vector<std::pair<std::string, std::string>> *userProperties)
266266{
267267 (void )thread_data;
268- (void )access;
269- (void )clientid;
270268 (void )subtopics;
271269 (void )qos;
272270 (void )(retain);
273271 (void )userProperties;
274- (void )topic;
275272 (void )payload;
276273 (void )shareName;
277274 (void )correlationData;
@@ -280,28 +277,28 @@ AuthResult flashmq_plugin_acl_check(void *thread_data, const AclAccess access, c
280277 // SYS topics are published every 10 seconds, this allow broker internal $SYS topics to be published
281278 bool is_broker_internal_topic = (username.empty () && clientid.empty ()) && topic.rfind (" $SYS" , 0 ) == 0 && access == AclAccess::write;
282279 bool is_allowed_user = allow_user_access (username);
283- if (is_broker_internal_topic || is_allowed_user)
284- {
280+
281+ if (is_broker_internal_topic || is_allowed_user) {
285282 return AuthResult::success;
286283 }
287284
288- long long exp_epoch = 0 ;
285+ std:: int64_t exp_epoch = 0 ;
289286 // thread safe read with if_contains
290287 bool cache_hit = token_expiry_cache.if_contains (clientid, [&](const TokenCache::value_type &kv) {
291288 exp_epoch = kv.second ;
292289 });
293290
294291
295292
296- if (cache_hit)
297- {
293+ if (cache_hit) {
298294 flashmq_logf (LOG_DEBUG, " JWT verification cache hit for user: %s and exp: %lld" , username.c_str (), exp_epoch);
299295 // jwt expiry is in epoch seconds
300- long long now_epoch = std::chrono::duration_cast<std::chrono::seconds>(
296+ std:: int64_t now_epoch = std::chrono::duration_cast<std::chrono::seconds>(
301297 std::chrono::system_clock::now ().time_since_epoch ()).count ();
302298
303299 bool token_expired = now_epoch > exp_epoch;
304- if (token_expired){
300+
301+ if (token_expired) {
305302 flashmq_logf (LOG_DEBUG, " JWT verification cache expired for user: %s" , username.c_str ());
306303 token_expiry_cache.erase (clientid);
307304 return AuthResult::acl_denied;
0 commit comments