11/*
2- * Copyright 2023 -present HiveMQ GmbH
2+ * Copyright 2024 -present HiveMQ GmbH
33 *
44 * Licensed under the Apache License, Version 2.0 (the "License");
55 * you may not use this file except in compliance with the License.
1818import com .fasterxml .jackson .databind .ObjectMapper ;
1919import com .fasterxml .jackson .databind .node .ObjectNode ;
2020import com .hivemq .adapter .sdk .api .ProtocolAdapterInformation ;
21+ import com .hivemq .adapter .sdk .api .config .PollingContext ;
2122import com .hivemq .adapter .sdk .api .model .*;
2223import com .hivemq .adapter .sdk .api .polling .PollingInput ;
2324import com .hivemq .adapter .sdk .api .polling .PollingOutput ;
2425import com .hivemq .adapter .sdk .api .polling .PollingProtocolAdapter ;
2526import com .hivemq .adapter .sdk .api .state .ProtocolAdapterState ;
27+ import com .hivemq .adapter .sdk .api .tag .Tag ;
2628import com .hivemq .edge .adapters .postgresql .config .PostgreSQLAdapterConfig ;
27- import com .hivemq .edge .adapters .postgresql .config .PostgreSQLPollingContext ;
29+ import com .hivemq .edge .adapters .postgresql .config .PostgreSQLAdapterTagDefinition ;
2830import org .jetbrains .annotations .NotNull ;
31+ import org .slf4j .Logger ;
32+ import org .slf4j .LoggerFactory ;
2933
30- import java .sql .*;
34+ import java .sql .Connection ;
35+ import java .sql .ResultSet ;
36+ import java .sql .ResultSetMetaData ;
37+ import java .sql .SQLException ;
3138import java .util .ArrayList ;
3239import java .util .List ;
3340import java .util .Objects ;
3441
35- public class PostgreSQLPollingProtocolAdapter implements PollingProtocolAdapter <PostgreSQLPollingContext > {
42+
43+ public class PostgreSQLPollingProtocolAdapter implements PollingProtocolAdapter {
44+ private static final @ NotNull Logger log = LoggerFactory .getLogger (PostgreSQLPollingProtocolAdapter .class );
3645 private final @ NotNull PostgreSQLAdapterConfig adapterConfig ;
3746 private final @ NotNull ProtocolAdapterInformation adapterInformation ;
3847 private final @ NotNull ProtocolAdapterState protocolAdapterState ;
39- private final @ NotNull List <PostgreSQLPollingContext > pollingContext ;
48+ private final @ NotNull PostgreSQLHelpers postgreSQLHelpers = new PostgreSQLHelpers ();
49+ private final @ NotNull String adapterId ;
50+ private final @ NotNull List <Tag > tags ;
4051 private Connection databaseConnection ;
41- private String compiledUri ;
42- private String username ;
43- private String password ;
52+ private final String compiledUri ;
53+ private final String username ;
54+ private final String password ;
4455
4556 public PostgreSQLPollingProtocolAdapter (final @ NotNull ProtocolAdapterInformation adapterInformation , final @ NotNull ProtocolAdapterInput <PostgreSQLAdapterConfig > input ) {
57+ this .adapterId = input .getAdapterId ();
4658 this .adapterInformation = adapterInformation ;
4759 this .adapterConfig = input .getConfig ();
4860 this .protocolAdapterState = input .getProtocolAdapterState ();
49- this .pollingContext = adapterConfig .getPollingContexts ();
61+ this .tags = input .getTags ();
62+ this .compiledUri = String .format ("jdbc:postgresql://%s:%s/%s" , adapterConfig .getServer (), adapterConfig .getPort (), adapterConfig .getDatabase ());
63+ this .username = adapterConfig .getUsername ();
64+ this .password = adapterConfig .getPassword ();
5065 }
5166
5267 @ Override
5368 public @ NotNull String getId () {
54- return adapterConfig . getId () ;
69+ return adapterId ;
5570 }
5671
5772 @ Override
@@ -64,11 +79,8 @@ public void start(final @NotNull ProtocolAdapterStartInput input, final @NotNull
6479
6580 /* Test connection to the database when starting the adapter. */
6681 try {
67- compiledUri = String .format ("jdbc:postgresql://%s:%s/%s" , adapterConfig .getServer (), adapterConfig .getPort (), adapterConfig .getDatabase ());
68- username = adapterConfig .getUsername ();
69- password = adapterConfig .getPassword ();
70- databaseConnection = connectDatabase ();
71-
82+ log .debug ("Starting connection to the database instance" );
83+ databaseConnection = postgreSQLHelpers .connectDatabase (compiledUri , username , password );
7284 if (databaseConnection .isValid (0 )){
7385 output .startedSuccessfully ();
7486 protocolAdapterState .setConnectionStatus (ProtocolAdapterState .ConnectionStatus .CONNECTED );
@@ -84,69 +96,94 @@ public void start(final @NotNull ProtocolAdapterStartInput input, final @NotNull
8496
8597 @ Override
8698 public void stop (final @ NotNull ProtocolAdapterStopInput protocolAdapterStopInput , final @ NotNull ProtocolAdapterStopOutput protocolAdapterStopOutput ) {
87- protocolAdapterState .setConnectionStatus (ProtocolAdapterState .ConnectionStatus .DISCONNECTED );
99+ try {
100+ log .debug ("Closing database connection" );
101+ databaseConnection .close ();
102+ } catch (SQLException e ) {
103+ throw new RuntimeException (e );
104+ }
88105 protocolAdapterStopOutput .stoppedSuccessfully ();
89106 }
90107
108+
91109 @ Override
92110 public @ NotNull ProtocolAdapterInformation getProtocolAdapterInformation () {
93111 return adapterInformation ;
94112 }
95113
96114 @ Override
97- public void poll (final @ NotNull PollingInput <PostgreSQLPollingContext > pollingInput , final @ NotNull PollingOutput pollingOutput ) {
98- ResultSet result ;
99- ObjectMapper om = new ObjectMapper ();
100-
101- /* Rework the query to protect against big data volumes (basically removing possible LIMIT XX in the query and replacing with defined limit in the sub setting. */
102- String query = removeLimitFromQuery (Objects .requireNonNull (pollingInput .getPollingContext ().getQuery ()), "LIMIT" ) + " LIMIT " + pollingInput .getPollingContext ().getRowLimit () + ";" ;
115+ public void poll (final @ NotNull PollingInput pollingInput , final @ NotNull PollingOutput pollingOutput ) {
116+ log .debug ("Getting polling context" );
117+ final PollingContext pollingContext = pollingInput .getPollingContext ();
103118
104119 /* Connect to the database and execute the query */
105120 try {
121+ log .debug ("Checking database connection state" );
106122 if (!databaseConnection .isValid (0 )){
107- databaseConnection = connectDatabase ();
108- }
109- result = (databaseConnection .createStatement ()).executeQuery (query );
110- ArrayList <ObjectNode > resultObject = new ArrayList <>();
111- ResultSetMetaData resultSetMD = result .getMetaData ();
112- while (result .next ()) {
113- int numColumns = resultSetMD .getColumnCount ();
114- ObjectNode node = om .createObjectNode ();
115- for (int i =1 ; i <=numColumns ; i ++) {
116- String column_name = resultSetMD .getColumnName (i );
117- node .put (column_name , result .getString (column_name ));
118- }
119- /* Publish datapoint with a single line if split is required */
120- if (pollingInput .getPollingContext ().getSpiltLinesInIndividualMessages ()){
121- pollingOutput .addDataPoint ("queryResult" , node );
122- } else {
123- resultObject .add (node );
124- }
125- }
126-
127- /* Publish datapoint with all lines if no split is required */
128- if (!pollingInput .getPollingContext ().getSpiltLinesInIndividualMessages ()) {
129- pollingOutput .addDataPoint ("queryResult" , resultObject );
123+ log .debug ("Connecting to the database" );
124+ databaseConnection = postgreSQLHelpers .connectDatabase (compiledUri , username , password );
130125 }
131- databaseConnection .close ();
132126
127+ log .debug ("Handling tags for the adapter" );
128+ tags .stream ()
129+ .filter (tag -> tag .getName ().equals (pollingContext .getTagName ()))
130+ .findFirst ()
131+ .ifPresentOrElse (
132+ def -> {
133+ try {
134+ ResultSet result ;
135+ ObjectMapper om = new ObjectMapper ();
136+ log .debug ("Getting tag definition" );
137+ /* Get the tag definition (Query, RowLimit and Split Lines)*/
138+ PostgreSQLAdapterTagDefinition definition = (PostgreSQLAdapterTagDefinition ) def .getDefinition ();
139+
140+ log .debug ("Cleaning query" );
141+ /* Rework the query to protect against big data volumes (basically removing possible LIMIT XX in the query and replacing with defined limit in the sub setting). */
142+ String query = postgreSQLHelpers .removeLimitFromQuery (Objects .requireNonNull (definition .getQuery ()), "LIMIT" ) + " LIMIT " + definition .getRowLimit () + ";" ;
143+ log .debug ("Cleaned Tag Query : {}" , query );
144+
145+ /* Execute query and handle result */
146+ result = (databaseConnection .createStatement ()).executeQuery (query );
147+ assert result != null ;
148+ ArrayList <ObjectNode > resultObject = new ArrayList <>();
149+ ResultSetMetaData resultSetMD = result .getMetaData ();
150+ while (result .next ()) {
151+ int numColumns = resultSetMD .getColumnCount ();
152+ ObjectNode node = om .createObjectNode ();
153+ for (int i =1 ; i <=numColumns ; i ++) {
154+ String column_name = resultSetMD .getColumnName (i );
155+ node .put (column_name , result .getString (column_name ));
156+ }
157+
158+ /* Publish datapoint with a single line if split is required */
159+ if (definition .getSpiltLinesInIndividualMessages ()){
160+ log .debug ("Splitting lines in multiple messages" );
161+ pollingOutput .addDataPoint ("queryResult" , node );
162+ } else {
163+ resultObject .add (node );
164+ }
165+ }
166+
167+ /* Publish datapoint with all lines if no split is required */
168+ if (!definition .getSpiltLinesInIndividualMessages ()) {
169+ log .debug ("Publishing all lines in a single message" );
170+ pollingOutput .addDataPoint ("queryResult" , resultObject );
171+ }
172+ } catch (final Exception e ) {
173+ pollingOutput .fail (e , null );
174+ }
175+ },
176+ () -> pollingOutput .fail ("Polling for PostgreSQL protocol adapter failed because the used tag '" +
177+ pollingInput .getPollingContext ().getTagName () +
178+ "' was not found. For the polling to work the tag must be created via REST API or the UI." )
179+ );
133180 } catch (SQLException e ) {
134- try {
135- databaseConnection .close ();
136- } catch (SQLException ex ) {
137- throw new RuntimeException (ex );
138- }
181+ log .debug (e .getMessage ());
139182 throw new RuntimeException (e );
140183 }
141-
142184 pollingOutput .finish ();
143185 }
144186
145- @ Override
146- public @ NotNull List <PostgreSQLPollingContext > getPollingContexts () {
147- return pollingContext ;
148- }
149-
150187 @ Override
151188 public int getPollingIntervalMillis () {
152189 return adapterConfig .getPollingIntervalMillis ();
@@ -157,23 +194,4 @@ public int getMaxPollingErrorsBeforeRemoval() {
157194 return adapterConfig .getMaxPollingErrorsBeforeRemoval ();
158195 }
159196
160- // Database connection method
161- public Connection connectDatabase () throws SQLException {
162- return DriverManager .getConnection (compiledUri , username , password );
163- }
164-
165- // Query cleaning method
166- public String removeLimitFromQuery (final @ NotNull String query , final @ NotNull String toRemove ) {
167- var words = query .split (" " );
168- StringBuilder newStr = new StringBuilder ();
169- var wasPreviousWord = false ;
170- for (String word : words ) {
171- if (!Objects .equals (word , toRemove ) && !wasPreviousWord ) {
172- newStr .append (word ).append (" " );
173- } else {
174- wasPreviousWord = !wasPreviousWord ;
175- }
176- }
177- return newStr .toString ();
178- }
179197}
0 commit comments