Skip to content

Commit 0e53804

Browse files
committed
refactor: Let startWriting() return CompletableFuture<Boolean> instead of boolean
1 parent 30565fe commit 0e53804

14 files changed

+457
-112
lines changed

ext/hivemq-edge-openapi-2025.19-SNAPSHOT.yaml

Lines changed: 74 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ info:
1414
## OpenAPI
1515
HiveMQ's REST API provides an OpenAPI 3.0 schema definition that can imported into popular API tooling (e.g. Postman) or can be used to generate client-code for multiple programming languages.
1616
title: HiveMQ Edge REST API
17-
version: 2025.19-SNAPSHOT
17+
version: 2025.14-SNAPSHOT
1818
x-logo:
1919
url: https://www.hivemq.com/img/svg/hivemq-bee.svg
2020
tags:
@@ -94,6 +94,9 @@ tags:
9494
These endpoints can be used to retrieve states of clients for the Data Hub.
9595
9696
name: Data Hub - State
97+
- description: |
98+
These endpoints can be used to retrieve the different elements of the Edge topology
99+
name: Domain
97100
- description: |
98101
These endpoints can be used to manage Pulse and its assets.
99102
name: Pulse
@@ -3697,7 +3700,7 @@ paths:
36973700
summary: Add a new Adapter
36983701
tags:
36993702
- Protocol Adapters
3700-
/api/v1/management/protocol-adapters/northboundMappings:
3703+
/api/v1/management/protocol-adapters/mappings/northboundMappings:
37013704
get:
37023705
description: Get all northbound mappings
37033706
operationId: get-northboundMappings
@@ -3706,12 +3709,13 @@ paths:
37063709
content:
37073710
application/json:
37083711
schema:
3709-
$ref: '#/components/schemas/NorthboundMappingList'
3712+
$ref: '#/components/schemas/NorthboundMappingOwnerList'
37103713
description: Success
3711-
summary: Get the mappings for northbound messages.
3714+
summary: Get all the northbound mappings.
37123715
tags:
37133716
- Protocol Adapters
3714-
/api/v1/management/protocol-adapters/southboundMappings:
3717+
- Domain
3718+
/api/v1/management/protocol-adapters/mappings/southboundMappings:
37153719
get:
37163720
description: Get all southbound mappings.
37173721
operationId: get-southboundMappings
@@ -3720,11 +3724,12 @@ paths:
37203724
content:
37213725
application/json:
37223726
schema:
3723-
$ref: '#/components/schemas/SouthboundMappingList'
3727+
$ref: '#/components/schemas/SouthboundMappingOwnerList'
37243728
description: Success
3725-
summary: Get all southbound mappings.
3729+
summary: Get all the southbound mappings.
37263730
tags:
37273731
- Protocol Adapters
3732+
- Domain
37283733
/api/v1/management/protocol-adapters/status:
37293734
get:
37303735
description: Obtain the details.
@@ -3796,11 +3801,12 @@ paths:
37963801
node: ns=2;i=test2
37973802
name: tag2
37983803
schema:
3799-
$ref: '#/components/schemas/DomainTagList'
3804+
$ref: '#/components/schemas/DomainTagOwnerList'
38003805
description: Success
3801-
summary: Get the list of all domain tags created in this Edge instance
3806+
summary: Get the list of all tags created in this Edge instance
38023807
tags:
38033808
- Protocol Adapters
3809+
- Domain
38043810
/api/v1/management/protocol-adapters/tags/{tagName}:
38053811
get:
38063812
description: Get a domain tag created in this Edge instance
@@ -4060,6 +4066,7 @@ paths:
40604066
summary: Get the list of all topic filters created in this Edge instance
40614067
tags:
40624068
- Topic Filters
4069+
- Domain
40634070
post:
40644071
description: Add a new topic filter.
40654072
operationId: add-topicFilters
@@ -4583,6 +4590,7 @@ paths:
45834590
operationId: get-managed-assets
45844591
tags:
45854592
- Pulse
4593+
- Domain
45864594
responses:
45874595
'200':
45884596
content:
@@ -6450,6 +6458,44 @@ components:
64506458
$ref: '#/components/schemas/DomainTag'
64516459
required:
64526460
- items
6461+
NorthboundMappingOwnerList:
6462+
type: object
6463+
properties:
6464+
items:
6465+
type: array
6466+
description: List of result items that are returned by this endpoint
6467+
items:
6468+
type: object
6469+
required:
6470+
- adapterId
6471+
- mapping
6472+
properties:
6473+
adapterId:
6474+
type: string
6475+
description: The id of the adapter owning the mapping
6476+
mapping:
6477+
$ref: '#/components/schemas/NorthboundMapping'
6478+
required:
6479+
- items
6480+
SouthboundMappingOwnerList:
6481+
type: object
6482+
properties:
6483+
items:
6484+
type: array
6485+
description: List of result items that are returned by this endpoint
6486+
items:
6487+
type: object
6488+
required:
6489+
- adapterId
6490+
- mapping
6491+
properties:
6492+
adapterId:
6493+
type: string
6494+
description: The id of the adapter owning the mapping
6495+
mapping:
6496+
$ref: '#/components/schemas/SouthboundMapping'
6497+
required:
6498+
- items
64536499
TagSchema:
64546500
type: object
64556501
properties:
@@ -6458,6 +6504,25 @@ components:
64586504
protocolId:
64596505
type: string
64606506
description: The id assigned to the protocol adapter type
6507+
DomainTagOwnerList:
6508+
type: object
6509+
properties:
6510+
items:
6511+
type: array
6512+
description: List of result items that are returned by this endpoint
6513+
items:
6514+
type: object
6515+
required:
6516+
- adapterId
6517+
- mapping
6518+
properties:
6519+
adapterId:
6520+
type: string
6521+
description: The id of the adapter owning the tag
6522+
mapping:
6523+
$ref: '#/components/schemas/DomainTag'
6524+
required:
6525+
- items
64616526
ProtocolAdapterCategory:
64626527
type: object
64636528
description: The category of the adapter

hivemq-edge/src/main/java/com/hivemq/bootstrap/factories/WritingServiceProvider.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.slf4j.LoggerFactory;
3131

3232
import java.util.List;
33+
import java.util.concurrent.CompletableFuture;
3334

3435
@Singleton
3536
public class WritingServiceProvider {
@@ -78,12 +79,13 @@ public boolean writingEnabled() {
7879

7980

8081
@Override
81-
public boolean startWriting(
82+
public CompletableFuture<Boolean> startWriting(
8283
final @NotNull WritingProtocolAdapter writingProtocolAdapter,
8384
final @NotNull ProtocolAdapterMetricsService protocolAdapterMetricsService,
8485
final @NotNull List<InternalWritingContext> southboundMappings) {
8586
log.warn("No bidirectional module is currently installed. Writing to PLCs is currently not supported.");
86-
return true; }
87+
return CompletableFuture.completedFuture(true);
88+
}
8789

8890
@Override
8991
public void stopWriting(

hivemq-edge/src/main/java/com/hivemq/protocols/InternalProtocolAdapterWritingService.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,17 @@
1919
import com.hivemq.adapter.sdk.api.services.ProtocolAdapterWritingService;
2020
import com.hivemq.adapter.sdk.api.writing.WritingContext;
2121
import com.hivemq.adapter.sdk.api.writing.WritingProtocolAdapter;
22+
import com.hivemq.persistence.SingleWriterService;
2223
import org.jetbrains.annotations.NotNull;
2324

2425
import java.util.List;
2526
import java.util.concurrent.CompletableFuture;
27+
import java.util.concurrent.Future;
2628

2729
public interface InternalProtocolAdapterWritingService extends ProtocolAdapterWritingService {
2830

2931

30-
boolean startWriting(
32+
@NotNull CompletableFuture<Boolean> startWriting(
3133
@NotNull WritingProtocolAdapter writingProtocolAdapter,
3234
@NotNull ProtocolAdapterMetricsService protocolAdapterMetricsService,
3335
@NotNull List<InternalWritingContext> writingContexts);

hivemq-edge/src/main/java/com/hivemq/protocols/ProtocolAdapterWrapper.java

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import java.util.List;
4949
import java.util.Optional;
5050
import java.util.concurrent.CompletableFuture;
51+
import java.util.concurrent.ExecutionException;
5152
import java.util.concurrent.atomic.AtomicBoolean;
5253
import java.util.concurrent.atomic.AtomicReference;
5354
import java.util.function.Function;
@@ -200,21 +201,25 @@ private void stopAfterFailedStart() {
200201
}
201202
}
202203

203-
private Optional<Throwable> attemptStartingConsumers(final boolean writingEnabled, final @NotNull EventService eventService) {
204+
private @NotNull Optional<Throwable> attemptStartingConsumers(final boolean writingEnabled, final @NotNull EventService eventService) {
204205
try {
205206
//Adapter started successfully, now start the consumers
206207
createAndSubscribeTagConsumer();
207208
startPolling(protocolAdapterPollingService, eventService);
208209

209-
if(writingEnabled && isWriting()) {
210+
if (writingEnabled && isWriting()) {
210211
final var started = new AtomicBoolean(false);
211212
protocolAdapterState.setConnectionStatusListener(status -> {
212-
if(status == ProtocolAdapterState.ConnectionStatus.CONNECTED) {
213-
if(started.compareAndSet(false, true)) {
214-
if(startWriting(protocolAdapterWritingService)) {
215-
log.info("Successfully started adapter with id {}", adapter.getId());
216-
} else {
217-
log.error("Protocol adapter start failed as data hub is not available.");
213+
if (status == ProtocolAdapterState.ConnectionStatus.CONNECTED) {
214+
if (started.compareAndSet(false, true)) {
215+
try {
216+
if (startWriting(protocolAdapterWritingService).get()) {
217+
log.info("Successfully started adapter with id {}", adapter.getId());
218+
} else {
219+
log.error("Protocol adapter start failed as data hub is not available.");
220+
}
221+
} catch (final Exception e) {
222+
log.error("Failed to start writing for adapter with id {}.", adapter.getId(), e);
218223
}
219224
}
220225
}
@@ -443,7 +448,7 @@ private void stopPolling(
443448
}
444449
}
445450

446-
private @NotNull boolean startWriting(final @NotNull InternalProtocolAdapterWritingService protocolAdapterWritingService) {
451+
private @NotNull CompletableFuture<Boolean> startWriting(final @NotNull InternalProtocolAdapterWritingService protocolAdapterWritingService) {
447452
log.debug("Start writing for protocol adapter with id '{}'", getId());
448453

449454
final var southboundMappings = getSouthboundMappings();

hivemq-edge/src/main/java/com/hivemq/protocols/fsm/ProtocolAdapterWrapper.java renamed to hivemq-edge/src/main/java/com/hivemq/protocols/fsm/ProtocolAdapterInstance.java

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,13 @@
1818

1919
import org.jetbrains.annotations.NotNull;
2020

21-
public class ProtocolAdapterWrapper {
21+
public class ProtocolAdapterInstance {
2222
protected @NotNull ProtocolAdapterState state;
2323
protected @NotNull ProtocolAdapterConnectionState connectionState;
2424

25-
public ProtocolAdapterWrapper() {
26-
this.state = ProtocolAdapterState.Stopped;
27-
this.connectionState = ProtocolAdapterConnectionState.Closed;
25+
public ProtocolAdapterInstance() {
26+
connectionState = ProtocolAdapterConnectionState.Closed;
27+
state = ProtocolAdapterState.Stopped;
2828
}
2929

3030
public @NotNull ProtocolAdapterState getState() {
@@ -35,15 +35,18 @@ public ProtocolAdapterWrapper() {
3535
return connectionState;
3636
}
3737

38-
public @NotNull ProtocolAdapterTransitionStatus transitionTo(final @NotNull ProtocolAdapterState protocolAdapterState) {
39-
final ProtocolAdapterTransitionResult result = state.transition(protocolAdapterState, this);
40-
state = result.state();
41-
// Handle error (logging, throwing exception, etc.)
42-
switch (result.status()) {
43-
default -> {
44-
// TODO
38+
public synchronized @NotNull ProtocolAdapterTransitionResponse transitionTo(final @NotNull ProtocolAdapterState newState) {
39+
final ProtocolAdapterTransitionResponse response = state.transition(newState, this);
40+
if (response.status() == ProtocolAdapterTransitionStatus.Success) {
41+
this.state = response.state();
42+
} else {
43+
// Handle error (logging, throwing exception, etc.)
44+
switch (response.status()) {
45+
default -> {
46+
// TODO
47+
}
4548
}
4649
}
47-
return result.status();
50+
return response;
4851
}
4952
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
/*
2+
* Copyright 2019-present HiveMQ GmbH
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.hivemq.protocols.fsm;
18+
19+
public class ProtocolAdapterOperator {
20+
}

0 commit comments

Comments
 (0)