Skip to content

Commit 27e3b8a

Browse files
author
Kishan Sairam Adapa
authored
fix: bug scheduling delay was missed (#435)
* fix: bug in proactive rescheduling not accounting for future timeout * update
1 parent 2ca25ad commit 27e3b8a

File tree

3 files changed

+11
-11
lines changed

3 files changed

+11
-11
lines changed

gradle/libs.versions.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ hypertrace-entity-service = "0.8.78"
33
hypertrace-config-service = "0.1.54"
44
hypertrace-grpc-utils = "0.12.4"
55
hypertrace-serviceFramework = "0.1.60"
6-
hypertrace-kafkaStreams = "0.4.1"
6+
hypertrace-kafkaStreams = "0.4.2"
77
hypertrace-view-generator = "0.4.19"
88
grpc = "1.57.2"
99

raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/rawspansgrouper/RawSpansProcessor.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -172,16 +172,16 @@ public void process(Record<TraceIdentity, RawSpan> record) {
172172
.setTraceId(traceId)
173173
.setSpanIds(List.of(spanId))
174174
.build();
175-
traceEmitPunctuator.scheduleTask(currentTimeMs, key);
175+
traceEmitPunctuator.scheduleTask(currentTimeMs + groupingWindowTimeoutMs, key);
176176
} else {
177177
traceState.getSpanIds().add(spanId);
178178
long prevScheduleTimestamp = traceState.getTraceEndTimestamp() + groupingWindowTimeoutMs;
179179
traceState.setTraceEndTimestamp(currentTimeMs);
180180
if (!traceEmitPunctuator.rescheduleTask(
181181
prevScheduleTimestamp, currentTimeMs + groupingWindowTimeoutMs, key)) {
182-
logger.debug(
183-
"Failed to proactively reschedule task on getting span for trace key {}, schedule already dropped!",
184-
key);
182+
logger.warn(
183+
"Failed to proactively reschedule task on getting span for trace id {}, schedule already dropped!",
184+
HexUtils.getHex(traceState.getTraceId()));
185185
}
186186
}
187187

raw-spans-grouper/raw-spans-grouper/src/test/java/org/hypertrace/core/rawspansgrouper/RawSpansGrouperTest.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -231,13 +231,8 @@ public void whenRawSpansAreReceivedWithInactivityExpectTraceToBeOutput(@TempDir
231231
messageTime = advanceAndSyncClockMock(messageTime, clock, 35_000);
232232
inputTopic.pipeInput(dummyTraceIdentity, dummySpan, messageTime);
233233

234-
// trace2 should have 1 span span3
235-
StructuredTrace trace = outputTopic.readValue();
236-
assertEquals(1, trace.getEventList().size());
237-
assertEquals("event-4", new String(trace.getEventList().get(0).getEventId().array()));
238-
239234
// trace1 should have 2 span span1, span2
240-
trace = outputTopic.readValue();
235+
StructuredTrace trace = outputTopic.readValue();
241236
assertEquals(2, trace.getEventList().size());
242237
Set<String> traceEventIds =
243238
trace.getEventList().stream()
@@ -246,6 +241,11 @@ public void whenRawSpansAreReceivedWithInactivityExpectTraceToBeOutput(@TempDir
246241
assertTrue(traceEventIds.contains("event-1"));
247242
assertTrue(traceEventIds.contains("event-2"));
248243

244+
// trace2 should have 1 span span3
245+
trace = outputTopic.readValue();
246+
assertEquals(1, trace.getEventList().size());
247+
assertEquals("event-4", new String(trace.getEventList().get(0).getEventId().array()));
248+
249249
inputTopic.pipeInput(createTraceIdentity(tenantId, "trace-1"), span3, messageTime);
250250
messageTime = advanceAndSyncClockMock(messageTime, clock, 45_000);
251251
inputTopic.pipeInput(createTraceIdentity(tenantId, "trace-2"), span5, messageTime);

0 commit comments

Comments
 (0)