@@ -23,6 +23,7 @@ use tracing::{Instrument, Span};
2323
2424use rumqttd_protocol:: { QoS , RetainForwardRule , SubscribeReasonCode , UnsubAckReason } ;
2525
26+ use crate :: config:: permissions:: PermissionsConfig ;
2627use crate :: map_join_error;
2728use crate :: mqtt:: mailbox:: MailSender ;
2829use crate :: mqtt:: packets:: PacketId ;
@@ -77,7 +78,11 @@ pub struct TceState {
7778}
7879
7980impl MqttRouter {
80- pub fn start ( tce : Option < TceState > , token : CancellationToken ) -> Self {
81+ pub fn start (
82+ tce : Option < TceState > ,
83+ token : CancellationToken ,
84+ permissions : PermissionsConfig ,
85+ ) -> Self {
8186 let ( command_tx, command_rx) = mpsc:: channel ( COMMAND_CAPACITY ) ;
8287
8388 let ( system_tx, system_rx) = mpsc:: unbounded_channel ( ) ;
@@ -88,6 +93,7 @@ impl MqttRouter {
8893
8994 let state = RouterState {
9095 token,
96+ permissions,
9197 clients : SecondaryMap :: new ( ) ,
9298 dead_clients : HashSet :: default ( ) ,
9399 subscriptions : Subscriptions :: default ( ) ,
@@ -159,6 +165,7 @@ impl RouterHandle {
159165 connection_id : ConnectionId ,
160166 client_index : ClientIndex ,
161167 client_id : ClientId ,
168+ user : String ,
162169 mail_tx : MailSender ,
163170 clean_session : bool ,
164171 ) -> crate :: Result < RouterConnection > {
@@ -170,6 +177,7 @@ impl RouterHandle {
170177 RouterCommand :: NewConnection {
171178 connection_id,
172179 client_id,
180+ user,
173181 message_tx,
174182 mail_tx,
175183 clean_session,
@@ -284,6 +292,7 @@ enum RouterCommand {
284292 connection_id : ConnectionId ,
285293 client_id : ClientId ,
286294 mail_tx : MailSender ,
295+ user : String ,
287296 message_tx : mpsc:: UnboundedSender < RouterMessage > ,
288297 clean_session : bool ,
289298 } ,
@@ -334,6 +343,8 @@ struct RouterState {
334343 clients : SecondaryMap < ClientIndex , ClientState > ,
335344 dead_clients : HashSet < ClientIndex > ,
336345
346+ permissions : PermissionsConfig ,
347+
337348 subscriptions : Subscriptions ,
338349 command_rx : mpsc:: Receiver < ( ClientIndex , RouterCommand ) > ,
339350 system_rx : mpsc:: UnboundedReceiver < SystemCommand > ,
@@ -351,6 +362,7 @@ struct RouterState {
351362struct ClientState {
352363 client_id : ClientId ,
353364 mail_tx : MailSender ,
365+ user : String ,
354366 subscriptions : ClientSubscriptions ,
355367 current_connection : Option < ConnectionState > ,
356368 clean_session : bool ,
@@ -414,6 +426,15 @@ enum PublishOrigin<'a> {
414426 Consensus ( & ' a CreatorId ) ,
415427}
416428
429+ impl PublishOrigin < ' _ > {
430+ pub fn get_client_index ( & self ) -> Option < ClientIndex > {
431+ match self {
432+ PublishOrigin :: Local ( client_index) => Some ( * client_index) ,
433+ _ => None ,
434+ }
435+ }
436+ }
437+
417438impl Index < SubscriptionKind > for Subscriptions {
418439 type Output = SubscriptionMap ;
419440
@@ -567,6 +588,7 @@ fn handle_command(state: &mut RouterState, client_idx: ClientIndex, command: Rou
567588 RouterCommand :: NewConnection {
568589 connection_id,
569590 client_id,
591+ user,
570592 message_tx,
571593 mail_tx,
572594 clean_session,
@@ -593,6 +615,7 @@ fn handle_command(state: &mut RouterState, client_idx: ClientIndex, command: Rou
593615 client_idx,
594616 ClientState {
595617 client_id,
618+ user,
596619 mail_tx,
597620 subscriptions : Default :: default ( ) ,
598621 current_connection : Some ( ConnectionState {
@@ -663,9 +686,11 @@ fn handle_subscribe(state: &mut RouterState, client_idx: ClientIndex, request: S
663686 publish : Arc < PublishTrasaction > ,
664687 }
665688
666- if ! state. clients . contains_key ( client_idx) {
689+ let Some ( client ) = state. clients . get ( client_idx) else {
667690 return ;
668- }
691+ } ;
692+
693+ let permissions = state. permissions . get_topics_acl_config ( & client. user ) ;
669694
670695 // if state.connections[conn_id].message_tx.is_closed() {
671696 // return;
@@ -683,6 +708,14 @@ fn handle_subscribe(state: &mut RouterState, client_idx: ClientIndex, request: S
683708 // as they would have failed validation on the frontend.
684709 . ok_or ( SubscribeReasonCode :: Unspecified )
685710 . and_then ( |( filter, props) | {
711+ if !state. permissions . check_acl_config (
712+ permissions,
713+ filter. as_str ( ) ,
714+ crate :: config:: permissions:: TransactionType :: Subscribe ,
715+ ) {
716+ Err ( SubscribeReasonCode :: NotAuthorized ) ?
717+ }
718+
686719 let sub_kind = SubscriptionKind :: from_filter ( & filter)
687720 . map_err ( |_| SubscribeReasonCode :: NotAuthorized ) ?;
688721
@@ -974,6 +1007,23 @@ fn dispatch(state: &mut RouterState, publish: Arc<PublishTrasaction>, origin: Pu
9741007 } ,
9751008 } ;
9761009
1010+ // Check if user has permission to publish
1011+ if let Some ( client_index) = origin. get_client_index ( ) {
1012+ let Some ( client) = state. clients . get ( client_index) else {
1013+ return ;
1014+ } ;
1015+
1016+ let topics_config = state. permissions . get_topics_acl_config ( & client. user ) ;
1017+
1018+ if !state. permissions . check_acl_config (
1019+ topics_config,
1020+ & publish. topic ,
1021+ crate :: config:: permissions:: TransactionType :: Publish ,
1022+ ) {
1023+ return ;
1024+ }
1025+ }
1026+
9771027 // Only run this if TCE is not available.
9781028 if state. tce . is_none ( ) && publish. meta . retain ( ) {
9791029 let time_now = state. startup_time + state. startup_instant . elapsed ( ) ;
0 commit comments