Skip to content

Commit a4a9423

Browse files
authored
Fixed calls to REST-API being blocked (#1052)
* Fixed calls to REST-API being blocked * Test relied on an exception which doesn't exist anymore
1 parent fef5ec2 commit a4a9423

File tree

5 files changed

+83
-80
lines changed

5 files changed

+83
-80
lines changed

hivemq-edge/src/main/java/com/hivemq/configuration/reader/ProtocolAdapterExtractor.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import java.util.Optional;
3030
import java.util.Set;
3131
import java.util.concurrent.CopyOnWriteArraySet;
32+
import java.util.concurrent.Executors;
3233
import java.util.concurrent.atomic.AtomicBoolean;
3334
import java.util.function.Consumer;
3435
import java.util.stream.Collectors;

hivemq-edge/src/main/java/com/hivemq/protocols/ProtocolAdapterManager.java

Lines changed: 71 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
import java.util.concurrent.ConcurrentHashMap;
5656
import java.util.concurrent.ExecutionException;
5757
import java.util.concurrent.ExecutorService;
58+
import java.util.concurrent.Executors;
5859
import java.util.concurrent.Future;
5960
import java.util.concurrent.atomic.AtomicBoolean;
6061
import java.util.function.Function;
@@ -83,7 +84,7 @@ public class ProtocolAdapterManager {
8384
private final @NotNull NorthboundConsumerFactory northboundConsumerFactory;
8485
private final @NotNull TagManager tagManager;
8586
private final @NotNull ProtocolAdapterExtractor protocolAdapterConfig;
86-
private final @NotNull AtomicBoolean refreshing = new AtomicBoolean(false);
87+
private final @NotNull ExecutorService executorService;
8788

8889
@Inject
8990
public ProtocolAdapterManager(
@@ -113,6 +114,8 @@ public ProtocolAdapterManager(
113114
this.northboundConsumerFactory = northboundConsumerFactory;
114115
this.tagManager = tagManager;
115116
this.protocolAdapterConfig = protocolAdapterConfig;
117+
this.executorService = Executors.newSingleThreadExecutor();
118+
Runtime.getRuntime().addShutdownHook(new Thread(executorService::shutdown));
116119
protocolAdapterWritingService.addWritingChangedCallback(() -> protocolAdapterFactoryManager.writingEnabledChanged(
117120
protocolAdapterWritingService.writingEnabled()));
118121
}
@@ -216,90 +219,86 @@ public void start() {
216219
}
217220

218221
public void refresh(final @NotNull List<ProtocolAdapterEntity> configs) {
219-
if(refreshing.compareAndSet(false, true)) {
220-
try {
221-
log.info("Refreshing adapters");
222+
executorService.submit(() -> {
223+
log.info("Refreshing adapters");
222224

223-
final Map<String, ProtocolAdapterConfig> protocolAdapterConfigs = configs
224-
.stream()
225-
.map(configConverter::fromEntity)
226-
.collect(Collectors.toMap(ProtocolAdapterConfig::getAdapterId, Function.identity()));
225+
final Map<String, ProtocolAdapterConfig> protocolAdapterConfigs = configs
226+
.stream()
227+
.map(configConverter::fromEntity)
228+
.collect(Collectors.toMap(ProtocolAdapterConfig::getAdapterId, Function.identity()));
227229

228-
final List<String> loadListOfAdapterNames = new ArrayList<>(protocolAdapterConfigs.keySet());
230+
final List<String> loadListOfAdapterNames = new ArrayList<>(protocolAdapterConfigs.keySet());
229231

230-
final List<String> adaptersToBeDeleted = new ArrayList<>(protocolAdapters.keySet());
231-
adaptersToBeDeleted.removeAll(loadListOfAdapterNames);
232+
final List<String> adaptersToBeDeleted = new ArrayList<>(protocolAdapters.keySet());
233+
adaptersToBeDeleted.removeAll(loadListOfAdapterNames);
232234

233-
final List<String> adaptersToBeCreated = new ArrayList<>(loadListOfAdapterNames);
234-
adaptersToBeCreated.removeAll(protocolAdapters.keySet());
235+
final List<String> adaptersToBeCreated = new ArrayList<>(loadListOfAdapterNames);
236+
adaptersToBeCreated.removeAll(protocolAdapters.keySet());
235237

236-
final List<String> adaptersToBeUpdated = new ArrayList<>(protocolAdapters.keySet());
237-
adaptersToBeUpdated.removeAll(adaptersToBeCreated);
238-
adaptersToBeUpdated.removeAll(adaptersToBeDeleted);
238+
final List<String> adaptersToBeUpdated = new ArrayList<>(protocolAdapters.keySet());
239+
adaptersToBeUpdated.removeAll(adaptersToBeCreated);
240+
adaptersToBeUpdated.removeAll(adaptersToBeDeleted);
239241

240-
final List<String> failedAdapters = new ArrayList<>();
242+
final List<String> failedAdapters = new ArrayList<>();
241243

242-
adaptersToBeDeleted.forEach(name -> {
243-
try {
244-
log.debug("Deleting adapter '{}'", name);
245-
stop(name, true).whenComplete((ignored, t) -> deleteAdapterInternal(name)).get();
246-
} catch (final InterruptedException | ExecutionException e) {
247-
failedAdapters.add(name);
248-
log.error("Failed deleting adapter {}", name, e);
249-
}
250-
});
244+
adaptersToBeDeleted.forEach(name -> {
245+
try {
246+
log.debug("Deleting adapter '{}'", name);
247+
stop(name, true).whenComplete((ignored, t) -> {
248+
deleteAdapterInternal(name);
249+
}).get();
250+
} catch (final InterruptedException | ExecutionException e) {
251+
failedAdapters.add(name);
252+
log.error("Failed deleting adapter {}", name, e);
253+
}
254+
});
251255

252-
adaptersToBeCreated.forEach(name -> {
253-
try {
254-
log.debug("Creating adapter '{}'", name);
255-
start(createAdapterInternal(protocolAdapterConfigs.get(name), versionProvider.getVersion())).get();
256-
} catch (final InterruptedException | ExecutionException e) {
257-
failedAdapters.add(name);
258-
log.error("Failed adding adapter {}", name, e);
259-
}
260-
});
256+
adaptersToBeCreated.forEach(name -> {
257+
try {
258+
log.debug("Creating adapter '{}'", name);
259+
start(createAdapterInternal(protocolAdapterConfigs.get(name), versionProvider.getVersion())).get();
260+
} catch (final InterruptedException | ExecutionException e) {
261+
failedAdapters.add(name);
262+
log.error("Failed adding adapter {}", name, e);
263+
}
264+
});
261265

262-
adaptersToBeUpdated.forEach(name -> {
263-
try {
264-
if(!protocolAdapterConfigs.get(name).equals(knownConfigs.get(name))) {
265-
log.debug("Updating adapter '{}'", name);
266-
stop(name, true)
267-
.thenApply(v -> {
268-
deleteAdapterInternal(name);
269-
return null;
270-
})
271-
.thenCompose(ignored ->
272-
start(
273-
createAdapterInternal(protocolAdapterConfigs.get(name),
274-
versionProvider.getVersion())))
275-
.get();
276-
} else {
277-
log.debug("Not-updating adapter '{}' since the config is unchanged", name);
278-
}
279-
} catch (final InterruptedException | ExecutionException e) {
280-
failedAdapters.add(name);
281-
log.error("Failed updating adapter {}", name, e);
266+
adaptersToBeUpdated.forEach(name -> {
267+
try {
268+
if(!protocolAdapterConfigs.get(name).equals(knownConfigs.get(name))) {
269+
log.debug("Updating adapter '{}'", name);
270+
stop(name, true)
271+
.thenApply(v -> {
272+
deleteAdapterInternal(name);
273+
return null;
274+
})
275+
.thenCompose(ignored ->
276+
start(
277+
createAdapterInternal(protocolAdapterConfigs.get(name),
278+
versionProvider.getVersion())))
279+
.get();
280+
} else {
281+
log.debug("Not-updating adapter '{}' since the config is unchanged", name);
282282
}
283+
} catch (final InterruptedException | ExecutionException e) {
284+
failedAdapters.add(name);
285+
log.error("Failed updating adapter {}", name, e);
286+
}
283287

284-
});
288+
});
285289

286-
if (failedAdapters.isEmpty()) {
287-
eventService.configurationEvent()
288-
.withSeverity(Event.SEVERITY.INFO)
289-
.withMessage("Configuration has been successfully updated")
290-
.fire();
291-
} else {
292-
eventService.configurationEvent()
293-
.withSeverity(Event.SEVERITY.CRITICAL)
294-
.withMessage("Reloading of configuration failed")
295-
.fire();
296-
}
297-
} finally {
298-
refreshing.set(false);
290+
if (failedAdapters.isEmpty()) {
291+
eventService.configurationEvent()
292+
.withSeverity(Event.SEVERITY.INFO)
293+
.withMessage("Configuration has been successfully updated")
294+
.fire();
295+
} else {
296+
eventService.configurationEvent()
297+
.withSeverity(Event.SEVERITY.CRITICAL)
298+
.withMessage("Reloading of configuration failed")
299+
.fire();
299300
}
300-
} else {
301-
log.warn("Refresh already in progress, skipping this refresh call");
302-
}
301+
});
303302
}
304303

305304
//legacy handling, hardcoded here, to not add legacy stuff into the adapter-sdk

hivemq-edge/src/main/java/com/hivemq/protocols/ProtocolAdapterWrapper.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,7 @@ public ProtocolAdapterWrapper(
136136
.thenApply(v -> {
137137
log.error("Error starting adapter with id {}", adapter.getId(), t);
138138
setRuntimeStatus(ProtocolAdapterState.RuntimeStatus.STOPPED);
139+
setErrorConnectionStatus(t, t.getMessage());
139140
startFutureRef.get().complete(false);
140141
startFutureRef.set(null);
141142
return false;

hivemq-edge/src/test/java/com/hivemq/protocols/ProtocolAdapterManagerTest.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -182,13 +182,15 @@ void test_startWriting_adapterFailedStart_resourcesCleanedUp() throws Exception{
182182
}
183183

184184
@Test
185-
void test_startWriting_eventServiceFailedStart_resourcesCleanedUp() {
185+
void test_startWriting_eventServiceFailedStart_resourcesCleanedUp() throws Exception {
186+
187+
final EventBuilder eventBuilder = new EventBuilderImpl(mock());
188+
when(eventService.createAdapterEvent(anyString(), anyString())).thenReturn(eventBuilder);
189+
186190
when(protocolAdapterWritingService.writingEnabled()).thenReturn(true);
187191
when(protocolAdapterWritingService.startWriting(any(),
188192
any(),
189193
any())).thenReturn(CompletableFuture.completedFuture(null));
190-
when(eventService.createAdapterEvent(anyString(),
191-
anyString())).thenThrow(new RuntimeException("we failed start"));
192194
when(protocolAdapterWritingService.stopWriting(any(),
193195
any())).thenReturn(CompletableFuture.completedFuture(null));
194196

@@ -203,7 +205,7 @@ void test_startWriting_eventServiceFailedStart_resourcesCleanedUp() {
203205
northboundConsumerFactory,
204206
tagManager);
205207

206-
assertThrows(ExecutionException.class, () -> protocolAdapterManager.start(adapterWrapper).get());
208+
protocolAdapterManager.start(adapterWrapper).get();
207209

208210
assertEquals(ProtocolAdapterState.RuntimeStatus.STOPPED, adapterWrapper.getRuntimeStatus());
209211
verify(protocolAdapterWritingService).stopWriting(eq((WritingProtocolAdapter) adapterWrapper.getAdapter()),

modules/hivemq-edge-module-plc4x/src/main/java/com/hivemq/edge/adapters/plc4x/impl/Plc4xConnection.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -91,16 +91,16 @@ protected void initConnection() throws Plc4xException {
9191
plcConnection = CompletableFuture.supplyAsync(() -> {
9292
try {
9393
return Optional.of(plcDriverManager.getConnectionManager().getConnection(connectionString));
94-
} catch (Throwable e) {
94+
} catch (final Throwable e) {
9595
log.info("Error encountered connecting to external device", e);
9696
}
9797
return Optional.<PlcConnection>empty();
9898
})
9999
.get(2_000, TimeUnit.MILLISECONDS)
100-
.orElseThrow(() -> new Plc4xException("Unable to connect to device."));
101-
} catch (InterruptedException | ExecutionException | TimeoutException e) {
100+
.orElseThrow(() -> new Plc4xException("Error encountered connecting to external device"));
101+
} catch (final Throwable e) {
102102
log.error("Error encountered connecting to external device", e);
103-
throw new Plc4xException(e);
103+
throw new Plc4xException("Error encountered connecting to external device");
104104
}
105105
}
106106
}

0 commit comments

Comments
 (0)