26
26
import java .util .Map ;
27
27
import java .util .Set ;
28
28
import java .util .concurrent .TimeUnit ;
29
- import java .util .concurrent .locks .Lock ;
30
- import java .util .concurrent .locks .ReentrantLock ;
31
29
import java .util .function .BiConsumer ;
32
30
import java .util .function .Consumer ;
33
31
import org .slf4j .Logger ;
34
32
import org .slf4j .LoggerFactory ;
35
33
36
34
public class DefaultPathwayContext implements PathwayContext {
37
35
private static final Logger log = LoggerFactory .getLogger (DefaultPathwayContext .class );
38
- private final Lock lock = new ReentrantLock ();
39
36
private final long hashOfKnownTags ;
40
37
private final TimeSource timeSource ;
41
38
private final String serviceNameOverride ;
@@ -107,91 +104,87 @@ public long getHash() {
107
104
}
108
105
109
106
@ Override
110
- public void setCheckpoint (DataStreamsContext context , Consumer <StatsPoint > pointConsumer ) {
107
+ public synchronized void setCheckpoint (
108
+ DataStreamsContext context , Consumer <StatsPoint > pointConsumer ) {
111
109
long startNanos = timeSource .getCurrentTimeNanos ();
112
110
long nanoTicks = timeSource .getNanoTicks ();
113
- lock .lock ();
114
- try {
115
- // So far, each tag key has only one tag value, so we're initializing the capacity to match
116
- // the number of tag keys for now. We should revisit this later if it's no longer the case.
117
- LinkedHashMap <String , String > sortedTags = context .sortedTags ();
118
- List <String > allTags = new ArrayList <>(sortedTags .size ());
119
- PathwayHashBuilder pathwayHashBuilder =
120
- new PathwayHashBuilder (hashOfKnownTags , serviceNameOverride );
121
- DataSetHashBuilder aggregationHashBuilder = new DataSetHashBuilder ();
122
-
123
- if (!started ) {
124
- long defaultTimestamp = context .defaultTimestamp ();
125
- if (defaultTimestamp == 0 ) {
126
- pathwayStartNanos = startNanos ;
127
- pathwayStartNanoTicks = nanoTicks ;
128
- edgeStartNanoTicks = nanoTicks ;
129
- } else {
130
- pathwayStartNanos = MILLISECONDS .toNanos (defaultTimestamp );
131
- pathwayStartNanoTicks =
132
- nanoTicks
133
- - MILLISECONDS .toNanos (timeSource .getCurrentTimeMillis () - defaultTimestamp );
134
- edgeStartNanoTicks = pathwayStartNanoTicks ;
135
- }
136
111
137
- hash = 0 ;
138
- started = true ;
139
- log .debug ("Started {}" , this );
112
+ // So far, each tag key has only one tag value, so we're initializing the capacity to match
113
+ // the number of tag keys for now. We should revisit this later if it's no longer the case.
114
+ LinkedHashMap <String , String > sortedTags = context .sortedTags ();
115
+ List <String > allTags = new ArrayList <>(sortedTags .size ());
116
+ PathwayHashBuilder pathwayHashBuilder =
117
+ new PathwayHashBuilder (hashOfKnownTags , serviceNameOverride );
118
+ DataSetHashBuilder aggregationHashBuilder = new DataSetHashBuilder ();
119
+
120
+ if (!started ) {
121
+ long defaultTimestamp = context .defaultTimestamp ();
122
+ if (defaultTimestamp == 0 ) {
123
+ pathwayStartNanos = startNanos ;
124
+ pathwayStartNanoTicks = nanoTicks ;
125
+ edgeStartNanoTicks = nanoTicks ;
126
+ } else {
127
+ pathwayStartNanos = MILLISECONDS .toNanos (defaultTimestamp );
128
+ pathwayStartNanoTicks =
129
+ nanoTicks - MILLISECONDS .toNanos (timeSource .getCurrentTimeMillis () - defaultTimestamp );
130
+ edgeStartNanoTicks = pathwayStartNanoTicks ;
140
131
}
141
132
142
- for (Map .Entry <String , String > entry : sortedTags .entrySet ()) {
143
- String tag = TagsProcessor .createTag (entry .getKey (), entry .getValue ());
144
- if (tag == null ) {
145
- continue ;
146
- }
147
- if (hashableTagKeys .contains (entry .getKey ())) {
148
- pathwayHashBuilder .addTag (tag );
149
- }
150
- if (extraAggregationTagKeys .contains (entry .getKey ())) {
151
- aggregationHashBuilder .addValue (tag );
152
- }
153
- allTags .add (tag );
154
- }
133
+ hash = 0 ;
134
+ started = true ;
135
+ log .debug ("Started {}" , this );
136
+ }
155
137
156
- long nodeHash = generateNodeHash (pathwayHashBuilder );
157
- // loop protection - a node should not be chosen as parent
158
- // for a sequential node with the same direction, as this
159
- // will cause a `cardinality explosion` for hash / parentHash tag values
160
- if (sortedTags .containsKey (TagsProcessor .DIRECTION_TAG )) {
161
- String direction = sortedTags .get (TagsProcessor .DIRECTION_TAG );
162
- if (direction .equals (previousDirection )) {
163
- hash = closestOppositeDirectionHash ;
164
- } else {
165
- previousDirection = direction ;
166
- closestOppositeDirectionHash = hash ;
167
- }
138
+ for (Map .Entry <String , String > entry : sortedTags .entrySet ()) {
139
+ String tag = TagsProcessor .createTag (entry .getKey (), entry .getValue ());
140
+ if (tag == null ) {
141
+ continue ;
142
+ }
143
+ if (hashableTagKeys .contains (entry .getKey ())) {
144
+ pathwayHashBuilder .addTag (tag );
168
145
}
146
+ if (extraAggregationTagKeys .contains (entry .getKey ())) {
147
+ aggregationHashBuilder .addValue (tag );
148
+ }
149
+ allTags .add (tag );
150
+ }
169
151
170
- long newHash = generatePathwayHash (nodeHash , hash );
171
- long aggregationHash = aggregationHashBuilder .addValue (String .valueOf (newHash ));
172
-
173
- long pathwayLatencyNano = nanoTicks - pathwayStartNanoTicks ;
174
- long edgeLatencyNano = nanoTicks - edgeStartNanoTicks ;
175
-
176
- StatsPoint point =
177
- new StatsPoint (
178
- allTags ,
179
- newHash ,
180
- hash ,
181
- aggregationHash ,
182
- startNanos ,
183
- pathwayLatencyNano ,
184
- edgeLatencyNano ,
185
- context .payloadSizeBytes (),
186
- serviceNameOverride );
187
- edgeStartNanoTicks = nanoTicks ;
188
- hash = newHash ;
189
-
190
- pointConsumer .accept (point );
191
- log .debug ("Checkpoint set {}, hash source: {}" , this , pathwayHashBuilder );
192
- } finally {
193
- lock .unlock ();
152
+ long nodeHash = generateNodeHash (pathwayHashBuilder );
153
+ // loop protection - a node should not be chosen as parent
154
+ // for a sequential node with the same direction, as this
155
+ // will cause a `cardinality explosion` for hash / parentHash tag values
156
+ if (sortedTags .containsKey (TagsProcessor .DIRECTION_TAG )) {
157
+ String direction = sortedTags .get (TagsProcessor .DIRECTION_TAG );
158
+ if (direction .equals (previousDirection )) {
159
+ hash = closestOppositeDirectionHash ;
160
+ } else {
161
+ previousDirection = direction ;
162
+ closestOppositeDirectionHash = hash ;
163
+ }
194
164
}
165
+
166
+ long newHash = generatePathwayHash (nodeHash , hash );
167
+ long aggregationHash = aggregationHashBuilder .addValue (newHash );
168
+
169
+ long pathwayLatencyNano = nanoTicks - pathwayStartNanoTicks ;
170
+ long edgeLatencyNano = nanoTicks - edgeStartNanoTicks ;
171
+
172
+ StatsPoint point =
173
+ new StatsPoint (
174
+ allTags ,
175
+ newHash ,
176
+ hash ,
177
+ aggregationHash ,
178
+ startNanos ,
179
+ pathwayLatencyNano ,
180
+ edgeLatencyNano ,
181
+ context .payloadSizeBytes (),
182
+ serviceNameOverride );
183
+ edgeStartNanoTicks = nanoTicks ;
184
+ hash = newHash ;
185
+
186
+ pointConsumer .accept (point );
187
+ log .debug ("Checkpoint set {}, hash source: {}" , this , pathwayHashBuilder );
195
188
}
196
189
197
190
@ Override
@@ -205,52 +198,42 @@ public StatsPoint getSavedStats() {
205
198
}
206
199
207
200
@ Override
208
- public String encode () throws IOException {
209
- lock .lock ();
210
- try {
211
- if (!started ) {
212
- throw new IllegalStateException ("Context must be started to encode" );
213
- }
201
+ public synchronized String encode () throws IOException {
202
+ if (!started ) {
203
+ throw new IllegalStateException ("Context must be started to encode" );
204
+ }
214
205
215
- outputBuffer .clear ();
216
- outputBuffer .writeLongLE (hash );
206
+ outputBuffer .clear ();
207
+ outputBuffer .writeLongLE (hash );
217
208
218
- long pathwayStartMillis = TimeUnit .NANOSECONDS .toMillis (pathwayStartNanos );
219
- VarEncodingHelper .encodeSignedVarLong (outputBuffer , pathwayStartMillis );
209
+ long pathwayStartMillis = TimeUnit .NANOSECONDS .toMillis (pathwayStartNanos );
210
+ VarEncodingHelper .encodeSignedVarLong (outputBuffer , pathwayStartMillis );
220
211
221
- long edgeStartMillis =
222
- pathwayStartMillis
223
- + TimeUnit .NANOSECONDS .toMillis (edgeStartNanoTicks - pathwayStartNanoTicks );
212
+ long edgeStartMillis =
213
+ pathwayStartMillis
214
+ + TimeUnit .NANOSECONDS .toMillis (edgeStartNanoTicks - pathwayStartNanoTicks );
224
215
225
- VarEncodingHelper .encodeSignedVarLong (outputBuffer , edgeStartMillis );
226
- byte [] base64 = Base64 .getEncoder ().encode (outputBuffer .trimmedCopy ());
227
- return new String (base64 , ISO_8859_1 );
228
- } finally {
229
- lock .unlock ();
230
- }
216
+ VarEncodingHelper .encodeSignedVarLong (outputBuffer , edgeStartMillis );
217
+ byte [] base64 = Base64 .getEncoder ().encode (outputBuffer .trimmedCopy ());
218
+ return new String (base64 , ISO_8859_1 );
231
219
}
232
220
233
221
@ Override
234
- public String toString () {
235
- lock .lock ();
236
- try {
237
- if (started ) {
238
- return "PathwayContext[ Hash "
239
- + Long .toUnsignedString (hash )
240
- + ", Start: "
241
- + pathwayStartNanos
242
- + ", StartTicks: "
243
- + pathwayStartNanoTicks
244
- + ", Edge Start Ticks: "
245
- + edgeStartNanoTicks
246
- + ", objectHashcode:"
247
- + hashCode ()
248
- + "]" ;
249
- } else {
250
- return "PathwayContext [Not Started]" ;
251
- }
252
- } finally {
253
- lock .unlock ();
222
+ public synchronized String toString () {
223
+ if (started ) {
224
+ return "PathwayContext[ Hash "
225
+ + Long .toUnsignedString (hash )
226
+ + ", Start: "
227
+ + pathwayStartNanos
228
+ + ", StartTicks: "
229
+ + pathwayStartNanoTicks
230
+ + ", Edge Start Ticks: "
231
+ + edgeStartNanoTicks
232
+ + ", objectHashcode:"
233
+ + hashCode ()
234
+ + "]" ;
235
+ } else {
236
+ return "PathwayContext [Not Started]" ;
254
237
}
255
238
}
256
239
@@ -334,6 +317,23 @@ public long addValue(String val) {
334
317
currentHash = FNV64Hash .generateHash (currentHash + val , FNV64Hash .Version .v1 );
335
318
return currentHash ;
336
319
}
320
+
321
+ public long addValue (long val ) {
322
+ byte [] b =
323
+ new byte [] {
324
+ (byte ) val ,
325
+ (byte ) (val >> 8 ),
326
+ (byte ) (val >> 16 ),
327
+ (byte ) (val >> 24 ),
328
+ (byte ) (val >> 32 ),
329
+ (byte ) (val >> 40 ),
330
+ (byte ) (val >> 48 ),
331
+ (byte ) (val >> 56 )
332
+ };
333
+
334
+ currentHash = FNV64Hash .continueHash (currentHash , b , FNV64Hash .Version .v1 );
335
+ return currentHash ;
336
+ }
337
337
}
338
338
339
339
private static class PathwayHashBuilder {
0 commit comments