@@ -26,7 +26,6 @@ import com.exactpro.th2.common.schema.message.MessageRouter
2626import com.exactpro.th2.common.schema.message.QueueAttribute.RAW
2727import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.GroupBatch
2828import com.exactpro.th2.common.utils.event.EventBatcher
29- import com.exactpro.th2.common.utils.event.storeEvent
3029import com.exactpro.th2.common.utils.event.transport.toProto
3130import com.exactpro.th2.common.utils.message.RAW_GROUP_SELECTOR
3231import com.exactpro.th2.common.utils.message.RawMessageBatcher
@@ -42,6 +41,8 @@ import com.exactpro.th2.http.client.api.IStateManager
4241import com.exactpro.th2.http.client.api.IStateManager.StateManagerContext
4342import com.exactpro.th2.http.client.api.impl.BasicRequestHandler
4443import com.exactpro.th2.http.client.api.impl.BasicStateManager
44+ import com.exactpro.th2.http.client.util.publishSentEvents
45+ import com.exactpro.th2.http.client.util.storeEvent
4546import com.exactpro.th2.http.client.util.toPrettyString
4647import com.exactpro.th2.http.client.util.toProtoMessage
4748import com.exactpro.th2.http.client.util.toTransportMessage
@@ -91,29 +92,6 @@ class Application(
9192 onBatch = eventRouter::send
9293 ).also { registerResource(" event batcher" , it::close) }
9394
94- val onError: (Throwable ) -> Unit = {
95- eventBatcher.storeEvent(rootEventId, " Batching problem: ${it.message} " , " Message batching problem" , it)
96- }
97-
98- lateinit var transportMB: MessageBatcher
99- lateinit var protoMB: RawMessageBatcher
100- if (useTransport) {
101- transportMB =
102- MessageBatcher (
103- maxBatchSize,
104- maxFlushTime,
105- book,
106- GROUP_SELECTOR ,
107- executor,
108- onError,
109- transportMR::send
110- ).also { registerResource(" transport message batcher" , it::close) }
111- } else {
112- protoMB = RawMessageBatcher (maxBatchSize, maxFlushTime, RAW_GROUP_SELECTOR , executor, onError) {
113- protoMR.send(it, RAW .value)
114- }.also { registerResource(" proto message batcher" , it::close) }
115- }
116-
11795 val aliasToService = mutableMapOf<String , Holder >()
11896 sessions.forEach { sessionAlias, sessionSettings ->
11997 val stateManager = load<IStateManager >(BasicStateManager ::class .java)
@@ -138,18 +116,30 @@ class Application(
138116 val onRequest: (RawHttpRequest ) -> Unit
139117 val onResponse: (RawHttpRequest , RawHttpResponse <* >) -> Unit
140118
119+ val onError: (Throwable ) -> Unit = {
120+ eventBatcher.storeEvent(clientEventId, " Batching problem: ${it.message} " , " Message batching problem" , it)
121+ }
122+
141123 if (useTransport) {
124+ val transportMB = MessageBatcher (
125+ maxBatchSize,
126+ maxFlushTime,
127+ book,
128+ GROUP_SELECTOR ,
129+ executor,
130+ onError
131+ ) { batch ->
132+ transportMR.send(batch)
133+ if (! sessionSettings.publishSentEvents) return @MessageBatcher
134+ eventBatcher.publishSentEvents(clientEventId, batch)
135+ }.also { registerResource(" transport message batcher $sessionAlias " , it::close) }
136+
142137 onRequest = { request: RawHttpRequest ->
143- val rawMessage = outgoingLock.withLock {
138+ outgoingLock.withLock {
144139 request.toTransportMessage(sessionAlias, outgoingSequence()).also {
145140 transportMB.onMessage(it, sessionGroup)
146141 }
147142 }
148- eventBatcher.storeEvent(
149- rawMessage.eventId?.toProto() ? : rootEventId,
150- " Sent HTTP request" ,
151- " Send message"
152- )
153143 }
154144 onResponse = { request: RawHttpRequest , response: RawHttpResponse <* > ->
155145 incomingLock.withLock {
@@ -161,21 +151,27 @@ class Application(
161151 stateManager.onResponse(response)
162152 }
163153 } else {
154+ val protoMB = RawMessageBatcher (
155+ maxBatchSize,
156+ maxFlushTime,
157+ RAW_GROUP_SELECTOR ,
158+ executor,
159+ onError
160+ ) { batch ->
161+ protoMR.send(batch, RAW .value)
162+ if (! sessionSettings.publishSentEvents) return @RawMessageBatcher
163+ eventBatcher.publishSentEvents(clientEventId, batch)
164+ }.also { registerResource(" proto message batcher $sessionAlias " , it::close) }
164165 val connectionId = com.exactpro.th2.common.grpc.ConnectionID .newBuilder()
165166 .setSessionAlias(sessionAlias)
166167 .setSessionGroup(sessionGroup)
167168 .build()
168169
169170 onRequest = { request: RawHttpRequest ->
170- val rawMessage = outgoingLock.withLock {
171+ outgoingLock.withLock {
171172 request.toProtoMessage(connectionId, outgoingSequence())
172173 .also (protoMB::onMessage)
173174 }
174- eventBatcher.storeEvent(
175- if (rawMessage.hasParentEventId()) rawMessage.parentEventId else rootEventId,
176- " Sent HTTP request" ,
177- " Send message"
178- )
179175 }
180176 onResponse = { request: RawHttpRequest , response: RawHttpResponse <* > ->
181177 incomingLock.withLock {
0 commit comments