@@ -68,6 +68,7 @@ public class ProtocolAdapterManager {
6868 private final @ NotNull ModuleLoader moduleLoader ;
6969 private final @ NotNull HiveMQEdgeRemoteService remoteService ;
7070 private final @ NotNull EventService eventService ;
71+ private final @ NotNull ProtocolAdapterMetrics protocolAdapterMetrics ;
7172
7273 private final @ NotNull Object lock = new Object ();
7374
@@ -79,22 +80,25 @@ public ProtocolAdapterManager(
7980 final @ NotNull ObjectMapper objectMapper ,
8081 final @ NotNull ModuleLoader moduleLoader ,
8182 final @ NotNull HiveMQEdgeRemoteService remoteService ,
82- final @ NotNull EventService eventService ) {
83+ final @ NotNull EventService eventService ,
84+ final @ NotNull ProtocolAdapterMetrics protocolAdapterMetrics ) {
8385 this .configurationService = configurationService ;
8486 this .metricRegistry = metricRegistry ;
8587 this .moduleServices = moduleServices ;
8688 this .objectMapper = ProtocolAdapterUtils .createProtocolAdapterMapper (objectMapper );
8789 this .moduleLoader = moduleLoader ;
8890 this .remoteService = remoteService ;
8991 this .eventService = eventService ;
92+ this .protocolAdapterMetrics = protocolAdapterMetrics ;
9093 }
9194
95+
9296 @ SuppressWarnings ({"unchecked" , "rawtypes" })
9397 public @ NotNull ListenableFuture <Void > start () {
9498
9599 findAllAdapters ();
96100
97- if (log .isInfoEnabled ()){
101+ if (log .isInfoEnabled ()) {
98102 log .info ("Discovered {} protocol adapter-type(s): [{}]." ,
99103 factoryMap .size (),
100104 factoryMap .values ()
@@ -112,14 +116,13 @@ public ProtocolAdapterManager(
112116 if (allConfigs .size () < 1 ) {
113117 return Futures .immediateFuture (null );
114118 }
115- final ImmutableList .Builder <CompletableFuture <Void >> adapterFutures =
116- ImmutableList .builder ();
119+ final ImmutableList .Builder <CompletableFuture <Void >> adapterFutures = ImmutableList .builder ();
117120
118121 for (Map .Entry <String , Object > configSection : allConfigs .entrySet ()) {
119122 final String adapterType = configSection .getKey ();
120123 final ProtocolAdapterFactory <?> protocolAdapterFactory = getProtocolAdapterFactory (adapterType );
121124 if (protocolAdapterFactory == null ) {
122- if (log .isWarnEnabled ()){
125+ if (log .isWarnEnabled ()) {
123126 log .warn ("Protocol adapter for config {} not found." , adapterType );
124127 }
125128 continue ;
@@ -132,14 +135,17 @@ public ProtocolAdapterManager(
132135 adapterConfigs = List .of ((Map <String , Object >) adapterXmlElement );
133136 } else {
134137 //unknown data structure - continue (bad config)
135- if (log .isWarnEnabled ()){
138+ if (log .isWarnEnabled ()) {
136139 log .warn ("Found invalid configuration element for adapter {}, skipping." , adapterType );
137140 }
138141 continue ;
139142 }
140143
141144 for (Map <String , Object > adapterConfig : adapterConfigs ) {
142145 ProtocolAdapterWrapper instance = createAdapterInstance (adapterType , adapterConfig );
146+ protocolAdapterMetrics .increaseProtocolAdapterMetric (instance .getAdapter ()
147+ .getProtocolAdapterInformation ()
148+ .getProtocolId ());
143149 CompletableFuture <Void > future = start (instance .getAdapter ());
144150 adapterFutures .add (future );
145151 }
@@ -160,7 +166,7 @@ private void findAllAdapters() {
160166 try {
161167 final ProtocolAdapterFactory <?> protocolAdapterFactory =
162168 facroryClass .getDeclaredConstructor ().newInstance ();
163- if (log .isDebugEnabled ()){
169+ if (log .isDebugEnabled ()) {
164170 log .debug ("Discovered protocol adapter implementation {}." , facroryClass .getName ());
165171 }
166172 final ProtocolAdapterInformation information = protocolAdapterFactory .getInformation ();
@@ -181,7 +187,7 @@ public ProtocolAdapterFactory getProtocolAdapterFactory(final @NotNull String pr
181187 public CompletableFuture <Void > start (final @ NotNull String protocolAdapterId ) {
182188 Preconditions .checkNotNull (protocolAdapterId );
183189 Optional <ProtocolAdapterWrapper > adapterOptional = getAdapterById (protocolAdapterId );
184- if (!adapterOptional .isPresent ()){
190+ if (!adapterOptional .isPresent ()) {
185191 return CompletableFuture .failedFuture (null );
186192 } else {
187193 return start (adapterOptional .get ().getAdapter ());
@@ -191,7 +197,7 @@ public CompletableFuture<Void> start(final @NotNull String protocolAdapterId) {
191197 public CompletableFuture <Void > stop (final @ NotNull String protocolAdapterId ) {
192198 Preconditions .checkNotNull (protocolAdapterId );
193199 Optional <ProtocolAdapterWrapper > adapterOptional = getAdapterById (protocolAdapterId );
194- if (!adapterOptional .isPresent ()){
200+ if (!adapterOptional .isPresent ()) {
195201 return CompletableFuture .failedFuture (null );
196202 } else {
197203 return stop (adapterOptional .get ().getAdapter ());
@@ -200,7 +206,7 @@ public CompletableFuture<Void> stop(final @NotNull String protocolAdapterId) {
200206
201207 public CompletableFuture <Void > start (final @ NotNull ProtocolAdapter protocolAdapter ) {
202208 Preconditions .checkNotNull (protocolAdapter );
203- if (log .isInfoEnabled ()){
209+ if (log .isInfoEnabled ()) {
204210 log .info ("Starting protocol-adapter \" {}\" ." , protocolAdapter .getId ());
205211 }
206212 CompletableFuture <ProtocolAdapterStartOutput > startFuture ;
@@ -210,15 +216,15 @@ public CompletableFuture<Void> start(final @NotNull ProtocolAdapter protocolAdap
210216 } else {
211217 startFuture = protocolAdapter .start (new ProtocolAdapterStartInputImpl (protocolAdapter ), output );
212218 }
213- return startFuture .<Void > thenApply (input -> {
219+ return startFuture .<Void >thenApply (input -> {
214220 if (!output .startedSuccessfully ) {
215- handleStartupError (protocolAdapter , output );
221+ handleStartupError (protocolAdapter , output );
216222 } else if (output .message != null ) {
217- if (log .isTraceEnabled ()){
218- log .trace ("Protocol-adapter \" {}\" started: {}." ,
219- protocolAdapter .getId (), output .message );
223+ if (log .isTraceEnabled ()) {
224+ log .trace ("Protocol-adapter \" {}\" started: {}." , protocolAdapter .getId (), output .message );
220225 }
221- HiveMQEdgeRemoteEvent adapterCreatedEvent = new HiveMQEdgeRemoteEvent (HiveMQEdgeRemoteEvent .EVENT_TYPE .ADAPTER_STARTED );
226+ HiveMQEdgeRemoteEvent adapterCreatedEvent =
227+ new HiveMQEdgeRemoteEvent (HiveMQEdgeRemoteEvent .EVENT_TYPE .ADAPTER_STARTED );
222228 adapterCreatedEvent .addUserData ("adapterType" ,
223229 protocolAdapter .getProtocolAdapterInformation ().getProtocolId ());
224230 remoteService .fireUsageEvent (adapterCreatedEvent );
@@ -233,7 +239,7 @@ public CompletableFuture<Void> start(final @NotNull ProtocolAdapter protocolAdap
233239
234240 public CompletableFuture <Void > stop (final @ NotNull ProtocolAdapter protocolAdapter ) {
235241 Preconditions .checkNotNull (protocolAdapter );
236- if (log .isInfoEnabled ()){
242+ if (log .isInfoEnabled ()) {
237243 log .info ("Stopping protocol-adapter \" {}\" ." , protocolAdapter .getId ());
238244 }
239245 CompletableFuture <Void > stopFuture ;
@@ -243,29 +249,28 @@ public CompletableFuture<Void> stop(final @NotNull ProtocolAdapter protocolAdapt
243249 stopFuture = protocolAdapter .stop ();
244250 }
245251 stopFuture .thenApply (input -> {
246- if (log .isTraceEnabled ()){
252+ if (log .isTraceEnabled ()) {
247253 log .trace ("Protocol-adapter \" {}\" stopped." , protocolAdapter .getId ());
248254 }
249255 return null ;
250256 }).exceptionally (throwable -> {
251- if (log .isWarnEnabled ()){
252- log .warn ("Protocol-adapter \" {}\" was unable to stop cleanly" ,
253- protocolAdapter .getId (), throwable );
257+ if (log .isWarnEnabled ()) {
258+ log .warn ("Protocol-adapter \" {}\" was unable to stop cleanly" , protocolAdapter .getId (), throwable );
254259 }
255260 return null ;
256261 });
257262 return stopFuture ;
258263 }
259264
260- protected void handleStartupError (final @ NotNull ProtocolAdapter protocolAdapter , @ NotNull final ProtocolAdapterStartOutputImpl output ){
261- if (log .isWarnEnabled ()){
265+ protected void handleStartupError (
266+ final @ NotNull ProtocolAdapter protocolAdapter , @ NotNull final ProtocolAdapterStartOutputImpl output ) {
267+ if (log .isWarnEnabled ()) {
262268 log .warn ("Protocol-adapter \" {}\" could not be started, reason: {}" ,
263- protocolAdapter .getId (),
264- output .message , output .getThrowable ());
269+ protocolAdapter .getId (), output .message , output .getThrowable ());
265270 }
266- HiveMQEdgeRemoteEvent adapterCreatedEvent = new HiveMQEdgeRemoteEvent ( HiveMQEdgeRemoteEvent . EVENT_TYPE . ADAPTER_ERROR );
267- adapterCreatedEvent . addUserData ( "adapterType" ,
268- protocolAdapter .getProtocolAdapterInformation ().getProtocolId ());
271+ HiveMQEdgeRemoteEvent adapterCreatedEvent =
272+ new HiveMQEdgeRemoteEvent ( HiveMQEdgeRemoteEvent . EVENT_TYPE . ADAPTER_ERROR );
273+ adapterCreatedEvent . addUserData ( "adapterType" , protocolAdapter .getProtocolAdapterInformation ().getProtocolId ());
269274 remoteService .fireUsageEvent (adapterCreatedEvent );
270275 }
271276
@@ -281,50 +286,54 @@ public synchronized CompletableFuture<Void> addAdapter(
281286 if (getAdapterById (adapterId ).isPresent ()) {
282287 throw new IllegalArgumentException ("adapter already exists by id '" + adapterId + "'" );
283288 }
284-
289+ protocolAdapterMetrics . increaseProtocolAdapterMetric ( adapterType );
285290 return addAdapterAndStartInRuntime (adapterType , config );
286291 }
287292
288293 public boolean deleteAdapter (final String id ) {
289294 Preconditions .checkNotNull (id );
290295 Optional <ProtocolAdapterWrapper > adapterInstance = getAdapterById (id );
291296 if (adapterInstance .isPresent ()) {
297+ protocolAdapterMetrics .decreaseProtocolAdapterMetric (adapterInstance .get ()
298+ .getAdapterInformation ()
299+ .getProtocolId ());
292300 adapterInstance .get ().getAdapter ().stop ();
293301 if (protocolAdapters .remove (id ) != null ) {
294302 try {
295- synchronized (lock ){
303+ synchronized (lock ) {
296304 //ensure the instance releases any hard state
297305 adapterInstance .get ().getAdapter ().destroy ();
298306 Map <String , Object > mainMap =
299307 configurationService .protocolAdapterConfigurationService ().getAllConfigs ();
300- List <Map > adapterList = getAdapterListForType (adapterInstance .get ().getAdapterInformation ().getProtocolId ());
308+ List <Map > adapterList =
309+ getAdapterListForType (adapterInstance .get ().getAdapterInformation ().getProtocolId ());
301310 if (adapterList != null ) {
302- if (adapterList .removeIf (instance -> id .equals (instance .get ("id" )))){
311+ if (adapterList .removeIf (instance -> id .equals (instance .get ("id" )))) {
303312 configurationService .protocolAdapterConfigurationService ().setAllConfigs (mainMap );
304313 }
305314 }
306315 }
307316 return true ;
308317 } finally {
309- eventService .fireEvent (
310- eventBuilder ( Event . SEVERITY . WARN , adapterInstance .get ().getAdapter ()).
311- withMessage ( String . format ( "Adapter \" %s\" was deleted from the system permanently." ,
312- adapterInstance .get ().getAdapter ().getId ())).build ());
318+ eventService .fireEvent (eventBuilder ( Event . SEVERITY . WARN ,
319+ adapterInstance .get ().getAdapter ()).withMessage ( String . format (
320+ "Adapter \" %s\" was deleted from the system permanently." ,
321+ adapterInstance .get ().getAdapter ().getId ())).build ());
313322 }
314323 }
315324 }
316325 return false ;
317326 }
318327
319- public boolean updateAdapter (final @ NotNull String adapterId ,
320- final @ NotNull Map <String , Object > config ) {
328+ public boolean updateAdapter (final @ NotNull String adapterId , final @ NotNull Map <String , Object > config ) {
321329 Preconditions .checkNotNull (adapterId );
322330 Optional <ProtocolAdapterWrapper > adapterInstance = getAdapterById (adapterId );
323331 if (adapterInstance .isPresent ()) {
324332 ProtocolAdapterWrapper oldInstance = adapterInstance .get ();
325333 deleteAdapter (oldInstance .getAdapter ().getId ());
326334 addAdapter (oldInstance .getAdapter ().getProtocolAdapterInformation ().getProtocolId (),
327- oldInstance .getAdapter ().getId (), config );
335+ oldInstance .getAdapter ().getId (),
336+ config );
328337 return true ;
329338 }
330339 return false ;
@@ -353,7 +362,8 @@ public Optional<ProtocolAdapterInformation> getAdapterTypeById(final String type
353362 return protocolAdapters ;
354363 }
355364
356- protected ProtocolAdapterWrapper createAdapterInstance (final String adapterType , final @ NotNull Map <String , Object > config ){
365+ protected ProtocolAdapterWrapper createAdapterInstance (
366+ final String adapterType , final @ NotNull Map <String , Object > config ) {
357367
358368 ProtocolAdapterFactory <?> protocolAdapterFactory = getProtocolAdapterFactory (adapterType );
359369 final ClassLoader contextClassLoader = Thread .currentThread ().getContextClassLoader ();
@@ -375,9 +385,10 @@ protected ProtocolAdapterWrapper createAdapterInstance(final String adapterType,
375385 }
376386 }
377387
378- protected CompletableFuture <Void > addAdapterAndStartInRuntime (final String adapterType , final @ NotNull Map <String , Object > config ){
388+ protected CompletableFuture <Void > addAdapterAndStartInRuntime (
389+ final String adapterType , final @ NotNull Map <String , Object > config ) {
379390
380- synchronized (lock ){
391+ synchronized (lock ) {
381392 ProtocolAdapterWrapper instance = createAdapterInstance (adapterType , config );
382393
383394 //-- Write the protocol adapter back to the main config (through the proxy)
@@ -390,7 +401,7 @@ protected CompletableFuture<Void> addAdapterAndStartInRuntime(final String adapt
390401 }
391402 }
392403
393- protected List <Map > getAdapterListForType (final String adapterType ){
404+ protected List <Map > getAdapterListForType (final String adapterType ) {
394405
395406 Map <String , Object > mainMap = configurationService .protocolAdapterConfigurationService ().getAllConfigs ();
396407 List <Map > adapterList = null ;
@@ -409,6 +420,7 @@ protected List<Map> getAdapterListForType(final String adapterType){
409420 return adapterList ;
410421 }
411422
423+
412424 public static class ProtocolAdapterInputImpl <T extends CustomConfig > implements ProtocolAdapterInput <T > {
413425 private final @ NotNull T configObject ;
414426 private final @ NotNull MetricRegistry metricRegistry ;
@@ -478,7 +490,8 @@ private ProtocolAdapterStartInputImpl(final @NotNull ProtocolAdapter protocolAda
478490 }
479491 }
480492
481- protected Event .Builder eventBuilder (final @ NotNull Event .SEVERITY severity , final @ NotNull ProtocolAdapter adapter ){
493+ protected Event .Builder eventBuilder (
494+ final @ NotNull Event .SEVERITY severity , final @ NotNull ProtocolAdapter adapter ) {
482495 Preconditions .checkNotNull (severity );
483496 Preconditions .checkNotNull (adapter );
484497 Event .Builder builder = new Event .Builder ();
0 commit comments