@@ -70,7 +70,9 @@ public class BridgeResourceImpl extends AbstractApi implements BridgesApi {
7070 private final @ NotNull ConfigurationService configurationService ;
7171 private final @ NotNull BridgeService bridgeService ;
7272 private final @ NotNull SystemInformation systemInformation ;
73- private final @ NotNull Object bridgeUpdateLock = new Object ();
73+ // Lock for all bridge CRUD operations to prevent race conditions between concurrent
74+ // add/update/remove operations (e.g., one thread removing while another is updating)
75+ private final @ NotNull Object bridgeUpdateLock ;
7476
7577 @ Inject
7678 public BridgeResourceImpl (
@@ -80,16 +82,20 @@ public BridgeResourceImpl(
8082 this .configurationService = configurationService ;
8183 this .bridgeService = bridgeService ;
8284 this .systemInformation = systemInformation ;
85+ this .bridgeUpdateLock = new Object ();
8386 }
8487
8588 @ Override
8689 public @ NotNull Response getBridges () {
8790 logger .trace ("Bridge API listing events at {}" , System .currentTimeMillis ());
88- final List <MqttBridge > bridges = configurationService .bridgeExtractor ().getBridges ();
89- final BridgeList list = new BridgeList ().items (bridges .stream ()
90- .map (m -> BridgeUtils .convert (m , getStatusInternal (m .getId ())))
91- .collect (Collectors .toList ()));
92- return Response .ok (list ).build ();
91+ // Synchronize to get consistent snapshot during concurrent modifications
92+ synchronized (bridgeUpdateLock ) {
93+ final List <MqttBridge > bridges = configurationService .bridgeExtractor ().getBridges ();
94+ final BridgeList list = new BridgeList ().items (bridges .stream ()
95+ .map (m -> BridgeUtils .convert (m , getStatusInternal (m .getId ())))
96+ .collect (Collectors .toList ()));
97+ return Response .ok (list ).build ();
98+ }
9399 }
94100
95101 @ Override
@@ -99,18 +105,22 @@ public BridgeResourceImpl(
99105 return ErrorResponseUtil .errorResponse (new ConfigWritingDisabled ());
100106 }
101107 final ApiErrorMessages errorMessages = ApiErrorUtils .createErrorContainer ();
102- if (checkBridgeExists (bridge .getId ())) {
103- return ErrorResponseUtil .errorResponse (new BridgeFailedSchemaValidationError (List .of (new Error (
104- "Bridge already existed" ,
105- "id" ))));
106- }
107- validateBridge (errorMessages , bridge );
108- if (ApiErrorUtils .hasRequestErrors (errorMessages )) {
109- return ErrorResponseUtil .errorResponse (new BridgeFailedSchemaValidationError (errorMessages .toErrorList ()));
110- } else {
111- final MqttBridge mqttBridge = unconvert (bridge );
112- configurationService .bridgeExtractor ().addBridge (mqttBridge );
113- return Response .ok ().build ();
108+
109+ // Synchronize to prevent race conditions with concurrent update/remove operations
110+ synchronized (bridgeUpdateLock ) {
111+ if (checkBridgeExists (bridge .getId ())) {
112+ return ErrorResponseUtil .errorResponse (new BridgeFailedSchemaValidationError (List .of (new Error (
113+ "Bridge already existed" ,
114+ "id" ))));
115+ }
116+ validateBridge (errorMessages , bridge );
117+ if (ApiErrorUtils .hasRequestErrors (errorMessages )) {
118+ return ErrorResponseUtil .errorResponse (new BridgeFailedSchemaValidationError (errorMessages .toErrorList ()));
119+ } else {
120+ final MqttBridge mqttBridge = unconvert (bridge );
121+ configurationService .bridgeExtractor ().addBridge (mqttBridge );
122+ return Response .ok ().build ();
123+ }
114124 }
115125 }
116126
@@ -120,26 +130,30 @@ public BridgeResourceImpl(
120130 final ApiErrorMessages errorMessages = ApiErrorUtils .createErrorContainer ();
121131 ApiErrorUtils .validateRequiredField (errorMessages , "id" , bridgeId , false );
122132 ApiErrorUtils .validateRequiredFieldRegex (errorMessages , "id" , bridgeId , HiveMQEdgeConstants .ID_REGEX );
123- if (!checkBridgeExists (bridgeId )) {
124- return ErrorResponseUtil .errorResponse (new BridgeNotFoundError (String .format ("Bridge not found by id '%s'" ,
125- bridgeId )));
126- }
127- if (ApiErrorUtils .hasRequestErrors (errorMessages )) {
128- return ErrorResponseUtil .errorResponse (new InvalidQueryParameterErrors (errorMessages .toErrorList ()));
129- } else {
130- final Optional <MqttBridge > bridge = configurationService .bridgeExtractor ()
131- .getBridges ()
132- .stream ()
133- .filter (b -> b .getId ().equals (bridgeId ))
134- .findFirst ();
135- if (bridge .isPresent ()) {
136- final MqttBridge mqttBridge = bridge .get ();
137- return Response .ok (BridgeUtils .convert (mqttBridge , getStatusInternal (bridgeId ))).build ();
138- } else {
139- return ErrorResponseUtil .errorResponse (new BridgeNotFoundError (String .format (
140- "Bridge not found by id '%s'" ,
133+
134+ // Synchronize to get consistent view during concurrent modifications
135+ synchronized (bridgeUpdateLock ) {
136+ if (!checkBridgeExists (bridgeId )) {
137+ return ErrorResponseUtil .errorResponse (new BridgeNotFoundError (String .format ("Bridge not found by id '%s'" ,
141138 bridgeId )));
142139 }
140+ if (ApiErrorUtils .hasRequestErrors (errorMessages )) {
141+ return ErrorResponseUtil .errorResponse (new InvalidQueryParameterErrors (errorMessages .toErrorList ()));
142+ } else {
143+ final Optional <MqttBridge > bridge = configurationService .bridgeExtractor ()
144+ .getBridges ()
145+ .stream ()
146+ .filter (b -> b .getId ().equals (bridgeId ))
147+ .findFirst ();
148+ if (bridge .isPresent ()) {
149+ final MqttBridge mqttBridge = bridge .get ();
150+ return Response .ok (BridgeUtils .convert (mqttBridge , getStatusInternal (bridgeId ))).build ();
151+ } else {
152+ return ErrorResponseUtil .errorResponse (new BridgeNotFoundError (String .format (
153+ "Bridge not found by id '%s'" ,
154+ bridgeId )));
155+ }
156+ }
143157 }
144158 }
145159
@@ -151,15 +165,19 @@ public BridgeResourceImpl(
151165 final ApiErrorMessages errorMessages = ApiErrorUtils .createErrorContainer ();
152166 ApiErrorUtils .validateRequiredField (errorMessages , "id" , bridgeId , false );
153167 ApiErrorUtils .validateRequiredFieldRegex (errorMessages , "id" , bridgeId , HiveMQEdgeConstants .ID_REGEX );
154- if (!checkBridgeExists (bridgeId )) {
155- return ErrorResponseUtil .errorResponse (new BridgeNotFoundError (String .format ("Bridge not found by id '%s'" ,
156- bridgeId )));
157- }
158- if (ApiErrorUtils .hasRequestErrors (errorMessages )) {
159- return ErrorResponseUtil .errorResponse (new InvalidQueryParameterErrors (errorMessages .toErrorList ()));
160- } else {
161- configurationService .bridgeExtractor ().removeBridge (bridgeId );
162- return Response .ok ().build ();
168+
169+ // Synchronize to prevent race conditions with concurrent add/update operations
170+ synchronized (bridgeUpdateLock ) {
171+ if (!checkBridgeExists (bridgeId )) {
172+ return ErrorResponseUtil .errorResponse (new BridgeNotFoundError (String .format ("Bridge not found by id '%s'" ,
173+ bridgeId )));
174+ }
175+ if (ApiErrorUtils .hasRequestErrors (errorMessages )) {
176+ return ErrorResponseUtil .errorResponse (new InvalidQueryParameterErrors (errorMessages .toErrorList ()));
177+ } else {
178+ configurationService .bridgeExtractor ().removeBridge (bridgeId );
179+ return Response .ok ().build ();
180+ }
163181 }
164182 }
165183
@@ -171,30 +189,39 @@ public BridgeResourceImpl(
171189 ApiErrorUtils .validateRequiredField (errorMessages , "id" , bridgeId , false );
172190 ApiErrorUtils .validateRequiredFieldRegex (errorMessages , "id" , bridgeId , HiveMQEdgeConstants .ID_REGEX );
173191 ApiErrorUtils .validateRequiredEntity (errorMessages , "command" , command );
174- if (!checkBridgeExists (bridgeId )) {
175- return ErrorResponseUtil .errorResponse (new BridgeNotFoundError (String .format ("Bridge not found by id '%s'" ,
176- bridgeId )));
177- }
178- if (ApiErrorUtils .hasRequestErrors (errorMessages )) {
179- return ErrorResponseUtil .errorResponse (new InvalidQueryParameterErrors (errorMessages .toErrorList ()));
180- } else {
181- switch (command .getCommand ()) {
182- case START :
183- bridgeService .startBridge (bridgeId );
184- break ;
185- case STOP :
186- bridgeService .stopBridgeAndRemoveQueues (bridgeId );
187- break ;
188- case RESTART :
189- bridgeService .restartBridge (bridgeId , getBridge (bridgeId ));
190- break ;
191- }
192192
193- return Response .ok (new StatusTransitionResult ().status (StatusTransitionResult .StatusEnum .PENDING )
194- .type (ApiConstants .BRIDGE_TYPE )
195- .identifier (bridgeId )
196- .callbackTimeoutMillis (ApiConstants .DEFAULT_TRANSITION_WAIT_TIMEOUT )).build ();
193+ // Synchronize to prevent race conditions where bridge is removed between check and action
194+ synchronized (bridgeUpdateLock ) {
195+ if (!checkBridgeExists (bridgeId )) {
196+ return ErrorResponseUtil .errorResponse (new BridgeNotFoundError (String .format ("Bridge not found by id '%s'" ,
197+ bridgeId )));
198+ }
199+ if (ApiErrorUtils .hasRequestErrors (errorMessages )) {
200+ return ErrorResponseUtil .errorResponse (new InvalidQueryParameterErrors (errorMessages .toErrorList ()));
201+ } else {
202+ switch (command .getCommand ()) {
203+ case START :
204+ bridgeService .startBridge (bridgeId );
205+ break ;
206+ case STOP :
207+ bridgeService .stopBridgeAndRemoveQueues (bridgeId );
208+ break ;
209+ case RESTART :
210+ final MqttBridge bridgeConfig = getBridge (bridgeId );
211+ if (bridgeConfig == null ) {
212+ return ErrorResponseUtil .errorResponse (new BridgeNotFoundError (String .format ("Bridge not found by id '%s'" ,
213+ bridgeId )));
214+ }
215+ bridgeService .restartBridge (bridgeId , bridgeConfig );
216+ break ;
217+ }
218+
219+ return Response .ok (new StatusTransitionResult ().status (StatusTransitionResult .StatusEnum .PENDING )
220+ .type (ApiConstants .BRIDGE_TYPE )
221+ .identifier (bridgeId )
222+ .callbackTimeoutMillis (ApiConstants .DEFAULT_TRANSITION_WAIT_TIMEOUT )).build ();
197223
224+ }
198225 }
199226 }
200227
@@ -245,26 +272,33 @@ public BridgeResourceImpl(
245272 final ApiErrorMessages errorMessages = ApiErrorUtils .createErrorContainer ();
246273 ApiErrorUtils .validateRequiredField (errorMessages , "id" , bridgeId , false );
247274 ApiErrorUtils .validateRequiredFieldRegex (errorMessages , "id" , bridgeId , HiveMQEdgeConstants .ID_REGEX );
248- if (!checkBridgeExists (bridgeId )) {
249- return ErrorResponseUtil .errorResponse (new BridgeNotFoundError (String .format ("Bridge not found by id '%s'" ,
250- bridgeId )));
251- }
252- if (ApiErrorUtils .hasRequestErrors (errorMessages )) {
253- return ErrorResponseUtil .errorResponse (new InvalidQueryParameterErrors (errorMessages .toErrorList ()));
254- } else {
255- return Response .ok (getStatusInternal (bridgeId )).build ();
275+
276+ // Synchronize to get consistent view during concurrent modifications
277+ synchronized (bridgeUpdateLock ) {
278+ if (!checkBridgeExists (bridgeId )) {
279+ return ErrorResponseUtil .errorResponse (new BridgeNotFoundError (String .format ("Bridge not found by id '%s'" ,
280+ bridgeId )));
281+ }
282+ if (ApiErrorUtils .hasRequestErrors (errorMessages )) {
283+ return ErrorResponseUtil .errorResponse (new InvalidQueryParameterErrors (errorMessages .toErrorList ()));
284+ } else {
285+ return Response .ok (getStatusInternal (bridgeId )).build ();
286+ }
256287 }
257288 }
258289
259290 @ Override
260291 public @ NotNull Response getBridgesStatus () {
261292 //-- Bridges
262- final ImmutableList .Builder <Status > builder = new ImmutableList .Builder <>();
263- final List <MqttBridge > bridges = configurationService .bridgeExtractor ().getBridges ();
264- for (final MqttBridge bridge : bridges ) {
265- builder .add (getStatusInternal (bridge .getId ()));
293+ // Synchronize to get consistent snapshot during concurrent modifications
294+ synchronized (bridgeUpdateLock ) {
295+ final ImmutableList .Builder <Status > builder = new ImmutableList .Builder <>();
296+ final List <MqttBridge > bridges = configurationService .bridgeExtractor ().getBridges ();
297+ for (final MqttBridge bridge : bridges ) {
298+ builder .add (getStatusInternal (bridge .getId ()));
299+ }
300+ return Response .ok (new StatusList ().items (builder .build ())).build ();
266301 }
267- return Response .ok (new StatusList ().items (builder .build ())).build ();
268302 }
269303
270304 protected @ NotNull Status getStatusInternal (final @ NotNull String bridgeId ) {
0 commit comments