Skip to content

Commit e376c3c

Browse files
committed
MSQ: Use VSF, give stages control over inputs.
This patch integrates MSQ with virtual storage. It also refactors how MSQ reads inputs to give stages more control over how inputs are read and merged. In particular, stages are now able to fully control merging logic. The main changes: 1) Integrate with virtual storage: merge the two DataSegmentProvider impls (Dart and Task) into DataSegmentProviderImpl that relies on SegmentManager. 2) Give stages control over input merging: rework InputSliceReader to return ReadablePartitions directly, without embedding any merging logic. Break out StandardPartitionReader as a separate class. Other changes: 1) Move ReadableInput to the querykit package. It is no longer specific to the MSQ framework. 2) Remove StandardStageProcessor, refactoring dependent code to not require it. 3) Remove ExternalColumnSelectorFactory wrapper. Type casting is now handled directly by RowBasedColumnSelectorFactory. 4) Include full query context in worker context, rather than just a subset. Includes apache#18871.
1 parent bd986b4 commit e376c3c

File tree

109 files changed

+2325
-2482
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

109 files changed

+2325
-2482
lines changed

indexing-service/src/main/java/org/apache/druid/indexing/common/SegmentCacheManagerFactory.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,9 +54,10 @@ public SegmentCacheManagerFactory(
5454

5555
public SegmentCacheManager manufacturate(File storageDir)
5656
{
57-
final SegmentLoaderConfig loaderConfig = new SegmentLoaderConfig().withLocations(
58-
Collections.singletonList(new StorageLocationConfig(storageDir, null, null))
59-
);
57+
final SegmentLoaderConfig loaderConfig =
58+
new SegmentLoaderConfig()
59+
.setLocations(Collections.singletonList(new StorageLocationConfig(storageDir, null, null)))
60+
.setVirtualStorage(true, true);
6061
final List<StorageLocation> storageLocations = loaderConfig.toStorageLocations();
6162
return new SegmentLocalCacheManager(
6263
storageLocations,

multi-stage-query/src/main/java/org/apache/druid/msq/dart/guice/DartWorkerModule.java

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -52,19 +52,20 @@
5252
import org.apache.druid.msq.dart.controller.http.DartQueryInfo;
5353
import org.apache.druid.msq.dart.controller.messages.ControllerMessage;
5454
import org.apache.druid.msq.dart.controller.sql.DartSqlEngine;
55-
import org.apache.druid.msq.dart.worker.DartDataSegmentProvider;
5655
import org.apache.druid.msq.dart.worker.DartDataServerQueryHandlerFactory;
5756
import org.apache.druid.msq.dart.worker.DartWorkerContextFactory;
5857
import org.apache.druid.msq.dart.worker.DartWorkerContextFactoryImpl;
5958
import org.apache.druid.msq.dart.worker.DartWorkerRunner;
6059
import org.apache.druid.msq.dart.worker.http.DartWorkerResource;
60+
import org.apache.druid.msq.exec.DataSegmentProviderImpl;
6161
import org.apache.druid.msq.exec.MemoryIntrospector;
62-
import org.apache.druid.msq.querykit.DataSegmentProvider;
62+
import org.apache.druid.msq.input.table.DataSegmentProvider;
6363
import org.apache.druid.msq.rpc.ResourcePermissionMapper;
6464
import org.apache.druid.query.DruidProcessingConfig;
6565
import org.apache.druid.query.QueryToolChestWarehouse;
6666
import org.apache.druid.rpc.ServiceClientFactory;
6767
import org.apache.druid.server.DruidNode;
68+
import org.apache.druid.server.SegmentManager;
6869
import org.apache.druid.server.security.AuthorizerMapper;
6970

7071
import java.io.File;
@@ -104,11 +105,6 @@ public void configure(Binder binder)
104105
.to(DartWorkerContextFactoryImpl.class)
105106
.in(LazySingleton.class);
106107

107-
binder.bind(DataSegmentProvider.class)
108-
.annotatedWith(Dart.class)
109-
.to(DartDataSegmentProvider.class)
110-
.in(LazySingleton.class);
111-
112108
binder.bind(ResourcePermissionMapper.class)
113109
.annotatedWith(Dart.class)
114110
.to(DartResourcePermissionMapper.class);
@@ -139,6 +135,14 @@ public DartWorkerRunner createWorkerRunner(
139135
);
140136
}
141137

138+
@Provides
139+
@LazySingleton
140+
@Dart
141+
public DataSegmentProvider createDataSegmentProvider(final SegmentManager segmentManager)
142+
{
143+
return new DataSegmentProviderImpl(segmentManager, null);
144+
}
145+
142146
@Provides
143147
@Dart
144148
public MessageRelayMonitor createMessageRelayMonitor(

multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartDataSegmentProvider.java

Lines changed: 0 additions & 114 deletions
This file was deleted.

multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartFrameContext.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
import org.apache.druid.msq.exec.WorkerMemoryParameters;
3131
import org.apache.druid.msq.exec.WorkerStorageParameters;
3232
import org.apache.druid.msq.kernel.StageId;
33-
import org.apache.druid.msq.querykit.DataSegmentProvider;
33+
import org.apache.druid.msq.input.table.DataSegmentProvider;
3434
import org.apache.druid.query.groupby.GroupingEngine;
3535
import org.apache.druid.query.policy.PolicyEnforcer;
3636
import org.apache.druid.segment.IndexIO;

multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerContext.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@
4242
import org.apache.druid.msq.exec.WorkerMemoryParameters;
4343
import org.apache.druid.msq.exec.WorkerStorageParameters;
4444
import org.apache.druid.msq.kernel.WorkOrder;
45-
import org.apache.druid.msq.querykit.DataSegmentProvider;
45+
import org.apache.druid.msq.input.table.DataSegmentProvider;
4646
import org.apache.druid.msq.util.MultiStageQueryContext;
4747
import org.apache.druid.query.DruidProcessingConfig;
4848
import org.apache.druid.query.QueryContext;

multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerContextFactoryImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
import org.apache.druid.msq.exec.MemoryIntrospector;
3434
import org.apache.druid.msq.exec.ProcessingBuffersProvider;
3535
import org.apache.druid.msq.exec.WorkerContext;
36-
import org.apache.druid.msq.querykit.DataSegmentProvider;
36+
import org.apache.druid.msq.input.table.DataSegmentProvider;
3737
import org.apache.druid.query.DruidProcessingConfig;
3838
import org.apache.druid.query.QueryContext;
3939
import org.apache.druid.query.groupby.GroupingEngine;

multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java

Lines changed: 9 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,9 @@
4141
import org.apache.druid.data.input.impl.DimensionsSpec;
4242
import org.apache.druid.error.DruidException;
4343
import org.apache.druid.frame.FrameType;
44-
import org.apache.druid.frame.allocation.ArenaMemoryAllocator;
44+
import org.apache.druid.frame.allocation.ArenaMemoryAllocatorFactory;
4545
import org.apache.druid.frame.channel.ReadableConcatFrameChannel;
46+
import org.apache.druid.frame.channel.ReadableFrameChannel;
4647
import org.apache.druid.frame.key.ClusterBy;
4748
import org.apache.druid.frame.key.ClusterByPartition;
4849
import org.apache.druid.frame.key.ClusterByPartitions;
@@ -92,8 +93,7 @@
9293
import org.apache.druid.msq.counters.ChannelCounters;
9394
import org.apache.druid.msq.counters.CounterSnapshots;
9495
import org.apache.druid.msq.counters.CounterSnapshotsTree;
95-
import org.apache.druid.msq.indexing.InputChannelFactory;
96-
import org.apache.druid.msq.indexing.InputChannelsImpl;
96+
import org.apache.druid.msq.exec.std.StandardPartitionReader;
9797
import org.apache.druid.msq.indexing.LegacyMSQSpec;
9898
import org.apache.druid.msq.indexing.MSQControllerTask;
9999
import org.apache.druid.msq.indexing.MSQSpec;
@@ -142,14 +142,12 @@
142142
import org.apache.druid.msq.input.inline.InlineInputSpecSlicer;
143143
import org.apache.druid.msq.input.lookup.LookupInputSpec;
144144
import org.apache.druid.msq.input.lookup.LookupInputSpecSlicer;
145-
import org.apache.druid.msq.input.stage.InputChannels;
146145
import org.apache.druid.msq.input.stage.StageInputSpec;
147146
import org.apache.druid.msq.input.stage.StageInputSpecSlicer;
148147
import org.apache.druid.msq.input.table.TableInputSpec;
149148
import org.apache.druid.msq.kernel.QueryDefinition;
150149
import org.apache.druid.msq.kernel.StageDefinition;
151150
import org.apache.druid.msq.kernel.StageId;
152-
import org.apache.druid.msq.kernel.StagePartition;
153151
import org.apache.druid.msq.kernel.WorkOrder;
154152
import org.apache.druid.msq.kernel.controller.ControllerQueryKernel;
155153
import org.apache.druid.msq.kernel.controller.ControllerQueryKernelConfig;
@@ -2797,31 +2795,25 @@ private void startQueryResultsReader()
27972795
final FrameProcessorExecutor resultReaderExec = createResultReaderExec(queryId());
27982796
resultReaderExec.registerCancellationId(RESULT_READER_CANCELLATION_ID);
27992797

2800-
ReadableConcatFrameChannel resultsChannel = null;
2798+
ReadableFrameChannel resultsChannel = null;
28012799

28022800
try {
2803-
final InputChannels inputChannels = new InputChannelsImpl(
2801+
final StandardPartitionReader partitionReader = new StandardPartitionReader(
28042802
queryDef,
2805-
queryKernel.getResultPartitionsForStage(finalStageId),
28062803
inputChannelFactory,
28072804
FrameWriterSpec.fromContext(querySpec.getContext()),
2808-
() -> ArenaMemoryAllocator.createOnHeap(5_000_000),
28092805
resultReaderExec,
28102806
RESULT_READER_CANCELLATION_ID,
2811-
null
2807+
null,
2808+
new ArenaMemoryAllocatorFactory(MultiStageQueryContext.getFrameSize(querySpec.getContext()))
28122809
);
28132810

28142811
resultsChannel = ReadableConcatFrameChannel.open(
28152812
StreamSupport.stream(queryKernel.getResultPartitionsForStage(finalStageId).spliterator(), false)
28162813
.map(
28172814
readablePartition -> {
28182815
try {
2819-
return inputChannels.openChannel(
2820-
new StagePartition(
2821-
queryKernel.getStageDefinition(finalStageId).getId(),
2822-
readablePartition.getPartitionNumber()
2823-
)
2824-
);
2816+
return partitionReader.openChannel(readablePartition);
28252817
}
28262818
catch (IOException e) {
28272819
throw new RuntimeException(e);
@@ -2852,7 +2844,7 @@ private void startQueryResultsReader()
28522844
}
28532845
catch (Throwable e) {
28542846
// There was some issue setting up the result reader. Shut down the results channel and stop the executor.
2855-
final ReadableConcatFrameChannel finalResultsChannel = resultsChannel;
2847+
final ReadableFrameChannel finalResultsChannel = resultsChannel;
28562848
throw CloseableUtils.closeAndWrapInCatch(
28572849
e,
28582850
() -> CloseableUtils.closeAll(
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.druid.msq.exec;
21+
22+
import com.google.common.base.Preconditions;
23+
import org.apache.druid.frame.channel.ReadableFrameChannel;
24+
import org.apache.druid.msq.counters.CounterNames;
25+
import org.apache.druid.msq.counters.CounterTracker;
26+
import org.apache.druid.msq.indexing.CountingReadableFrameChannel;
27+
import org.apache.druid.msq.input.InputSlice;
28+
import org.apache.druid.msq.input.stage.StageInputSlice;
29+
import org.apache.druid.msq.kernel.StageId;
30+
import org.apache.druid.msq.kernel.WorkOrder;
31+
32+
import java.io.IOException;
33+
import java.util.List;
34+
35+
/**
36+
* Wrapper around {@link InputChannelFactory} that increments counters as data is read.
37+
*/
38+
public class CountingInputChannelFactory implements InputChannelFactory
39+
{
40+
private final InputChannelFactory baseFactory;
41+
private final WorkOrder workOrder;
42+
private final CounterTracker counterTracker;
43+
44+
public CountingInputChannelFactory(
45+
InputChannelFactory baseFactory,
46+
WorkOrder workOrder,
47+
CounterTracker counterTracker
48+
)
49+
{
50+
this.baseFactory = Preconditions.checkNotNull(baseFactory, "baseFactory");
51+
this.workOrder = Preconditions.checkNotNull(workOrder, "workOrder");
52+
this.counterTracker = Preconditions.checkNotNull(counterTracker, "counterTracker");
53+
}
54+
55+
@Override
56+
public ReadableFrameChannel openChannel(StageId stageId, int workerNumber, int partitionNumber) throws IOException
57+
{
58+
return new CountingReadableFrameChannel(
59+
baseFactory.openChannel(stageId, workerNumber, partitionNumber),
60+
counterTracker.channel(getCounterNameForStage(stageId.getStageNumber())),
61+
partitionNumber
62+
);
63+
}
64+
65+
/**
66+
* Returns the counter name based on the input number (position in {@link WorkOrder#getInputs()}) for a stage.
67+
* If multiple match, or if none matches, uses the size of the input array (which ends up not corresponding to
68+
* any input number, allowing detection of these situations when looking at counters).
69+
*/
70+
private String getCounterNameForStage(int stageNumber)
71+
{
72+
final List<InputSlice> inputs = workOrder.getInputs();
73+
Integer matchingSlice = null;
74+
for (int i = 0; i < inputs.size(); i++) {
75+
final InputSlice slice = inputs.get(i);
76+
if (slice instanceof StageInputSlice && ((StageInputSlice) slice).getStageNumber() == stageNumber) {
77+
if (matchingSlice == null) {
78+
matchingSlice = i;
79+
} else {
80+
matchingSlice = null;
81+
break;
82+
}
83+
}
84+
}
85+
86+
return CounterNames.inputChannel(matchingSlice == null ? inputs.size() : matchingSlice);
87+
}
88+
}

0 commit comments

Comments
 (0)