Skip to content

Commit a602432

Browse files
committed
issue #1255 Add message length into execution plan extra info
1 parent e33da3f commit a602432

File tree

6 files changed

+156
-26
lines changed

6 files changed

+156
-26
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
* Copyright 2020 ABSA Group Limited
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package za.co.absa.spline.producer.rest.controller
18+
19+
import org.aspectj.lang.ProceedingJoinPoint
20+
import org.aspectj.lang.annotation.{Around, Aspect, Pointcut}
21+
import org.slf4s.Logging
22+
import org.springframework.stereotype.Component
23+
import org.springframework.web.context.request.{RequestContextHolder, ServletRequestAttributes}
24+
import za.co.absa.spline.producer.model.{ExecutionPlan, v1_1}
25+
import za.co.absa.spline.producer.rest.filter.MessageLengthCapturingFilter
26+
import za.co.absa.spline.producer.rest.filter.MessageLengthCapturingFilter.ReadOnlyCounter
27+
28+
@Aspect
29+
@Component
30+
class ExecutionPlansControllerMessageLengthCapturingAspect extends Logging {
31+
32+
@Pointcut("execution(public * za.co.absa.spline.producer.rest.controller.*Controller.*(..))")
33+
def publicControllerMethods(): Unit = {}
34+
35+
@Pointcut("execution(* *(.., za.co.absa.spline.producer.model.ExecutionPlan, ..))")
36+
def acceptingExecutionPlanV1(): Unit = {}
37+
38+
@Pointcut("execution(* *(.., za.co.absa.spline.producer.model.v1_1.ExecutionPlan, ..))")
39+
def acceptingExecutionPlan(): Unit = {}
40+
41+
@Around("publicControllerMethods() && (acceptingExecutionPlan() || acceptingExecutionPlanV1())")
42+
def aroundAdvice(jp: ProceedingJoinPoint): AnyRef = {
43+
val origArgs = jp.getArgs
44+
val fixedArgs = origArgs.map {
45+
case ep: ExecutionPlan =>
46+
ep.copy(extraInfo = withMessageLengthInfo(ep.extraInfo))
47+
case ep: v1_1.ExecutionPlan =>
48+
ep.copy(extraInfo = withMessageLengthInfo(ep.extraInfo))
49+
case x => x
50+
}
51+
jp.proceed(fixedArgs)
52+
}
53+
54+
private def withMessageLengthInfo(m: Map[String, Any]): Map[String, Any] = {
55+
val req = RequestContextHolder.getRequestAttributes.asInstanceOf[ServletRequestAttributes].getRequest
56+
val counters = MessageLengthCapturingFilter.getCounters(req).toArray
57+
m + ("__spline_msg_size" -> counters.map(_.count))
58+
}
59+
}
60+
61+

rest-gateway/src/main/scala/za/co/absa/spline/gateway/rest/filter/GzipFilter.scala renamed to producer-rest-core/src/main/scala/za/co/absa/spline/producer/rest/filter/GzipFilter.scala

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
/*
2-
* Copyright 2020 ABSA Group Limited
3-
*
2+
* Copyright 2023 ABSA Group Limited
43
* Licensed under the Apache License, Version 2.0 (the "License");
54
* you may not use this file except in compliance with the License.
65
* You may obtain a copy of the License at
@@ -14,13 +13,15 @@
1413
* limitations under the License.
1514
*/
1615

17-
package za.co.absa.spline.gateway.rest.filter
16+
package za.co.absa.spline.producer.rest.filter
1817

19-
import javax.servlet._
20-
import javax.servlet.http.HttpServletRequest
2118
import org.springframework.http.HttpHeaders
2219
import za.co.absa.spline.producer.rest.HttpConstants.Encoding
2320

21+
import java.util.zip.GZIPInputStream
22+
import javax.servlet._
23+
import javax.servlet.http.HttpServletRequest
24+
2425
/**
2526
* Filter for decompressing gziped Http requests
2627
*
@@ -30,7 +31,7 @@ class GzipFilter extends Filter {
3031
override def doFilter(request: ServletRequest, response: ServletResponse, chain: FilterChain): Unit = {
3132

3233
val newRequest = request match {
33-
case r: HttpServletRequest if isCompressed(r) => new GZIPRequestWrapper(r)
34+
case r: HttpServletRequest if isCompressed(r) => new HttpRequestWrapper(r, new GZIPInputStream(r.getInputStream))
3435
case _ => request
3536
}
3637

rest-gateway/src/main/scala/za/co/absa/spline/gateway/rest/filter/GZIPRequestWrapper.scala renamed to producer-rest-core/src/main/scala/za/co/absa/spline/producer/rest/filter/HttpRequestWrapper.scala

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
/*
2-
* Copyright 2020 ABSA Group Limited
3-
*
2+
* Copyright 2023 ABSA Group Limited
43
* Licensed under the Apache License, Version 2.0 (the "License");
54
* you may not use this file except in compliance with the License.
65
* You may obtain a copy of the License at
@@ -14,20 +13,19 @@
1413
* limitations under the License.
1514
*/
1615

17-
package za.co.absa.spline.gateway.rest.filter
18-
19-
import java.io.{BufferedReader, InputStreamReader}
16+
package za.co.absa.spline.producer.rest.filter
2017

18+
import java.io.{BufferedReader, InputStream, InputStreamReader}
2119
import javax.servlet.ServletInputStream
2220
import javax.servlet.http.{HttpServletRequest, HttpServletRequestWrapper}
2321

2422

25-
final class GZIPRequestWrapper(val request: HttpServletRequest) extends HttpServletRequestWrapper(request) {
23+
final class HttpRequestWrapper(request: HttpServletRequest, stream: InputStream)
24+
extends HttpServletRequestWrapper(request) {
2625

27-
val stream = new GZIPServletInputStream(request.getInputStream)
28-
val reader = new BufferedReader(new InputStreamReader(stream))
26+
private val reader = new BufferedReader(new InputStreamReader(stream))
2927

28+
override def getInputStream: ServletInputStream = new ServletInputStreamAdapter(stream)
3029

31-
override def getInputStream: ServletInputStream = stream
3230
override def getReader: BufferedReader = reader
33-
}
31+
}
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/*
2+
* Copyright 2020 ABSA Group Limited
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package za.co.absa.spline.producer.rest.filter
18+
19+
import za.co.absa.spline.producer.rest.filter.MessageLengthCapturingFilter.{LengthCountingInputStreamWrapper, getCounters}
20+
21+
import java.io.InputStream
22+
import javax.servlet._
23+
import javax.servlet.http.HttpServletRequest
24+
import scala.collection.mutable
25+
26+
class MessageLengthCapturingFilter extends Filter {
27+
override def doFilter(request: ServletRequest, response: ServletResponse, chain: FilterChain): Unit = {
28+
val newRequest = request match {
29+
case r: HttpServletRequest =>
30+
val inputStreamWrapper = new LengthCountingInputStreamWrapper(r.getInputStream)
31+
getCounters(r) += inputStreamWrapper.lengthCounter
32+
new HttpRequestWrapper(r, inputStreamWrapper)
33+
case _ => request
34+
}
35+
chain.doFilter(newRequest, response)
36+
}
37+
38+
override def init(config: FilterConfig): Unit = {
39+
// nothing to do here
40+
}
41+
42+
override def destroy(): Unit = {
43+
// nothing to do here
44+
}
45+
}
46+
47+
object MessageLengthCapturingFilter {
48+
private val CountersRequestAttributeKey: String = s"${classOf[MessageLengthCapturingFilter].getName}.counters"
49+
50+
def getCounters(r: ServletRequest): mutable.Buffer[ReadOnlyCounter] = {
51+
val countersAttrOrNull = r.getAttribute(CountersRequestAttributeKey).asInstanceOf[mutable.Buffer[ReadOnlyCounter]]
52+
val counters = Option(countersAttrOrNull).getOrElse(mutable.Buffer.empty[ReadOnlyCounter])
53+
if (counters.isEmpty) r.setAttribute(CountersRequestAttributeKey, counters)
54+
counters
55+
}
56+
57+
trait ReadOnlyCounter {
58+
def count: Int
59+
}
60+
61+
class LengthCountingInputStreamWrapper(r: InputStream) extends InputStream {
62+
private var _bytesReadCount: Int = 0
63+
64+
val lengthCounter: ReadOnlyCounter = new ReadOnlyCounter {
65+
override def count: Int = _bytesReadCount
66+
}
67+
68+
override def read(): Int = {
69+
_bytesReadCount += 1
70+
r.read()
71+
}
72+
}
73+
}

rest-gateway/src/main/scala/za/co/absa/spline/gateway/rest/filter/GZIPServletInputStream.scala renamed to producer-rest-core/src/main/scala/za/co/absa/spline/producer/rest/filter/ServletInputStreamAdapter.scala

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
/*
2-
* Copyright 2020 ABSA Group Limited
3-
*
2+
* Copyright 2023 ABSA Group Limited
43
* Licensed under the Apache License, Version 2.0 (the "License");
54
* you may not use this file except in compliance with the License.
65
* You may obtain a copy of the License at
@@ -14,16 +13,12 @@
1413
* limitations under the License.
1514
*/
1615

17-
package za.co.absa.spline.gateway.rest.filter
18-
19-
import java.util.zip.GZIPInputStream
16+
package za.co.absa.spline.producer.rest.filter
2017

18+
import java.io.InputStream
2119
import javax.servlet.{ReadListener, ServletInputStream}
2220

23-
final class GZIPServletInputStream(val inputStream: ServletInputStream) extends ServletInputStream {
24-
25-
val gzipStream = new GZIPInputStream(inputStream)
26-
21+
final class ServletInputStreamAdapter(val gzipStream: InputStream) extends ServletInputStream {
2722

2823
override def read: Int = gzipStream.read
2924

rest-gateway/src/main/scala/za/co/absa/spline/gateway/rest/AppInitializer.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,9 @@ import za.co.absa.spline.common.webmvc.cors.PermissiveCorsFilter
2424
import za.co.absa.spline.common.webmvc.diagnostics.{DiagnosticsRESTConfig, RootWebContextConfig}
2525
import za.co.absa.spline.consumer.rest.ConsumerRESTConfig
2626
import za.co.absa.spline.consumer.service.ConsumerServicesConfig
27-
import za.co.absa.spline.gateway.rest.filter.GzipFilter
2827
import za.co.absa.spline.persistence.ArangoRepoConfig
2928
import za.co.absa.spline.producer.rest.ProducerRESTConfig
29+
import za.co.absa.spline.producer.rest.filter.{GzipFilter, MessageLengthCapturingFilter}
3030
import za.co.absa.spline.producer.service.ProducerServicesConfig
3131

3232
import javax.servlet.ServletContext
@@ -43,7 +43,9 @@ object AppInitializer extends WebApplicationInitializer {
4343
}))
4444

4545
registerFilter[PermissiveCorsFilter](container, "CORSFilter", "/*")
46+
registerFilter[MessageLengthCapturingFilter](container, "MessageSizeCapturingFilter_before_gzip", "/*")
4647
registerFilter[GzipFilter](container, "GzipFilter", "/*")
48+
registerFilter[MessageLengthCapturingFilter](container, "MessageSizeCapturingFilter_after_gzip", "/*")
4749

4850
registerRESTDispatcher[ConsumerRESTConfig](container, "consumer")
4951
registerRESTDispatcher[ProducerRESTConfig](container, "producer")

0 commit comments

Comments
 (0)