Skip to content

Commit 5626213

Browse files
authored
add StorageMonitor to measure storage and virtual storage usage by segment cache (#18742)
changes: * adds `StorageLocationStats` and `VirtualStorageLocationStats` to abstract measuring segment cache/virtual segment cache usage * adds `StorageStats` to collect `StorageLocationStats` and `VirtualStorageLocationStats` into maps between location label and the stats of that location * adds new method `SegmentCacheManager.getStorageStats` to expose stats collection for segment cache manager implementations * adds new `StorageMonitor` to emit metrics for values in `StorageStats` * metrics for `StorageLocationStats`: - storage/used/bytes - storage/load/count - storage/load/bytes - storage/drop/count - storage/drop/bytes * metrics for `VirtualStorageLocationStats`: - storage/virtual/used/bytes - storage/virtual/hit/count - storage/virtual/load/count - storage/virtual/load/bytes - storage/virtual/evict/count - storage/virtual/evict/bytes - storage/virtual/reject/count
1 parent b8230eb commit 5626213

File tree

12 files changed

+669
-119
lines changed

12 files changed

+669
-119
lines changed

embedded-tests/src/test/java/org/apache/druid/testing/embedded/query/QueryVirtualStorageTest.java

Lines changed: 80 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.apache.druid.query.DefaultQueryMetrics;
3030
import org.apache.druid.query.DruidProcessingConfigTest;
3131
import org.apache.druid.server.metrics.LatchableEmitter;
32+
import org.apache.druid.server.metrics.StorageMonitor;
3233
import org.apache.druid.sql.calcite.planner.Calcites;
3334
import org.apache.druid.testing.embedded.EmbeddedBroker;
3435
import org.apache.druid.testing.embedded.EmbeddedCoordinator;
@@ -51,7 +52,6 @@
5152
import java.io.IOException;
5253
import java.nio.file.Files;
5354
import java.util.Collections;
54-
import java.util.List;
5555
import java.util.concurrent.ThreadLocalRandom;
5656

5757
/**
@@ -102,6 +102,11 @@ public EmbeddedDruidCluster createCluster()
102102
.useDefaultTimeoutForLatchableEmitter(20)
103103
.addResource(storageResource)
104104
.addCommonProperty("druid.storage.zip", "false")
105+
.addCommonProperty("druid.monitoring.emissionPeriod", "PT1s")
106+
.addCommonProperty(
107+
"druid.monitoring.monitors",
108+
"[\"org.apache.druid.server.metrics.StorageMonitor\"]"
109+
)
105110
.addServer(coordinator)
106111
.addServer(overlord)
107112
.addServer(indexer)
@@ -152,90 +157,123 @@ void testQueryPartials()
152157
"select count(*) from \"%s\" WHERE __time >= TIMESTAMP '2015-09-12 14:00:00' and __time < TIMESTAMP '2015-09-12 19:00:00'",
153158
"select count(*) from \"%s\" WHERE __time >= TIMESTAMP '2015-09-12 19:00:00' and __time < TIMESTAMP '2015-09-13 00:00:00'"
154159
};
155-
final long[] expectedResults = new long[] {
156-
9770,
157-
10524,
158-
10267,
159-
8683
160-
};
160+
final long[] expectedResults = new long[]{9770, 10524, 10267, 8683};
161+
final long[] expectedLoads = new long[]{8L, 6L, 5L, 5L};
162+
163+
164+
LatchableEmitter emitter = historical.latchableEmitter();
165+
// clear out the pipe to get zerod out storage monitor metrics
166+
ServiceMetricEvent monitorEvent = emitter.waitForNextEvent(event -> event.hasMetricName(StorageMonitor.VSF_LOAD_COUNT));
167+
while (monitorEvent != null && monitorEvent.getValue().longValue() > 0) {
168+
monitorEvent = emitter.waitForNextEvent(event -> event.hasMetricName(StorageMonitor.VSF_LOAD_COUNT));
169+
}
170+
// then flush (which clears out the internal events stores in test emitter) so we can do clean sums across them
171+
emitter.flush();
172+
173+
emitter.waitForNextEvent(event -> event.hasMetricName(StorageMonitor.VSF_LOAD_COUNT));
174+
long beforeLoads = emitter.getMetricEventLongSum(StorageMonitor.VSF_LOAD_COUNT);
175+
// confirm flushed
176+
Assertions.assertEquals(0, beforeLoads);
161177

178+
// run the queries in order
162179
Assertions.assertEquals(expectedResults[0], Long.parseLong(cluster.runSql(queries[0], dataSource)));
163-
assertMetrics(1, 8L);
180+
assertQueryMetrics(1, expectedLoads[0]);
164181
Assertions.assertEquals(expectedResults[1], Long.parseLong(cluster.runSql(queries[1], dataSource)));
165-
assertMetrics(2, 6L);
182+
assertQueryMetrics(2, expectedLoads[1]);
166183
Assertions.assertEquals(expectedResults[2], Long.parseLong(cluster.runSql(queries[2], dataSource)));
167-
assertMetrics(3, 5L);
184+
assertQueryMetrics(3, expectedLoads[2]);
168185
Assertions.assertEquals(expectedResults[3], Long.parseLong(cluster.runSql(queries[3], dataSource)));
169-
assertMetrics(4, 5L);
186+
assertQueryMetrics(4, expectedLoads[3]);
170187

188+
emitter.waitForNextEvent(event -> event.hasMetricName(StorageMonitor.VSF_LOAD_COUNT));
189+
long firstLoads = emitter.getMetricEventLongSum(StorageMonitor.VSF_LOAD_COUNT);
190+
Assertions.assertTrue(firstLoads >= 24, "expected " + 24 + " but only got " + firstLoads);
191+
192+
long expectedTotalHits = 0;
193+
long expectedTotalLoad = 0;
171194
for (int i = 0; i < 1000; i++) {
172195
int nextQuery = ThreadLocalRandom.current().nextInt(queries.length);
173196
Assertions.assertEquals(expectedResults[nextQuery], Long.parseLong(cluster.runSql(queries[nextQuery], dataSource)));
174-
assertMetrics(i + 5, null);
197+
assertQueryMetrics(i + 5, null);
198+
long actualLoads = getMetricLatestValue(emitter, DefaultQueryMetrics.QUERY_ON_DEMAND_LOAD_COUNT, i + 5);
199+
expectedTotalLoad += actualLoads;
200+
expectedTotalHits += (expectedLoads[nextQuery] - actualLoads);
175201
}
202+
203+
emitter.waitForNextEvent(event -> event.hasMetricName(StorageMonitor.VSF_HIT_COUNT));
204+
long hits = emitter.getMetricEventLongSum(StorageMonitor.VSF_HIT_COUNT);
205+
Assertions.assertTrue(hits >= expectedTotalHits, "expected " + expectedTotalHits + " but only got " + hits);
206+
emitter.waitForNextEvent(event -> event.hasMetricName(StorageMonitor.VSF_LOAD_COUNT));
207+
long loads = emitter.getMetricEventLongSum(StorageMonitor.VSF_LOAD_COUNT);
208+
Assertions.assertTrue(loads >= expectedTotalLoad, "expected " + expectedTotalLoad + " but only got " + loads);
209+
Assertions.assertTrue(emitter.getMetricEventLongSum(StorageMonitor.VSF_LOAD_BYTES) > 0);
210+
emitter.waitForNextEvent(event -> event.hasMetricName(StorageMonitor.VSF_EVICT_COUNT));
211+
Assertions.assertTrue(emitter.getMetricEventLongSum(StorageMonitor.VSF_EVICT_COUNT) >= 0);
212+
Assertions.assertTrue(emitter.getMetricEventLongSum(StorageMonitor.VSF_EVICT_BYTES) > 0);
213+
Assertions.assertEquals(0, emitter.getMetricEventLongSum(StorageMonitor.VSF_REJECT_COUNT));
214+
Assertions.assertTrue(emitter.getLatestMetricEventValue(StorageMonitor.VSF_USED_BYTES, 0).longValue() > 0);
176215
}
177216

178-
private void assertMetrics(int expectedEventCount, @Nullable Long expectedLoadCount)
217+
218+
private void assertQueryMetrics(int expectedEventCount, @Nullable Long expectedLoadCount)
179219
{
180220
LatchableEmitter emitter = historical.latchableEmitter();
181-
final int lastIndex = expectedEventCount - 1;
182221

183-
List<ServiceMetricEvent> countEvents = emitter.getMetricEvents(DefaultQueryMetrics.QUERY_ON_DEMAND_LOAD_COUNT);
184-
Assertions.assertEquals(expectedEventCount, countEvents.size());
222+
long loadCount = getMetricLatestValue(emitter, DefaultQueryMetrics.QUERY_ON_DEMAND_LOAD_COUNT, expectedEventCount);
185223
if (expectedLoadCount != null) {
186-
Assertions.assertEquals(expectedLoadCount, countEvents.get(lastIndex).getValue());
224+
Assertions.assertEquals(expectedLoadCount, loadCount);
187225
}
188-
boolean hasLoads = countEvents.get(lastIndex).getValue().longValue() > 0;
226+
boolean hasLoads = loadCount > 0;
189227

190-
List<ServiceMetricEvent> timeEvents = emitter.getMetricEvents(DefaultQueryMetrics.QUERY_ON_DEMAND_LOAD_BATCH_TIME);
191-
Assertions.assertEquals(expectedEventCount, timeEvents.size());
228+
long time = getMetricLatestValue(emitter, DefaultQueryMetrics.QUERY_ON_DEMAND_LOAD_BATCH_TIME, expectedEventCount);
192229
if (hasLoads) {
193-
Assertions.assertTrue(timeEvents.get(lastIndex).getValue().longValue() > 0);
230+
Assertions.assertTrue(time > 0);
194231
} else {
195-
Assertions.assertEquals(0, timeEvents.get(lastIndex).getValue().longValue());
232+
Assertions.assertEquals(0, time);
196233
}
197234

198-
List<ServiceMetricEvent> timeMaxEvents = emitter.getMetricEvents(DefaultQueryMetrics.QUERY_ON_DEMAND_LOAD_TIME_MAX);
199-
Assertions.assertEquals(expectedEventCount, timeMaxEvents.size());
235+
long maxTime = getMetricLatestValue(emitter, DefaultQueryMetrics.QUERY_ON_DEMAND_LOAD_TIME_MAX, expectedEventCount);
200236
if (hasLoads) {
201-
Assertions.assertTrue(timeMaxEvents.get(lastIndex).getValue().longValue() > 0);
237+
Assertions.assertTrue(maxTime > 0);
202238
} else {
203-
Assertions.assertEquals(0, timeMaxEvents.get(lastIndex).getValue().longValue());
239+
Assertions.assertEquals(0, maxTime);
204240
}
205241

206-
List<ServiceMetricEvent> timeAvgEvents = emitter.getMetricEvents(DefaultQueryMetrics.QUERY_ON_DEMAND_LOAD_TIME_AVG);
207-
Assertions.assertEquals(expectedEventCount, timeAvgEvents.size());
242+
long avgTime = getMetricLatestValue(emitter, DefaultQueryMetrics.QUERY_ON_DEMAND_LOAD_TIME_AVG, expectedEventCount);
208243
if (hasLoads) {
209-
Assertions.assertTrue(timeAvgEvents.get(lastIndex).getValue().longValue() > 0);
244+
Assertions.assertTrue(avgTime > 0);
210245
} else {
211-
Assertions.assertEquals(0, timeAvgEvents.get(lastIndex).getValue().longValue());
246+
Assertions.assertEquals(0, avgTime);
212247
}
213248

214-
List<ServiceMetricEvent> waitMaxEvents = emitter.getMetricEvents(DefaultQueryMetrics.QUERY_ON_DEMAND_WAIT_TIME_MAX);
215-
Assertions.assertEquals(expectedEventCount, waitMaxEvents.size());
249+
long maxWait = getMetricLatestValue(emitter, DefaultQueryMetrics.QUERY_ON_DEMAND_WAIT_TIME_MAX, expectedEventCount);
216250
if (hasLoads) {
217-
Assertions.assertTrue(waitMaxEvents.get(lastIndex).getValue().longValue() >= 0);
251+
Assertions.assertTrue(maxWait >= 0);
218252
} else {
219-
Assertions.assertEquals(0, waitMaxEvents.get(lastIndex).getValue().longValue());
253+
Assertions.assertEquals(0, maxWait);
220254
}
221255

222-
List<ServiceMetricEvent> waitAvgEvents = emitter.getMetricEvents(DefaultQueryMetrics.QUERY_ON_DEMAND_WAIT_TIME_AVG);
223-
Assertions.assertEquals(expectedEventCount, waitAvgEvents.size());
256+
long avgWait = getMetricLatestValue(emitter, DefaultQueryMetrics.QUERY_ON_DEMAND_WAIT_TIME_AVG, expectedEventCount);
224257
if (hasLoads) {
225-
Assertions.assertTrue(waitAvgEvents.get(lastIndex).getValue().longValue() >= 0);
258+
Assertions.assertTrue(avgWait >= 0);
226259
} else {
227-
Assertions.assertEquals(0, waitAvgEvents.get(lastIndex).getValue().longValue());
260+
Assertions.assertEquals(0, avgWait);
228261
}
229262

230-
List<ServiceMetricEvent> loadSizeEvents = emitter.getMetricEvents(DefaultQueryMetrics.QUERY_ON_DEMAND_LOAD_BYTES);
231-
Assertions.assertEquals(expectedEventCount, loadSizeEvents.size());
263+
long bytes = getMetricLatestValue(emitter, DefaultQueryMetrics.QUERY_ON_DEMAND_LOAD_BYTES, expectedEventCount);
232264
if (hasLoads) {
233-
Assertions.assertTrue(loadSizeEvents.get(lastIndex).getValue().longValue() > 0);
265+
Assertions.assertTrue(bytes > 0);
234266
} else {
235-
Assertions.assertEquals(0, loadSizeEvents.get(lastIndex).getValue().longValue());
267+
Assertions.assertEquals(0, bytes);
236268
}
237269
}
238270

271+
private long getMetricLatestValue(LatchableEmitter emitter, String metricName, int expectedCount)
272+
{
273+
Assertions.assertEquals(expectedCount, emitter.getMetricEventCount(metricName));
274+
return emitter.getLatestMetricEventValue(metricName, 0).longValue();
275+
}
276+
239277
private String createTestDatasourceName()
240278
{
241279
return "wiki-" + IdUtils.getRandomId();

processing/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitter.java

Lines changed: 37 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,11 @@
2424
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
2525
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
2626

27+
import javax.annotation.Nullable;
2728
import java.util.ArrayDeque;
2829
import java.util.ArrayList;
2930
import java.util.Collections;
31+
import java.util.Deque;
3032
import java.util.List;
3133
import java.util.Map;
3234
import java.util.Queue;
@@ -39,9 +41,9 @@
3941
*/
4042
public class StubServiceEmitter extends ServiceEmitter implements MetricsVerifier
4143
{
42-
private final Queue<Event> events = new ConcurrentLinkedDeque<>();
43-
private final Queue<AlertEvent> alertEvents = new ConcurrentLinkedDeque<>();
44-
private final ConcurrentHashMap<String, Queue<ServiceMetricEvent>> metricEvents = new ConcurrentHashMap<>();
44+
private final Deque<Event> events = new ConcurrentLinkedDeque<>();
45+
private final Deque<AlertEvent> alertEvents = new ConcurrentLinkedDeque<>();
46+
private final ConcurrentHashMap<String, Deque<ServiceMetricEvent>> metricEvents = new ConcurrentHashMap<>();
4547

4648
public StubServiceEmitter()
4749
{
@@ -123,6 +125,38 @@ public List<Number> getMetricValues(
123125
return values;
124126
}
125127

128+
public int getMetricEventCount(String metricName)
129+
{
130+
final Queue<ServiceMetricEvent> metricEventQueue = metricEvents.get(metricName);
131+
return metricEventQueue == null ? 0 : metricEventQueue.size();
132+
}
133+
134+
public long getMetricEventLongSum(String metricName)
135+
{
136+
final Queue<ServiceMetricEvent> metricEventQueue = metricEvents.get(metricName);
137+
long total = 0;
138+
for (ServiceMetricEvent event : metricEventQueue) {
139+
total += event.getValue().longValue();
140+
}
141+
return total;
142+
}
143+
144+
@Nullable
145+
public Number getLatestMetricEventValue(String metricName)
146+
{
147+
final Deque<ServiceMetricEvent> metricEventQueue = metricEvents.get(metricName);
148+
return metricEventQueue == null ? null : metricEventQueue.getLast().getValue();
149+
}
150+
151+
public Number getLatestMetricEventValue(String metricName, Number defaultValue)
152+
{
153+
final Number latest = getLatestMetricEventValue(metricName);
154+
if (latest == null) {
155+
return defaultValue;
156+
}
157+
return latest;
158+
}
159+
126160
@Override
127161
public void start()
128162
{

server/src/main/java/org/apache/druid/segment/loading/SegmentCacheManager.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,4 +138,10 @@ public interface SegmentCacheManager
138138
void shutdownBootstrap();
139139

140140
void shutdown();
141+
142+
/**
143+
* Collect {@link StorageStats}, if available.
144+
*/
145+
@Nullable
146+
StorageStats getStorageStats();
141147
}

server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,10 @@
5353
import java.nio.file.Files;
5454
import java.nio.file.StandardCopyOption;
5555
import java.util.ArrayList;
56+
import java.util.HashMap;
5657
import java.util.Iterator;
5758
import java.util.List;
59+
import java.util.Map;
5860
import java.util.Objects;
5961
import java.util.Optional;
6062
import java.util.concurrent.ConcurrentHashMap;
@@ -619,6 +621,31 @@ public void shutdown()
619621
}
620622
}
621623

624+
@Nullable
625+
@Override
626+
public StorageStats getStorageStats()
627+
{
628+
if (config.isVirtualStorage()) {
629+
final Map<String, VirtualStorageLocationStats> locationStats = new HashMap<>();
630+
for (StorageLocation location : locations) {
631+
locationStats.put(location.getPath().toString(), location.resetWeakStats());
632+
}
633+
return new StorageStats(
634+
Map.of(),
635+
locationStats
636+
);
637+
} else {
638+
final Map<String, StorageLocationStats> locationStats = new HashMap<>();
639+
for (StorageLocation location : locations) {
640+
locationStats.put(location.getPath().toString(), location.resetStaticStats());
641+
}
642+
return new StorageStats(
643+
locationStats,
644+
Map.of()
645+
);
646+
}
647+
}
648+
622649
@VisibleForTesting
623650
public ConcurrentHashMap<DataSegment, ReferenceCountingLock> getSegmentLocks()
624651
{

0 commit comments

Comments
 (0)