Skip to content

Commit 201de30

Browse files
authored
Fix concurrency issues with StubServiceEmitter (#18249)
* concurrency issue with StubServiceEmitter - #18121 have added a few new metrics which have increased the load on it and have caused random appearances of an issue arising from the fact it used an `ArrayList` under the hood. * added some catches to shut down queries properly in case some unexpected exceptions occur - this could give better exceptions and reduce time to fix in the future this should reduce the probability that `QTest` splits remain hanging
1 parent 11fd7e6 commit 201de30

File tree

9 files changed

+54
-25
lines changed

9 files changed

+54
-25
lines changed

extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartQueryMaker.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -405,6 +405,15 @@ private void submitController()
405405
controller.queryId()
406406
);
407407
}
408+
catch (Throwable e) {
409+
log.error(
410+
e,
411+
"Controller failed for sqlQueryId[%s], controllerHost[%s]",
412+
plannerContext.getSqlQueryId(),
413+
controller.queryId()
414+
);
415+
throw e;
416+
}
408417
finally {
409418
controllerRegistry.deregister(controllerHolder);
410419
Thread.currentThread().setName(threadName);

extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -343,10 +343,23 @@ public void run(final QueryListener queryListener) throws Exception
343343
try (final Closer closer = Closer.create()) {
344344
reportPayload = runInternal(queryListener, closer);
345345
}
346+
catch (Throwable e) {
347+
log.error(e, "Controller internal execution encountered exception.");
348+
queryListener.onQueryComplete(makeStatusReportForException(e));
349+
throw e;
350+
}
346351
// Call onQueryComplete after Closer is fully closed, ensuring no controller-related processing is ongoing.
347352
queryListener.onQueryComplete(reportPayload);
348353
}
349354

355+
356+
private MSQTaskReportPayload makeStatusReportForException(Throwable e)
357+
{
358+
MSQErrorReport errorReport = MSQErrorReport.fromFault(queryId(), null, null, UnknownFault.forException(e));
359+
MSQStatusReport statusReport = new MSQStatusReport(TaskState.FAILED, errorReport, null, null, 0, new HashMap<>(), 0, 0, null, null);
360+
return new MSQTaskReportPayload(statusReport, null, null, null);
361+
}
362+
350363
@Override
351364
public void stop(CancellationReason reason)
352365
{

indexing-service/src/test/java/org/apache/druid/indexing/overlord/duty/UnusedSegmentsKillerTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
import org.junit.Test;
5353

5454
import java.util.List;
55+
import java.util.Queue;
5556
import java.util.Set;
5657
import java.util.stream.Collectors;
5758

@@ -328,7 +329,7 @@ public void test_run_prioritizesOlderIntervals()
328329
emitter.verifyEmitted(UnusedSegmentsKiller.Metric.PROCESSED_KILL_JOBS, 10);
329330

330331
// Verify that the kill intervals are sorted with the oldest interval first
331-
final List<StubServiceEmitter.ServiceMetricEventSnapshot> events =
332+
final Queue<StubServiceEmitter.ServiceMetricEventSnapshot> events =
332333
emitter.getMetricEvents().get(TaskMetrics.RUN_DURATION);
333334
final List<Interval> killIntervals = events.stream().map(event -> {
334335
final String taskId = (String) event.getUserDims().get(DruidMetrics.TASK_ID);

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1094,7 +1094,7 @@
10941094
<dependency>
10951095
<groupId>org.junit</groupId>
10961096
<artifactId>junit-bom</artifactId>
1097-
<version>5.10.2</version>
1097+
<version>5.13.3</version>
10981098
<type>pom</type>
10991099
<scope>import</scope>
11001100
</dependency>

processing/src/test/java/org/apache/druid/java/util/metrics/HttpPostEmitterMonitorTest.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,8 @@
2525
import org.junit.jupiter.api.BeforeEach;
2626
import org.junit.jupiter.api.Test;
2727

28-
import java.util.List;
2928
import java.util.Map;
29+
import java.util.Queue;
3030

3131
import static org.junit.jupiter.api.Assertions.assertEquals;
3232
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -65,7 +65,7 @@ public void testDoMonitor()
6565

6666
assertTrue(monitor.doMonitor(stubServiceEmitter));
6767

68-
final Map<String, List<StubServiceEmitter.ServiceMetricEventSnapshot>> metricEvents = stubServiceEmitter.getMetricEvents();
68+
final Map<String, Queue<StubServiceEmitter.ServiceMetricEventSnapshot>> metricEvents = stubServiceEmitter.getMetricEvents();
6969

7070
assertMetricValue(metricEvents, "emitter/successfulSending/maxTimeMs", 0);
7171
assertMetricValue(metricEvents, "emitter/events/emitted/delta", 100L);
@@ -83,8 +83,8 @@ public void testDoMonitor()
8383
assertMetricValue(metricEvents, "emitter/failedSending/maxTimeMs", 0L);
8484
}
8585

86-
private void assertMetricValue(Map<String, List<StubServiceEmitter.ServiceMetricEventSnapshot>> metricEvents, String metricName, Number expectedValue)
86+
private void assertMetricValue(Map<String, Queue<StubServiceEmitter.ServiceMetricEventSnapshot>> metricEvents, String metricName, Number expectedValue)
8787
{
88-
assertEquals(metricEvents.get(metricName).get(0).getMetricEvent().getValue().doubleValue(), expectedValue.doubleValue());
88+
assertEquals(metricEvents.get(metricName).peek().getMetricEvent().getValue().doubleValue(), expectedValue.doubleValue());
8989
}
9090
}

processing/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitter.java

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -24,21 +24,24 @@
2424
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
2525
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
2626

27+
import java.util.ArrayDeque;
2728
import java.util.ArrayList;
2829
import java.util.Collections;
2930
import java.util.List;
3031
import java.util.Map;
32+
import java.util.Queue;
3133
import java.util.concurrent.ConcurrentHashMap;
34+
import java.util.concurrent.ConcurrentLinkedDeque;
3235

3336
/**
3437
* Test implementation of {@link ServiceEmitter} that collects emitted metrics
3538
* and alerts in lists.
3639
*/
3740
public class StubServiceEmitter extends ServiceEmitter implements MetricsVerifier
3841
{
39-
private final List<Event> events = new ArrayList<>();
40-
private final List<AlertEvent> alertEvents = new ArrayList<>();
41-
private final ConcurrentHashMap<String, List<ServiceMetricEventSnapshot>> metricEvents = new ConcurrentHashMap<>();
42+
private final Queue<Event> events = new ConcurrentLinkedDeque<>();
43+
private final Queue<AlertEvent> alertEvents = new ConcurrentLinkedDeque<>();
44+
private final ConcurrentHashMap<String, Queue<ServiceMetricEventSnapshot>> metricEvents = new ConcurrentHashMap<>();
4245

4346
public StubServiceEmitter()
4447
{
@@ -55,7 +58,7 @@ public void emit(Event event)
5558
{
5659
if (event instanceof ServiceMetricEvent) {
5760
ServiceMetricEvent metricEvent = (ServiceMetricEvent) event;
58-
metricEvents.computeIfAbsent(metricEvent.getMetric(), name -> new ArrayList<>())
61+
metricEvents.computeIfAbsent(metricEvent.getMetric(), name -> new ConcurrentLinkedDeque<>())
5962
.add(new ServiceMetricEventSnapshot(metricEvent));
6063
} else if (event instanceof AlertEvent) {
6164
alertEvents.add((AlertEvent) event);
@@ -68,15 +71,15 @@ public void emit(Event event)
6871
*/
6972
public List<Event> getEvents()
7073
{
71-
return events;
74+
return new ArrayList<>(events);
7275
}
7376

7477
/**
7578
* Gets all the metric events emitted since the previous {@link #flush()}.
7679
*
7780
* @return Map from metric name to list of events emitted for that metric.
7881
*/
79-
public Map<String, List<ServiceMetricEventSnapshot>> getMetricEvents()
82+
public Map<String, Queue<ServiceMetricEventSnapshot>> getMetricEvents()
8083
{
8184
return metricEvents;
8285
}
@@ -86,7 +89,7 @@ public Map<String, List<ServiceMetricEventSnapshot>> getMetricEvents()
8689
*/
8790
public List<AlertEvent> getAlerts()
8891
{
89-
return alertEvents;
92+
return new ArrayList<>(alertEvents);
9093
}
9194

9295
@Override
@@ -96,8 +99,8 @@ public List<Number> getMetricValues(
9699
)
97100
{
98101
final List<Number> values = new ArrayList<>();
99-
final List<ServiceMetricEventSnapshot> events =
100-
metricEvents.getOrDefault(metricName, Collections.emptyList());
102+
final Queue<ServiceMetricEventSnapshot> events =
103+
metricEvents.getOrDefault(metricName, new ArrayDeque<>());
101104
final Map<String, Object> filters =
102105
dimensionFilters == null ? Collections.emptyMap() : dimensionFilters;
103106
for (ServiceMetricEventSnapshot event : events) {

server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@
7878
import java.util.HashSet;
7979
import java.util.List;
8080
import java.util.Map;
81+
import java.util.Queue;
8182
import java.util.Set;
8283
import java.util.concurrent.ConcurrentHashMap;
8384
import java.util.concurrent.ConcurrentMap;
@@ -162,7 +163,7 @@ public void testSimpleIngestion() throws Exception
162163
).get();
163164
Assert.assertEquals(
164165
ImmutableMap.of("x", "3"),
165-
(Map<String, String>) segmentsAndCommitMetadata.getCommitMetadata()
166+
segmentsAndCommitMetadata.getCommitMetadata()
166167
);
167168
Assert.assertEquals(
168169
IDENTIFIERS.subList(0, 2),
@@ -2278,7 +2279,7 @@ public void testQueryBySegments() throws Exception
22782279

22792280
private void verifySinkMetrics(StubServiceEmitter emitter, Set<String> segmentIds)
22802281
{
2281-
Map<String, List<StubServiceEmitter.ServiceMetricEventSnapshot>> events = emitter.getMetricEvents();
2282+
Map<String, Queue<StubServiceEmitter.ServiceMetricEventSnapshot>> events = emitter.getMetricEvents();
22822283
int segments = segmentIds.size();
22832284
Assert.assertEquals(4, events.size());
22842285
Assert.assertTrue(events.containsKey("query/cpu/time"));

server/src/test/java/org/apache/druid/server/audit/SQLAuditManagerTest.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import java.io.IOException;
4444
import java.util.List;
4545
import java.util.Map;
46+
import java.util.Queue;
4647
import java.util.TreeMap;
4748

4849
@RunWith(MockitoJUnitRunner.class)
@@ -91,14 +92,14 @@ public void testAuditMetricEventWithPayload() throws IOException
9192
final AuditEntry entry = createAuditEntry("testKey", "testType", DateTimes.nowUtc());
9293
auditManager.doAudit(entry);
9394

94-
Map<String, List<StubServiceEmitter.ServiceMetricEventSnapshot>> metricEvents = serviceEmitter.getMetricEvents();
95+
Map<String, Queue<StubServiceEmitter.ServiceMetricEventSnapshot>> metricEvents = serviceEmitter.getMetricEvents();
9596
Assert.assertEquals(1, metricEvents.size());
9697

97-
List<StubServiceEmitter.ServiceMetricEventSnapshot> auditMetricEvents = metricEvents.get("config/audit");
98+
Queue<StubServiceEmitter.ServiceMetricEventSnapshot> auditMetricEvents = metricEvents.get("config/audit");
9899
Assert.assertNotNull(auditMetricEvents);
99100
Assert.assertEquals(1, auditMetricEvents.size());
100101

101-
ServiceMetricEvent metric = auditMetricEvents.get(0).getMetricEvent();
102+
ServiceMetricEvent metric = auditMetricEvents.peek().getMetricEvent();
102103

103104
final AuditEntry dbEntry = lookupAuditEntryForKey("testKey");
104105
Assert.assertNotNull(dbEntry);
@@ -120,14 +121,14 @@ public void testCreateAuditEntry() throws IOException
120121
Assert.assertEquals(entry, dbEntry);
121122

122123
// Verify emitted metrics
123-
Map<String, List<StubServiceEmitter.ServiceMetricEventSnapshot>> metricEvents = serviceEmitter.getMetricEvents();
124+
Map<String, Queue<StubServiceEmitter.ServiceMetricEventSnapshot>> metricEvents = serviceEmitter.getMetricEvents();
124125
Assert.assertEquals(1, metricEvents.size());
125126

126-
List<StubServiceEmitter.ServiceMetricEventSnapshot> auditMetricEvents = metricEvents.get("config/audit");
127+
Queue<StubServiceEmitter.ServiceMetricEventSnapshot> auditMetricEvents = metricEvents.get("config/audit");
127128
Assert.assertNotNull(auditMetricEvents);
128129
Assert.assertEquals(1, auditMetricEvents.size());
129130

130-
ServiceMetricEvent metric = auditMetricEvents.get(0).getMetricEvent();
131+
ServiceMetricEvent metric = auditMetricEvents.peek().getMetricEvent();
131132
Assert.assertEquals(dbEntry.getKey(), metric.getUserDims().get("key"));
132133
Assert.assertEquals(dbEntry.getType(), metric.getUserDims().get("type"));
133134
Assert.assertNull(metric.getUserDims().get("payload"));

server/src/test/java/org/apache/druid/server/metrics/LatchableEmitter.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -181,13 +181,14 @@ private void evaluateWaitConditions()
181181
return;
182182
}
183183

184+
List<Event> events = getEvents();
184185
for (WaitCondition condition : conditionsToEvaluate) {
185-
final int currentNumberOfEvents = getEvents().size();
186+
final int currentNumberOfEvents = events.size();
186187

187188
// Do not use an iterator over the list to avoid concurrent modification exceptions
188189
// Evaluate new events against this condition
189190
for (int i = condition.processedUntil; i < currentNumberOfEvents; ++i) {
190-
if (condition.predicate.test(getEvents().get(i))) {
191+
if (condition.predicate.test(events.get(i))) {
191192
condition.countDownLatch.countDown();
192193
}
193194
}

0 commit comments

Comments
 (0)