Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,15 @@ private void submitController()
controller.queryId()
);
}
catch (Throwable e) {
log.error(
e,
"Controller failed for sqlQueryId[%s], controllerHost[%s]",
plannerContext.getSqlQueryId(),
controller.queryId()
);
throw e;
}
finally {
controllerRegistry.deregister(controllerHolder);
Thread.currentThread().setName(threadName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,10 +343,23 @@ public void run(final QueryListener queryListener) throws Exception
try (final Closer closer = Closer.create()) {
reportPayload = runInternal(queryListener, closer);
}
catch (Throwable e) {
log.error(e, "Controller internal execution encountered exception.");
queryListener.onQueryComplete(makeStatusReportForException(e));
throw e;
}
// Call onQueryComplete after Closer is fully closed, ensuring no controller-related processing is ongoing.
queryListener.onQueryComplete(reportPayload);
}


private MSQTaskReportPayload makeStatusReportForException(Throwable e)
{
MSQErrorReport errorReport = MSQErrorReport.fromFault(queryId(), null, null, UnknownFault.forException(e));
MSQStatusReport statusReport = new MSQStatusReport(TaskState.FAILED, errorReport, null, null, 0, new HashMap<>(), 0, 0, null, null);
return new MSQTaskReportPayload(statusReport, null, null, null);
}

@Override
public void stop(CancellationReason reason)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.junit.Test;

import java.util.List;
import java.util.Queue;
import java.util.Set;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -328,7 +329,7 @@ public void test_run_prioritizesOlderIntervals()
emitter.verifyEmitted(UnusedSegmentsKiller.Metric.PROCESSED_KILL_JOBS, 10);

// Verify that the kill intervals are sorted with the oldest interval first
final List<StubServiceEmitter.ServiceMetricEventSnapshot> events =
final Queue<StubServiceEmitter.ServiceMetricEventSnapshot> events =
emitter.getMetricEvents().get(TaskMetrics.RUN_DURATION);
final List<Interval> killIntervals = events.stream().map(event -> {
final String taskId = (String) event.getUserDims().get(DruidMetrics.TASK_ID);
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1094,7 +1094,7 @@
<dependency>
<groupId>org.junit</groupId>
<artifactId>junit-bom</artifactId>
<version>5.10.2</version>
<version>5.13.3</version>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this related to anything or just opportunistic?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not closely related; just in an earlier version I've tried out a junit 5.12.x feature....then undone it - but retained the bom upgrade as it might still be usefull later (didn't made things worse)

<type>pom</type>
<scope>import</scope>
</dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.util.List;
import java.util.Map;
import java.util.Queue;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand Down Expand Up @@ -65,7 +65,7 @@ public void testDoMonitor()

assertTrue(monitor.doMonitor(stubServiceEmitter));

final Map<String, List<StubServiceEmitter.ServiceMetricEventSnapshot>> metricEvents = stubServiceEmitter.getMetricEvents();
final Map<String, Queue<StubServiceEmitter.ServiceMetricEventSnapshot>> metricEvents = stubServiceEmitter.getMetricEvents();

assertMetricValue(metricEvents, "emitter/successfulSending/maxTimeMs", 0);
assertMetricValue(metricEvents, "emitter/events/emitted/delta", 100L);
Expand All @@ -83,8 +83,8 @@ public void testDoMonitor()
assertMetricValue(metricEvents, "emitter/failedSending/maxTimeMs", 0L);
}

private void assertMetricValue(Map<String, List<StubServiceEmitter.ServiceMetricEventSnapshot>> metricEvents, String metricName, Number expectedValue)
private void assertMetricValue(Map<String, Queue<StubServiceEmitter.ServiceMetricEventSnapshot>> metricEvents, String metricName, Number expectedValue)
{
assertEquals(metricEvents.get(metricName).get(0).getMetricEvent().getValue().doubleValue(), expectedValue.doubleValue());
assertEquals(metricEvents.get(metricName).peek().getMetricEvent().getValue().doubleValue(), expectedValue.doubleValue());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,21 +24,24 @@
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;

import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;

/**
* Test implementation of {@link ServiceEmitter} that collects emitted metrics
* and alerts in lists.
*/
public class StubServiceEmitter extends ServiceEmitter implements MetricsVerifier
{
private final List<Event> events = new ArrayList<>();
private final List<AlertEvent> alertEvents = new ArrayList<>();
private final ConcurrentHashMap<String, List<ServiceMetricEventSnapshot>> metricEvents = new ConcurrentHashMap<>();
private final Queue<Event> events = new ConcurrentLinkedDeque<>();
private final Queue<AlertEvent> alertEvents = new ConcurrentLinkedDeque<>();
private final ConcurrentHashMap<String, Queue<ServiceMetricEventSnapshot>> metricEvents = new ConcurrentHashMap<>();

public StubServiceEmitter()
{
Expand All @@ -55,7 +58,7 @@ public void emit(Event event)
{
if (event instanceof ServiceMetricEvent) {
ServiceMetricEvent metricEvent = (ServiceMetricEvent) event;
metricEvents.computeIfAbsent(metricEvent.getMetric(), name -> new ArrayList<>())
metricEvents.computeIfAbsent(metricEvent.getMetric(), name -> new ConcurrentLinkedDeque<>())
.add(new ServiceMetricEventSnapshot(metricEvent));
} else if (event instanceof AlertEvent) {
alertEvents.add((AlertEvent) event);
Expand All @@ -68,15 +71,15 @@ public void emit(Event event)
*/
public List<Event> getEvents()
{
return events;
return new ArrayList<>(events);
}

/**
* Gets all the metric events emitted since the previous {@link #flush()}.
*
* @return Map from metric name to list of events emitted for that metric.
*/
public Map<String, List<ServiceMetricEventSnapshot>> getMetricEvents()
public Map<String, Queue<ServiceMetricEventSnapshot>> getMetricEvents()
{
return metricEvents;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I guess we can later update this method to also return a map containing read-only lists rather than the Queue or maybe just update the method to accept a metric name.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes; some enhancements could be made there - actually in the latchingemitter even after this patch an assimptotically n^2 computation remained (over time)

I already had to undo a jfr fix in this patch as it was causing some other issues - and I'm afraid this won't fully solve the problem ; sometimes I could still reproduce the issue...with a lower probability....but its still there.

seems like recently it happens on apache/master more frequently - I have to collect more details

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually in the latchingemitter even after this patch an assimptotically n^2 computation remained (over time)

What is the n^2 computation, is it the evaluation of events against a given WaitCondition?
I had to tried to keep it O(n), might have missed some case.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it O(n) but for every new event ; which is technically n^2 over time

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Must be a bug then, because every WaitCondition tracks a processedUntil index.
And only processes new items after that.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it's a good pattern to expose internals. I don't think it should need this readuntil pattern

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, it is just for tests and not really exposed outside the class, only for use by the sub-class.

The read until is actually very useful to process only the stuff that we need to process.

Otherwise, we would need to pipe every new event through each existing condition either sync or async.
But that would still not handle the case where a condition is registered later.
I guess we could maintain a list of historical events for that purpose.

But all of that just seems overkill for a simple testing use case.

Please let me know if you have some other suggestions to improve this flow.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reading pre-existing events registrations seems dodgy to me; I feel like that's an artifact of the fact that the usage pattern is like:

doSomething();
waitSomething();

which could have trouble if doSomething have already happened before wait could even launche....the "read older events" is a try to fix this...however the problem arises from the fact that the pattern itself is flawed...

it should be more like

w = registerWait()
doSomething()
wait(w)

or possibly with some try-with-resources sugar

try( waitForSomething() ) {
  doSomething();
}

so that the contract of waitForSomething would be:

  • only reads messages which are recieved after its registered
  • no need to track read point/redispatch old events - dispatch only the event caught during the emit to the current readers
  • it won't even need access to how the events are stored by the StubEmitter

not sure if this would apply to all existing use cases - but I hope so...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I would have liked it to be something like that. But it would make the embedded tests (where the LatchableEmitter is actually used) much less readable.

Copy link
Contributor

@kfaraz kfaraz Jul 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The original flow actually feels more intuitive to me.
We are essentially registering a consumer for the stream of metric events coming in.
And the last read point is just the offset that we have consumed so far.

(now that the events field is a queue, this is even more applicable)

}
Expand All @@ -86,7 +89,7 @@ public Map<String, List<ServiceMetricEventSnapshot>> getMetricEvents()
*/
public List<AlertEvent> getAlerts()
{
return alertEvents;
return new ArrayList<>(alertEvents);
}

@Override
Expand All @@ -96,8 +99,8 @@ public List<Number> getMetricValues(
)
{
final List<Number> values = new ArrayList<>();
final List<ServiceMetricEventSnapshot> events =
metricEvents.getOrDefault(metricName, Collections.emptyList());
final Queue<ServiceMetricEventSnapshot> events =
metricEvents.getOrDefault(metricName, new ArrayDeque<>());
final Map<String, Object> filters =
dimensionFilters == null ? Collections.emptyMap() : dimensionFilters;
for (ServiceMetricEventSnapshot event : events) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
Expand Down Expand Up @@ -162,7 +163,7 @@ public void testSimpleIngestion() throws Exception
).get();
Assert.assertEquals(
ImmutableMap.of("x", "3"),
(Map<String, String>) segmentsAndCommitMetadata.getCommitMetadata()
segmentsAndCommitMetadata.getCommitMetadata()
);
Assert.assertEquals(
IDENTIFIERS.subList(0, 2),
Expand Down Expand Up @@ -2278,7 +2279,7 @@ public void testQueryBySegments() throws Exception

private void verifySinkMetrics(StubServiceEmitter emitter, Set<String> segmentIds)
{
Map<String, List<StubServiceEmitter.ServiceMetricEventSnapshot>> events = emitter.getMetricEvents();
Map<String, Queue<StubServiceEmitter.ServiceMetricEventSnapshot>> events = emitter.getMetricEvents();
int segments = segmentIds.size();
Assert.assertEquals(4, events.size());
Assert.assertTrue(events.containsKey("query/cpu/time"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.TreeMap;

@RunWith(MockitoJUnitRunner.class)
Expand Down Expand Up @@ -91,14 +92,14 @@ public void testAuditMetricEventWithPayload() throws IOException
final AuditEntry entry = createAuditEntry("testKey", "testType", DateTimes.nowUtc());
auditManager.doAudit(entry);

Map<String, List<StubServiceEmitter.ServiceMetricEventSnapshot>> metricEvents = serviceEmitter.getMetricEvents();
Map<String, Queue<StubServiceEmitter.ServiceMetricEventSnapshot>> metricEvents = serviceEmitter.getMetricEvents();
Assert.assertEquals(1, metricEvents.size());

List<StubServiceEmitter.ServiceMetricEventSnapshot> auditMetricEvents = metricEvents.get("config/audit");
Queue<StubServiceEmitter.ServiceMetricEventSnapshot> auditMetricEvents = metricEvents.get("config/audit");
Assert.assertNotNull(auditMetricEvents);
Assert.assertEquals(1, auditMetricEvents.size());

ServiceMetricEvent metric = auditMetricEvents.get(0).getMetricEvent();
ServiceMetricEvent metric = auditMetricEvents.peek().getMetricEvent();

final AuditEntry dbEntry = lookupAuditEntryForKey("testKey");
Assert.assertNotNull(dbEntry);
Expand All @@ -120,14 +121,14 @@ public void testCreateAuditEntry() throws IOException
Assert.assertEquals(entry, dbEntry);

// Verify emitted metrics
Map<String, List<StubServiceEmitter.ServiceMetricEventSnapshot>> metricEvents = serviceEmitter.getMetricEvents();
Map<String, Queue<StubServiceEmitter.ServiceMetricEventSnapshot>> metricEvents = serviceEmitter.getMetricEvents();
Assert.assertEquals(1, metricEvents.size());

List<StubServiceEmitter.ServiceMetricEventSnapshot> auditMetricEvents = metricEvents.get("config/audit");
Queue<StubServiceEmitter.ServiceMetricEventSnapshot> auditMetricEvents = metricEvents.get("config/audit");
Assert.assertNotNull(auditMetricEvents);
Assert.assertEquals(1, auditMetricEvents.size());

ServiceMetricEvent metric = auditMetricEvents.get(0).getMetricEvent();
ServiceMetricEvent metric = auditMetricEvents.peek().getMetricEvent();
Assert.assertEquals(dbEntry.getKey(), metric.getUserDims().get("key"));
Assert.assertEquals(dbEntry.getType(), metric.getUserDims().get("type"));
Assert.assertNull(metric.getUserDims().get("payload"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,13 +181,14 @@ private void evaluateWaitConditions()
return;
}

List<Event> events = getEvents();
for (WaitCondition condition : conditionsToEvaluate) {
final int currentNumberOfEvents = getEvents().size();
final int currentNumberOfEvents = events.size();

// Do not use an iterator over the list to avoid concurrent modification exceptions
// Evaluate new events against this condition
for (int i = condition.processedUntil; i < currentNumberOfEvents; ++i) {
if (condition.predicate.test(getEvents().get(i))) {
if (condition.predicate.test(events.get(i))) {
condition.countDownLatch.countDown();
}
}
Expand Down
Loading