Skip to content

Commit a1a5f88

Browse files
authored
Merge pull request #81 from ExpediaDotCom/spanSizeValidation
Updating spanSize validation logic to skip certain tags and services
2 parents 51b1a19 + e9ebd26 commit a1a5f88

File tree

18 files changed

+184
-101
lines changed

18 files changed

+184
-101
lines changed

commons/src/main/scala/com/expedia/www/haystack/collector/commons/ProtoSpanExtractor.scala

Lines changed: 35 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,14 @@ import java.util.concurrent.ConcurrentHashMap
2424

2525
import com.expedia.open.tracing.{Span, Tag}
2626
import com.expedia.www.haystack.collector.commons.ProtoSpanExtractor._
27-
import com.expedia.www.haystack.collector.commons.config.{ExtractorConfiguration, Format, SpanMaxSize}
27+
import com.expedia.www.haystack.collector.commons.config.{ExtractorConfiguration, Format}
2828
import com.expedia.www.haystack.collector.commons.record.{KeyValueExtractor, KeyValuePair}
2929
import com.expedia.www.haystack.span.decorators.SpanDecorator
3030
import com.google.protobuf.util.JsonFormat
3131
import org.slf4j.Logger
3232

33-
import scala.util.{Failure, Success, Try}
3433
import scala.collection.JavaConverters._
34+
import scala.util.{Failure, Success, Try}
3535

3636
object ProtoSpanExtractor {
3737
private val DaysInYear1970 = 365
@@ -45,10 +45,9 @@ object ProtoSpanExtractor {
4545
val TraceIdIsRequired = "Trace ID is required: serviceName=[%s] operationName=[%s]"
4646
val StartTimeIsInvalid = "Start time [%d] is invalid: serviceName=[%s] operationName=[%s]"
4747
val DurationIsInvalid = "Duration [%d] is invalid: serviceName=[%s] operationName=[%s]"
48-
val SpanSizeLimitExceeded = "Span Size Limit Exceeded: serviceName=[%s] operationName=[%s] traceId=[%s] spanSize=[%d]"
48+
val SpanSizeLimitExceeded = "Span Size Limit Exceeded: serviceName=[%s] operationName=[%s] traceId=[%s] spanSize=[%d] probableTags=[%s]"
4949

5050
val ServiceNameVsTtlAndOperationNames = new ConcurrentHashMap[String, TtlAndOperationNames]
51-
val MaximumOperationNameCount = 1000
5251
val OperationNameCountExceededMeterName = "operation.name.count.exceeded"
5352
}
5453

@@ -60,7 +59,7 @@ class ProtoSpanExtractor(extractorConfiguration: ExtractorConfiguration,
6059

6160
private val invalidSpanMeter = metricRegistry.meter("invalid.span")
6261
private val validSpanMeter = metricRegistry.meter("valid.span")
63-
private val spanSizeLimitExceeded = metricRegistry.meter("sizeLimitExceeded.span")
62+
private val spanSizeLimitExceededMeter = metricRegistry.meter("sizeLimitExceeded.span")
6463

6564
override def configure(): Unit = ()
6665

@@ -89,12 +88,12 @@ class ProtoSpanExtractor(extractorConfiguration: ExtractorConfiguration,
8988
}
9089

9190
def validateSpanSize(span: Span): Try[Span] = {
92-
if (extractorConfiguration.spanValidation.spanMaxSize.enable)
93-
{
94-
val spanSize = span.toByteArray.length
95-
val maxSizeLimit = extractorConfiguration.spanValidation.spanMaxSize.maxSizeLimit
96-
validate(span, spanSize, SpanSizeLimitExceeded, maxSizeLimit)
97-
}
91+
if (extractorConfiguration.spanValidation.spanMaxSize.enable
92+
&& !extractorConfiguration.spanValidation.spanMaxSize.skipServices.contains(span.getServiceName.toLowerCase)) {
93+
val spanSize = span.toByteArray.length
94+
val maxSizeLimit = extractorConfiguration.spanValidation.spanMaxSize.maxSizeLimit
95+
validate(span, spanSize, SpanSizeLimitExceeded, maxSizeLimit)
96+
}
9897
else
9998
Success(span)
10099
}
@@ -141,24 +140,39 @@ class ProtoSpanExtractor(extractorConfiguration: ExtractorConfiguration,
141140
highestValidValue: Int): Try[Span] = {
142141

143142
if (valueToValidate > highestValidValue) {
144-
spanSizeLimitExceeded.mark()
145-
LOGGER.debug(msg.format(span.getServiceName, span.getOperationName, span.getTraceId, valueToValidate))
146-
Success(truncateTags(span))
143+
spanSizeLimitExceededMeter.mark()
144+
LOGGER.debug(msg.format(span.getServiceName, span.getOperationName, span.getTraceId, valueToValidate, getProbableTagsExceedingSizeLimit(span)))
145+
if (extractorConfiguration.spanValidation.spanMaxSize.logOnly) {
146+
Success(span)
147+
} else {
148+
Success(truncateTags(span))
149+
}
147150
}
148151
else {
149152
Success(span)
150153
}
151154
}
152155

153-
private def truncateTags(span : Span): Span = {
154-
val errorTag = span.getTagsList.asScala.filter(tag => tag.getKey.equalsIgnoreCase("error"))
155-
val spanBuilder = span.toBuilder
156-
val messsageTagKey = extractorConfiguration.spanValidation.spanMaxSize.infoTagKey
157-
val messageTagValue = extractorConfiguration.spanValidation.spanMaxSize.infoTagValue
156+
private def getProbableTagsExceedingSizeLimit(span: Span): String = {
157+
span.getTagsList.asScala
158+
.filter(tag => tag.getVStrBytes.size > extractorConfiguration.spanValidation.spanMaxSize.maxSizeLimit)
159+
.map(_.getKey)
160+
.mkString(", ")
161+
}
162+
163+
private def truncateTags(span: Span): Span = {
164+
val skippedTags = span.getTagsList.asScala
165+
.filter(tag => extractorConfiguration.spanValidation.spanMaxSize.skipTags.contains(tag.getKey.toLowerCase))
158166

167+
val spanBuilder = span.toBuilder
159168
spanBuilder.clearTags()
160-
errorTag.foreach(tag => spanBuilder.addTags(tag))
161-
spanBuilder.addTags(Tag.newBuilder().setKey(messsageTagKey).setVStr(messageTagValue))
169+
170+
skippedTags.foreach(spanBuilder.addTags)
171+
172+
val truncateTagKey = extractorConfiguration.spanValidation.spanMaxSize.infoTagKey
173+
val truncateTagValue = extractorConfiguration.spanValidation.spanMaxSize.infoTagValue
174+
spanBuilder.addTags(Tag.newBuilder().setKey(truncateTagKey).setVStr(truncateTagValue))
175+
162176
spanBuilder.build()
163177
}
164178

commons/src/main/scala/com/expedia/www/haystack/collector/commons/config/ConfigurationLoader.scala

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -42,16 +42,15 @@ object ConfigurationLoader {
4242
* if overrides_config_path env variable exists, then we load that config file and use base conf as fallback,
4343
* else we load the config from env variables(prefixed with haystack) and use base conf as fallback
4444
*
45-
* @param resourceName name of the resource file to be loaded. Default value is `config/base.conf`
45+
* @param resourceName name of the resource file to be loaded. Default value is `config/base.conf`
4646
* @param envNamePrefix env variable prefix to override config values. Default is `HAYSTACK_PROP_`
47-
*
4847
* @return an instance of com.typesafe.Config
4948
*/
50-
def loadConfigFileWithEnvOverrides(resourceName : String = "config/base.conf",
51-
envNamePrefix : String = ENV_NAME_PREFIX) : Config = {
49+
def loadConfigFileWithEnvOverrides(resourceName: String = "config/base.conf",
50+
envNamePrefix: String = ENV_NAME_PREFIX): Config = {
5251

53-
require(resourceName != null && resourceName.length > 0 , "resourceName is required")
54-
require(envNamePrefix != null && envNamePrefix.length > 0 , "envNamePrefix is required")
52+
require(resourceName != null && resourceName.length > 0, "resourceName is required")
53+
require(envNamePrefix != null && envNamePrefix.length > 0, "envNamePrefix is required")
5554

5655
val baseConfig = ConfigFactory.load(resourceName)
5756

@@ -84,7 +83,7 @@ object ConfigurationLoader {
8483
}
8584

8685
/**
87-
* @return new config object with haystack specific environment variables
86+
* @return new config object with haystack specific environment variables
8887
*/
8988
private[haystack] def parsePropertiesFromMap(envVars: Map[String, String],
9089
keysWithArrayValues: Set[String],
@@ -101,6 +100,7 @@ object ConfigurationLoader {
101100
/**
102101
* converts the env variable to HOCON format
103102
* for e.g. env variable HAYSTACK_KAFKA_STREAMS_NUM_STREAM_THREADS gets converted to kafka.streams.num.stream.threads
103+
*
104104
* @param env environment variable name
105105
* @return variable name that complies with hocon key
106106
*/
@@ -110,6 +110,7 @@ object ConfigurationLoader {
110110

111111
/**
112112
* converts the env variable value to iterable object if it starts and ends with '[' and ']' respectively.
113+
*
113114
* @param env environment variable value
114115
* @return string or iterable object
115116
*/
@@ -153,9 +154,12 @@ object ConfigurationLoader {
153154
outputFormat = if (extractor.hasPath("output.format")) Format.withName(extractor.getString("output.format")) else Format.PROTO,
154155
spanValidation = SpanValidation(SpanMaxSize(
155156
maxSizeValidationConfig.getBoolean("enable"),
157+
maxSizeValidationConfig.getBoolean("log.only"),
156158
maxSizeValidationConfig.getInt("max.size.limit"),
157159
maxSizeValidationConfig.getString("message.tag.key"),
158-
maxSizeValidationConfig.getString("message.tag.value"))
160+
maxSizeValidationConfig.getString("message.tag.value"),
161+
maxSizeValidationConfig.getStringList("skip.tags").map(_.toLowerCase),
162+
maxSizeValidationConfig.getStringList("skip.services").map(_.toLowerCase))
159163
))
160164
}
161165

@@ -169,7 +173,7 @@ object ConfigurationLoader {
169173
val props = new Properties()
170174
val cfg = ConfigFactory.parseMap(c._2.asInstanceOf[util.HashMap[String, Object]])
171175
val topic = cfg.getString("config.topic")
172-
val tags = cfg.getConfig("tags").entrySet().foldRight(Map[String, String]())((t, tMap) => {
176+
val tags = cfg.getConfig("tags").entrySet().foldRight(Map[String, String]())((t, tMap) => {
173177
tMap + (t.getKey -> t.getValue.unwrapped().toString)
174178
})
175179
val temp = cfg.getConfig("config.props").entrySet() foreach {

commons/src/main/scala/com/expedia/www/haystack/collector/commons/config/ExtractorConfiguration.scala

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,13 @@ object Format extends Enumeration {
2828

2929
case class SpanValidation(spanMaxSize: SpanMaxSize)
3030

31-
case class SpanMaxSize( enable: Boolean,
32-
maxSizeLimit: Int,
33-
infoTagKey: String,
34-
infoTagValue: String )
31+
case class SpanMaxSize(enable: Boolean,
32+
logOnly: Boolean,
33+
maxSizeLimit: Int,
34+
infoTagKey: String,
35+
infoTagValue: String,
36+
skipTags: Seq[String],
37+
skipServices: Seq[String])
3538

3639
case class ExtractorConfiguration(outputFormat: Format,
3740
spanValidation: SpanValidation)

commons/src/test/scala/com/expedia/www/haystack/collector/commons/unit/KeyExtractorSpec.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ class KeyExtractorSpec extends FunSpec with Matchers with MetricsSupport {
3636
"trace-id-1" -> createSpan("trace-id-1", "spanId_1", "service_1", "operation", StartTimeMicros, DurationMicros),
3737
"trace-id-2" -> createSpan("trace-id-2", "spanId_2", "service_2", "operation", StartTimeMicros, DurationMicros))
3838

39-
val spanValidationConfig = SpanValidation(SpanMaxSize(enable = false, 5000, "", ""))
39+
val spanValidationConfig = SpanValidation(SpanMaxSize(enable = false, logOnly = false, 5000, "", "", Seq(), Seq()))
4040

4141
spanMap.foreach(sp => {
4242
val kvPairs = new ProtoSpanExtractor(ExtractorConfiguration(Format.PROTO, spanValidationConfig), LoggerFactory.getLogger(classOf[ProtoSpanExtractor]), List()).extractKeyValuePairs(sp._2.toByteArray)
@@ -54,7 +54,7 @@ class KeyExtractorSpec extends FunSpec with Matchers with MetricsSupport {
5454
"trace-id-1" -> createSpan("trace-id-1", "spanId_1", "service_1", "operation", StartTimeMicros, 1),
5555
"trace-id-2" -> createSpan("trace-id-2", "spanId_2", "service_2", "operation", StartTimeMicros, 1))
5656

57-
val spanValidationConfig = SpanValidation(SpanMaxSize(enable = false, 5000, "", ""))
57+
val spanValidationConfig = SpanValidation(SpanMaxSize(enable = false, logOnly = false, 5000, "", "", Seq(), Seq()))
5858

5959
spanMap.foreach(sp => {
6060
val kvPairs = new ProtoSpanExtractor(ExtractorConfiguration(Format.JSON, spanValidationConfig), LoggerFactory.getLogger(classOf[ProtoSpanExtractor]), List()).extractKeyValuePairs(sp._2.toByteArray)

0 commit comments

Comments
 (0)