Skip to content

Commit aedd967

Browse files
authored
Sprint 26 master (#45)
* Add Rebound * Add new TimeStamp Provider * Create Execute Custom Query action
1 parent 3c1a0ef commit aedd967

19 files changed

+1512
-11
lines changed

CHANGELOG.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,11 @@
11
# JDBC Component
2+
## 2.3.2 (October 21, 2019)
3+
4+
* Add rebound mechanism in case of deadlocks for actions: Insert, UpsertByPK, DeleteByPK
5+
6+
## 2.4.0 (october 17, 2019)
7+
8+
* Add `Custom Query` action
29

310
## 2.3.1 (September 30, 2019)
411

README.md

Lines changed: 62 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
This is an open source component for working with object-relational database management systems on [elastic.io platform](http://www.elastic.io "elastic.io platform").
55

66
### Completeness Matrix
7-
![JDBC Component Completeness Matrix](https://user-images.githubusercontent.com/16806832/65326373-f6f20680-dbb9-11e9-9ad2-0900f68fd6e3.png)
7+
![JDBC Component Completeness Matrix](https://user-images.githubusercontent.com/22715422/67289390-38dad900-f4e7-11e9-9a45-1c7775c9c7d5.png)
88

99
[JDBC Component Completeness Matrix](https://docs.google.com/spreadsheets/d/1sZr9ydJbMK8v-TguctmFDiqgjRKcrpbdj4CeFuZEkQU/edit?usp=sharing)
1010

@@ -117,10 +117,56 @@ The format of ``Start Polling From (optional)`` field should be like ``yyyy-mm-d
117117

118118
*Please Note: Component Snapshot will not be overwritten in Real-Time flows due to platform behaviour, so we strongly recommend to use Get Rows Polling trigger in Keen Flows only*
119119

120+
#### Input fields description
121+
![image](https://user-images.githubusercontent.com/16806832/67293348-f5836900-f4ec-11e9-8e6a-e91b9417ff9d.png)
122+
123+
##### Tables List
124+
125+
Dropdown list with available table names, required field
126+
127+
##### Timestamp (or similar) Column
128+
129+
Dropdown list with available Column names, that have a type like `java.sql.Date` or `java.sql.Timestamp`, required field
130+
131+
##### Start Polling From (optional)
132+
133+
Optional field, indicates the beginning time to start polling from (defaults to the current time)
134+
135+
120136
### SELECT trigger (Deprecated)
121137
This action exists in JDBC component only for backward compatibility. New [**Select trigger**](#select-trigger) is recommended to use.
122138

123139
## Actions
140+
### Execute custom query action
141+
Action to execute custom SQL query from provided request string.
142+
143+
**Note:** SQL request will be executed according to chosen database JDBC specification.
144+
145+
Execution result returns as array of objects. If request contains multiple sql statements - them will execute inside one transaction.
146+
If one of statements fails, transaction will be rollbacked.
147+
148+
#### Input fields description
149+
150+
As input metadata, you will get one field named `query` to provide request string
151+
152+
#### Query Samples:
153+
154+
Select:
155+
```sql
156+
SELECT name, size FROM stars
157+
```
158+
159+
Update:
160+
```sql
161+
INSERT INTO stars values (1,'Taurus', '2015-02-19 10:10:10.0', 123, 5, 'true', '2015-02-19')
162+
```
163+
164+
Posgresql batch multiple statements request:
165+
```sql
166+
DELETE FROM stars WHERE id = 1;
167+
UPDATE stars SET radius = 5 WHERE id = 2;
168+
```
169+
124170
### Select action
125171
![image](https://user-images.githubusercontent.com/40201204/43592439-39ec5738-967e-11e8-8632-3655b08982d3.png)
126172
The action will execute an [SQL](https://en.wikipedia.org/wiki/SQL "SQL") query that can return multiple results, it has limitations on the query and suited only for SELECT type of queries.
@@ -175,7 +221,7 @@ As an input metadata you will get a Primary Key field to provide the data inside
175221
The action will execute ``INSERT`` command into the table from ``Table`` dropdown list the values specified in the body.
176222

177223
#### List of Expected Config fields
178-
224+
* `Enable Rebound` if `Yes` in case of deadlocks rebound message using Sailor rebound mechanism, number of rebound can be specified via environment variable: `ELASTICIO_REBOUND_LIMIT` recommended value 3
179225
#### Input fields description
180226
##### Table
181227

@@ -199,6 +245,7 @@ As output metadata, you will get execution insert result like:
199245
![image](https://user-images.githubusercontent.com/40201204/43592505-5b6bbfe8-967e-11e8-845e-2ce8ac707357.png)
200246
The action will execute delete query from a ``Table`` dropdown field, as criteria can be used only [PRIMARY KEY](https://en.wikipedia.org/wiki/Primary_key "PRIMARY KEY"). The action returns count of affected rows.
201247
Checkbox ``Don't throw Error on an Empty Result`` allows to emit an empty response, otherwise you will get an error on empty response.
248+
`Enable Rebound` if `Yes` in case of deadlocks rebound message using Sailor rebound mechanism, number of rebound can be specified via environment variable: `ELASTICIO_REBOUND_LIMIT` recommended value 3
202249
#### Input fields description
203250
![image](https://user-images.githubusercontent.com/40201204/43644579-f593d1c8-9737-11e8-9b97-ee9e575a19f7.png)
204251
As an input metadata you will get a Primary Key field to provide the data inside as a clause value.
@@ -260,19 +307,23 @@ The action will execute ``SELECT`` command from a ``Tables`` dropdown field, as
260307
5. Specify input data (field with red asterisk is Primary key), and click "Continue"
261308
![image](https://user-images.githubusercontent.com/16806832/44981854-83fcfa00-af7c-11e8-9ef2-8c06e77fed1e.png)
262309

263-
6. Retrieving sample
310+
6. Enable rebound mechanism if needed
311+
![image](https://user-images.githubusercontent.com/18464641/67211608-b76e4280-f423-11e9-85f6-f7ec58cc24f1.png)
312+
313+
7. Retrieving sample
264314
![image](https://user-images.githubusercontent.com/16806832/44983059-86f9e980-af80-11e8-8178-77e463488c7a.png)
265315

266-
7. Retrieve sample result
316+
8. Retrieve sample result
267317
![image](https://user-images.githubusercontent.com/16806832/44982952-2ec2e780-af80-11e8-98b1-58c3adbc15b9.png)
268318

269-
8. Click "Continue"
319+
9. Click "Continue"
270320
![image](https://user-images.githubusercontent.com/16806832/44983101-b0b31080-af80-11e8-82d8-0e70e4b4ff97.png)
271321

272-
9. Finish component configuration
322+
10. Finish component configuration
273323
![image](https://user-images.githubusercontent.com/16806832/44983365-90378600-af81-11e8-9be4-4dbb39af0fdc.png)
274324

275325
#### Input fields description
326+
* `Enable Rebound` if `Yes` in case of deadlocks rebound message using Sailor rebound mechanism, number of rebound can be specified via environment variable: `ELASTICIO_REBOUND_LIMIT` recommended value 3
276327
As an input metadata you will get all fields of selected table. [PRIMARY KEY](https://en.wikipedia.org/wiki/Primary_key "PRIMARY KEY") is required field (will mark as asterisk) and other input fields are optional.
277328
![image](https://user-images.githubusercontent.com/16806832/44397461-1a76f780-a549-11e8-8247-9a6f9aa3f3b4.png)
278329

@@ -294,6 +345,11 @@ Please use [**Upsert row by primary key**](#upsert-row-by-primary-key-action) in
294345
4. The current implementation of the action ``Execute stored procedure`` doesn't support ResultSet MSSQL output.
295346
5. The current implementation of the action ``Execute stored procedure`` doesn't support any array types parameters.
296347
(MySQL does not have schemas by definition)
348+
6. Rebound mechanism only works for this SQL State:
349+
- ``MySQL``: 40001, XA102
350+
- ``Oracle``: 61000
351+
- ``MSSQL``: 40001
352+
- ``PostgreSQL``: 40P01
297353

298354
## License
299355
Apache-2.0 © [elastic.io GmbH](https://www.elastic.io "elastic.io GmbH")

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
group = 'io.elastic'
2-
version = '2.3.1'
2+
version = '2.4.0'
33
apply plugin: 'java'
44
apply plugin: 'idea'
55
apply plugin: 'eclipse'

component.json

Lines changed: 64 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
{
22
"title": "Database",
33
"description": "Database JDBC connector",
4+
"buildType": "docker",
45
"credentials": {
56
"verifier": "io.elastic.jdbc.JdbcCredentialsVerifier",
67
"fields": {
@@ -72,7 +73,7 @@
7273
"viewClass": "SelectView",
7374
"prompt": "Select a Timestamp (or similar) Column",
7475
"label": "Timestamp (or similar) Column",
75-
"model": "io.elastic.jdbc.providers.ColumnNamesProvider",
76+
"model": "io.elastic.jdbc.providers.TimeStampColumnNamesProvider",
7677
"required": true,
7778
"require": [
7879
"tableName"
@@ -138,6 +139,37 @@
138139
}
139140
},
140141
"actions": {
142+
"customQuery": {
143+
"main": "io.elastic.jdbc.actions.CustomQuery",
144+
"title": "Execute custom query",
145+
"description": "Executes provided sql query string as is",
146+
"metadata": {
147+
"in": {
148+
"type": "object",
149+
"required": true,
150+
"properties": {
151+
"query": {
152+
"type": "string",
153+
"required": true
154+
}
155+
}
156+
},
157+
"out":{
158+
"type": "object",
159+
"required": false,
160+
"properties": {
161+
"result": {
162+
"required": false,
163+
"type": "array"
164+
},
165+
"updated": {
166+
"required": false,
167+
"type": "number"
168+
}
169+
}
170+
}
171+
}
172+
},
141173
"lookupRowByPrimaryKey": {
142174
"main": "io.elastic.jdbc.actions.LookupRowByPrimaryKey",
143175
"title": "Lookup Row By Primary Key",
@@ -158,7 +190,7 @@
158190
"dynamicMetadata": "io.elastic.jdbc.providers.PrimaryColumnNamesProvider"
159191
},
160192
"upsertRowByPrimaryKey": {
161-
"main": "io.elastic.jdbc.actions.providers.UpsertRowByPrimaryKey",
193+
"main": "io.elastic.jdbc.actions.UpsertRowByPrimaryKey",
162194
"title": "Upsert Row By Primary Key",
163195
"description": "Executes upsert by primary key",
164196
"fields": {
@@ -168,6 +200,16 @@
168200
"label": "Table",
169201
"required": true,
170202
"model": "io.elastic.jdbc.providers.TableNameProvider"
203+
},
204+
"reboundEnabled" : {
205+
"viewClass": "SelectView",
206+
"prompt": "Default is No",
207+
"label": "Enable Rebound",
208+
"required": false,
209+
"model" : {
210+
"Yes" : "Yes",
211+
"No" : "No"
212+
}
171213
}
172214
},
173215
"dynamicMetadata": "io.elastic.jdbc.providers.ColumnNamesWithPrimaryKeyProvider"
@@ -183,6 +225,16 @@
183225
"label": "Table",
184226
"required": true,
185227
"model": "io.elastic.jdbc.providers.TableNameProvider"
228+
},
229+
"reboundEnabled" : {
230+
"viewClass": "SelectView",
231+
"prompt": "Default is No",
232+
"label": "Enable Rebound",
233+
"required": false,
234+
"model" : {
235+
"Yes" : "Yes",
236+
"No" : "No"
237+
}
186238
}
187239
},
188240
"dynamicMetadata": "io.elastic.jdbc.providers.ColumnNamesForInsertProvider"
@@ -202,6 +254,16 @@
202254
"nullableResult": {
203255
"label": "Don`t throw Error on an Empty Result",
204256
"viewClass": "CheckBoxView"
257+
},
258+
"reboundEnabled" : {
259+
"viewClass": "SelectView",
260+
"prompt": "Default is No",
261+
"label": "Enable Rebound",
262+
"required": false,
263+
"model" : {
264+
"Yes" : "Yes",
265+
"No" : "No"
266+
}
205267
}
206268
},
207269
"dynamicMetadata": "io.elastic.jdbc.providers.PrimaryColumnNamesProvider"
Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
package io.elastic.jdbc.actions;
2+
3+
import io.elastic.api.ExecutionParameters;
4+
import io.elastic.api.Message;
5+
import io.elastic.api.Module;
6+
import io.elastic.jdbc.utils.Utils;
7+
import java.sql.Connection;
8+
import java.sql.ResultSet;
9+
import java.sql.ResultSetMetaData;
10+
import java.sql.SQLException;
11+
import java.sql.Statement;
12+
import java.util.ArrayList;
13+
import java.util.List;
14+
import javax.json.Json;
15+
import javax.json.JsonArray;
16+
import javax.json.JsonArrayBuilder;
17+
import javax.json.JsonObject;
18+
import javax.json.JsonObjectBuilder;
19+
import org.slf4j.Logger;
20+
import org.slf4j.LoggerFactory;
21+
22+
public class CustomQuery implements Module {
23+
24+
private static final Logger LOGGER = LoggerFactory.getLogger(CustomQuery.class);
25+
26+
private static final String JSON_RESULT_ARRAY_NAME = "result";
27+
private static final String JSON_RESULT_COUNT_NAME = "updated";
28+
29+
@Override
30+
public void execute(ExecutionParameters parameters) {
31+
LOGGER.info("Starting execute custom query action");
32+
final JsonObject configuration = parameters.getConfiguration();
33+
final JsonObject body = parameters.getMessage().getBody();
34+
final String dbEngine = Utils.getDbEngine(configuration);
35+
final String queryString = body.getString("query");
36+
LOGGER.info("Found dbEngine: '{}' and query: '{}'", dbEngine, queryString);
37+
38+
List<Message> messages = new ArrayList<>();
39+
try (Connection connection = Utils.getConnection(configuration)) {
40+
connection.setAutoCommit(false);
41+
try (Statement statement = connection.createStatement()) {
42+
boolean status = statement.execute(queryString);
43+
if (status) {
44+
ResultSet resultSet = statement.getResultSet();
45+
messages.add(this.processResultSetToMessage(resultSet));
46+
} else {
47+
messages.add(this.processUpdateCountToMessage(statement.getUpdateCount()));
48+
}
49+
50+
while (statement.getMoreResults() || statement.getUpdateCount() != -1) {
51+
if (statement.getUpdateCount() != -1) {
52+
messages.add(this.processUpdateCountToMessage(statement.getUpdateCount()));
53+
} else {
54+
messages.add(this.processResultSetToMessage(statement.getResultSet()));
55+
}
56+
}
57+
58+
connection.commit();
59+
} catch (Exception e) {
60+
connection.rollback();
61+
connection.setAutoCommit(true);
62+
throw e;
63+
}
64+
} catch (SQLException e) {
65+
throw new RuntimeException(e);
66+
}
67+
68+
messages.forEach(message -> {
69+
LOGGER.trace("Emit data= {}", message.getBody());
70+
parameters.getEventEmitter().emitData(message);
71+
});
72+
73+
LOGGER.info("Custom query action is successfully executed");
74+
}
75+
76+
private Message processResultSetToMessage(ResultSet resultSet) throws SQLException {
77+
JsonArray result = customResultSetToJsonArray(resultSet);
78+
return new Message.Builder()
79+
.body(Json.createObjectBuilder()
80+
.add(JSON_RESULT_ARRAY_NAME, result)
81+
.build()
82+
).build();
83+
}
84+
85+
private Message processUpdateCountToMessage(int updateCount) {
86+
return new Message.Builder()
87+
.body(Json.createObjectBuilder()
88+
.add(JSON_RESULT_COUNT_NAME, updateCount)
89+
.build()
90+
).build();
91+
}
92+
93+
public static JsonArray customResultSetToJsonArray(ResultSet resultSet) throws SQLException {
94+
JsonArrayBuilder jsonBuilder = Json.createArrayBuilder();
95+
96+
if (resultSet == null) {
97+
return jsonBuilder.build();
98+
}
99+
100+
ResultSetMetaData metaData = resultSet.getMetaData();
101+
102+
while (resultSet.next()) {
103+
JsonObjectBuilder entry = Json.createObjectBuilder();
104+
for (int i = 1; i <= metaData.getColumnCount(); i++) {
105+
Utils.getColumnDataByType(resultSet, metaData, i, entry);
106+
}
107+
jsonBuilder.add(entry.build());
108+
}
109+
110+
return jsonBuilder.build();
111+
}
112+
}

0 commit comments

Comments
 (0)