@@ -44,7 +44,11 @@ import com.exactpro.th2.read.db.impl.grpc.util.toModel
4444import com.google.protobuf.Empty
4545import com.google.protobuf.Message
4646import io.grpc.Status
47+ import io.grpc.StatusRuntimeException
48+ import io.grpc.stub.ServerCallStreamObserver
4749import io.grpc.stub.StreamObserver
50+ import kotlinx.coroutines.channels.Channel
51+ import kotlinx.coroutines.channels.ClosedReceiveChannelException
4852import mu.KotlinLogging
4953import java.time.Instant
5054import java.util.concurrent.TimeUnit
@@ -58,8 +62,8 @@ class DataBaseReaderGrpcServer(
5862 private val onEvent : OnEvent ,
5963) : ReadDbGrpc.ReadDbImplBase() {
6064 override fun execute (request : QueryRequest , responseObserver : StreamObserver <QueryResponse >) {
61- execute(" execute" , request, responseObserver) { event, parentEventId, _ ->
62- GrpcExecuteListener (responseObserver, event) {
65+ execute(" execute" , request, responseObserver) { event, parentEventId, executionId ->
66+ GrpcExecuteListener (responseObserver, executionId, event) {
6367 onEvent.accept(it, parentEventId)
6468 }
6569 }
@@ -155,8 +159,8 @@ class DataBaseReaderGrpcServer(
155159 event.bodyData(executeQueryRequest.toBody(executionId))
156160 app.executeQuery(
157161 executeQueryRequest,
158- createListener(event, parentEventId, executionId),
159- ) { row ->
162+ createListener(event, parentEventId, executionId)
163+ ) { row ->
160164 row.copy(associatedMessageType = associatedMessageType, executionId = executionId)
161165 }
162166 } catch (ex: Exception ) {
@@ -168,31 +172,64 @@ class DataBaseReaderGrpcServer(
168172 }
169173
170174 private class GrpcExecuteListener (
171- private val observer : StreamObserver <QueryResponse >,
175+ streamObserver : StreamObserver <QueryResponse >,
176+ private val executionId : Long ,
172177 private val event : Event ,
173178 private val onEvent : (Event ) -> Unit ,
174179 ) : ResultListener {
175- override fun onRow (sourceId : DataSourceId , row : TableRow ) {
176- requireNotNull(row.executionId) {
177- " 'Execution id' is null for row: $row "
180+ private val channel = Channel <Unit >(1 )
181+ private val observer = streamObserver as ServerCallStreamObserver <QueryResponse >
182+
183+ init {
184+ observer.setOnReadyHandler {
185+ channel.trySend(Unit )
186+ }
187+ observer.setOnCancelHandler {
188+ LOGGER .warn { " gRPC request is canceled for '$executionId ' execution" }
189+ channel.close()
190+ }
191+ }
192+
193+ override suspend fun onRow (sourceId : DataSourceId , row : TableRow ) {
194+ check(executionId == row.executionId) {
195+ " 'Execution id' isn't equal to '$executionId ', row: $row "
196+ }
197+ try {
198+ if (observer.isCancelled) {
199+ return
200+ }
201+ if (! observer.isReady) {
202+ channel.receive()
203+ }
204+
205+ observer.onNext(
206+ QueryResponse .newBuilder()
207+ .putRows(row)
208+ .setExecutionId(row.executionId)
209+ .build()
210+ )
211+ } catch (e: RuntimeException ) {
212+ if (e is ClosedReceiveChannelException || e is StatusRuntimeException ) {
213+ LOGGER .error(e) { " Couldn't send next gRPC message by gRPC connection problem for '$executionId ' execution" }
214+ } else {
215+ throw e
216+ }
178217 }
179- observer.onNext(
180- QueryResponse .newBuilder()
181- .putRows(row)
182- .setExecutionId(row.executionId)
183- .build()
184- )
185218 }
186219
187220 override fun onError (error : Throwable ) {
188- observer.onError(error)
221+ if (! observer.isCancelled) {
222+ observer.onError(error)
223+ }
189224 event.endTimestamp()
190225 .exception(error, true )
191226 .also (onEvent)
192227 }
193228
194229 override fun onComplete () {
195- observer.onCompleted()
230+ if (! observer.isCancelled) {
231+ observer.onCompleted()
232+ }
196233 event.endTimestamp()
197234 .also (onEvent)
198235 }
@@ -206,7 +243,7 @@ class DataBaseReaderGrpcServer(
206243 ) : ResultListener {
207244 private val counter = AtomicLong ()
208245
209- override fun onRow (sourceId : DataSourceId , row : TableRow ) {
246+ override suspend fun onRow (sourceId : DataSourceId , row : TableRow ) {
210247 check(report.executionId == row.executionId) {
211248 " 'Execution id' isn't equal to '${report.executionId} ', row: $row "
212249 }
0 commit comments