Skip to content

Commit 2534e62

Browse files
authored
fix error message for MSQ TooManyInputFilesFault (#18799)
1 parent af4e63d commit 2534e62

File tree

4 files changed

+21
-63
lines changed

4 files changed

+21
-63
lines changed

multi-stage-query/src/main/java/org/apache/druid/msq/exec/QueryValidator.java

Lines changed: 0 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -19,20 +19,13 @@
1919

2020
package org.apache.druid.msq.exec;
2121

22-
import com.google.common.math.IntMath;
23-
import com.google.common.primitives.Ints;
2422
import org.apache.druid.java.util.common.ISE;
2523
import org.apache.druid.msq.indexing.error.MSQException;
2624
import org.apache.druid.msq.indexing.error.TooManyClusteredByColumnsFault;
2725
import org.apache.druid.msq.indexing.error.TooManyColumnsFault;
28-
import org.apache.druid.msq.indexing.error.TooManyInputFilesFault;
2926
import org.apache.druid.msq.indexing.error.TooManyWorkersFault;
30-
import org.apache.druid.msq.input.InputSlice;
3127
import org.apache.druid.msq.kernel.QueryDefinition;
3228
import org.apache.druid.msq.kernel.StageDefinition;
33-
import org.apache.druid.msq.kernel.WorkOrder;
34-
35-
import java.math.RoundingMode;
3629

3730
public class QueryValidator
3831
{
@@ -68,22 +61,4 @@ public static void validateQueryDef(final QueryDefinition queryDef)
6861
}
6962
}
7063
}
71-
72-
/**
73-
* Validate that a {@link WorkOrder} falls within the {@link Limits#MAX_INPUT_FILES_PER_WORKER} limit.
74-
*/
75-
public static void validateWorkOrder(final WorkOrder order)
76-
{
77-
final int numInputFiles = Ints.checkedCast(order.getInputs().stream().mapToLong(InputSlice::fileCount).sum());
78-
79-
if (numInputFiles > Limits.MAX_INPUT_FILES_PER_WORKER) {
80-
throw new MSQException(
81-
new TooManyInputFilesFault(
82-
numInputFiles,
83-
Limits.MAX_INPUT_FILES_PER_WORKER,
84-
IntMath.divide(numInputFiles, Limits.MAX_INPUT_FILES_PER_WORKER, RoundingMode.CEILING)
85-
)
86-
);
87-
}
88-
}
8964
}

multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernel.java

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
import com.google.common.collect.ImmutableList;
2424
import com.google.common.collect.ImmutableMap;
2525
import com.google.common.collect.ImmutableSet;
26+
import com.google.common.math.IntMath;
27+
import com.google.common.primitives.Ints;
2628
import it.unimi.dsi.fastutil.ints.Int2IntAVLTreeMap;
2729
import it.unimi.dsi.fastutil.ints.Int2IntMap;
2830
import it.unimi.dsi.fastutil.ints.Int2ObjectAVLTreeMap;
@@ -35,15 +37,17 @@
3537
import org.apache.druid.java.util.common.ISE;
3638
import org.apache.druid.java.util.common.logger.Logger;
3739
import org.apache.druid.msq.exec.ExtraInfoHolder;
40+
import org.apache.druid.msq.exec.Limits;
3841
import org.apache.druid.msq.exec.OutputChannelMode;
39-
import org.apache.druid.msq.exec.QueryValidator;
4042
import org.apache.druid.msq.indexing.error.CanceledFault;
4143
import org.apache.druid.msq.indexing.error.MSQException;
4244
import org.apache.druid.msq.indexing.error.MSQFault;
4345
import org.apache.druid.msq.indexing.error.MSQFaultUtils;
46+
import org.apache.druid.msq.indexing.error.TooManyInputFilesFault;
4447
import org.apache.druid.msq.indexing.error.UnknownFault;
4548
import org.apache.druid.msq.indexing.error.WorkerFailedFault;
4649
import org.apache.druid.msq.indexing.error.WorkerRpcFailedFault;
50+
import org.apache.druid.msq.input.InputSlice;
4751
import org.apache.druid.msq.input.InputSpecSlicerFactory;
4852
import org.apache.druid.msq.input.stage.ReadablePartitions;
4953
import org.apache.druid.msq.kernel.QueryDefinition;
@@ -56,6 +60,7 @@
5660
import org.apache.druid.msq.statistics.PartialKeyStatisticsInformation;
5761

5862
import javax.annotation.Nullable;
63+
import java.math.RoundingMode;
5964
import java.util.ArrayDeque;
6065
import java.util.ArrayList;
6166
import java.util.HashMap;
@@ -292,6 +297,8 @@ public Int2ObjectMap<WorkOrder> createWorkOrders(
292297
final WorkerInputs workerInputs = stageKernel.getWorkerInputs();
293298
final OutputChannelMode outputChannelMode = stageOutputChannelModes.get(stageKernel.getStageDefinition().getId());
294299

300+
int totalFileCount = 0;
301+
boolean fault = false;
295302
for (int workerNumber : workerInputs.workers()) {
296303
final Object extraInfo = extraInfos != null ? extraInfos.get(workerNumber) : null;
297304

@@ -310,9 +317,18 @@ public Int2ObjectMap<WorkOrder> createWorkOrders(
310317
config.getWorkerContextMap()
311318
);
312319

313-
QueryValidator.validateWorkOrder(workOrder);
320+
final int numInputFiles = Ints.checkedCast(workOrder.getInputs().stream().mapToLong(InputSlice::fileCount).sum());
321+
fault = fault || IntMath.divide(numInputFiles, Limits.MAX_INPUT_FILES_PER_WORKER, RoundingMode.CEILING) > 1;
322+
totalFileCount += numInputFiles;
314323
workerToWorkOrder.put(workerNumber, workOrder);
315324
}
325+
326+
final int requiredWorkers = IntMath.divide(totalFileCount, Limits.MAX_INPUT_FILES_PER_WORKER, RoundingMode.CEILING);
327+
if (fault) {
328+
throw new MSQException(
329+
new TooManyInputFilesFault(totalFileCount, Limits.MAX_INPUT_FILES_PER_WORKER, requiredWorkers)
330+
);
331+
}
316332
stageWorkOrders.put(new StageId(queryDef.getQueryId(), stageNumber), workerToWorkOrder);
317333
return workerToWorkOrder;
318334
}

multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQFaultsTest.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -472,7 +472,7 @@ public void testTooManyInputFiles() throws IOException
472472
{
473473
RowSignature dummyRowSignature = RowSignature.builder().add("__time", ColumnType.LONG).build();
474474

475-
final int numFiles = 20000;
475+
final int numFiles = 100000;
476476

477477
final File toRead = getResourceAsTemporaryFile("/wikipedia-sampled.json");
478478
final String toReadFileNameAsJson = queryFramework().queryJsonMapper().writeValueAsString(toRead.getAbsolutePath());
@@ -492,9 +492,10 @@ public void testTooManyInputFiles() throws IOException
492492
+ ") PARTITIONED by day",
493493
externalFiles
494494
))
495+
.setQueryContext(Map.of("maxNumTasks", 8))
495496
.setExpectedDataSource("foo1")
496497
.setExpectedRowSignature(dummyRowSignature)
497-
.setExpectedMSQFault(new TooManyInputFilesFault(numFiles, Limits.MAX_INPUT_FILES_PER_WORKER, 2))
498+
.setExpectedMSQFault(new TooManyInputFilesFault(numFiles, Limits.MAX_INPUT_FILES_PER_WORKER, 10))
498499
.verifyResults();
499500
}
500501

multi-stage-query/src/test/java/org/apache/druid/msq/exec/QueryValidatorTest.java

Lines changed: 0 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -26,16 +26,13 @@
2626
import org.apache.druid.msq.kernel.QueryDefinitionBuilder;
2727
import org.apache.druid.msq.kernel.StageDefinition;
2828
import org.apache.druid.msq.kernel.StageDefinitionBuilder;
29-
import org.apache.druid.msq.kernel.WorkOrder;
3029
import org.apache.druid.msq.querykit.common.OffsetLimitStageProcessor;
31-
import org.apache.druid.msq.util.MultiStageQueryContext;
3230
import org.apache.druid.segment.column.ColumnType;
3331
import org.apache.druid.segment.column.RowSignature;
3432
import org.junit.Rule;
3533
import org.junit.Test;
3634
import org.junit.rules.ExpectedException;
3735

38-
import java.util.Collections;
3936
import java.util.UUID;
4037
import java.util.stream.IntStream;
4138

@@ -95,37 +92,6 @@ public void testGreaterThanMaxColumns()
9592
QueryValidator.validateQueryDef(createQueryDefinition(Limits.MAX_FRAME_COLUMNS + 1, 1));
9693
}
9794

98-
@Test
99-
public void testMoreInputFiles()
100-
{
101-
int numWorkers = 3;
102-
int inputFiles = numWorkers * Limits.MAX_INPUT_FILES_PER_WORKER + 1;
103-
104-
final WorkOrder workOrder = new WorkOrder(
105-
createQueryDefinition(inputFiles, numWorkers),
106-
0,
107-
0,
108-
Collections.singletonList(() -> inputFiles), // Slice with a large number of inputFiles
109-
null,
110-
null,
111-
null,
112-
null
113-
);
114-
115-
expectedException.expect(MSQException.class);
116-
expectedException.expectMessage(StringUtils.format(
117-
"Too many input files/segments [%d] encountered. Maximum input files/segments per worker is set to [%d]. Try"
118-
+ " breaking your query up into smaller queries, or increasing the number of workers to at least [%d] by"
119-
+ " setting %s in your query context",
120-
inputFiles,
121-
Limits.MAX_INPUT_FILES_PER_WORKER,
122-
numWorkers + 1,
123-
MultiStageQueryContext.CTX_MAX_NUM_TASKS
124-
));
125-
126-
QueryValidator.validateWorkOrder(workOrder);
127-
}
128-
12995
public static QueryDefinition createQueryDefinition(int numColumns, int numWorkers)
13096
{
13197
QueryDefinitionBuilder builder = QueryDefinition.builder(UUID.randomUUID().toString());

0 commit comments

Comments
 (0)