Skip to content
Draft
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Lightweight data provider (2.11.0)
# Lightweight data provider (2.11.1)

# Overview
This component serves as a data provider for [th2-data-services](https://github.com/th2-net/th2-data-services). It will connect to the cassandra database via [cradle api](https://github.com/th2-net/cradleapi) and expose the data stored in there as REST resources.
Expand Down Expand Up @@ -224,6 +224,11 @@ spec:

# Release notes:

## 2.11.1

+ Support `keepOpen` option for `searchMessageGroups` gRPC request
+ th2 gradle plugin `0.1.3`

## 2.11.0

+ Updated:
Expand Down
2 changes: 2 additions & 0 deletions app/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ dependencies {
testImplementation("org.testcontainers:cassandra")

testImplementation("com.datastax.oss:java-driver-core")
testImplementation("io.grpc:grpc-testing")
testImplementation("io.grpc:grpc-inprocess")
}

application {
Expand Down
2 changes: 1 addition & 1 deletion app/gradle.properties
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
kotlin.code.style=official
release_version=2.11.0
release_version=2.11.1
description='th2 Lightweight data provider component'
kapt.include.compile.classpath=false
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2022-2023 Exactpro (Exactpro Systems Limited)
* Copyright 2022-2024 Exactpro (Exactpro Systems Limited)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -60,7 +60,7 @@ data class MessagesGroupRequest(
},
if (hasStartTimestamp()) startTimestamp.toInstant() else error("missing start timestamp"),
if (hasEndTimestamp()) endTimestamp.toInstant() else error("missing end timestamp"),
false, // FIXME: update gRPC
request.keepOpen,
if (hasBookId()) bookId.toCradle() else error("parameter '$BOOK_ID_PARAM' is required"),
request.responseFormatsList.takeIf { it.isNotEmpty() }
?.mapTo(hashSetOf(), ResponseFormat.Companion::fromString)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import java.util.concurrent.BlockingQueue
import java.util.concurrent.Future
import java.util.concurrent.ScheduledExecutorService
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.locks.ReentrantLock
import kotlin.concurrent.withLock

Expand All @@ -42,9 +43,9 @@ class GrpcDataProviderBackPressure(
dataMeasurement: DataMeasurement,
private val scheduler: ScheduledExecutorService,
) : GrpcDataProviderImpl(configuration, searchMessagesHandler, searchEventsHandler, generalCradleHandler, dataMeasurement) {

companion object {
private val logger = KotlinLogging.logger { }
private const val EVENT_POLLING_TIMEOUT = 100L
}

override fun <T> processResponse(
Expand All @@ -57,50 +58,90 @@ class GrpcDataProviderBackPressure(
val servCallObs = responseObserver as ServerCallStreamObserver<T>
val lock = ReentrantLock()
var future: Future<*>? = null
val isCancelled = AtomicBoolean(false)

fun cleanBuffer() {
while (buffer.poll() != null) {
buffer.clear()
}
}

fun cancel() {
handler.cancel()
onClose(handler)
cleanBuffer()
onFinished()
if (isCancelled.compareAndSet(false, true)) {
handler.cancel()
onClose(handler)
cleanBuffer()
onFinished()
logger.info { "Stream cancelled and cleaned up" }
}
}

servCallObs.setOnCancelHandler {
logger.warn { "Execution cancelled" }
lock.withLock {
future?.cancel(true)
future = null
}
cancel()
}

servCallObs.setOnReadyHandler {
if (!handler.isAlive)
if (!handler.isAlive || isCancelled.get()) {
logger.debug { "Handler no longer alive or already cancelled, skipping processing" }
return@setOnReadyHandler
}

lock.withLock {
future?.cancel(false)
future = null
}

var inProcess = true
while (servCallObs.isReady && inProcess) {
while (servCallObs.isReady && inProcess && !isCancelled.get()) {
if (servCallObs.isCancelled) {
logger.warn { "Request is canceled during processing" }
handler.cancel()
cancel()
return@setOnReadyHandler
}
val event = buffer.take()
if (event.close) {
servCallObs.onCompleted()
inProcess = false
onFinished()
onClose(handler)
logger.info { "Executing finished successfully" }
} else if (event.error != null) {
servCallObs.onError(event.error)
inProcess = false
onFinished()
handler.complete()
logger.warn(event.error) { "Executing finished with error" }
} else {
converter.invoke(event)?.let { servCallObs.onNext(it) }

try {
// We need to poll because if we will use take and keepOpen option it is possible that we will have to wait here indefinitely
val event = buffer.poll(EVENT_POLLING_TIMEOUT, TimeUnit.MILLISECONDS) ?: continue
when {
event.close -> {
servCallObs.onCompleted()
inProcess = false
onFinished()
onClose(handler)
logger.info { "Executing finished successfully" }
}
event.error != null -> {
servCallObs.onError(event.error)
inProcess = false
onFinished()
handler.complete()
logger.warn(event.error) { "Executing finished with error" }
}
else -> {
converter.invoke(event)?.let { servCallObs.onNext(it) }
}
}
} catch (e: InterruptedException) {
logger.warn(e) { "Processing interrupted" }
cancel()
return@setOnReadyHandler
} catch (e: Exception) {
logger.error(e) { "Error processing event" }
servCallObs.onError(Status.INTERNAL
.withDescription("Internal error during processing")
.withCause(e)
.asRuntimeException())
cancel()
return@setOnReadyHandler
}
}
if (inProcess) {

if (inProcess && !isCancelled.get()) {
lock.withLock {
future = scheduler.schedule({
runCatching {
Expand All @@ -118,20 +159,10 @@ class GrpcDataProviderBackPressure(
}, configuration.grpcBackPressureReadinessTimeoutMls, TimeUnit.MILLISECONDS)
}
}

if (!servCallObs.isReady) {
logger.trace { "Suspending processing because the opposite side is not ready to receive more messages. In queue: ${buffer.size}" }
}
}

servCallObs.setOnCancelHandler {
logger.warn{ "Execution cancelled" }
lock.withLock {
future?.cancel(true)
future = null
}
cancel()
}


}
}
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ open class GrpcDataProviderImpl(
}
}

protected open fun <T> processResponse(
open fun <T> processResponse(
responseObserver: StreamObserver<T>,
buffer: BlockingQueue<GrpcEvent>,
handler: CancelableResponseHandler,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* Copyright 2024 Exactpro (Exactpro Systems Limited)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.exactpro.th2.lwdataprovider.grpc

import com.exactpro.cradle.messages.StoredMessage
import com.exactpro.th2.common.message.toTimestamp
import com.exactpro.th2.dataprovider.lw.grpc.MessageGroupsSearchRequest
import com.exactpro.th2.lwdataprovider.entities.internal.ResponseFormat
import com.exactpro.th2.lwdataprovider.util.ImmutableListCradleResult
import com.exactpro.th2.lwdataprovider.util.createBatches
import com.exactpro.th2.lwdataprovider.util.validateMessagesOrderGrpc
import org.junit.jupiter.api.Assertions.assertEquals
import org.mockito.kotlin.argThat
import org.mockito.kotlin.whenever
import java.time.Instant
import java.time.temporal.ChronoUnit

abstract class GRPCBaseTests : GrpcImplTestBase() {

protected fun stopsPullingDataWhenOutOfRangeExists(offsetNewData: Boolean) {
val startTimestamp = Instant.now()
val firstEndTimestamp = startTimestamp.plus(10L, ChronoUnit.MINUTES)
val endTimestamp = firstEndTimestamp.plus(10L, ChronoUnit.MINUTES)
val aliasesCount = 5
val increase = 5L
val firstBatchMessagesCount = (firstEndTimestamp.epochSecond - startTimestamp.epochSecond) / increase
val firstMessagesPerAlias = firstBatchMessagesCount / aliasesCount

val lastBatchMessagesCount = (endTimestamp.epochSecond - firstEndTimestamp.epochSecond) / increase
val lastMessagesPerAlias = lastBatchMessagesCount / aliasesCount

val firstBatches = createBatches(
firstMessagesPerAlias,
aliasesCount,
overlapCount = 0,
increase,
startTimestamp,
firstEndTimestamp,
)
val lastBatches = createBatches(
lastMessagesPerAlias,
aliasesCount,
overlapCount = 0,
increase,
firstEndTimestamp,
endTimestamp,
aliasIndexOffset = if (offsetNewData) aliasesCount else 0
)
val outsideBatches = createBatches(
10,
1,
0,
increase,
endTimestamp.plusNanos(1),
endTimestamp.plus(5, ChronoUnit.MINUTES),
)
val group = "test"
val firstRequestMessagesCount = firstBatches.sumOf { it.messageCount }
val secondRequestMessagesCount = lastBatches.sumOf { it.messageCount }
val messagesCount = firstRequestMessagesCount + secondRequestMessagesCount

whenever(storage.getGroupedMessageBatches(argThat {
groupName == group && from.value == startTimestamp && to.value == endTimestamp
})).thenReturn(ImmutableListCradleResult(firstBatches))
whenever(storage.getGroupedMessageBatches(argThat {
groupName == group && from.value == firstBatches.maxOf { it.lastTimestamp } && to.value == endTimestamp
})).thenReturn(ImmutableListCradleResult(lastBatches))
whenever(storage.getGroupedMessageBatches(argThat {
limit == 1 && groupName == group
})).thenReturn(ImmutableListCradleResult(outsideBatches))

val request = MessageGroupsSearchRequest.newBuilder().apply {
addMessageGroupBuilder().setName("test")
addResponseFormats(ResponseFormat.BASE_64.name)
bookIdBuilder.setName("test")
this.startTimestamp = startTimestamp.toTimestamp()
this.endTimestamp = endTimestamp.toTimestamp()
this.keepOpen = true
}.build()

val grpcDataProvider = createGrpcDataProvider()
GrpcTestHolder(grpcDataProvider).use { (stub) ->
val responses = stub.searchMessageGroups(request).asSequence().toList()

assertEquals(messagesCount + 1, responses.size) {
val missing: List<StoredMessage> =
(firstBatches.asSequence() + lastBatches.asSequence()).flatMap { it.messages }.filter { stored ->
responses.none {
val messageId = it.message.messageId
messageId.connectionId.sessionAlias == stored.sessionAlias
&& messageId.sequence == stored.sequence
&& messageId.direction.toCradleDirection() == stored.direction
}
}.toList()
"Missing ${missing.size} message(s): $missing"
}

validateMessagesOrderGrpc(responses, messagesCount)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright 2024 Exactpro (Exactpro Systems Limited)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.exactpro.th2.lwdataprovider.grpc

import com.exactpro.th2.dataprovider.lw.grpc.DataProviderGrpc
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import java.util.concurrent.Executors

class GrpcDataProviderBackPressureTest : GRPCBaseTests() {

val backPressureExecutor = Executors.newSingleThreadScheduledExecutor()

@ParameterizedTest
@ValueSource(booleans = [true, false])
fun `stops pulling if data out of range exist`(offsetNewData: Boolean) {
this.stopsPullingDataWhenOutOfRangeExists(offsetNewData)
}

override fun createGrpcDataProvider(): DataProviderGrpc.DataProviderImplBase = GrpcDataProviderBackPressure(
configuration,
searchHandler,
searchEventsHandler,
generalCradleHandler,
measurement,
backPressureExecutor
)
}
Loading