diff --git a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/QueryValidator.java b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/QueryValidator.java index 75e5e156ca15..b322b70b07ad 100644 --- a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/QueryValidator.java +++ b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/QueryValidator.java @@ -19,20 +19,13 @@ package org.apache.druid.msq.exec; -import com.google.common.math.IntMath; -import com.google.common.primitives.Ints; import org.apache.druid.java.util.common.ISE; import org.apache.druid.msq.indexing.error.MSQException; import org.apache.druid.msq.indexing.error.TooManyClusteredByColumnsFault; import org.apache.druid.msq.indexing.error.TooManyColumnsFault; -import org.apache.druid.msq.indexing.error.TooManyInputFilesFault; import org.apache.druid.msq.indexing.error.TooManyWorkersFault; -import org.apache.druid.msq.input.InputSlice; import org.apache.druid.msq.kernel.QueryDefinition; import org.apache.druid.msq.kernel.StageDefinition; -import org.apache.druid.msq.kernel.WorkOrder; - -import java.math.RoundingMode; public class QueryValidator { @@ -68,22 +61,4 @@ public static void validateQueryDef(final QueryDefinition queryDef) } } } - - /** - * Validate that a {@link WorkOrder} falls within the {@link Limits#MAX_INPUT_FILES_PER_WORKER} limit. - */ - public static void validateWorkOrder(final WorkOrder order) - { - final int numInputFiles = Ints.checkedCast(order.getInputs().stream().mapToLong(InputSlice::fileCount).sum()); - - if (numInputFiles > Limits.MAX_INPUT_FILES_PER_WORKER) { - throw new MSQException( - new TooManyInputFilesFault( - numInputFiles, - Limits.MAX_INPUT_FILES_PER_WORKER, - IntMath.divide(numInputFiles, Limits.MAX_INPUT_FILES_PER_WORKER, RoundingMode.CEILING) - ) - ); - } - } } diff --git a/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernel.java b/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernel.java index 8ad4131d2a4e..f61022044b04 100644 --- a/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernel.java +++ b/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernel.java @@ -23,6 +23,8 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import com.google.common.math.IntMath; +import com.google.common.primitives.Ints; import it.unimi.dsi.fastutil.ints.Int2IntAVLTreeMap; import it.unimi.dsi.fastutil.ints.Int2IntMap; import it.unimi.dsi.fastutil.ints.Int2ObjectAVLTreeMap; @@ -35,15 +37,17 @@ import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.msq.exec.ExtraInfoHolder; +import org.apache.druid.msq.exec.Limits; import org.apache.druid.msq.exec.OutputChannelMode; -import org.apache.druid.msq.exec.QueryValidator; import org.apache.druid.msq.indexing.error.CanceledFault; import org.apache.druid.msq.indexing.error.MSQException; import org.apache.druid.msq.indexing.error.MSQFault; import org.apache.druid.msq.indexing.error.MSQFaultUtils; +import org.apache.druid.msq.indexing.error.TooManyInputFilesFault; import org.apache.druid.msq.indexing.error.UnknownFault; import org.apache.druid.msq.indexing.error.WorkerFailedFault; import org.apache.druid.msq.indexing.error.WorkerRpcFailedFault; +import org.apache.druid.msq.input.InputSlice; import org.apache.druid.msq.input.InputSpecSlicerFactory; import org.apache.druid.msq.input.stage.ReadablePartitions; import org.apache.druid.msq.kernel.QueryDefinition; @@ -56,6 +60,7 @@ import org.apache.druid.msq.statistics.PartialKeyStatisticsInformation; import javax.annotation.Nullable; +import java.math.RoundingMode; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.HashMap; @@ -292,6 +297,8 @@ public Int2ObjectMap createWorkOrders( final WorkerInputs workerInputs = stageKernel.getWorkerInputs(); final OutputChannelMode outputChannelMode = stageOutputChannelModes.get(stageKernel.getStageDefinition().getId()); + int totalFileCount = 0; + boolean fault = false; for (int workerNumber : workerInputs.workers()) { final Object extraInfo = extraInfos != null ? extraInfos.get(workerNumber) : null; @@ -310,9 +317,18 @@ public Int2ObjectMap createWorkOrders( config.getWorkerContextMap() ); - QueryValidator.validateWorkOrder(workOrder); + final int numInputFiles = Ints.checkedCast(workOrder.getInputs().stream().mapToLong(InputSlice::fileCount).sum()); + fault = fault || IntMath.divide(numInputFiles, Limits.MAX_INPUT_FILES_PER_WORKER, RoundingMode.CEILING) > 1; + totalFileCount += numInputFiles; workerToWorkOrder.put(workerNumber, workOrder); } + + final int requiredWorkers = IntMath.divide(totalFileCount, Limits.MAX_INPUT_FILES_PER_WORKER, RoundingMode.CEILING); + if (fault) { + throw new MSQException( + new TooManyInputFilesFault(totalFileCount, Limits.MAX_INPUT_FILES_PER_WORKER, requiredWorkers) + ); + } stageWorkOrders.put(new StageId(queryDef.getQueryId(), stageNumber), workerToWorkOrder); return workerToWorkOrder; } diff --git a/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQFaultsTest.java b/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQFaultsTest.java index 2918391c0be0..7e85722e4805 100644 --- a/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQFaultsTest.java +++ b/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQFaultsTest.java @@ -472,7 +472,7 @@ public void testTooManyInputFiles() throws IOException { RowSignature dummyRowSignature = RowSignature.builder().add("__time", ColumnType.LONG).build(); - final int numFiles = 20000; + final int numFiles = 100000; final File toRead = getResourceAsTemporaryFile("/wikipedia-sampled.json"); final String toReadFileNameAsJson = queryFramework().queryJsonMapper().writeValueAsString(toRead.getAbsolutePath()); @@ -492,9 +492,10 @@ public void testTooManyInputFiles() throws IOException + ") PARTITIONED by day", externalFiles )) + .setQueryContext(Map.of("maxNumTasks", 8)) .setExpectedDataSource("foo1") .setExpectedRowSignature(dummyRowSignature) - .setExpectedMSQFault(new TooManyInputFilesFault(numFiles, Limits.MAX_INPUT_FILES_PER_WORKER, 2)) + .setExpectedMSQFault(new TooManyInputFilesFault(numFiles, Limits.MAX_INPUT_FILES_PER_WORKER, 10)) .verifyResults(); } diff --git a/multi-stage-query/src/test/java/org/apache/druid/msq/exec/QueryValidatorTest.java b/multi-stage-query/src/test/java/org/apache/druid/msq/exec/QueryValidatorTest.java index b81f49b73ca7..0e521dafd1e1 100644 --- a/multi-stage-query/src/test/java/org/apache/druid/msq/exec/QueryValidatorTest.java +++ b/multi-stage-query/src/test/java/org/apache/druid/msq/exec/QueryValidatorTest.java @@ -26,16 +26,13 @@ import org.apache.druid.msq.kernel.QueryDefinitionBuilder; import org.apache.druid.msq.kernel.StageDefinition; import org.apache.druid.msq.kernel.StageDefinitionBuilder; -import org.apache.druid.msq.kernel.WorkOrder; import org.apache.druid.msq.querykit.common.OffsetLimitStageProcessor; -import org.apache.druid.msq.util.MultiStageQueryContext; import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.RowSignature; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; -import java.util.Collections; import java.util.UUID; import java.util.stream.IntStream; @@ -95,37 +92,6 @@ public void testGreaterThanMaxColumns() QueryValidator.validateQueryDef(createQueryDefinition(Limits.MAX_FRAME_COLUMNS + 1, 1)); } - @Test - public void testMoreInputFiles() - { - int numWorkers = 3; - int inputFiles = numWorkers * Limits.MAX_INPUT_FILES_PER_WORKER + 1; - - final WorkOrder workOrder = new WorkOrder( - createQueryDefinition(inputFiles, numWorkers), - 0, - 0, - Collections.singletonList(() -> inputFiles), // Slice with a large number of inputFiles - null, - null, - null, - null - ); - - expectedException.expect(MSQException.class); - expectedException.expectMessage(StringUtils.format( - "Too many input files/segments [%d] encountered. Maximum input files/segments per worker is set to [%d]. Try" - + " breaking your query up into smaller queries, or increasing the number of workers to at least [%d] by" - + " setting %s in your query context", - inputFiles, - Limits.MAX_INPUT_FILES_PER_WORKER, - numWorkers + 1, - MultiStageQueryContext.CTX_MAX_NUM_TASKS - )); - - QueryValidator.validateWorkOrder(workOrder); - } - public static QueryDefinition createQueryDefinition(int numColumns, int numWorkers) { QueryDefinitionBuilder builder = QueryDefinition.builder(UUID.randomUUID().toString());