|
30 | 30 | import com.arpnetworking.steno.LoggerFactory;
|
31 | 31 | import com.arpnetworking.tsdcore.model.MetricType;
|
32 | 32 | import com.arpnetworking.tsdcore.model.Quantity;
|
| 33 | +import com.arpnetworking.utility.RegexAndMapReplacer; |
33 | 34 | import com.google.common.base.MoreObjects;
|
34 | 35 | import com.google.common.base.Splitter;
|
35 | 36 | import com.google.common.collect.ImmutableList;
|
|
41 | 42 | import java.util.Map;
|
42 | 43 | import java.util.regex.Matcher;
|
43 | 44 | import java.util.regex.Pattern;
|
44 |
| -import java.util.stream.Collectors; |
45 | 45 |
|
46 | 46 | /**
|
47 | 47 | * Implementation of <code>Source</code> which wraps another <code>Source</code>
|
@@ -119,67 +119,68 @@ public void notify(final Observable observable, final Object event) {
|
119 | 119 |
|
120 | 120 | // Merge the metrics in the record together
|
121 | 121 | final Record record = (Record) event;
|
122 |
| - final Map<String, MergingMetric> mergedMetrics = Maps.newHashMap(); |
| 122 | + final Map<ImmutableMap<String, String>, Map<String, MergingMetric>> mergedMetrics = Maps.newHashMap(); |
123 | 123 | for (final Map.Entry<String, ? extends Metric> metric : record.getMetrics().entrySet()) {
|
124 | 124 | boolean found = false;
|
125 | 125 | for (final Map.Entry<Pattern, List<String>> findAndReplace : _findAndReplace.entrySet()) {
|
126 | 126 | final Matcher matcher = findAndReplace.getKey().matcher(metric.getKey());
|
127 | 127 | if (matcher.find()) {
|
128 | 128 | for (final String replacement : findAndReplace.getValue()) {
|
129 |
| - final String replacedString = matcher.replaceAll(replacement); |
| 129 | + final String replacedString = |
| 130 | + RegexAndMapReplacer.replaceAll(matcher, metric.getKey(), replacement, record.getDimensions()); |
130 | 131 |
|
131 | 132 | final int tagsStart = replacedString.indexOf(';');
|
132 | 133 | if (tagsStart == -1) {
|
133 | 134 | // We just have a metric name. Optimize for this common case
|
134 |
| - merge(metric.getValue(), replacedString, mergedMetrics); |
| 135 | + merge(metric.getValue(), replacedString, mergedMetrics, record.getDimensions()); |
135 | 136 | } else {
|
| 137 | + final String metricName = replacedString.substring(0, tagsStart); |
136 | 138 | final Map<String, String> parsedTags = TAG_SPLITTER.split(replacedString.substring(tagsStart + 1));
|
137 |
| - final Map<String, String> finalTags = Maps.newTreeMap(); |
| 139 | + final ImmutableMap.Builder<String, String> finalTags = ImmutableMap.builder(); |
138 | 140 | finalTags.putAll(record.getDimensions());
|
139 | 141 | finalTags.putAll(parsedTags);
|
140 |
| - final StringBuilder keyBuilder = new StringBuilder(); |
141 |
| - keyBuilder.append(replacedString.substring(0, tagsStart + 1)); |
142 |
| - keyBuilder.append( |
143 |
| - finalTags.entrySet() |
144 |
| - .stream() |
145 |
| - .map(entry -> String.format("%s=%s", entry.getKey(), entry.getValue())) |
146 |
| - .collect(Collectors.joining(";"))); |
147 |
| - |
148 |
| - merge(metric.getValue(), keyBuilder.toString(), mergedMetrics); |
| 142 | + |
| 143 | + merge(metric.getValue(), metricName, mergedMetrics, finalTags.build()); |
149 | 144 | }
|
150 | 145 | }
|
151 | 146 | //Having "found" set here means that mapping a metric to an empty list suppresses that metric
|
152 | 147 | found = true;
|
153 | 148 | }
|
154 | 149 | }
|
155 | 150 | if (!found) {
|
156 |
| - merge(metric.getValue(), metric.getKey(), mergedMetrics); |
| 151 | + merge(metric.getValue(), metric.getKey(), mergedMetrics, record.getDimensions()); |
157 | 152 | }
|
158 | 153 | }
|
159 | 154 |
|
160 | 155 | // Raise the merged record event with this source's observers
|
161 | 156 | // NOTE: Do not leak instances of MergingMetric since it is mutable
|
162 |
| - _source.notify( |
163 |
| - ThreadLocalBuilder.build( |
164 |
| - DefaultRecord.Builder.class, |
165 |
| - b1 -> b1.setMetrics( |
166 |
| - mergedMetrics.entrySet().stream().collect( |
167 |
| - ImmutableMap.toImmutableMap( |
168 |
| - Map.Entry::getKey, |
169 |
| - e -> ThreadLocalBuilder.clone( |
170 |
| - e.getValue(), |
171 |
| - DefaultMetric.Builder.class)))) |
172 |
| - .setId(record.getId()) |
173 |
| - .setTime(record.getTime()) |
174 |
| - .setAnnotations(record.getAnnotations()) |
175 |
| - .setDimensions(record.getDimensions()))); |
| 157 | + for (Map.Entry<ImmutableMap<String, String>, Map<String, MergingMetric>> entry : mergedMetrics.entrySet()) { |
| 158 | + _source.notify( |
| 159 | + ThreadLocalBuilder.build( |
| 160 | + DefaultRecord.Builder.class, |
| 161 | + b1 -> b1.setMetrics( |
| 162 | + entry.getValue().entrySet().stream().collect( |
| 163 | + ImmutableMap.toImmutableMap( |
| 164 | + Map.Entry::getKey, |
| 165 | + e -> ThreadLocalBuilder.clone( |
| 166 | + e.getValue(), |
| 167 | + DefaultMetric.Builder.class)))) |
| 168 | + .setId(record.getId()) |
| 169 | + .setTime(record.getTime()) |
| 170 | + .setAnnotations(record.getAnnotations()) |
| 171 | + .setDimensions(entry.getKey()))); |
| 172 | + } |
176 | 173 | }
|
177 | 174 |
|
178 |
| - private void merge(final Metric metric, final String key, final Map<String, MergingMetric> mergedMetrics) { |
179 |
| - final MergingMetric mergedMetric = mergedMetrics.get(key); |
| 175 | + private void merge(final Metric metric, final String key, |
| 176 | + final Map<ImmutableMap<String, String>, Map<String, MergingMetric>> mergedMetrics, |
| 177 | + final ImmutableMap<String, String> dimensions) { |
| 178 | + |
| 179 | + final Map<String, MergingMetric> mergedMetricsForDimensions = mergedMetrics.computeIfAbsent(dimensions, k -> Maps.newHashMap()); |
| 180 | + final MergingMetric mergedMetric = mergedMetricsForDimensions.get(key); |
180 | 181 | if (mergedMetric == null) {
|
181 | 182 | // This is the first time this metric is being merged into
|
182 |
| - mergedMetrics.put(key, new MergingMetric(metric)); |
| 183 | + mergedMetricsForDimensions.put(key, new MergingMetric(metric)); |
183 | 184 | } else if (!mergedMetric.isMergable(metric)) {
|
184 | 185 | // This instance of the metric is not mergable with previous
|
185 | 186 | LOGGER.error()
|
|
0 commit comments