Skip to content

Commit 01dd8d1

Browse files
authored
Dart: Async queries to realtime servers. (#18241)
* Dart: Async queries to realtime servers. Prior to this patch, queries from MSQ workers to data servers would be initiated in the processing pool, and would block the processing pool until results started coming in. This patch addresses it with the strategy: 1) Update DataServerClient to return a future that resolves when the response starts being written. 2) Split DataServerQueryHandler into DartDataServerQueryHandler and IndexerDataServerQueryHandler. The Dart version doesn't do retries and doesn't follow segments to other data servers. It just returns the async future from DataServerClient. The Indexer (tasks) version retains the prior logic and isn't really async. I didn't attempt to asyncify its retry logic in this patch. 3) Add ReturnOrAwait.awaitAllFutures, which allows processors to wait for a future to resolve. 4) Update ScanQueryFrameProcessor and GroupByPreShuffleFrameProcessor to give up the processing thread when waiting for a data server query to come back. Additionally, to simplify DataServerClient, cancellations are now issued without using a scheduled executor. There should be no need for this, because the service client is async. * Fix tests and checkstyle. * Fix exception checking.
1 parent 5b6a2a2 commit 01dd8d1

File tree

26 files changed

+1338
-627
lines changed

26 files changed

+1338
-627
lines changed

extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/guice/DartWorkerModule.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.apache.druid.msq.dart.guice;
2121

22+
import com.fasterxml.jackson.databind.ObjectMapper;
2223
import com.fasterxml.jackson.databind.jsontype.NamedType;
2324
import com.fasterxml.jackson.databind.module.SimpleModule;
2425
import com.google.common.util.concurrent.MoreExecutors;
@@ -35,8 +36,10 @@
3536
import org.apache.druid.guice.LifecycleModule;
3637
import org.apache.druid.guice.ManageLifecycle;
3738
import org.apache.druid.guice.ManageLifecycleAnnouncements;
39+
import org.apache.druid.guice.annotations.EscalatedGlobal;
3840
import org.apache.druid.guice.annotations.LoadScope;
3941
import org.apache.druid.guice.annotations.Self;
42+
import org.apache.druid.guice.annotations.Smile;
4043
import org.apache.druid.initialization.DruidModule;
4144
import org.apache.druid.java.util.common.StringUtils;
4245
import org.apache.druid.java.util.common.concurrent.Execs;
@@ -50,6 +53,7 @@
5053
import org.apache.druid.msq.dart.controller.messages.ControllerMessage;
5154
import org.apache.druid.msq.dart.controller.sql.DartSqlEngine;
5255
import org.apache.druid.msq.dart.worker.DartDataSegmentProvider;
56+
import org.apache.druid.msq.dart.worker.DartDataServerQueryHandlerFactory;
5357
import org.apache.druid.msq.dart.worker.DartWorkerContextFactory;
5458
import org.apache.druid.msq.dart.worker.DartWorkerContextFactoryImpl;
5559
import org.apache.druid.msq.dart.worker.DartWorkerRunner;
@@ -58,6 +62,8 @@
5862
import org.apache.druid.msq.querykit.DataSegmentProvider;
5963
import org.apache.druid.msq.rpc.ResourcePermissionMapper;
6064
import org.apache.druid.query.DruidProcessingConfig;
65+
import org.apache.druid.query.QueryToolChestWarehouse;
66+
import org.apache.druid.rpc.ServiceClientFactory;
6167
import org.apache.druid.server.DruidNode;
6268
import org.apache.druid.server.security.AuthorizerMapper;
6369

@@ -156,6 +162,20 @@ public Outbox<ControllerMessage> createOutbox()
156162
{
157163
return new OutboxImpl<>();
158164
}
165+
166+
@Provides
167+
public DartDataServerQueryHandlerFactory createDataServerQueryHandlerFactory(
168+
@EscalatedGlobal ServiceClientFactory serviceClientFactory,
169+
@Smile ObjectMapper smileMapper,
170+
QueryToolChestWarehouse queryToolChestWarehouse
171+
)
172+
{
173+
return new DartDataServerQueryHandlerFactory(
174+
serviceClientFactory,
175+
smileMapper,
176+
queryToolChestWarehouse
177+
);
178+
}
159179
}
160180

161181
@Override
Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.druid.msq.dart.worker;
21+
22+
import com.fasterxml.jackson.databind.JavaType;
23+
import com.fasterxml.jackson.databind.ObjectMapper;
24+
import com.google.common.util.concurrent.ListenableFuture;
25+
import org.apache.druid.common.guava.FutureUtils;
26+
import org.apache.druid.discovery.DataServerClient;
27+
import org.apache.druid.error.DruidException;
28+
import org.apache.druid.java.util.common.StringUtils;
29+
import org.apache.druid.java.util.common.guava.Sequence;
30+
import org.apache.druid.java.util.common.guava.Yielder;
31+
import org.apache.druid.java.util.common.io.Closer;
32+
import org.apache.druid.msq.counters.ChannelCounters;
33+
import org.apache.druid.msq.exec.DataServerQueryHandler;
34+
import org.apache.druid.msq.exec.DataServerQueryHandlerUtils;
35+
import org.apache.druid.msq.exec.DataServerQueryResult;
36+
import org.apache.druid.msq.input.table.DataServerRequestDescriptor;
37+
import org.apache.druid.msq.input.table.RichSegmentDescriptor;
38+
import org.apache.druid.query.Queries;
39+
import org.apache.druid.query.Query;
40+
import org.apache.druid.query.QueryToolChest;
41+
import org.apache.druid.query.QueryToolChestWarehouse;
42+
import org.apache.druid.query.SegmentDescriptor;
43+
import org.apache.druid.query.aggregation.MetricManipulatorFns;
44+
import org.apache.druid.query.context.DefaultResponseContext;
45+
import org.apache.druid.query.context.ResponseContext;
46+
import org.apache.druid.rpc.ServiceClientFactory;
47+
import org.apache.druid.rpc.ServiceLocation;
48+
49+
import java.util.Collections;
50+
import java.util.List;
51+
import java.util.function.Function;
52+
import java.util.stream.Collectors;
53+
54+
/**
55+
* Dart implementation of {@link DataServerQueryHandler}. Issues queries asynchronously, with no retries.
56+
*/
57+
public class DartDataServerQueryHandler implements DataServerQueryHandler
58+
{
59+
private final String dataSource;
60+
private final ChannelCounters channelCounters;
61+
private final ServiceClientFactory serviceClientFactory;
62+
private final ObjectMapper objectMapper;
63+
private final QueryToolChestWarehouse warehouse;
64+
private final DataServerRequestDescriptor requestDescriptor;
65+
66+
public DartDataServerQueryHandler(
67+
String dataSource,
68+
ChannelCounters channelCounters,
69+
ServiceClientFactory serviceClientFactory,
70+
ObjectMapper objectMapper,
71+
QueryToolChestWarehouse warehouse,
72+
DataServerRequestDescriptor requestDescriptor
73+
)
74+
{
75+
this.dataSource = dataSource;
76+
this.channelCounters = channelCounters;
77+
this.serviceClientFactory = serviceClientFactory;
78+
this.objectMapper = objectMapper;
79+
this.warehouse = warehouse;
80+
this.requestDescriptor = requestDescriptor;
81+
}
82+
83+
/**
84+
* {@inheritDoc}
85+
*
86+
* This method returns immediately. The returned future resolves when the server has started sending back
87+
* its response.
88+
*
89+
* Queries are issued once, without retries.
90+
*/
91+
@Override
92+
public <RowType, QueryType> ListenableFuture<DataServerQueryResult<RowType>> fetchRowsFromDataServer(
93+
Query<QueryType> query,
94+
Function<Sequence<QueryType>, Sequence<RowType>> mappingFunction,
95+
Closer closer
96+
)
97+
{
98+
final Query<QueryType> preparedQuery =
99+
Queries.withSpecificSegments(
100+
DataServerQueryHandlerUtils.prepareQuery(query, dataSource),
101+
requestDescriptor.getSegments()
102+
.stream()
103+
.map(RichSegmentDescriptor::toPlainDescriptor)
104+
.collect(Collectors.toList())
105+
);
106+
107+
final ServiceLocation serviceLocation =
108+
ServiceLocation.fromDruidServerMetadata(requestDescriptor.getServerMetadata());
109+
final DataServerClient dataServerClient = makeDataServerClient(serviceLocation);
110+
final QueryToolChest<QueryType, Query<QueryType>> toolChest = warehouse.getToolChest(query);
111+
final Function<QueryType, QueryType> preComputeManipulatorFn =
112+
toolChest.makePreComputeManipulatorFn(query, MetricManipulatorFns.deserializing());
113+
final JavaType queryResultType = toolChest.getBaseResultType();
114+
final ResponseContext responseContext = new DefaultResponseContext();
115+
116+
return FutureUtils.transform(
117+
dataServerClient.run(preparedQuery, responseContext, queryResultType, closer),
118+
resultSequence -> {
119+
final Yielder<RowType> yielder = DataServerQueryHandlerUtils.createYielder(
120+
resultSequence.map(preComputeManipulatorFn),
121+
mappingFunction,
122+
channelCounters
123+
);
124+
125+
final List<SegmentDescriptor> missingSegments =
126+
DataServerQueryHandlerUtils.getMissingSegments(responseContext);
127+
128+
if (!missingSegments.isEmpty()) {
129+
throw DruidException
130+
.forPersona(DruidException.Persona.USER)
131+
.ofCategory(DruidException.Category.RUNTIME_FAILURE)
132+
.build(
133+
"Segment[%s]%s not found on server[%s]. Please retry your query.",
134+
missingSegments.get(0),
135+
missingSegments.size() > 1 ? StringUtils.format(" and[%d] others", missingSegments.size() - 1) : "",
136+
serviceLocation.getHostAndPort()
137+
);
138+
}
139+
140+
return new DataServerQueryResult<>(
141+
Collections.singletonList(yielder),
142+
Collections.emptyList(),
143+
dataSource
144+
);
145+
}
146+
);
147+
}
148+
149+
private DataServerClient makeDataServerClient(ServiceLocation serviceLocation)
150+
{
151+
return new DataServerClient(serviceClientFactory, serviceLocation, objectMapper);
152+
}
153+
}
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.druid.msq.dart.worker;
21+
22+
import com.fasterxml.jackson.databind.ObjectMapper;
23+
import org.apache.druid.msq.counters.ChannelCounters;
24+
import org.apache.druid.msq.exec.DataServerQueryHandlerFactory;
25+
import org.apache.druid.msq.input.table.DataServerRequestDescriptor;
26+
import org.apache.druid.query.QueryToolChestWarehouse;
27+
import org.apache.druid.rpc.ServiceClientFactory;
28+
29+
/**
30+
* Factory for {@link DartDataServerQueryHandler}.
31+
*/
32+
public class DartDataServerQueryHandlerFactory implements DataServerQueryHandlerFactory
33+
{
34+
private final ServiceClientFactory serviceClientFactory;
35+
private final ObjectMapper objectMapper;
36+
private final QueryToolChestWarehouse warehouse;
37+
38+
public DartDataServerQueryHandlerFactory(
39+
ServiceClientFactory serviceClientFactory,
40+
ObjectMapper objectMapper,
41+
QueryToolChestWarehouse warehouse
42+
)
43+
{
44+
this.serviceClientFactory = serviceClientFactory;
45+
this.objectMapper = objectMapper;
46+
this.warehouse = warehouse;
47+
}
48+
49+
@Override
50+
public DartDataServerQueryHandler createDataServerQueryHandler(
51+
String dataSource,
52+
ChannelCounters channelCounters,
53+
DataServerRequestDescriptor requestDescriptor
54+
)
55+
{
56+
return new DartDataServerQueryHandler(
57+
dataSource,
58+
channelCounters,
59+
serviceClientFactory,
60+
objectMapper,
61+
warehouse,
62+
requestDescriptor
63+
);
64+
}
65+
}

extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerContextFactoryImpl.java

Lines changed: 4 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import com.fasterxml.jackson.databind.ObjectMapper;
2323
import com.google.inject.Inject;
2424
import com.google.inject.Injector;
25-
import org.apache.druid.client.coordinator.CoordinatorClient;
2625
import org.apache.druid.guice.annotations.EscalatedGlobal;
2726
import org.apache.druid.guice.annotations.Json;
2827
import org.apache.druid.guice.annotations.Self;
@@ -31,14 +30,12 @@
3130
import org.apache.druid.messages.server.Outbox;
3231
import org.apache.druid.msq.dart.Dart;
3332
import org.apache.druid.msq.dart.controller.messages.ControllerMessage;
34-
import org.apache.druid.msq.exec.DataServerQueryHandlerFactory;
3533
import org.apache.druid.msq.exec.MemoryIntrospector;
3634
import org.apache.druid.msq.exec.ProcessingBuffersProvider;
3735
import org.apache.druid.msq.exec.WorkerContext;
3836
import org.apache.druid.msq.querykit.DataSegmentProvider;
3937
import org.apache.druid.query.DruidProcessingConfig;
4038
import org.apache.druid.query.QueryContext;
41-
import org.apache.druid.query.QueryToolChestWarehouse;
4239
import org.apache.druid.query.groupby.GroupingEngine;
4340
import org.apache.druid.query.policy.PolicyEnforcer;
4441
import org.apache.druid.rpc.ServiceClientFactory;
@@ -65,8 +62,7 @@ public class DartWorkerContextFactoryImpl implements DartWorkerContextFactory
6562
private final MemoryIntrospector memoryIntrospector;
6663
private final ProcessingBuffersProvider processingBuffersProvider;
6764
private final Outbox<ControllerMessage> outbox;
68-
private final CoordinatorClient coordinatorClient;
69-
private final QueryToolChestWarehouse warehouse;
65+
private final DartDataServerQueryHandlerFactory dataServerQueryHandlerFactory;
7066
private final ServiceEmitter emitter;
7167

7268
@Inject
@@ -84,8 +80,7 @@ public DartWorkerContextFactoryImpl(
8480
MemoryIntrospector memoryIntrospector,
8581
@Dart ProcessingBuffersProvider processingBuffersProvider,
8682
Outbox<ControllerMessage> outbox,
87-
CoordinatorClient coordinatorClient,
88-
QueryToolChestWarehouse warehouse,
83+
DartDataServerQueryHandlerFactory dataServerQueryHandlerFactory,
8984
ServiceEmitter emitter
9085
)
9186
{
@@ -102,8 +97,7 @@ public DartWorkerContextFactoryImpl(
10297
this.memoryIntrospector = memoryIntrospector;
10398
this.processingBuffersProvider = processingBuffersProvider;
10499
this.outbox = outbox;
105-
this.coordinatorClient = coordinatorClient;
106-
this.warehouse = warehouse;
100+
this.dataServerQueryHandlerFactory = dataServerQueryHandlerFactory;
107101
this.emitter = emitter;
108102
}
109103

@@ -132,12 +126,7 @@ public WorkerContext build(
132126
outbox,
133127
tempDir,
134128
queryContext,
135-
new DataServerQueryHandlerFactory(
136-
coordinatorClient,
137-
serviceClientFactory,
138-
jsonMapper,
139-
warehouse
140-
),
129+
dataServerQueryHandlerFactory,
141130
emitter
142131
);
143132
}

0 commit comments

Comments
 (0)