Skip to content

Commit ff5fb71

Browse files
Nikita-Smirnov-ExactproOleg
andauthored
[th2-5149] Read-db should create event with query details, unique query id and start/end times when user execute query (#16)
* gRPC execute method generates unique id for each execution and puts it into related event and messages. * added readme for oracle redo log * [TH2-5152] Catch exceptions during query execution to notify the listener * gRPC server implementation skips columns with null value after fix. --------- Co-authored-by: Oleg <oleg.smirnov@exactprosystems.com>
1 parent 5a979f4 commit ff5fb71

26 files changed

+1333
-52
lines changed

README.md

Lines changed: 63 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# th2-read-db 0.6.0
1+
# th2-read-db 0.7.0
22

33
The read-db is a component for extracting data from databases using JDBC technology. If database has JDBC driver the read can work with the database
44

@@ -44,6 +44,9 @@ publication:
4444
queueSize: 1000
4545
maxDelayMillis: 1000
4646
maxBatchSize: 100
47+
eventPublication:
48+
maxBatchSizeInItems: 100
49+
maxFlushTime: 1000
4750
```
4851
4952
## Parameters
@@ -65,6 +68,7 @@ The list of queries that can be executed by read-db.
6568
It might contain parameters in the following format: `${<name>[:<type>]}`.
6669
The **type** part can be omitted if the type is `varchar`.
6770
Examples: `${id:integer}`, `${registration_time:timestamp}`, `${first_name}`
71+
[Types](https://docs.oracle.com/javase/8/docs/api/java/sql/JDBCType.html): bit, tinyint, smallint, integer, bigint, float, real, double, numeric, decimal, char, varchar, longvarchar, date, time, timestamp, binary, varbinary, longvarbinary, null, other, java_object, distinct, struct, array, blob, clob, ref, datalink, boolean, rowid, nchar, nvarchar, longnvarchar, nclob, sqlxml, ref_cursor, time_with_timezone, timestamp_with_timezone
6872
+ defaultParameters - the default values for parameters. They will be used if the parameter was not specified in the request
6973
+ messageType - the message type that should be associated with this query.
7074
If it is set the read-db will set a property `th2.csv.override_message_type` with specified value
@@ -126,7 +130,10 @@ You can interact with read-db via gRPC. It supports executing direct queries and
126130
# Publication
127131

128132
The read-db publishes all extracted data to MQ as raw messages in CSV format. The alias matches the **data source id**.
129-
Message might contain property `th2.csv.override_message_type` with value that should be used as message type for the row message
133+
Message might contain properties
134+
* `th2.csv.override_message_type` with value that should be used as message type for the row message
135+
* `th2.read-db.execute.uid` with unique identifier of query execution
136+
* `th2.pull_task.update_hash` with hash of source and query configuration used pull query execution
130137

131138
# gRPC
132139

@@ -135,6 +142,39 @@ Message might contain property `th2.csv.override_message_type` with value that s
135142
Pull task tries to load the last message published to Cradle instead of initialise from the start
136143
if you connect read-db to a data-provider using `com.exactpro.th2.dataprovider.lw.grpc.DataProviderService`.
137144

145+
## Server
146+
147+
### Execute method
148+
149+
User can trigger a query execution on a data source using this method. the method includes the activities:
150+
* generation of growing unique id.
151+
* query execution.
152+
* publication results of the query execution to MQ where each message has `th2.read-db.execute.uid` property with the unique id
153+
* publication event with data source, query, request parameters and the unique id.
154+
Start/End even times correspond to the beginning/ending the query execution.
155+
Body example:
156+
```json
157+
[
158+
{
159+
"dataSource": {
160+
"url":"jdbc url for data base connection",
161+
"username":"user name"
162+
},
163+
"query": {
164+
"query":"SQL query text"
165+
},
166+
"parameters": {
167+
"parameter": [
168+
"parameter value"
169+
]
170+
},
171+
"executionId": 123
172+
}
173+
]
174+
```
175+
NOTE: the event hasn't got attached message because the query can produce a lot of rows.
176+
* streaming results of the query execution with the unique id as gRPC response.
177+
138178
# CR example
139179
## infra 1
140180
```yaml
@@ -144,7 +184,7 @@ metadata:
144184
name: read-db
145185
spec:
146186
image-name: ghcr.io/th2-net/th2-read-db
147-
image-version: 0.0.1
187+
image-version: 0.7.0-dev
148188
type: th2-read
149189
custom-config:
150190
dataSources:
@@ -182,6 +222,9 @@ spec:
182222
queueSize: 1000
183223
maxDelayMillis: 1000
184224
maxBatchSize: 100
225+
eventPublication:
226+
maxBatchSizeInItems: 100
227+
maxFlushTime: 1000
185228
useTransport: true
186229
pins:
187230
- name: client
@@ -217,7 +260,7 @@ metadata:
217260
name: read-db
218261
spec:
219262
imageName: ghcr.io/th2-net/th2-read-db
220-
imageVersion: 0.0.1
263+
imageVersion: 0.7.0-dev
221264
type: th2-read
222265
customConfig:
223266
dataSources:
@@ -256,6 +299,9 @@ spec:
256299
queueSize: 1000
257300
maxDelayMillis: 1000
258301
maxBatchSize: 100
302+
eventPublication:
303+
maxBatchSizeInItems: 100
304+
maxFlushTime: 1000
259305
useTransport: true
260306
pins:
261307
mq:
@@ -288,8 +334,21 @@ spec:
288334
cpu: 50m
289335
```
290336

337+
### Oracle redo logs
338+
[How to configure th2-read-db to pull data from redo log](oracle-log-miner.md)
339+
291340
## Changes
292341

342+
### 0.7.0
343+
344+
#### Feature:
345+
346+
+ gRPC execute method generates unique id for each execution and puts it into related event and messages.
347+
348+
#### Fix:
349+
350+
+ gRPC Execute method doesn't respond rows with null values. gRPC server implementation skips columns with null value after fix.
351+
293352
### 0.6.0
294353

295354
#### Feature:

app/gradle.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
kotlin.code.style=official
2-
release_version=0.6.0
2+
release_version=0.7.0
33
description=read-db component for extracting data from databases using JDBC technology

core/build.gradle

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ dependencies {
5858
implementation "com.fasterxml.jackson.core:jackson-databind"
5959

6060
testImplementation "org.junit.jupiter:junit-jupiter:5.10.0"
61+
testImplementation "org.jetbrains.kotlin:kotlin-test-junit"
6162
testImplementation "org.jetbrains.kotlinx:kotlinx-coroutines-test:$coroutines_version"
6263
testImplementation "org.mockito.kotlin:mockito-kotlin:5.1.0"
6364
testImplementation "io.strikt:strikt-core:0.34.1"
@@ -66,6 +67,9 @@ dependencies {
6667
testImplementation "org.testcontainers:testcontainers"
6768
testImplementation "org.testcontainers:mysql"
6869
testImplementation "org.testcontainers:oracle-xe"
70+
testImplementation "io.grpc:grpc-testing"
71+
72+
testImplementation 'com.exactpro.th2:junit-jupiter-integration:0.0.1-master-6956603819-5241ee5-SNAPSHOT'
6973

7074
testRuntimeOnly("com.mysql:mysql-connector-j:8.1.0") {
7175
because("mysql support")

core/gradle.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
kotlin.code.style=official
2-
release_version=0.6.0
2+
release_version=0.7.0
33
description=core part of read db to create an application with required JDBC drivers in the classpath

core/src/main/kotlin/com/exactpro/th2/read/db/app/DataBaseReader.kt

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -97,16 +97,21 @@ class DataBaseReader(
9797
) {
9898
scope.launch {
9999
with(request) {
100-
dataBaseService.executeQuery(sourceId, before, queryId, after, parameters)
101-
.onCompletion {
102-
if (it == null) {
103-
listener.onComplete()
104-
} else {
105-
listener.onError(it)
100+
try {
101+
dataBaseService.executeQuery(sourceId, before, queryId, after, parameters)
102+
.onCompletion {
103+
if (it == null) {
104+
listener.onComplete()
105+
} else {
106+
listener.onError(it)
107+
}
108+
}.collect {
109+
rowTransformer(it).transferTo(sourceId, listener, rowListener)
106110
}
107-
}.collect {
108-
rowTransformer(it).transferTo(sourceId, listener, rowListener)
109-
}
111+
} catch (ex: Exception) {
112+
LOGGER.error(ex) { "cannot execute query '${request.queryId}' for '${request.sourceId}'" }
113+
listener.onError(ex)
114+
}
110115
}
111116
}
112117
}

core/src/main/kotlin/com/exactpro/th2/read/db/app/DataBaseReaderConfiguration.kt

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ class DataBaseReaderConfiguration(
3131
val queries: Map<QueryId, QueryConfiguration>,
3232
val startupTasks: List<StartupTaskConfiguration> = emptyList(),
3333
val publication: PublicationConfiguration = PublicationConfiguration(),
34+
val eventPublication: EventPublicationConfiguration = EventPublicationConfiguration(),
3435
val useTransport: Boolean = false
3536
)
3637

@@ -40,6 +41,11 @@ class PublicationConfiguration(
4041
val maxBatchSize: Int = 100,
4142
)
4243

44+
class EventPublicationConfiguration(
45+
val maxBatchSizeInItems: Int = 100,
46+
val maxFlushTime: Long = 1000,
47+
)
48+
4349
@JsonTypeInfo(
4450
use = JsonTypeInfo.Id.NAME,
4551
include = JsonTypeInfo.As.PROPERTY,

core/src/main/kotlin/com/exactpro/th2/read/db/app/Requests.kt

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,16 @@ package com.exactpro.th2.read.db.app
1919
import com.exactpro.th2.read.db.core.DataSourceId
2020
import com.exactpro.th2.read.db.core.QueryId
2121
import com.exactpro.th2.read.db.core.QueryParametersValues
22+
import com.fasterxml.jackson.annotation.JsonInclude
2223
import java.time.Duration
2324

25+
@JsonInclude(JsonInclude.Include.NON_EMPTY)
2426
class ExecuteQueryRequest(
2527
val sourceId: DataSourceId,
26-
val before: List<QueryId>,
28+
val before: List<QueryId> = emptyList(),
2729
val queryId: QueryId,
28-
val after: List<QueryId>,
29-
val parameters: QueryParametersValues,
30+
val after: List<QueryId> = emptyList(),
31+
val parameters: QueryParametersValues = emptyMap(),
3032
)
3133

3234
class PullTableRequest(

core/src/main/kotlin/com/exactpro/th2/read/db/bootstrap/Main.kt

Lines changed: 44 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package com.exactpro.th2.read.db.bootstrap
1919

2020
import com.exactpro.th2.common.grpc.Direction.FIRST
21+
import com.exactpro.th2.common.grpc.EventBatch
2122
import com.exactpro.th2.common.message.direction
2223
import com.exactpro.th2.common.message.plusAssign
2324
import com.exactpro.th2.common.message.sequence
@@ -33,7 +34,9 @@ import com.exactpro.th2.common.schema.message.QueueAttribute
3334
import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.Direction
3435
import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.GroupBatch
3536
import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.RawMessage
37+
import com.exactpro.th2.common.utils.event.EventBatcher
3638
import com.exactpro.th2.common.utils.message.transport.toGroup
39+
import com.exactpro.th2.common.utils.shutdownGracefully
3740
import com.exactpro.th2.dataprovider.lw.grpc.DataProviderService
3841
import com.exactpro.th2.lwdataprovider.MessageSearcher
3942
import com.exactpro.th2.read.db.app.DataBaseReader
@@ -132,6 +135,9 @@ internal fun setupApp(
132135

133136
val appScope = createScope(closeResource)
134137
val componentBookName = factory.boxConfiguration.bookName
138+
val rootEventId = factory.rootEventId
139+
val maxEventBatchSize = factory.cradleManager.storage.entitiesFactory.maxTestEventBatchSize
140+
135141
val messageLoader: MessageLoader = createMessageLoader(factory, componentBookName)
136142
val reader = if (cfg.useTransport) {
137143
val messageRouter: MessageRouter<GroupBatch> = factory.transportGroupBatchRouter
@@ -174,7 +180,17 @@ internal fun setupApp(
174180
createReader(cfg, appScope, messageQueue, closeResource, TableRow::toProtoMessage, messageLoader)
175181
}
176182

177-
val handler = DataBaseReaderGrpcServer(reader)
183+
val eventBatcher = configureEventStoring(cfg, maxEventBatchSize, closeResource, factory.eventBatchRouter::send)
184+
185+
val handler = DataBaseReaderGrpcServer(
186+
reader,
187+
{ cfg.dataSources[it] ?: error("'$it' data source isn't found in custom config") },
188+
{ cfg.queries[it] ?: error("'$it' query isn't found in custom config") },
189+
) { event, parentEventId ->
190+
eventBatcher.onEvent(
191+
event.toProto(parentEventId ?: rootEventId)
192+
)
193+
}
178194

179195
val server = factory.grpcRouter.startServer(handler)
180196
.start()
@@ -271,6 +287,32 @@ private fun createScope(closeResource: (name: String, resource: () -> Unit) -> U
271287
return appScope
272288
}
273289

290+
private fun configureEventStoring(
291+
cfg: DataBaseReaderConfiguration,
292+
maxEventBatchSize: Int,
293+
closeResource: (name: String, resource: () -> Unit) -> Unit,
294+
send: (EventBatch) -> Unit
295+
): EventBatcher {
296+
val executor = Executors.newSingleThreadScheduledExecutor(
297+
ThreadFactoryBuilder()
298+
.setNameFormat("event-saver-%d")
299+
.build()
300+
)
301+
302+
closeResource("event storing") {
303+
LOGGER.info { "Shutdown executor" }
304+
executor.shutdownGracefully(1, TimeUnit.MINUTES)
305+
}
306+
307+
return EventBatcher(
308+
maxEventBatchSize.toLong(),
309+
cfg.eventPublication.maxBatchSizeInItems,
310+
cfg.eventPublication.maxFlushTime,
311+
executor,
312+
send
313+
)
314+
}
315+
274316
private fun <BUILDER, DIRECTION> configureTransportMessageStoring(
275317
cfg: DataBaseReaderConfiguration,
276318
keyExtractor: (BUILDER) -> SessionKey<DIRECTION>,
@@ -305,12 +347,7 @@ private fun <BUILDER, DIRECTION> configureTransportMessageStoring(
305347
LOGGER.error(ex) { "cannot complete drain task in specified timeout" }
306348
}
307349
LOGGER.info { "Shutdown executor" }
308-
executor.shutdown()
309-
if (!executor.awaitTermination(1, TimeUnit.MINUTES)) {
310-
LOGGER.error { "executor was not shutdown during specified timeout. Force shutdown" }
311-
val runnables = executor.shutdownNow()
312-
LOGGER.error { "${runnables.size} task(s) left" }
313-
}
350+
executor.shutdownGracefully(1, TimeUnit.MINUTES)
314351
}
315352
}
316353
return messagesQueue

core/src/main/kotlin/com/exactpro/th2/read/db/bootstrap/Utils.kt

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2023 Exactpro (Exactpro Systems Limited)
2+
* Copyright 2023-2024 Exactpro (Exactpro Systems Limited)
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -38,6 +38,7 @@ import java.math.BigDecimal
3838
import com.exactpro.th2.common.grpc.RawMessage as ProtoRawMessage
3939

4040
internal const val TH2_CSV_OVERRIDE_MESSAGE_TYPE_PROPERTY = "th2.csv.override_message_type"
41+
private const val TH2_READ_DB_UNIQUE_ID = "th2.read-db.execute.uid"
4142
private const val SEPARATOR = ','
4243

4344
internal fun TableRow.toProtoMessage(dataSourceId: DataSourceId, properties: Map<String, String>): ProtoRawMessage.Builder {
@@ -65,6 +66,9 @@ internal fun TableRow.toTransportMessage(dataSourceId: DataSourceId, properties:
6566
if (associatedMessageType != null) {
6667
builder.addMetadataProperty(TH2_CSV_OVERRIDE_MESSAGE_TYPE_PROPERTY, associatedMessageType)
6768
}
69+
if (executionId != null) {
70+
builder.addMetadataProperty(TH2_READ_DB_UNIQUE_ID, executionId.toString())
71+
}
6872
properties.forEach(builder::addMetadataProperty)
6973

7074
return builder
@@ -124,7 +128,7 @@ internal fun MessageSearchResponse.toTableRow(): TableRow {
124128
}
125129
}
126130

127-
private fun Any.toStringValue(): String = when (this) {
131+
internal fun Any.toStringValue(): String = when (this) {
128132
is BigDecimal -> stripTrailingZeros().toPlainString()
129133
is Double -> toBigDecimal().toStringValue()
130134
is Float -> toBigDecimal().toStringValue()

core/src/main/kotlin/com/exactpro/th2/read/db/core/DataSourceConfiguration.kt

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,14 @@
1616

1717
package com.exactpro.th2.read.db.core
1818

19+
import com.fasterxml.jackson.annotation.JsonInclude
20+
import com.fasterxml.jackson.annotation.JsonProperty
21+
22+
@JsonInclude(JsonInclude.Include.NON_EMPTY)
1923
data class DataSourceConfiguration(
2024
val url: String,
2125
val username: String? = null,
26+
@JsonProperty(access = JsonProperty.Access.WRITE_ONLY)
2227
val password: String? = null,
2328
val properties: Map<String, String> = emptyMap(),
2429
)

0 commit comments

Comments
 (0)