diff --git a/Dockerfile b/Dockerfile index f1574028..df3a4d09 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM gradle:6.6-jdk11 AS build +FROM gradle:7.1-jdk11 AS build ARG Prelease_version=0.0.0 COPY ./ . RUN gradle clean build dockerPrepare -Prelease_version=${Prelease_version} diff --git a/README.md b/README.md index f97ed1cb..379f226e 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# Lightweight data provider (1.1.0) +# Lightweight data provider (1.1.1) # Overview This component serves as a data provider for [th2-data-services](https://github.com/th2-net/th2-data-services). It will connect to the cassandra database via [cradle api](https://github.com/th2-net/cradleapi) and expose the data stored in there as REST resources. @@ -29,6 +29,7 @@ This component is similar to [rpt-data-provider](https://github.com/th2-net/th2- - `resultCountLimit` - number - Sets the maximum amount of messages to return. Defaults to `null (unlimited)`. - `endTimestamp` - number, unix timestamp in milliseconds - Sets the timestamp to which the search will be performed, starting with `startTimestamp`. When `searchDirection` is `previous`, `endTimestamp` must be less then `startTimestamp`. Defaults to `null` (search can be stopped after reaching `resultCountLimit`). - `onlyRaw` - boolean - Disabling decoding messages. If it is true, message body will be empty in all messages. Default `false` +- `responseFormats` - text, accepts multiple values - sets response formats. Possible values: BASE_64, PARSED. default value - BASE_64 & PARSED. Elements in channel match the format sse: @@ -58,6 +59,8 @@ spec: # maxBufferDecodeQueue: 10000 # buffer size for messages that sent to decode but anwers hasn't been received # decodingTimeout: 60000 # timeout expecting answers from codec. # batchSize: 100 # batch size from codecs +# codecUsePinAttributes: true # send raw message to specified codec (true) or send to all codecs (false) +# responseFormats: string list # resolve data for selected formats only. (allowed values: BASE_64, PARSED) pins: # pins are used to communicate with codec components to parse message data @@ -92,4 +95,4 @@ spec: requests: memory: 300Mi cpu: 50m -``` \ No newline at end of file +``` diff --git a/build.gradle b/build.gradle index d881e83f..bf1c57f9 100644 --- a/build.gradle +++ b/build.gradle @@ -1,5 +1,5 @@ /******************************************************************************* - * Copyright 2009-2020 Exactpro (Exactpro Systems Limited) + * Copyright 2009-2022 Exactpro (Exactpro Systems Limited) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -51,12 +51,10 @@ repositories { } dependencies { - api platform('com.exactpro.th2:bom:3.0.0') + api platform('com.exactpro.th2:bom:3.2.0') - implementation 'org.slf4j:slf4j-api' - implementation 'org.slf4j:slf4j-log4j12' - implementation 'com.exactpro.th2:common:3.34.0' + implementation 'com.exactpro.th2:common:3.39.3' implementation 'org.eclipse.jetty:jetty-server:9.4.44.v20210927' implementation 'org.eclipse.jetty:jetty-servlet:9.4.44.v20210927' @@ -66,7 +64,7 @@ dependencies { implementation('net.jpountz.lz4:lz4:1.3.0') { because('cassandra driver requires lz4 impl in classpath for compression') } - implementation 'com.exactpro.th2:grpc-data-provider:1.1.0' + implementation 'com.exactpro.th2:grpc-data-provider:1.1.0-TS-1083-response-formats-2759013731-SNAPSHOT' implementation 'io.github.microutils:kotlin-logging:2.1.14' @@ -83,7 +81,7 @@ dependencies { } testImplementation 'org.junit.jupiter:junit-jupiter:5.8.2' - testImplementation 'org.assertj:assertj-core:3.12.2' + testImplementation 'org.assertj:assertj-core:3.23.1' } test { diff --git a/gradle.properties b/gradle.properties index 0eddf02a..621debbc 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,5 +1,5 @@ ################################################################################ -# Copyright 2009-2020 Exactpro (Exactpro Systems Limited) +# Copyright 2009-2022 Exactpro (Exactpro Systems Limited) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -16,6 +16,6 @@ kotlin.code.style=official -release_version=1.1.0 +release_version=1.1.1 docker_image_name= diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index 0dbf8caa..e1607acb 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,5 +1,5 @@ ################################################################################ -# Copyright 2009-2020 Exactpro (Exactpro Systems Limited) +# Copyright 2009-2022 Exactpro (Exactpro Systems Limited) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -15,7 +15,7 @@ ################################################################################ #Tue Apr 14 11:21:33 MSK 2020 -distributionUrl=https\://services.gradle.org/distributions/gradle-6.6-all.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-7.1-bin.zip distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists zipStorePath=wrapper/dists diff --git a/settings.gradle b/settings.gradle index ee3cc005..e9609528 100644 --- a/settings.gradle +++ b/settings.gradle @@ -1,5 +1,5 @@ /******************************************************************************* - * Copyright 2009-2020 Exactpro (Exactpro Systems Limited) + * Copyright 2009-2022 Exactpro (Exactpro Systems Limited) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/main/kotlin/com/exactpro/th2/lwdataprovider/RabbitMqDecoder.kt b/src/main/kotlin/com/exactpro/th2/lwdataprovider/RabbitMqDecoder.kt index 8e9d6d20..94cf4c01 100644 --- a/src/main/kotlin/com/exactpro/th2/lwdataprovider/RabbitMqDecoder.kt +++ b/src/main/kotlin/com/exactpro/th2/lwdataprovider/RabbitMqDecoder.kt @@ -16,9 +16,7 @@ package com.exactpro.th2.lwdataprovider -import com.exactpro.th2.common.grpc.MessageBatch import com.exactpro.th2.common.grpc.MessageGroupBatch -import com.exactpro.th2.common.grpc.RawMessageBatch import com.exactpro.th2.common.schema.message.MessageRouter import com.exactpro.th2.common.schema.message.QueueAttribute import com.exactpro.th2.lwdataprovider.configuration.Configuration @@ -37,9 +35,13 @@ class RabbitMqDecoder(private val configuration: Configuration, companion object { private val logger = KotlinLogging.logger { } } - - fun sendBatchMessage(batch: MessageGroupBatch, session: String) { - this.messageRouterRawBatch.send(batch, session, QueueAttribute.RAW.value) + + fun sendBatchMessage(batch: MessageGroupBatch, session: String, codecUsePinAttributes: Boolean) { + if (codecUsePinAttributes) { + this.messageRouterRawBatch.send(batch, session, QueueAttribute.RAW.value) + } else { + this.messageRouterRawBatch.sendAll(batch, QueueAttribute.RAW.value) + } } fun registerMessage(message: RequestedMessageDetails) { diff --git a/src/main/kotlin/com/exactpro/th2/lwdataprovider/RequestContext.kt b/src/main/kotlin/com/exactpro/th2/lwdataprovider/RequestContext.kt index 1759fd7b..acfd6f07 100644 --- a/src/main/kotlin/com/exactpro/th2/lwdataprovider/RequestContext.kt +++ b/src/main/kotlin/com/exactpro/th2/lwdataprovider/RequestContext.kt @@ -20,6 +20,7 @@ import com.exactpro.cradle.messages.StoredMessage import com.exactpro.th2.common.grpc.Message import com.exactpro.th2.common.grpc.RawMessage import com.exactpro.th2.lwdataprovider.entities.responses.LastScannedObjectInfo +import com.exactpro.th2.lwdataprovider.entities.internal.ResponseFormat import io.prometheus.client.Histogram import mu.KotlinLogging import java.time.Instant @@ -85,7 +86,7 @@ abstract class MessageRequestContext ( fun allDataLoadedFromCradle() = allMessagesRequested.set(true) - abstract fun createMessageDetails(id: String, time: Long, storedMessage: StoredMessage, onResponse: () -> Unit = {}): RequestedMessageDetails; + abstract fun createMessageDetails(id: String, time: Long, storedMessage: StoredMessage, responseFormats: List, onResponse: () -> Unit = {}): RequestedMessageDetails; abstract fun addStreamInfo(); override fun onMessageSent() { @@ -137,6 +138,7 @@ abstract class RequestedMessageDetails ( @Volatile var time: Long, val storedMessage: StoredMessage, protected open val context: MessageRequestContext, + val responseFormats: List, var parsedMessage: List? = null, var rawMessage: RawMessage? = null, private val onResponse: () -> Unit = {} diff --git a/src/main/kotlin/com/exactpro/th2/lwdataprovider/configuration/Configuration.kt b/src/main/kotlin/com/exactpro/th2/lwdataprovider/configuration/Configuration.kt index 23b3c180..a72979a6 100644 --- a/src/main/kotlin/com/exactpro/th2/lwdataprovider/configuration/Configuration.kt +++ b/src/main/kotlin/com/exactpro/th2/lwdataprovider/configuration/Configuration.kt @@ -16,6 +16,7 @@ package com.exactpro.th2.lwdataprovider.configuration +import com.exactpro.th2.lwdataprovider.entities.internal.ResponseFormat import java.util.* @@ -31,6 +32,8 @@ class CustomConfigurationClass { val mode: String? = null val grpcBackPressure : Boolean? = null val bufferPerQuery: Int? = null + val codecUsePinAttributes: Boolean = true + val defaultResponseFormats: List? = null } class Configuration(customConfiguration: CustomConfigurationClass) { @@ -47,6 +50,9 @@ class Configuration(customConfiguration: CustomConfigurationClass) { customConfiguration.mode?.let { Mode.valueOf(it.uppercase(Locale.getDefault())) }, Mode.HTTP) val grpcBackPressure: Boolean = VariableBuilder.getVariable("grpcBackPressure", customConfiguration.grpcBackPressure, false) val bufferPerQuery: Int = VariableBuilder.getVariable("bufferPerQuery", customConfiguration.bufferPerQuery, 0) + val codecUsePinAttributes: Boolean = VariableBuilder.getVariable("codecUsePinAttributes", customConfiguration.codecUsePinAttributes, true) + val defaultResponseFormats: List = VariableBuilder.getVariable("defaultResponseFormats", + customConfiguration.defaultResponseFormats, listOf(ResponseFormat.ALL)) } enum class Mode { diff --git a/src/main/kotlin/com/exactpro/th2/lwdataprovider/db/CradleMessageExtractor.kt b/src/main/kotlin/com/exactpro/th2/lwdataprovider/db/CradleMessageExtractor.kt index d4fdf625..dadc6066 100644 --- a/src/main/kotlin/com/exactpro/th2/lwdataprovider/db/CradleMessageExtractor.kt +++ b/src/main/kotlin/com/exactpro/th2/lwdataprovider/db/CradleMessageExtractor.kt @@ -27,6 +27,7 @@ import com.exactpro.th2.lwdataprovider.MessageRequestContext import com.exactpro.th2.lwdataprovider.RabbitMqDecoder import com.exactpro.th2.lwdataprovider.RequestedMessageDetails import com.exactpro.th2.lwdataprovider.configuration.Configuration +import com.exactpro.th2.lwdataprovider.entities.internal.ResponseFormat import mu.KotlinLogging import kotlin.concurrent.withLock import kotlin.system.measureTimeMillis @@ -36,6 +37,7 @@ class CradleMessageExtractor(configuration: Configuration, private val cradleMan private val storage = cradleManager.storage private val batchSize = configuration.batchSize + private val codecUsePinAttributes = configuration.codecUsePinAttributes companion object { private val logger = KotlinLogging.logger { } @@ -43,7 +45,7 @@ class CradleMessageExtractor(configuration: Configuration, private val cradleMan fun getStreams(): Collection = storage.streams - fun getMessages(filter: StoredMessageFilter, requestContext: MessageRequestContext) { + fun getMessages(filter: StoredMessageFilter, requestContext: MessageRequestContext, responseFormats: List) { var msgCount = 0 val time = measureTimeMillis { @@ -65,7 +67,7 @@ class CradleMessageExtractor(configuration: Configuration, private val cradleMan msgId = storedMessage.id val id = storedMessage.id.toString() val decodingStep = requestContext.startStep("decoding") - val tmp = requestContext.createMessageDetails(id, 0, storedMessage) { decodingStep.finish() } + val tmp = requestContext.createMessageDetails(id, 0, storedMessage, responseFormats) { decodingStep.finish() } messageBuffer.add(tmp) ++msgBufferCount tmp.rawMessage = requestContext.startStep("raw_message_parsing").use { RawMessage.parseFrom(storedMessage.content) }.also { @@ -79,8 +81,7 @@ class CradleMessageExtractor(configuration: Configuration, private val cradleMan decoder.registerMessage(it) requestContext.registerMessage(it) } - decoder.sendBatchMessage(builder.build(), sessionName) - + decoder.sendBatchMessage(builder.build(), sessionName, codecUsePinAttributes) messageBuffer.clear() builder.clear() msgCount += msgBufferCount @@ -101,7 +102,7 @@ class CradleMessageExtractor(configuration: Configuration, private val cradleMan } if (msgBufferCount > 0) { - decoder.sendBatchMessage(builder.build(), sessionName) + decoder.sendBatchMessage(builder.build(), sessionName, codecUsePinAttributes) val sendingTime = System.currentTimeMillis() messageBuffer.forEach { @@ -121,7 +122,7 @@ class CradleMessageExtractor(configuration: Configuration, private val cradleMan } - fun getRawMessages(filter: StoredMessageFilter, requestContext: MessageRequestContext) { + fun getRawMessages(filter: StoredMessageFilter, requestContext: MessageRequestContext, responseFormats: List) { var msgCount = 0 val time = measureTimeMillis { @@ -136,7 +137,7 @@ class CradleMessageExtractor(configuration: Configuration, private val cradleMan } msgId = storedMessageBatch.id val id = storedMessageBatch.id.toString() - val tmp = requestContext.createMessageDetails(id, time, storedMessageBatch) + val tmp = requestContext.createMessageDetails(id, time, storedMessageBatch, responseFormats) tmp.rawMessage = RawMessage.parseFrom(storedMessageBatch.content) tmp.responseMessage() msgCount++ @@ -166,7 +167,7 @@ class CradleMessageExtractor(configuration: Configuration, private val cradleMan val time = System.currentTimeMillis() val decodingStep = if (onlyRaw) null else requestContext.startStep("decoding") - val tmp = requestContext.createMessageDetails(message.id.toString(), time, message) { decodingStep?.finish() } + val tmp = requestContext.createMessageDetails(message.id.toString(), time, message, emptyList()) { decodingStep?.finish() } tmp.rawMessage = RawMessage.parseFrom(message.content) requestContext.loadedMessages += 1 @@ -178,7 +179,7 @@ class CradleMessageExtractor(configuration: Configuration, private val cradleMan .build() decoder.registerMessage(tmp) requestContext.registerMessage(tmp) - decoder.sendBatchMessage(msgBatch, message.streamName) + decoder.sendBatchMessage(msgBatch, message.streamName, codecUsePinAttributes) } } diff --git a/src/main/kotlin/com/exactpro/th2/lwdataprovider/entities/internal/ResponseFormat.kt b/src/main/kotlin/com/exactpro/th2/lwdataprovider/entities/internal/ResponseFormat.kt new file mode 100644 index 00000000..14c299b8 --- /dev/null +++ b/src/main/kotlin/com/exactpro/th2/lwdataprovider/entities/internal/ResponseFormat.kt @@ -0,0 +1,24 @@ +/******************************************************************************* + * Copyright 2022 Exactpro (Exactpro Systems Limited) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ******************************************************************************/ + +package com.exactpro.th2.lwdataprovider.entities.internal + +enum class ResponseFormat { + ALL, + BASE_64, + PARSED, + +} \ No newline at end of file diff --git a/src/main/kotlin/com/exactpro/th2/lwdataprovider/entities/requests/GetMessageRequest.kt b/src/main/kotlin/com/exactpro/th2/lwdataprovider/entities/requests/GetMessageRequest.kt index 9ef296cb..49c71ec5 100644 --- a/src/main/kotlin/com/exactpro/th2/lwdataprovider/entities/requests/GetMessageRequest.kt +++ b/src/main/kotlin/com/exactpro/th2/lwdataprovider/entities/requests/GetMessageRequest.kt @@ -17,17 +17,20 @@ package com.exactpro.th2.lwdataprovider.entities.requests import com.exactpro.th2.common.grpc.MessageID +import com.exactpro.th2.lwdataprovider.entities.internal.ResponseFormat import com.exactpro.th2.lwdataprovider.grpc.toStoredMessageId data class GetMessageRequest( val msgId: String, - val onlyRaw: Boolean + val onlyRaw: Boolean, + val responseFormats: List? = listOf(ResponseFormat.ALL) ) { constructor(msgId: String, parameters: Map>) : this( msgId = msgId, - onlyRaw = parameters["onlyRaw"]?.firstOrNull()?.toBoolean() ?: false + onlyRaw = parameters["onlyRaw"]?.firstOrNull()?.toBoolean() ?: false, + responseFormats = parameters["responseFormats"]?.map { x -> ResponseFormat.valueOf(x) } ) constructor(msgId: MessageID) : this( diff --git a/src/main/kotlin/com/exactpro/th2/lwdataprovider/entities/requests/SseMessageSearchRequest.kt b/src/main/kotlin/com/exactpro/th2/lwdataprovider/entities/requests/SseMessageSearchRequest.kt index d995f402..a744941c 100644 --- a/src/main/kotlin/com/exactpro/th2/lwdataprovider/entities/requests/SseMessageSearchRequest.kt +++ b/src/main/kotlin/com/exactpro/th2/lwdataprovider/entities/requests/SseMessageSearchRequest.kt @@ -22,12 +22,13 @@ import com.exactpro.cradle.messages.StoredMessageId import com.exactpro.th2.dataprovider.grpc.MessageSearchRequest import com.exactpro.th2.dataprovider.grpc.MessageStreamPointer import com.exactpro.th2.lwdataprovider.entities.exceptions.InvalidRequestException +import com.exactpro.th2.lwdataprovider.entities.internal.ResponseFormat import com.exactpro.th2.lwdataprovider.grpc.toInstant +import com.exactpro.th2.lwdataprovider.grpc.toLocalResponseFormats import com.exactpro.th2.lwdataprovider.grpc.toProviderMessageStreams import com.exactpro.th2.lwdataprovider.grpc.toProviderRelation import com.exactpro.th2.lwdataprovider.grpc.toStoredMessageId import java.time.Instant -import kotlin.streams.toList data class SseMessageSearchRequest( val startTimestamp: Instant?, @@ -39,7 +40,9 @@ data class SseMessageSearchRequest( val attachedEvents: Boolean, val lookupLimitDays: Int?, val resumeFromIdsList: List?, - val onlyRaw: Boolean + @Deprecated("Use responseFormats instead") + val onlyRaw: Boolean, + val responseFormats: List? = listOf(ResponseFormat.ALL) ) { companion object { @@ -86,7 +89,8 @@ data class SseMessageSearchRequest( keepOpen = parameters["keepOpen"]?.firstOrNull()?.toBoolean() ?: false, attachedEvents = parameters["attachedEvents"]?.firstOrNull()?.toBoolean() ?: false, lookupLimitDays = parameters["lookupLimitDays"]?.firstOrNull()?.toInt(), - onlyRaw = parameters["onlyRaw"]?.firstOrNull()?.toBoolean() ?: false + onlyRaw = parameters["onlyRaw"]?.firstOrNull()?.toBoolean() ?: false, + responseFormats = parameters["responseFormats"]?.map { x -> ResponseFormat.valueOf(x) } ) constructor(grpcRequest: MessageSearchRequest) : this( @@ -99,7 +103,8 @@ data class SseMessageSearchRequest( keepOpen = if (grpcRequest.hasKeepOpen()) grpcRequest.keepOpen.value else false, attachedEvents = false, // disabled lookupLimitDays = null, - onlyRaw = false // NOT SUPPORTED in GRPC + onlyRaw = false, // NOT SUPPORTED in GRPC + responseFormats = grpcRequest.responseFormatsList.toLocalResponseFormats() ) private fun checkEndTimestamp() { diff --git a/src/main/kotlin/com/exactpro/th2/lwdataprovider/grpc/GrpcDataProviderImpl.kt b/src/main/kotlin/com/exactpro/th2/lwdataprovider/grpc/GrpcDataProviderImpl.kt index 9380f567..25d5d1fa 100644 --- a/src/main/kotlin/com/exactpro/th2/lwdataprovider/grpc/GrpcDataProviderImpl.kt +++ b/src/main/kotlin/com/exactpro/th2/lwdataprovider/grpc/GrpcDataProviderImpl.kt @@ -146,7 +146,7 @@ open class GrpcDataProviderImpl( val grpcResponseHandler = GrpcResponseHandler(queue) val context = GrpcMessageRequestContext(grpcResponseHandler, maxMessagesPerRequest = configuration.bufferPerQuery) val loadingStep = context.startStep("messages_loading") - searchMessagesHandler.loadMessages(requestParams, context) + searchMessagesHandler.loadMessages(requestParams, context, configuration) try { processResponse(responseObserver, grpcResponseHandler, context, loadingStep::finish) { it.message } } catch (ex: Exception) { diff --git a/src/main/kotlin/com/exactpro/th2/lwdataprovider/grpc/GrpcMessageRequestContext.kt b/src/main/kotlin/com/exactpro/th2/lwdataprovider/grpc/GrpcMessageRequestContext.kt index 3c0c60fd..c6cbf17a 100644 --- a/src/main/kotlin/com/exactpro/th2/lwdataprovider/grpc/GrpcMessageRequestContext.kt +++ b/src/main/kotlin/com/exactpro/th2/lwdataprovider/grpc/GrpcMessageRequestContext.kt @@ -24,6 +24,7 @@ import com.exactpro.th2.dataprovider.grpc.MessageStreamPointers import com.exactpro.th2.lwdataprovider.GrpcResponseHandler import com.exactpro.th2.lwdataprovider.MessageRequestContext import com.exactpro.th2.lwdataprovider.RequestedMessageDetails +import com.exactpro.th2.lwdataprovider.entities.internal.ResponseFormat import com.exactpro.th2.lwdataprovider.entities.responses.LastScannedObjectInfo import com.exactpro.th2.lwdataprovider.producers.GrpcMessageProducer import java.util.concurrent.ConcurrentHashMap @@ -42,8 +43,8 @@ class GrpcMessageRequestContext ( maxMessagesPerRequest = maxMessagesPerRequest) { - override fun createMessageDetails(id: String, time: Long, storedMessage: StoredMessage, onResponse: () -> Unit): GrpcRequestedMessageDetails { - return GrpcRequestedMessageDetails(id, time, storedMessage, this, onResponse) + override fun createMessageDetails(id: String, time: Long, storedMessage: StoredMessage, responseFormats: List, onResponse: () -> Unit): GrpcRequestedMessageDetails { + return GrpcRequestedMessageDetails(id, time, storedMessage, this, responseFormats, onResponse) } override fun addStreamInfo() { @@ -58,10 +59,11 @@ class GrpcRequestedMessageDetails( time: Long, storedMessage: StoredMessage, override val context: GrpcMessageRequestContext, + responseFormats: List, onResponse: () -> Unit, parsedMessage: List? = null, rawMessage: RawMessage? = null -) : RequestedMessageDetails(id, time, storedMessage, context, parsedMessage, rawMessage, onResponse) { +) : RequestedMessageDetails(id, time, storedMessage, context, responseFormats, parsedMessage, rawMessage, onResponse) { override fun responseMessageInternal() { val msg = GrpcMessageProducer.createMessage(this) diff --git a/src/main/kotlin/com/exactpro/th2/lwdataprovider/grpc/GrpcUtils.kt b/src/main/kotlin/com/exactpro/th2/lwdataprovider/grpc/GrpcUtils.kt index f7d66ddd..925f3382 100644 --- a/src/main/kotlin/com/exactpro/th2/lwdataprovider/grpc/GrpcUtils.kt +++ b/src/main/kotlin/com/exactpro/th2/lwdataprovider/grpc/GrpcUtils.kt @@ -22,10 +22,32 @@ import com.exactpro.th2.common.grpc.Direction import com.exactpro.th2.common.grpc.MessageID import com.exactpro.th2.dataprovider.grpc.MessageStream import com.exactpro.th2.dataprovider.grpc.TimeRelation +import com.exactpro.th2.lwdataprovider.entities.internal.ResponseFormat import com.exactpro.th2.lwdataprovider.entities.requests.ProviderMessageStream +import com.exactpro.th2.dataprovider.grpc.MessageSearchRequest.ResponseFormat.ALL +import com.exactpro.th2.dataprovider.grpc.MessageSearchRequest.ResponseFormat.BASE_64 +import com.exactpro.th2.dataprovider.grpc.MessageSearchRequest.ResponseFormat.PARSED import com.google.protobuf.Timestamp import java.time.Instant +fun List?.toLocalResponseFormats() : List { + val list : MutableList = ArrayList() + if (this.isNullOrEmpty()) { + list.add(ResponseFormat.ALL) + } else + for (responseFormat in this) { + when (responseFormat) { + ALL -> list.add(ResponseFormat.ALL) + BASE_64 -> list.add(ResponseFormat.BASE_64) + PARSED -> list.add(ResponseFormat.PARSED) + else -> { + error("Unrecognized response format: $responseFormat") + } + } + } + return list.toList() +} + fun Timestamp.toInstant() : Instant = Instant.ofEpochSecond(this.seconds, this.nanos.toLong()) fun TimeRelation?.toProviderRelation(): com.exactpro.cradle.TimeRelation { diff --git a/src/main/kotlin/com/exactpro/th2/lwdataprovider/handlers/SearchMessagesHandler.kt b/src/main/kotlin/com/exactpro/th2/lwdataprovider/handlers/SearchMessagesHandler.kt index 5dd7abef..4ea47cca 100644 --- a/src/main/kotlin/com/exactpro/th2/lwdataprovider/handlers/SearchMessagesHandler.kt +++ b/src/main/kotlin/com/exactpro/th2/lwdataprovider/handlers/SearchMessagesHandler.kt @@ -20,7 +20,9 @@ import com.exactpro.cradle.TimeRelation.AFTER import com.exactpro.cradle.messages.StoredMessageFilterBuilder import com.exactpro.cradle.messages.StoredMessageId import com.exactpro.th2.lwdataprovider.MessageRequestContext +import com.exactpro.th2.lwdataprovider.configuration.Configuration import com.exactpro.th2.lwdataprovider.db.CradleMessageExtractor +import com.exactpro.th2.lwdataprovider.entities.internal.ResponseFormat import com.exactpro.th2.lwdataprovider.entities.requests.GetMessageRequest import com.exactpro.th2.lwdataprovider.entities.requests.SseMessageSearchRequest import mu.KotlinLogging @@ -40,7 +42,7 @@ class SearchMessagesHandler( return cradleMsgExtractor.getStreams(); } - fun loadMessages(request: SseMessageSearchRequest, requestContext: MessageRequestContext) { + fun loadMessages(request: SseMessageSearchRequest, requestContext: MessageRequestContext, configuration: Configuration) { if (request.stream == null && request.resumeFromIdsList.isNullOrEmpty()) { return; @@ -72,10 +74,12 @@ class SearchMessagesHandler( }.build() - if (!request.onlyRaw) - cradleMsgExtractor.getMessages(filter, requestContext) - else - cradleMsgExtractor.getRawMessages(filter, requestContext) + val responseFormats = request.responseFormats ?: configuration.defaultResponseFormats + if (isOnlyRaw(request, responseFormats)) { + cradleMsgExtractor.getRawMessages(filter, requestContext, responseFormats) + } else { + cradleMsgExtractor.getMessages(filter, requestContext, responseFormats) + } limitReached = request.resultCountLimit != null && request.resultCountLimit <= requestContext.loadedMessages } } else { @@ -94,10 +98,12 @@ class SearchMessagesHandler( request.resultCountLimit?.let { limit(max(it - requestContext.loadedMessages, 0)) } }.build() - if (!request.onlyRaw) - cradleMsgExtractor.getMessages(filter, requestContext) - else - cradleMsgExtractor.getRawMessages(filter, requestContext) + val responseFormats = request.responseFormats ?: configuration.defaultResponseFormats + if (isOnlyRaw(request, responseFormats)) { + cradleMsgExtractor.getRawMessages(filter, requestContext, responseFormats) + } else { + cradleMsgExtractor.getMessages(filter, requestContext, responseFormats) + } limitReached = request.resultCountLimit != null && request.resultCountLimit <= requestContext.loadedMessages } @@ -132,6 +138,10 @@ class SearchMessagesHandler( } } } + + private fun isOnlyRaw(request: SseMessageSearchRequest, responseFormats: List) : Boolean{ + return request.onlyRaw || (responseFormats.contains(ResponseFormat.BASE_64) && responseFormats.size == 1) + } } diff --git a/src/main/kotlin/com/exactpro/th2/lwdataprovider/http/GetMessagesServlet.kt b/src/main/kotlin/com/exactpro/th2/lwdataprovider/http/GetMessagesServlet.kt index cfd55d05..c1c507d9 100644 --- a/src/main/kotlin/com/exactpro/th2/lwdataprovider/http/GetMessagesServlet.kt +++ b/src/main/kotlin/com/exactpro/th2/lwdataprovider/http/GetMessagesServlet.kt @@ -57,7 +57,7 @@ class GetMessagesServlet ( val reqContext = MessageSseRequestContext(sseResponse, queryParametersMap, maxMessagesPerRequest = configuration.bufferPerQuery) reqContext.startStep("messages_loading").use { keepAliveHandler.addKeepAliveData(reqContext) - searchMessagesHandler.loadMessages(request, reqContext) + searchMessagesHandler.loadMessages(request, reqContext,configuration) this.waitAndWrite(queue, resp, reqContext) keepAliveHandler.removeKeepAliveData(reqContext) diff --git a/src/main/kotlin/com/exactpro/th2/lwdataprovider/http/SseRequestContext.kt b/src/main/kotlin/com/exactpro/th2/lwdataprovider/http/SseRequestContext.kt index d7de678b..52d3e56d 100644 --- a/src/main/kotlin/com/exactpro/th2/lwdataprovider/http/SseRequestContext.kt +++ b/src/main/kotlin/com/exactpro/th2/lwdataprovider/http/SseRequestContext.kt @@ -20,6 +20,7 @@ import com.exactpro.cradle.messages.StoredMessage import com.exactpro.th2.common.grpc.Message import com.exactpro.th2.common.grpc.RawMessage import com.exactpro.th2.lwdataprovider.* +import com.exactpro.th2.lwdataprovider.entities.internal.ResponseFormat import com.exactpro.th2.lwdataprovider.entities.responses.Event import com.exactpro.th2.lwdataprovider.entities.responses.LastScannedObjectInfo import com.exactpro.th2.lwdataprovider.producers.MessageProducer53 @@ -39,8 +40,8 @@ class MessageSseRequestContext ( maxMessagesPerRequest = maxMessagesPerRequest) { - override fun createMessageDetails(id: String, time: Long, storedMessage: StoredMessage, onResponse: () -> Unit) : RequestedMessageDetails { - return SseRequestedMessageDetails(id, time, storedMessage, this, onResponse) + override fun createMessageDetails(id: String, time: Long, storedMessage: StoredMessage, responseFormats: List, onResponse: () -> Unit) : RequestedMessageDetails { + return SseRequestedMessageDetails(id, time, storedMessage, this, responseFormats, onResponse) } override fun addStreamInfo() { @@ -54,10 +55,11 @@ class SseRequestedMessageDetails( time: Long, storedMessage: StoredMessage, override val context: MessageSseRequestContext, + responseFormats: List, onResponse: () -> Unit, parsedMessage: List? = null, rawMessage: RawMessage? = null -) : RequestedMessageDetails(id, time, storedMessage, context, parsedMessage, rawMessage, onResponse) { +) : RequestedMessageDetails(id, time, storedMessage, context, responseFormats, parsedMessage, rawMessage, onResponse) { override fun responseMessageInternal() { val msg = MessageProducer53.createMessage(this, context.jsonFormatter) diff --git a/src/main/kotlin/com/exactpro/th2/lwdataprovider/producers/GrpcMessageProducer.kt b/src/main/kotlin/com/exactpro/th2/lwdataprovider/producers/GrpcMessageProducer.kt index f47e7d6c..985f01c5 100644 --- a/src/main/kotlin/com/exactpro/th2/lwdataprovider/producers/GrpcMessageProducer.kt +++ b/src/main/kotlin/com/exactpro/th2/lwdataprovider/producers/GrpcMessageProducer.kt @@ -23,6 +23,9 @@ import com.exactpro.th2.common.grpc.MessageID import com.exactpro.th2.dataprovider.grpc.MessageGroupItem import com.exactpro.th2.dataprovider.grpc.MessageGroupResponse import com.exactpro.th2.lwdataprovider.RequestedMessageDetails +import com.exactpro.th2.lwdataprovider.entities.internal.ResponseFormat.ALL +import com.exactpro.th2.lwdataprovider.entities.internal.ResponseFormat.BASE_64 +import com.exactpro.th2.lwdataprovider.entities.internal.ResponseFormat.PARSED import com.google.protobuf.Timestamp import java.time.Instant @@ -32,14 +35,19 @@ class GrpcMessageProducer { fun createMessage(rawMessage: RequestedMessageDetails): MessageGroupResponse { val storedMessage = rawMessage.storedMessage + val responseFormats = rawMessage.responseFormats return MessageGroupResponse.newBuilder().apply { messageId = convertMessageId(storedMessage.id) timestamp = convertTimestamp(storedMessage.timestamp) - bodyRaw = rawMessage.rawMessage?.body - rawMessage.parsedMessage?.forEach { - addMessageItem(MessageGroupItem.newBuilder().setMessage(it).build()) + if (responseFormats.isEmpty() || responseFormats.contains(ALL) || responseFormats.contains(BASE_64)) { + bodyRaw = rawMessage.rawMessage?.body + } + if (responseFormats.isEmpty() || responseFormats.contains(ALL) || responseFormats.contains(PARSED)) { + rawMessage.parsedMessage?.forEach { + addMessageItem(MessageGroupItem.newBuilder().setMessage(it).build()) + } } }.build() } diff --git a/src/main/kotlin/com/exactpro/th2/lwdataprovider/producers/MessageProducer53.kt b/src/main/kotlin/com/exactpro/th2/lwdataprovider/producers/MessageProducer53.kt index 5025b61a..36ab0770 100644 --- a/src/main/kotlin/com/exactpro/th2/lwdataprovider/producers/MessageProducer53.kt +++ b/src/main/kotlin/com/exactpro/th2/lwdataprovider/producers/MessageProducer53.kt @@ -21,23 +21,27 @@ import com.exactpro.th2.common.message.addFields import com.exactpro.th2.common.message.messageType import com.exactpro.th2.lwdataprovider.CustomJsonFormatter import com.exactpro.th2.lwdataprovider.RequestedMessageDetails +import com.exactpro.th2.lwdataprovider.entities.internal.ResponseFormat +import com.exactpro.th2.lwdataprovider.entities.internal.ResponseFormat.ALL +import com.exactpro.th2.lwdataprovider.entities.internal.ResponseFormat.BASE_64 +import com.exactpro.th2.lwdataprovider.entities.internal.ResponseFormat.PARSED import com.exactpro.th2.lwdataprovider.entities.responses.ProviderMessage53 import java.util.Base64 + @Deprecated("for 5.3 messages") class MessageProducer53 { companion object { fun createMessage(rawMessage: RequestedMessageDetails, formatter: CustomJsonFormatter): ProviderMessage53 { - val convertToOneMessage = rawMessage.parsedMessage?.let { convertToOneMessage(it) } + val responseFormats = rawMessage.responseFormats + val convertToOneMessage = if (isParsedFormat(responseFormats)) rawMessage.parsedMessage?.let { convertToOneMessage(it) } else null return ProviderMessage53( rawMessage.storedMessage, convertToOneMessage?.let { formatter.print(it) } ?: "{}", - convertToOneMessage, - rawMessage.rawMessage?.let { - Base64.getEncoder().encodeToString(it.body.toByteArray()) - }, + if (isParsedFormat(responseFormats)) convertToOneMessage else null, + if (isBase64Format(responseFormats)) rawMessage.rawMessage?.let { Base64.getEncoder().encodeToString(it.body.toByteArray()) } else null, if (convertToOneMessage != null) convertToOneMessage.metadata.messageType else "" ) } @@ -54,7 +58,7 @@ class MessageProducer53 { ) } - fun convertToOneMessage (messages: List): Message { + fun convertToOneMessage(messages: List): Message { return when (messages.size) { 1 -> messages[0] else -> messages[0].toBuilder().run { @@ -85,6 +89,13 @@ class MessageProducer53 { } } + private fun isParsedFormat(responseFormats: List) : Boolean{ + return responseFormats.isEmpty() || responseFormats.contains(ALL) || responseFormats.contains(PARSED) + } + + private fun isBase64Format(responseFormats: List) : Boolean{ + return responseFormats.isEmpty() || responseFormats.contains(ALL) || responseFormats.contains(BASE_64) + } } } \ No newline at end of file