Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,13 @@

package org.apache.druid.msq.exec;

import com.google.common.math.IntMath;
import com.google.common.primitives.Ints;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.msq.indexing.error.MSQException;
import org.apache.druid.msq.indexing.error.TooManyClusteredByColumnsFault;
import org.apache.druid.msq.indexing.error.TooManyColumnsFault;
import org.apache.druid.msq.indexing.error.TooManyInputFilesFault;
import org.apache.druid.msq.indexing.error.TooManyWorkersFault;
import org.apache.druid.msq.input.InputSlice;
import org.apache.druid.msq.kernel.QueryDefinition;
import org.apache.druid.msq.kernel.StageDefinition;
import org.apache.druid.msq.kernel.WorkOrder;

import java.math.RoundingMode;

public class QueryValidator
{
Expand Down Expand Up @@ -68,22 +61,4 @@ public static void validateQueryDef(final QueryDefinition queryDef)
}
}
}

/**
* Validate that a {@link WorkOrder} falls within the {@link Limits#MAX_INPUT_FILES_PER_WORKER} limit.
*/
public static void validateWorkOrder(final WorkOrder order)
{
final int numInputFiles = Ints.checkedCast(order.getInputs().stream().mapToLong(InputSlice::fileCount).sum());

if (numInputFiles > Limits.MAX_INPUT_FILES_PER_WORKER) {
throw new MSQException(
new TooManyInputFilesFault(
numInputFiles,
Limits.MAX_INPUT_FILES_PER_WORKER,
IntMath.divide(numInputFiles, Limits.MAX_INPUT_FILES_PER_WORKER, RoundingMode.CEILING)
)
);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.math.IntMath;
import com.google.common.primitives.Ints;
import it.unimi.dsi.fastutil.ints.Int2IntAVLTreeMap;
import it.unimi.dsi.fastutil.ints.Int2IntMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectAVLTreeMap;
Expand All @@ -35,15 +37,17 @@
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.msq.exec.ExtraInfoHolder;
import org.apache.druid.msq.exec.Limits;
import org.apache.druid.msq.exec.OutputChannelMode;
import org.apache.druid.msq.exec.QueryValidator;
import org.apache.druid.msq.indexing.error.CanceledFault;
import org.apache.druid.msq.indexing.error.MSQException;
import org.apache.druid.msq.indexing.error.MSQFault;
import org.apache.druid.msq.indexing.error.MSQFaultUtils;
import org.apache.druid.msq.indexing.error.TooManyInputFilesFault;
import org.apache.druid.msq.indexing.error.UnknownFault;
import org.apache.druid.msq.indexing.error.WorkerFailedFault;
import org.apache.druid.msq.indexing.error.WorkerRpcFailedFault;
import org.apache.druid.msq.input.InputSlice;
import org.apache.druid.msq.input.InputSpecSlicerFactory;
import org.apache.druid.msq.input.stage.ReadablePartitions;
import org.apache.druid.msq.kernel.QueryDefinition;
Expand All @@ -56,6 +60,7 @@
import org.apache.druid.msq.statistics.PartialKeyStatisticsInformation;

import javax.annotation.Nullable;
import java.math.RoundingMode;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.HashMap;
Expand Down Expand Up @@ -292,6 +297,8 @@ public Int2ObjectMap<WorkOrder> createWorkOrders(
final WorkerInputs workerInputs = stageKernel.getWorkerInputs();
final OutputChannelMode outputChannelMode = stageOutputChannelModes.get(stageKernel.getStageDefinition().getId());

int totalFileCount = 0;
boolean fault = false;
for (int workerNumber : workerInputs.workers()) {
final Object extraInfo = extraInfos != null ? extraInfos.get(workerNumber) : null;

Expand All @@ -310,9 +317,18 @@ public Int2ObjectMap<WorkOrder> createWorkOrders(
config.getWorkerContextMap()
);

QueryValidator.validateWorkOrder(workOrder);
final int numInputFiles = Ints.checkedCast(workOrder.getInputs().stream().mapToLong(InputSlice::fileCount).sum());
fault = fault || IntMath.divide(numInputFiles, Limits.MAX_INPUT_FILES_PER_WORKER, RoundingMode.CEILING) > 1;
totalFileCount += numInputFiles;
workerToWorkOrder.put(workerNumber, workOrder);
}

final int requiredWorkers = IntMath.divide(totalFileCount, Limits.MAX_INPUT_FILES_PER_WORKER, RoundingMode.CEILING);
if (fault) {
throw new MSQException(
new TooManyInputFilesFault(totalFileCount, Limits.MAX_INPUT_FILES_PER_WORKER, requiredWorkers)
);
}
stageWorkOrders.put(new StageId(queryDef.getQueryId(), stageNumber), workerToWorkOrder);
return workerToWorkOrder;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,7 @@ public void testTooManyInputFiles() throws IOException
{
RowSignature dummyRowSignature = RowSignature.builder().add("__time", ColumnType.LONG).build();

final int numFiles = 20000;
final int numFiles = 100000;

final File toRead = getResourceAsTemporaryFile("/wikipedia-sampled.json");
final String toReadFileNameAsJson = queryFramework().queryJsonMapper().writeValueAsString(toRead.getAbsolutePath());
Expand All @@ -492,9 +492,10 @@ public void testTooManyInputFiles() throws IOException
+ ") PARTITIONED by day",
externalFiles
))
.setQueryContext(Map.of("maxNumTasks", 8))
.setExpectedDataSource("foo1")
.setExpectedRowSignature(dummyRowSignature)
.setExpectedMSQFault(new TooManyInputFilesFault(numFiles, Limits.MAX_INPUT_FILES_PER_WORKER, 2))
.setExpectedMSQFault(new TooManyInputFilesFault(numFiles, Limits.MAX_INPUT_FILES_PER_WORKER, 10))
.verifyResults();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,13 @@
import org.apache.druid.msq.kernel.QueryDefinitionBuilder;
import org.apache.druid.msq.kernel.StageDefinition;
import org.apache.druid.msq.kernel.StageDefinitionBuilder;
import org.apache.druid.msq.kernel.WorkOrder;
import org.apache.druid.msq.querykit.common.OffsetLimitStageProcessor;
import org.apache.druid.msq.util.MultiStageQueryContext;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;

import java.util.Collections;
import java.util.UUID;
import java.util.stream.IntStream;

Expand Down Expand Up @@ -95,37 +92,6 @@ public void testGreaterThanMaxColumns()
QueryValidator.validateQueryDef(createQueryDefinition(Limits.MAX_FRAME_COLUMNS + 1, 1));
}

@Test
public void testMoreInputFiles()
{
int numWorkers = 3;
int inputFiles = numWorkers * Limits.MAX_INPUT_FILES_PER_WORKER + 1;

final WorkOrder workOrder = new WorkOrder(
createQueryDefinition(inputFiles, numWorkers),
0,
0,
Collections.singletonList(() -> inputFiles), // Slice with a large number of inputFiles
null,
null,
null,
null
);

expectedException.expect(MSQException.class);
expectedException.expectMessage(StringUtils.format(
"Too many input files/segments [%d] encountered. Maximum input files/segments per worker is set to [%d]. Try"
+ " breaking your query up into smaller queries, or increasing the number of workers to at least [%d] by"
+ " setting %s in your query context",
inputFiles,
Limits.MAX_INPUT_FILES_PER_WORKER,
numWorkers + 1,
MultiStageQueryContext.CTX_MAX_NUM_TASKS
));

QueryValidator.validateWorkOrder(workOrder);
}

public static QueryDefinition createQueryDefinition(int numColumns, int numWorkers)
{
QueryDefinitionBuilder builder = QueryDefinition.builder(UUID.randomUUID().toString());
Expand Down
Loading