Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ public EmbeddedDruidCluster createCluster()
.useDefaultTimeoutForLatchableEmitter(20)
.addResource(storageResource)
.addCommonProperty("druid.storage.zip", "false")
.addCommonProperty("druid.indexer.task.buildV10", "true")
.addCommonProperty("druid.monitoring.emissionPeriod", "PT1s")
.addCommonProperty(
"druid.monitoring.monitors",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.druid.rpc.StandardRetryPolicy;
import org.apache.druid.rpc.indexing.OverlordClient;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.IndexMergerV10Factory;
import org.apache.druid.segment.IndexMergerV9Factory;
import org.apache.druid.segment.handoff.SegmentHandoffNotifierFactory;
import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
Expand Down Expand Up @@ -103,6 +104,7 @@ public class TaskToolboxFactory
private final CacheConfig cacheConfig;
private final CachePopulatorStats cachePopulatorStats;
private final IndexMergerV9Factory indexMergerV9Factory;
private final IndexMergerV10Factory indexMergerV10Factory;
private final DruidNodeAnnouncer druidNodeAnnouncer;
private final DruidNode druidNode;
private final LookupNodeService lookupNodeService;
Expand Down Expand Up @@ -151,6 +153,7 @@ public TaskToolboxFactory(
CacheConfig cacheConfig,
CachePopulatorStats cachePopulatorStats,
IndexMergerV9Factory indexMergerV9Factory,
IndexMergerV10Factory indexMergerV10Factory,
DruidNodeAnnouncer druidNodeAnnouncer,
@RemoteChatHandler DruidNode druidNode,
LookupNodeService lookupNodeService,
Expand Down Expand Up @@ -196,6 +199,7 @@ public TaskToolboxFactory(
this.cacheConfig = cacheConfig;
this.cachePopulatorStats = cachePopulatorStats;
this.indexMergerV9Factory = indexMergerV9Factory;
this.indexMergerV10Factory = indexMergerV10Factory;
this.druidNodeAnnouncer = druidNodeAnnouncer;
this.druidNode = druidNode;
this.lookupNodeService = lookupNodeService;
Expand Down Expand Up @@ -256,7 +260,9 @@ public TaskToolbox build(TaskConfig config, Task task)
.cacheConfig(cacheConfig)
.cachePopulatorStats(cachePopulatorStats)
.indexMerger(
indexMergerV9Factory.create(
config.buildV10()
? indexMergerV10Factory.create()
: indexMergerV9Factory.create(
task.getContextValue(Tasks.STORE_EMPTY_COLUMNS_KEY, config.isStoreEmptyColumns())
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ public class TaskConfig implements TaskDirectory
@JsonProperty
private final boolean allowHadoopTaskExecution;

@JsonProperty
private final boolean buildV10;

@JsonCreator
public TaskConfig(
@JsonProperty("baseDir") String baseDir,
Expand All @@ -92,7 +95,8 @@ public TaskConfig(
@JsonProperty("storeEmptyColumns") @Nullable Boolean storeEmptyColumns,
@JsonProperty("encapsulatedTask") boolean enableTaskLevelLogPush,
@JsonProperty("tmpStorageBytesPerTask") @Nullable Long tmpStorageBytesPerTask,
@JsonProperty("allowHadoopTaskExecution") boolean allowHadoopTaskExecution
@JsonProperty("allowHadoopTaskExecution") boolean allowHadoopTaskExecution,
@JsonProperty("buildV10") boolean buildV10
)
{
this.baseDir = Configs.valueOrDefault(baseDir, System.getProperty("java.io.tmpdir"));
Expand All @@ -119,6 +123,7 @@ public TaskConfig(
this.storeEmptyColumns = Configs.valueOrDefault(storeEmptyColumns, DEFAULT_STORE_EMPTY_COLUMNS);
this.tmpStorageBytesPerTask = Configs.valueOrDefault(tmpStorageBytesPerTask, DEFAULT_TMP_STORAGE_BYTES_PER_TASK);
this.allowHadoopTaskExecution = allowHadoopTaskExecution;
this.buildV10 = buildV10;
}

private TaskConfig(
Expand All @@ -132,7 +137,8 @@ private TaskConfig(
boolean storeEmptyColumns,
boolean encapsulatedTask,
long tmpStorageBytesPerTask,
boolean allowHadoopTaskExecution
boolean allowHadoopTaskExecution,
boolean buildV10
)
{
this.baseDir = baseDir;
Expand All @@ -146,6 +152,7 @@ private TaskConfig(
this.encapsulatedTask = encapsulatedTask;
this.tmpStorageBytesPerTask = tmpStorageBytesPerTask;
this.allowHadoopTaskExecution = allowHadoopTaskExecution;
this.buildV10 = buildV10;
}

@JsonProperty
Expand Down Expand Up @@ -244,6 +251,12 @@ public boolean isAllowHadoopTaskExecution()
return allowHadoopTaskExecution;
}

@JsonProperty
public boolean buildV10()
{
return buildV10;
}

private String defaultDir(@Nullable String configParameter, final String defaultVal)
{
if (configParameter == null) {
Expand All @@ -266,7 +279,8 @@ public TaskConfig withBaseTaskDir(File baseTaskDir)
storeEmptyColumns,
encapsulatedTask,
tmpStorageBytesPerTask,
allowHadoopTaskExecution
allowHadoopTaskExecution,
buildV10
);
}

Expand All @@ -283,7 +297,8 @@ public TaskConfig withTmpStorageBytesPerTask(long tmpStorageBytesPerTask)
storeEmptyColumns,
encapsulatedTask,
tmpStorageBytesPerTask,
allowHadoopTaskExecution
allowHadoopTaskExecution,
buildV10
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.TuningConfig;
import org.apache.druid.segment.loading.SegmentCacheManager;
import org.apache.druid.segment.projections.AggregateProjectionSchema;
import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
import org.apache.druid.segment.transform.CompactionTransformSpec;
import org.apache.druid.segment.transform.TransformSpec;
Expand Down Expand Up @@ -1134,7 +1135,7 @@ private void processProjections(final QueryableIndex index)
if (metadata != null && metadata.getProjections() != null && !metadata.getProjections().isEmpty()) {
projections = new HashMap<>();
for (AggregateProjectionMetadata projectionMetadata : metadata.getProjections()) {
final AggregateProjectionMetadata.Schema schema = projectionMetadata.getSchema();
final AggregateProjectionSchema schema = projectionMetadata.getSchema();
final QueryableIndex projectionIndex = Preconditions.checkNotNull(
index.getProjectionQueryableIndex(schema.getName())
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ public void setUp() throws IOException
null,
null,
null,
null,
new NoopTestTaskReportFileWriter(),
null,
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public class TaskConfigBuilder
private boolean enableTaskLevelLogPush;
private Long tmpStorageBytesPerTask;
private boolean allowHadoopTaskExecution;
private boolean buildV10;

public TaskConfigBuilder setBaseDir(String baseDir)
{
Expand Down Expand Up @@ -104,6 +105,12 @@ public TaskConfigBuilder setAllowHadoopTaskExecution(boolean allowHadoopTaskExec
return this;
}

public TaskConfigBuilder setBuildV10(boolean buildV10)
{
this.buildV10 = buildV10;
return this;
}

public TaskConfig build()
{
return new TaskConfig(
Expand All @@ -117,7 +124,8 @@ public TaskConfig build()
storeEmptyColumns,
enableTaskLevelLogPush,
tmpStorageBytesPerTask,
allowHadoopTaskExecution
allowHadoopTaskExecution,
buildV10
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ public void setup() throws IOException
null,
utils.getIndexMergerV9Factory(),
null,
null,
node,
null,
null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -599,6 +599,7 @@ public void announceSegment(DataSegment segment)
new CacheConfig(),
new CachePopulatorStats(),
INDEX_MERGER_V9_FACTORY,
null,
EasyMock.createNiceMock(DruidNodeAnnouncer.class),
EasyMock.createNiceMock(DruidNode.class),
new LookupNodeService("tier"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ public TestTaskToolboxFactory(
bob.cacheConfig,
bob.cachePopulatorStats,
bob.indexMergerV9Factory,
null,
bob.druidNodeAnnouncer,
bob.druidNode,
bob.lookupNodeService,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -728,6 +728,7 @@ public void close()
new CacheConfig(),
new CachePopulatorStats(),
testUtils.getIndexMergerV9Factory(),
null,
EasyMock.createNiceMock(DruidNodeAnnouncer.class),
EasyMock.createNiceMock(DruidNode.class),
new LookupNodeService("tier"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ private WorkerTaskManager createWorkerTaskManager()
null,
null,
null,
null,
new NoopTestTaskReportFileWriter(),
null,
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ private WorkerTaskMonitor createTaskMonitor()
null,
null,
null,
null,
new NoopTestTaskReportFileWriter(),
null,
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.projections.AggregateProjectionSchema;
import org.apache.druid.server.lookup.cache.LookupLoadingSpec;
import org.apache.druid.sql.calcite.planner.CatalogResolver;
import org.apache.druid.sql.calcite.planner.ColumnMapping;
Expand Down Expand Up @@ -545,31 +546,31 @@ public void testInsertOnExternalDataSourceWithCatalogProjections(String contextN
.add("delta", ColumnType.LONG)
.build();
AggregateProjectionMetadata expectedProjection = new AggregateProjectionMetadata(
AggregateProjectionMetadata.schemaBuilder("channel_added_hourly")
.timeColumnName(Granularities.GRANULARITY_VIRTUAL_COLUMN_NAME)
.virtualColumns(
AggregateProjectionSchema.schemaBuilder("channel_added_hourly")
.timeColumnName(Granularities.GRANULARITY_VIRTUAL_COLUMN_NAME)
.virtualColumns(
Granularities.toVirtualColumn(
Granularities.HOUR,
Granularities.GRANULARITY_VIRTUAL_COLUMN_NAME
)
)
.groupAndOrder(Granularities.GRANULARITY_VIRTUAL_COLUMN_NAME, "channel")
.aggregators(new LongSumAggregatorFactory("sum_added", "added"))
.build(),
.groupAndOrder(Granularities.GRANULARITY_VIRTUAL_COLUMN_NAME, "channel")
.aggregators(new LongSumAggregatorFactory("sum_added", "added"))
.build(),
16
);
AggregateProjectionMetadata expectedProjection2 = new AggregateProjectionMetadata(
AggregateProjectionMetadata.schemaBuilder("channel_delta_daily")
.timeColumnName(Granularities.GRANULARITY_VIRTUAL_COLUMN_NAME)
.virtualColumns(
AggregateProjectionSchema.schemaBuilder("channel_delta_daily")
.timeColumnName(Granularities.GRANULARITY_VIRTUAL_COLUMN_NAME)
.virtualColumns(
Granularities.toVirtualColumn(
Granularities.DAY,
Granularities.GRANULARITY_VIRTUAL_COLUMN_NAME
)
)
.groupAndOrder(Granularities.GRANULARITY_VIRTUAL_COLUMN_NAME, "channel")
.aggregators(new LongSumAggregatorFactory("sum_delta", "delta"))
.build(),
.groupAndOrder(Granularities.GRANULARITY_VIRTUAL_COLUMN_NAME, "channel")
.aggregators(new LongSumAggregatorFactory("sum_delta", "delta"))
.build(),
11
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@
import org.apache.druid.query.OrderBy;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.filter.DimFilter;
import org.apache.druid.segment.AggregateProjectionMetadata;
import org.apache.druid.segment.VirtualColumn;
import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.projections.AggregateProjectionSchema;
import org.apache.druid.utils.CollectionUtils;
import org.joda.time.DateTimeZone;

Expand Down Expand Up @@ -162,9 +162,9 @@ public List<OrderBy> getOrdering()
}

@JsonIgnore
public AggregateProjectionMetadata.Schema toMetadataSchema()
public AggregateProjectionSchema toMetadataSchema()
{
return new AggregateProjectionMetadata.Schema(
return new AggregateProjectionSchema(
name,
timeColumnName,
filter,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,13 @@
import org.apache.druid.query.OrderBy;
import org.apache.druid.query.Query;
import org.apache.druid.query.expression.TimestampFloorExprMacro;
import org.apache.druid.segment.AggregateProjectionMetadata;
import org.apache.druid.segment.CursorBuildSpec;
import org.apache.druid.segment.Cursors;
import org.apache.druid.segment.VirtualColumn;
import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.projections.AggregateProjectionSchema;
import org.apache.druid.segment.virtual.ExpressionVirtualColumn;

import javax.annotation.Nullable;
Expand Down Expand Up @@ -182,7 +182,7 @@ public static ExpressionVirtualColumn toVirtualColumn(Granularity granularity, S
* <p>
* IMPORTANT - this method DOES NOT VERIFY that the virtual column has a single input that is a time column
* ({@link ColumnHolder#TIME_COLUMN_NAME} or equivalent projection time column as defined by
* {@link AggregateProjectionMetadata.Schema#getTimeColumnName()}). Callers must verify this externally before
* {@link AggregateProjectionSchema#getTimeColumnName()}). Callers must verify this externally before
* calling this method by examining {@link VirtualColumn#requiredColumns()}.
* <p>
* This method also does not handle other time expressions, or if the virtual column is just an identifier for a
Expand Down
Loading
Loading