Skip to content

Commit e96f5bb

Browse files
authored
fix: Fix locking for pulse config (#1216)
1 parent 82a16a2 commit e96f5bb

File tree

8 files changed

+32
-36
lines changed

8 files changed

+32
-36
lines changed

hivemq-edge/src/main/java/com/hivemq/api/resources/impl/PulseApiImpl.java

Lines changed: 18 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -120,8 +120,8 @@ public PulseApiImpl(
120120
if (optionalResponse.isPresent()) {
121121
return optionalResponse.get();
122122
}
123-
final PulseEntity existingPulseEntity = pulseExtractor.getPulseEntity();
124-
synchronized (existingPulseEntity.getLock()) {
123+
synchronized (pulseExtractor.getLock()) {
124+
final PulseEntity existingPulseEntity = pulseExtractor.getPulseEntity();
125125
final UUID id = combiner.getId();
126126
final @NotNull Optional<DataCombiner> optionalDataCombiner = assetMappingExtractor.getCombinerById(id);
127127
if (optionalDataCombiner.isPresent()) {
@@ -192,8 +192,8 @@ public PulseApiImpl(
192192
if (mappingId == null) {
193193
return ErrorResponseUtil.errorResponse(new InvalidManagedAssetMappingIdError("null"));
194194
}
195-
final PulseEntity existingPulseEntity = pulseExtractor.getPulseEntity();
196-
synchronized (existingPulseEntity.getLock()) {
195+
synchronized (pulseExtractor.getLock()) {
196+
final PulseEntity existingPulseEntity = pulseExtractor.getPulseEntity();
197197
final PulseAgentAssets assets =
198198
PulseAgentAssets.fromPersistence(existingPulseEntity.getPulseAssetsEntity());
199199
final OptionalInt optionalAssetIndex = IntStream.range(0, assets.size())
@@ -231,8 +231,8 @@ public PulseApiImpl(
231231
if (optionalResponse.isPresent()) {
232232
return optionalResponse.get();
233233
}
234-
final PulseEntity existingPulseEntity = pulseExtractor.getPulseEntity();
235-
synchronized (existingPulseEntity.getLock()) {
234+
synchronized (pulseExtractor.getLock()) {
235+
final PulseEntity existingPulseEntity = pulseExtractor.getPulseEntity();
236236
final @NotNull Optional<DataCombiner> optionalDataCombiner =
237237
assetMappingExtractor.getCombinerById(combinerId);
238238
if (optionalDataCombiner.isEmpty()) {
@@ -275,8 +275,8 @@ public PulseApiImpl(
275275
if (optionalResponse.isPresent()) {
276276
return optionalResponse.get();
277277
}
278-
final PulseEntity existingPulseEntity = pulseExtractor.getPulseEntity();
279-
synchronized (existingPulseEntity.getLock()) {
278+
synchronized (pulseExtractor.getLock()) {
279+
final PulseEntity existingPulseEntity = pulseExtractor.getPulseEntity();
280280
final PulseAgentAssets assets =
281281
PulseAgentAssets.fromPersistence(existingPulseEntity.getPulseAssetsEntity());
282282
final OptionalInt optionalAssetIndex = IntStream.range(0, assets.size())
@@ -329,8 +329,7 @@ public PulseApiImpl(
329329

330330
@Override
331331
public @NotNull Response getAssetMapper(final @NotNull UUID combinerId) {
332-
final PulseEntity existingPulseEntity = pulseExtractor.getPulseEntity();
333-
synchronized (existingPulseEntity.getLock()) {
332+
synchronized (pulseExtractor.getLock()) {
334333
final @NotNull Optional<DataCombiner> optionalDataCombiner =
335334
assetMappingExtractor.getCombinerById(combinerId);
336335
if (optionalDataCombiner.isEmpty()) {
@@ -342,8 +341,7 @@ public PulseApiImpl(
342341

343342
@Override
344343
public @NotNull Response getAssetMapperInstructions(final @NotNull UUID combinerId, final @NotNull UUID mappingId) {
345-
final PulseEntity existingPulseEntity = pulseExtractor.getPulseEntity();
346-
synchronized (existingPulseEntity.getLock()) {
344+
synchronized (pulseExtractor.getLock()) {
347345
final @NotNull Optional<DataCombiner> optionalDataCombiner =
348346
assetMappingExtractor.getCombinerById(combinerId);
349347
if (optionalDataCombiner.isEmpty()) {
@@ -362,8 +360,7 @@ public PulseApiImpl(
362360

363361
@Override
364362
public @NotNull Response getAssetMapperMappings(final @NotNull UUID combinerId) {
365-
final PulseEntity existingPulseEntity = pulseExtractor.getPulseEntity();
366-
synchronized (existingPulseEntity.getLock()) {
363+
synchronized (pulseExtractor.getLock()) {
367364
final @NotNull Optional<DataCombiner> optionalDataCombiner =
368365
assetMappingExtractor.getCombinerById(combinerId);
369366
if (optionalDataCombiner.isEmpty()) {
@@ -380,8 +377,7 @@ public PulseApiImpl(
380377

381378
@Override
382379
public @NotNull Response getAssetMappers() {
383-
final PulseEntity existingPulseEntity = pulseExtractor.getPulseEntity();
384-
synchronized (existingPulseEntity.getLock()) {
380+
synchronized (pulseExtractor.getLock()) {
385381
final List<DataCombiner> allCombiners = assetMappingExtractor.getAllCombiners();
386382
final List<Combiner> combiners = allCombiners.stream().map(DataCombiner::toModel).toList();
387383
final CombinerList combinerList = new CombinerList().items(combiners);
@@ -395,8 +391,8 @@ public PulseApiImpl(
395391
if (optionalResponse.isPresent()) {
396392
return optionalResponse.get();
397393
}
398-
final PulseEntity pulseEntity = pulseExtractor.getPulseEntity();
399-
synchronized (pulseEntity.getLock()) {
394+
synchronized (pulseExtractor.getLock()) {
395+
final PulseEntity pulseEntity = pulseExtractor.getPulseEntity();
400396
final PulseAgentAssets assets = PulseAgentAssets.fromPersistence(pulseEntity.getPulseAssetsEntity());
401397
final ManagedAssetList managedAssetList = PulseAgentAssetsConverter.INSTANCE.toRestEntity(assets);
402398
return Response.ok(managedAssetList).build();
@@ -422,8 +418,8 @@ public PulseApiImpl(
422418
if (optionalResponse.isPresent()) {
423419
return optionalResponse.get();
424420
}
425-
final PulseEntity existingPulseEntity = pulseExtractor.getPulseEntity();
426-
synchronized (existingPulseEntity.getLock()) {
421+
synchronized (pulseExtractor.getLock()) {
422+
final PulseEntity existingPulseEntity = pulseExtractor.getPulseEntity();
427423
final @NotNull Optional<DataCombiner> optionalDataCombiner =
428424
assetMappingExtractor.getCombinerById(combiner.getId());
429425
if (optionalDataCombiner.isEmpty()) {
@@ -488,8 +484,8 @@ public PulseApiImpl(
488484
if (optionalResponse.isPresent()) {
489485
return optionalResponse.get();
490486
}
491-
final PulseEntity existingPulseEntity = pulseExtractor.getPulseEntity();
492-
synchronized (existingPulseEntity.getLock()) {
487+
synchronized (pulseExtractor.getLock()) {
488+
final PulseEntity existingPulseEntity = pulseExtractor.getPulseEntity();
493489
final PulseAgentAssets assets =
494490
PulseAgentAssets.fromPersistence(existingPulseEntity.getPulseAssetsEntity());
495491
final OptionalInt optionalAssetIndex = IntStream.range(0, assets.size())

hivemq-edge/src/main/java/com/hivemq/configuration/entity/pulse/PulseEntity.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,6 @@
3232
@XmlRootElement(name = "pulse")
3333
@XmlAccessorType(XmlAccessType.NONE)
3434
public class PulseEntity implements EntityValidatable {
35-
private final Object lock = new Object();
36-
3735
@JsonProperty(value = "managedAssets", required = true)
3836
@XmlElement(name = "managed-assets", required = true)
3937
private @NotNull PulseAssetsEntity pulseAssetsEntity;
@@ -46,10 +44,6 @@ public PulseEntity(final @NotNull PulseAssetsEntity pulseAssetsEntity) {
4644
this.pulseAssetsEntity = pulseAssetsEntity;
4745
}
4846

49-
public @NotNull Object getLock() {
50-
return lock;
51-
}
52-
5347
@Override
5448
public boolean equals(final @Nullable Object o) {
5549
if (!(o instanceof final PulseEntity that)) {

hivemq-edge/src/main/java/com/hivemq/configuration/reader/PulseExtractor.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,8 @@ public class PulseExtractor implements ReloadableExtractor<PulseEntity, PulseEnt
3030
private final static Consumer<PulseEntity> DEFAULT_CONSUMER =
3131
pulseEntity -> log.debug("No consumer registered for Pulse configuration changes.");
3232
private final @NotNull ConfigFileReaderWriter configFileReaderWriter;
33+
private final Object lock = new Object();
3334
private @NotNull Consumer<PulseEntity> consumer;
34-
3535
private @NotNull PulseEntity pulseEntity;
3636

3737
public PulseExtractor(final @NotNull ConfigFileReaderWriter configFileReaderWriter) {
@@ -40,6 +40,10 @@ public PulseExtractor(final @NotNull ConfigFileReaderWriter configFileReaderWrit
4040
this.pulseEntity = new PulseEntity();
4141
}
4242

43+
public @NotNull Object getLock() {
44+
return lock;
45+
}
46+
4347
public synchronized @NotNull PulseEntity getPulseEntity() {
4448
return pulseEntity;
4549
}

hivemq-edge/src/main/java/com/hivemq/pulse/messaging/AssetMapperManager.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -105,8 +105,8 @@ public void start() {
105105

106106
public void refresh(final @NotNull List<DataCombiner> assetMappers) {
107107
LOGGER.info("Refreshing Pulse Asset Mappers");
108-
final PulseEntity pulseEntity = pulseExtractor.getPulseEntity();
109-
synchronized (pulseEntity.getLock()) {
108+
synchronized (pulseExtractor.getLock()) {
109+
final PulseEntity pulseEntity = pulseExtractor.getPulseEntity();
110110
final Set<UUID> oldAssetMapperIdSet = idToAssetMapperTaskMap.keySet();
111111
// Let's filter out non-streaming asset mappers.
112112
final Map<String, PulseAssetEntity> assetEntityMap = PulseAgentAssetUtils.toAssetEntityMap(pulseEntity);
@@ -202,7 +202,7 @@ public void refresh(final @NotNull List<DataCombiner> assetMappers) {
202202
}
203203

204204
public @NotNull CompletableFuture<Void> stopAll() {
205-
synchronized (pulseExtractor.getPulseEntity().getLock()) {
205+
synchronized (pulseExtractor.getLock()) {
206206
return CompletableFuture.runAsync(() -> {
207207
idToAssetMapperTaskMap.values()
208208
.stream()
@@ -245,7 +245,7 @@ private void internalDeleteDataCombiner(final @NotNull UUID id) {
245245

246246
private @NotNull CompletableFuture<Void> stop(
247247
final @NotNull AssetMapperManager.AssetMapperTask assetMapperTask) {
248-
synchronized (pulseExtractor.getPulseEntity().getLock()) {
248+
synchronized (pulseExtractor.getLock()) {
249249
// stopping is fast no reason for async
250250
assetMapperTask.dataCombiningRuntimes().forEach(DataCombiningRuntime::stop);
251251
}

hivemq-edge/src/main/java/com/hivemq/pulse/utils/PulseAgentAssetUtils.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,8 @@ public static void resolveDiff(
6060
final @NotNull AssetMappingExtractor assetMappingExtractor,
6161
final @NotNull PulseExtractor pulseExtractor,
6262
final @NotNull List<Asset> remoteAssets) {
63-
final PulseEntity oldPulseEntity = pulseExtractor.getPulseEntity();
64-
synchronized (oldPulseEntity.getLock()) {
63+
synchronized (pulseExtractor.getLock()) {
64+
final PulseEntity oldPulseEntity = pulseExtractor.getPulseEntity();
6565
final List<PulseAssetEntity> localAssets = oldPulseEntity.getPulseAssetsEntity().getPulseAssetEntities();
6666
final List<PulseAssetEntity> newLocalAssets = new ArrayList<>();
6767
final Map<String, Asset> remoteAssetMap = remoteAssets.stream()

hivemq-edge/src/test/java/com/hivemq/api/resources/impl/pulse/AbstractPulseApiImplTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ public abstract class AbstractPulseApiImplTest {
134134
public void setUp() {
135135
when(systemInformation.isConfigWriteable()).thenReturn(true);
136136
when(pulseExtractor.getPulseEntity()).thenReturn(pulseEntity);
137-
when(pulseEntity.getLock()).thenReturn(new Object());
137+
when(pulseExtractor.getLock()).thenReturn(new Object());
138138
when(pulseEntity.getPulseAssetsEntity()).thenReturn(pulseAssetsEntity);
139139
when(assetProviderRegistry.getAssetProviders()).thenReturn(Set.of(assetProvider));
140140
when(statusProviderRegistry.getStatusProviders()).thenReturn(Set.of(statusProvider));

hivemq-edge/src/test/java/com/hivemq/pulse/messaging/AssetMapperManagerTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,7 @@ public void setUp() {
133133
assetMappingExtractor,
134134
pulseExtractor);
135135
pulseEntity = new PulseEntity();
136+
when(pulseExtractor.getLock()).thenReturn(new Object());
136137
when(pulseExtractor.getPulseEntity()).thenReturn(pulseEntity);
137138
}
138139

hivemq-edge/src/test/java/com/hivemq/pulse/utils/PulseAgentAssetUtilsTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ public class PulseAgentAssetUtilsTest {
6565

6666
@BeforeEach
6767
public void setUp() {
68+
when(pulseExtractor.getLock()).thenReturn(new Object());
6869
pulseEntityArgumentCaptor = ArgumentCaptor.forClass(PulseEntity.class);
6970
localAssets = IntStream.range(0, STATUSES.size()).mapToObj(this::createLocalAsset).toList();
7071
remoteAssets = IntStream.range(0, STATUSES.size()).mapToObj(this::createRemoteAsset).toList();

0 commit comments

Comments
 (0)