6060import java .util .Base64 ;
6161import java .util .List ;
6262import java .util .concurrent .CompletableFuture ;
63- import java .util .concurrent .ExecutionException ;
63+ import java .util .concurrent .atomic . AtomicBoolean ;
6464
6565import static com .hivemq .adapter .sdk .api .state .ProtocolAdapterState .ConnectionStatus .ERROR ;
66+ import static com .hivemq .adapter .sdk .api .state .ProtocolAdapterState .ConnectionStatus .STATELESS ;
6667import static com .hivemq .edge .adapters .http .config .HttpSpecificAdapterConfig .JSON_MIME_TYPE ;
6768import static com .hivemq .edge .adapters .http .config .HttpSpecificAdapterConfig .PLAIN_MIME_TYPE ;
6869
@@ -73,7 +74,6 @@ public class HttpProtocolAdapter implements BatchPollingProtocolAdapter {
7374 private static final @ NotNull String CONTENT_TYPE_HEADER = "Content-Type" ;
7475 private static final @ NotNull String BASE64_ENCODED_VALUE = "data:%s;base64,%s" ;
7576 private static final @ NotNull String USER_AGENT_HEADER = "User-Agent" ;
76- private static final @ NotNull String RESPONSE_DATA = "httpResponseData" ;
7777
7878 private final @ NotNull ProtocolAdapterInformation adapterInformation ;
7979 private final @ NotNull HttpSpecificAdapterConfig adapterConfig ;
@@ -85,6 +85,7 @@ public class HttpProtocolAdapter implements BatchPollingProtocolAdapter {
8585 private final @ NotNull String adapterId ;
8686 private final @ Nullable ObjectMapper objectMapper ;
8787
88+ private final @ NotNull AtomicBoolean started ;
8889 private volatile @ Nullable HttpClient httpClient = null ;
8990
9091 public HttpProtocolAdapter (
@@ -93,12 +94,17 @@ public HttpProtocolAdapter(
9394 this .adapterId = input .getAdapterId ();
9495 this .adapterInformation = adapterInformation ;
9596 this .adapterConfig = input .getConfig ();
96- this .tags = input .getTags ().stream ().map (tag -> (HttpTag )tag ).toList ();
97+ this .tags = input .getTags ().stream ().map (tag -> (HttpTag ) tag ).toList ();
9798 this .version = input .getVersion ();
9899 this .protocolAdapterState = input .getProtocolAdapterState ();
99100 this .moduleServices = input .moduleServices ();
100101 this .adapterFactories = input .adapterFactories ();
101102 this .objectMapper = new ObjectMapper ();
103+ this .started = new AtomicBoolean (false );
104+ }
105+
106+ private static boolean isSuccessStatusCode (final int statusCode ) {
107+ return statusCode >= 200 && statusCode <= 299 ;
102108 }
103109
104110 @ Override
@@ -110,8 +116,13 @@ public HttpProtocolAdapter(
110116 public void start (
111117 final @ NotNull ProtocolAdapterStartInput input ,
112118 final @ NotNull ProtocolAdapterStartOutput output ) {
119+ if (!started .compareAndSet (false , true )) {
120+ // Already started, idempotent - just return success
121+ output .startedSuccessfully ();
122+ return ;
123+ }
113124 try {
114- protocolAdapterState .setConnectionStatus (ProtocolAdapterState . ConnectionStatus . STATELESS );
125+ protocolAdapterState .setConnectionStatus (STATELESS );
115126 if (httpClient == null ) {
116127 final HttpClient .Builder builder = HttpClient .newBuilder ();
117128 builder .version (HttpClient .Version .HTTP_1_1 )
@@ -124,12 +135,18 @@ public void start(
124135 }
125136 output .startedSuccessfully ();
126137 } catch (final Exception e ) {
138+ started .set (false );
127139 output .failStart (e , "Unable to start http protocol adapter." );
128140 }
129141 }
130142
131143 @ Override
132144 public void stop (final @ NotNull ProtocolAdapterStopInput input , final @ NotNull ProtocolAdapterStopOutput output ) {
145+ if (!started .compareAndSet (true , false )) {
146+ // Already stopped, idempotent - just return success
147+ output .stoppedSuccessfully ();
148+ return ;
149+ }
133150 httpClient = null ;
134151 output .stoppedSuccessfully ();
135152 }
@@ -147,9 +164,7 @@ public void destroy() {
147164 }
148165
149166 @ Override
150- public void poll (
151- final @ NotNull BatchPollingInput pollingInput , final @ NotNull BatchPollingOutput pollingOutput ) {
152-
167+ public void poll (final @ NotNull BatchPollingInput pollingInput , final @ NotNull BatchPollingOutput pollingOutput ) {
153168 final HttpClient httpClient = this .httpClient ;
154169 if (httpClient == null ) {
155170 pollingOutput .fail (new ProtocolAdapterException (), "No response was created, because the client is null." );
@@ -159,46 +174,46 @@ public void poll(
159174 final List <CompletableFuture <HttpData >> pollingFutures =
160175 tags .stream ().map (tag -> pollHttp (httpClient , tag )).toList ();
161176
162- CompletableFuture .allOf (pollingFutures .toArray (new CompletableFuture []{}))
163- .whenComplete ((result , throwable ) -> {
164- if (throwable != null ) {
165- pollingOutput .fail (throwable , "Error while polling tags." );
166- } else {
167- try {
168- for (final CompletableFuture <HttpData > future : pollingFutures ) {
169- final var data = future .get ();
170- // Update connection status to ERROR if HTTP request failed
171- if (!data .isSuccessStatusCode ()) {
172- protocolAdapterState .setConnectionStatus (ERROR );
173- }
174- // FSM manages STATELESS/CONNECTED status automatically
175- if (data .isSuccessStatusCode () ||
176- !adapterConfig .getHttpToMqttConfig ().isHttpPublishSuccessStatusCodeOnly ()) {
177- data .getDataPoints ().forEach (pollingOutput ::addDataPoint );
178- }
179- }
180- pollingOutput .finish ();
181- } catch (final InterruptedException | ExecutionException e ) {
182- pollingOutput .fail (e , "Exception while accessing data of completed future." );
177+ CompletableFuture .allOf (pollingFutures .toArray (new CompletableFuture []{})).whenComplete ((result , throwable ) -> {
178+ if (throwable != null ) {
179+ pollingOutput .fail (throwable , "Error while polling tags." );
180+ } else {
181+ try {
182+ for (final CompletableFuture <HttpData > future : pollingFutures ) {
183+ final var data = future .get ();
184+ // Update connection status to ERROR if HTTP request failed
185+ if (!data .isSuccessStatusCode ()) {
186+ protocolAdapterState .setConnectionStatus (ERROR );
187+ } else {
188+ protocolAdapterState .setConnectionStatus (STATELESS );
189+ }
190+ if (data .isSuccessStatusCode () ||
191+ !adapterConfig .getHttpToMqttConfig ().isHttpPublishSuccessStatusCodeOnly ()) {
192+ data .getDataPoints ().forEach (pollingOutput ::addDataPoint );
183193 }
184194 }
185- });
195+ pollingOutput .finish ();
196+ } catch (final InterruptedException e ) {
197+ Thread .currentThread ().interrupt ();
198+ pollingOutput .fail (e , "Interrupted while accessing data of completed future." );
199+ } catch (final Throwable e ) {
200+ pollingOutput .fail (e , "Exception while accessing data of completed future." );
201+ }
202+ }
203+ });
186204 }
187205
188- private CompletableFuture <HttpData > pollHttp (
206+ private @ NotNull CompletableFuture <HttpData > pollHttp (
189207 final @ NotNull HttpClient httpClient ,
190208 final @ NotNull HttpTag httpTag ) {
191209
192- final HttpRequest .Builder builder = HttpRequest .newBuilder ();
193- final String url = httpTag .getDefinition ().getUrl ();
194210 final HttpTagDefinition tagDef = httpTag .getDefinition ();
195- builder .uri (URI .create (url ));
196211
197- builder .timeout (Duration .ofSeconds (httpTag .getDefinition ().getHttpRequestTimeoutSeconds ()));
212+ final HttpRequest .Builder builder = HttpRequest .newBuilder ();
213+ builder .uri (URI .create (tagDef .getUrl ()));
214+ builder .timeout (Duration .ofSeconds (tagDef .getHttpRequestTimeoutSeconds ()));
198215 builder .setHeader (USER_AGENT_HEADER , String .format ("HiveMQ-Edge; %s" , version ));
199-
200216 tagDef .getHttpHeaders ().forEach (hv -> builder .setHeader (hv .getName (), hv .getValue ()));
201-
202217 switch (tagDef .getHttpRequestMethod ()) {
203218 case GET :
204219 builder .GET ();
@@ -220,31 +235,29 @@ private CompletableFuture<HttpData> pollHttp(
220235 builder .header (CONTENT_TYPE_HEADER , tagDef .getHttpRequestBodyContentType ().getMimeType ());
221236 break ;
222237 default :
223- return CompletableFuture
224- . failedFuture (
225- new IllegalStateException ( "There was an unexpected value present in the request config: " + tagDef .getHttpRequestMethod ()));
238+ return CompletableFuture . failedFuture ( new IllegalStateException (
239+ "There was an unexpected value present in the request config: " +
240+ tagDef .getHttpRequestMethod ()));
226241 }
227-
228- return httpClient
229- .sendAsync (builder .build (), HttpResponse .BodyHandlers .ofString ())
230- .thenApply (httpResponse -> getHttpData (httpResponse , url , httpTag .getName ()));
242+ return httpClient .sendAsync (builder .build (), HttpResponse .BodyHandlers .ofString ())
243+ .thenApply (httpResponse -> getHttpData (httpResponse , tagDef .getUrl (), httpTag .getName ()));
231244 }
232245
233- private @ NotNull HttpData getHttpData (final HttpResponse <String > httpResponse , final String url ,
234- final @ NotNull String tagName ) {
246+ private @ NotNull HttpData getHttpData (
247+ final @ NotNull HttpResponse <String > httpResponse ,
248+ final @ NotNull String url ,
249+ final @ NotNull String tagName ) {
235250 Object payloadData = null ;
236- String responseContentType = null ;
237-
251+ String contentType = null ;
238252 if (isSuccessStatusCode (httpResponse .statusCode ())) {
239253 final String bodyData = httpResponse .body ();
240254 //-- if the content type is json, then apply the JSON to the output data,
241255 //-- else encode using base64 (as we dont know what the content is).
242256 if (bodyData != null ) {
243- responseContentType = httpResponse .headers ().firstValue (CONTENT_TYPE_HEADER ).orElse (null );
244- responseContentType = adapterConfig .getHttpToMqttConfig ().isAssertResponseIsJson () ?
245- JSON_MIME_TYPE :
246- responseContentType ;
247- if (JSON_MIME_TYPE .equals (responseContentType )) {
257+ contentType = httpResponse .headers ().firstValue (CONTENT_TYPE_HEADER ).orElse (null );
258+ contentType =
259+ adapterConfig .getHttpToMqttConfig ().isAssertResponseIsJson () ? JSON_MIME_TYPE : contentType ;
260+ if (JSON_MIME_TYPE .equals (contentType )) {
248261 try {
249262 payloadData = objectMapper .readTree (bodyData );
250263 } catch (final Exception e ) {
@@ -261,23 +274,20 @@ private CompletableFuture<HttpData> pollHttp(
261274 throw new RuntimeException ("unable to parse JSON data from HTTP response" );
262275 }
263276 } else {
264- if (responseContentType == null ) {
265- responseContentType = PLAIN_MIME_TYPE ;
277+ if (contentType == null ) {
278+ contentType = PLAIN_MIME_TYPE ;
266279 }
267- final String base64 =
268- Base64 . getEncoder (). encodeToString ( bodyData . getBytes ( StandardCharsets . UTF_8 ));
269- payloadData = String . format ( BASE64_ENCODED_VALUE , responseContentType , base64 );
280+ payloadData = String . format ( BASE64_ENCODED_VALUE ,
281+ contentType ,
282+ Base64 . getEncoder (). encodeToString ( bodyData . getBytes ( StandardCharsets . UTF_8 )) );
270283 }
271284 }
272285 }
273286
274- final HttpData data = new HttpData (url ,
275- httpResponse .statusCode (),
276- responseContentType ,
277- adapterFactories .dataPointFactory ());
278- //When the body is empty, just include the metadata
287+ final HttpData data =
288+ new HttpData (url , httpResponse .statusCode (), contentType , adapterFactories .dataPointFactory ());
279289 if (payloadData != null ) {
280- data .addDataPoint (tagName , payloadData );
290+ data .addDataPoint (tagName , payloadData ); // when the body is empty, just include the metadata
281291 }
282292 return data ;
283293 }
@@ -294,26 +304,25 @@ public int getMaxPollingErrorsBeforeRemoval() {
294304
295305 @ Override
296306 public void createTagSchema (
297- final @ NotNull TagSchemaCreationInput input , final @ NotNull TagSchemaCreationOutput output ) {
307+ final @ NotNull TagSchemaCreationInput input ,
308+ final @ NotNull TagSchemaCreationOutput output ) {
298309 output .finish (JsonSchema .createJsonSchema ());
299310 }
300311
301- private static boolean isSuccessStatusCode (final int statusCode ) {
302- return statusCode >= 200 && statusCode <= 299 ;
303- }
304-
305- protected @ NotNull SSLContext createTrustAllContext () {
312+ private @ NotNull SSLContext createTrustAllContext () {
306313 try {
307- final SSLContext sslContext = SSLContext .getInstance ("TLS" );
308- final X509ExtendedTrustManager trustManager = new X509ExtendedTrustManager () {
314+ final @ NotNull SSLContext sslContext = SSLContext .getInstance ("TLS" );
315+ final @ NotNull X509ExtendedTrustManager trustManager = new X509ExtendedTrustManager () {
309316 @ Override
310317 public void checkClientTrusted (
311- final X509Certificate @ NotNull [] x509Certificates , final @ NotNull String s ) {
318+ final X509Certificate @ NotNull [] x509Certificates ,
319+ final @ NotNull String s ) {
312320 }
313321
314322 @ Override
315323 public void checkServerTrusted (
316- final X509Certificate @ NotNull [] x509Certificates , final @ NotNull String s ) {
324+ final X509Certificate @ NotNull [] x509Certificates ,
325+ final @ NotNull String s ) {
317326 }
318327
319328 @ Override
0 commit comments