Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -996,7 +996,7 @@ protected void setExpectedSystemSchemaObjects(String dataSource, String taskId)

adminServers =
"server,host,plaintext_port,tls_port,server_type,tier,curr_size,max_size,is_leader\n"
+ "localhost:8083,localhost,8083,-1,historical,_default_tier,1939,100000000,\n"
+ "localhost:8083,localhost,8083,-1,historical,_default_tier,2024,100000000,\n"
+ "localhost:8091,localhost,8091,-1,indexer,_default_tier,0,0,";

adminServerSegments = StringUtils.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ public EmbeddedDruidCluster createCluster()
.useDefaultTimeoutForLatchableEmitter(20)
.addResource(storageResource)
.addCommonProperty("druid.storage.zip", "false")
.addCommonProperty("druid.indexer.task.buildV10", "true")
.addCommonProperty("druid.monitoring.emissionPeriod", "PT1s")
.addCommonProperty(
"druid.monitoring.monitors",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.druid.rpc.StandardRetryPolicy;
import org.apache.druid.rpc.indexing.OverlordClient;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.IndexMergerV10Factory;
import org.apache.druid.segment.IndexMergerV9Factory;
import org.apache.druid.segment.handoff.SegmentHandoffNotifierFactory;
import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
Expand Down Expand Up @@ -103,6 +104,7 @@ public class TaskToolboxFactory
private final CacheConfig cacheConfig;
private final CachePopulatorStats cachePopulatorStats;
private final IndexMergerV9Factory indexMergerV9Factory;
private final IndexMergerV10Factory indexMergerV10Factory;
private final DruidNodeAnnouncer druidNodeAnnouncer;
private final DruidNode druidNode;
private final LookupNodeService lookupNodeService;
Expand Down Expand Up @@ -151,6 +153,7 @@ public TaskToolboxFactory(
CacheConfig cacheConfig,
CachePopulatorStats cachePopulatorStats,
IndexMergerV9Factory indexMergerV9Factory,
IndexMergerV10Factory indexMergerV10Factory,
DruidNodeAnnouncer druidNodeAnnouncer,
@RemoteChatHandler DruidNode druidNode,
LookupNodeService lookupNodeService,
Expand Down Expand Up @@ -196,6 +199,7 @@ public TaskToolboxFactory(
this.cacheConfig = cacheConfig;
this.cachePopulatorStats = cachePopulatorStats;
this.indexMergerV9Factory = indexMergerV9Factory;
this.indexMergerV10Factory = indexMergerV10Factory;
this.druidNodeAnnouncer = druidNodeAnnouncer;
this.druidNode = druidNode;
this.lookupNodeService = lookupNodeService;
Expand Down Expand Up @@ -256,7 +260,9 @@ public TaskToolbox build(TaskConfig config, Task task)
.cacheConfig(cacheConfig)
.cachePopulatorStats(cachePopulatorStats)
.indexMerger(
indexMergerV9Factory.create(
config.buildV10()
? indexMergerV10Factory.create()
: indexMergerV9Factory.create(
task.getContextValue(Tasks.STORE_EMPTY_COLUMNS_KEY, config.isStoreEmptyColumns())
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ public class TaskConfig implements TaskDirectory
@JsonProperty
private final boolean allowHadoopTaskExecution;

@JsonProperty
private final boolean buildV10;

@JsonCreator
public TaskConfig(
@JsonProperty("baseDir") String baseDir,
Expand All @@ -92,7 +95,8 @@ public TaskConfig(
@JsonProperty("storeEmptyColumns") @Nullable Boolean storeEmptyColumns,
@JsonProperty("encapsulatedTask") boolean enableTaskLevelLogPush,
@JsonProperty("tmpStorageBytesPerTask") @Nullable Long tmpStorageBytesPerTask,
@JsonProperty("allowHadoopTaskExecution") boolean allowHadoopTaskExecution
@JsonProperty("allowHadoopTaskExecution") boolean allowHadoopTaskExecution,
@JsonProperty("buildV10") boolean buildV10
)
{
this.baseDir = Configs.valueOrDefault(baseDir, System.getProperty("java.io.tmpdir"));
Expand All @@ -119,6 +123,7 @@ public TaskConfig(
this.storeEmptyColumns = Configs.valueOrDefault(storeEmptyColumns, DEFAULT_STORE_EMPTY_COLUMNS);
this.tmpStorageBytesPerTask = Configs.valueOrDefault(tmpStorageBytesPerTask, DEFAULT_TMP_STORAGE_BYTES_PER_TASK);
this.allowHadoopTaskExecution = allowHadoopTaskExecution;
this.buildV10 = buildV10;
}

private TaskConfig(
Expand All @@ -132,7 +137,8 @@ private TaskConfig(
boolean storeEmptyColumns,
boolean encapsulatedTask,
long tmpStorageBytesPerTask,
boolean allowHadoopTaskExecution
boolean allowHadoopTaskExecution,
boolean buildV10
)
{
this.baseDir = baseDir;
Expand All @@ -146,6 +152,7 @@ private TaskConfig(
this.encapsulatedTask = encapsulatedTask;
this.tmpStorageBytesPerTask = tmpStorageBytesPerTask;
this.allowHadoopTaskExecution = allowHadoopTaskExecution;
this.buildV10 = buildV10;
}

@JsonProperty
Expand Down Expand Up @@ -244,6 +251,12 @@ public boolean isAllowHadoopTaskExecution()
return allowHadoopTaskExecution;
}

@JsonProperty
public boolean buildV10()
{
return buildV10;
}

private String defaultDir(@Nullable String configParameter, final String defaultVal)
{
if (configParameter == null) {
Expand All @@ -266,7 +279,8 @@ public TaskConfig withBaseTaskDir(File baseTaskDir)
storeEmptyColumns,
encapsulatedTask,
tmpStorageBytesPerTask,
allowHadoopTaskExecution
allowHadoopTaskExecution,
buildV10
);
}

Expand All @@ -283,7 +297,8 @@ public TaskConfig withTmpStorageBytesPerTask(long tmpStorageBytesPerTask)
storeEmptyColumns,
encapsulatedTask,
tmpStorageBytesPerTask,
allowHadoopTaskExecution
allowHadoopTaskExecution,
buildV10
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.TuningConfig;
import org.apache.druid.segment.loading.SegmentCacheManager;
import org.apache.druid.segment.projections.AggregateProjectionSchema;
import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
import org.apache.druid.segment.transform.CompactionTransformSpec;
import org.apache.druid.segment.transform.TransformSpec;
Expand Down Expand Up @@ -1134,7 +1135,7 @@ private void processProjections(final QueryableIndex index)
if (metadata != null && metadata.getProjections() != null && !metadata.getProjections().isEmpty()) {
projections = new HashMap<>();
for (AggregateProjectionMetadata projectionMetadata : metadata.getProjections()) {
final AggregateProjectionMetadata.Schema schema = projectionMetadata.getSchema();
final AggregateProjectionSchema schema = projectionMetadata.getSchema();
final QueryableIndex projectionIndex = Preconditions.checkNotNull(
index.getProjectionQueryableIndex(schema.getName())
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ public void setUp() throws IOException
null,
null,
null,
null,
new NoopTestTaskReportFileWriter(),
null,
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
import org.apache.druid.rpc.indexing.NoopOverlordClient;
import org.apache.druid.rpc.indexing.OverlordClient;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.IndexMergerV9;
import org.apache.druid.segment.IndexMergerV10Factory;
import org.apache.druid.segment.IndexMergerV9Factory;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.column.ColumnConfig;
Expand Down Expand Up @@ -68,6 +68,7 @@ public class TestUtils

private final ObjectMapper jsonMapper;
private final IndexMergerV9Factory indexMergerV9Factory;
private final IndexMergerV10Factory indexMergerV10Factory;
private final IndexIO indexIO;
private final RowIngestionMetersFactory rowIngestionMetersFactory;

Expand All @@ -80,6 +81,11 @@ public TestUtils()
indexIO,
OffHeapMemorySegmentWriteOutMediumFactory.instance()
);
this.indexMergerV10Factory = new IndexMergerV10Factory(
jsonMapper,
indexIO,
OffHeapMemorySegmentWriteOutMediumFactory.instance()
);

this.rowIngestionMetersFactory = new DropwizardRowIngestionMetersFactory();

Expand Down Expand Up @@ -121,14 +127,14 @@ public ObjectMapper getTestObjectMapper()
return jsonMapper;
}

public IndexMergerV9 getTestIndexMergerV9()
public IndexMergerV9Factory getIndexMergerV9Factory()
{
return indexMergerV9Factory.create(true);
return indexMergerV9Factory;
}

public IndexMergerV9Factory getIndexMergerV9Factory()
public IndexMergerV10Factory getIndexMergerV10Factory()
{
return indexMergerV9Factory;
return indexMergerV10Factory;
}

public IndexIO getTestIndexIO()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public class TaskConfigBuilder
private boolean enableTaskLevelLogPush;
private Long tmpStorageBytesPerTask;
private boolean allowHadoopTaskExecution;
private boolean buildV10;

public TaskConfigBuilder setBaseDir(String baseDir)
{
Expand Down Expand Up @@ -104,6 +105,12 @@ public TaskConfigBuilder setAllowHadoopTaskExecution(boolean allowHadoopTaskExec
return this;
}

public TaskConfigBuilder setBuildV10(boolean buildV10)
{
this.buildV10 = buildV10;
return this;
}

public TaskConfig build()
{
return new TaskConfig(
Expand All @@ -117,7 +124,8 @@ public TaskConfig build()
storeEmptyColumns,
enableTaskLevelLogPush,
tmpStorageBytesPerTask,
allowHadoopTaskExecution
allowHadoopTaskExecution,
buildV10
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ public void setup() throws IOException
null,
null,
utils.getIndexMergerV9Factory(),
utils.getIndexMergerV10Factory(),
null,
node,
null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@
import org.apache.druid.query.policy.NoopPolicyEnforcer;
import org.apache.druid.rpc.indexing.NoopOverlordClient;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.IndexMergerV10Factory;
import org.apache.druid.segment.IndexMergerV9Factory;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.SegmentSchemaMapping;
Expand Down Expand Up @@ -176,13 +177,15 @@ public class TaskLifecycleTest extends InitializedNullHandlingTest
{
private static final ObjectMapper MAPPER;
private static final IndexMergerV9Factory INDEX_MERGER_V9_FACTORY;
private static final IndexMergerV10Factory INDEX_MERGER_V10_FACTORY;
private static final IndexIO INDEX_IO;
private static final TestUtils TEST_UTILS;

static {
TEST_UTILS = new TestUtils();
MAPPER = TEST_UTILS.getTestObjectMapper();
INDEX_MERGER_V9_FACTORY = TEST_UTILS.getIndexMergerV9Factory();
INDEX_MERGER_V10_FACTORY = TEST_UTILS.getIndexMergerV10Factory();
INDEX_IO = TEST_UTILS.getTestIndexIO();
}

Expand Down Expand Up @@ -599,6 +602,7 @@ public void announceSegment(DataSegment segment)
new CacheConfig(),
new CachePopulatorStats(),
INDEX_MERGER_V9_FACTORY,
INDEX_MERGER_V10_FACTORY,
EasyMock.createNiceMock(DruidNodeAnnouncer.class),
EasyMock.createNiceMock(DruidNode.class),
new LookupNodeService("tier"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.druid.rpc.indexing.NoopOverlordClient;
import org.apache.druid.rpc.indexing.OverlordClient;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.IndexMergerV10Factory;
import org.apache.druid.segment.IndexMergerV9Factory;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.TestIndex;
Expand Down Expand Up @@ -107,6 +108,7 @@ public TestTaskToolboxFactory(
bob.cacheConfig,
bob.cachePopulatorStats,
bob.indexMergerV9Factory,
bob.indexMergerV10Factory,
bob.druidNodeAnnouncer,
bob.druidNode,
bob.lookupNodeService,
Expand Down Expand Up @@ -153,7 +155,16 @@ public static class Builder
private Cache cache;
private CacheConfig cacheConfig;
private CachePopulatorStats cachePopulatorStats;
private IndexMergerV9Factory indexMergerV9Factory = new IndexMergerV9Factory(jsonMapper, indexIO, OnHeapMemorySegmentWriteOutMediumFactory.instance());
private IndexMergerV9Factory indexMergerV9Factory = new IndexMergerV9Factory(
jsonMapper,
indexIO,
OnHeapMemorySegmentWriteOutMediumFactory.instance()
);
private IndexMergerV10Factory indexMergerV10Factory = new IndexMergerV10Factory(
jsonMapper,
indexIO,
OnHeapMemorySegmentWriteOutMediumFactory.instance()
);
private DruidNodeAnnouncer druidNodeAnnouncer;
private DruidNode druidNode;
private LookupNodeService lookupNodeService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -728,6 +728,7 @@ public void close()
new CacheConfig(),
new CachePopulatorStats(),
testUtils.getIndexMergerV9Factory(),
testUtils.getIndexMergerV10Factory(),
EasyMock.createNiceMock(DruidNodeAnnouncer.class),
EasyMock.createNiceMock(DruidNode.class),
new LookupNodeService("tier"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ private WorkerTaskManager createWorkerTaskManager()
null,
null,
null,
null,
new NoopTestTaskReportFileWriter(),
null,
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ private WorkerTaskMonitor createTaskMonitor()
null,
null,
null,
null,
new NoopTestTaskReportFileWriter(),
null,
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.projections.AggregateProjectionSchema;
import org.apache.druid.server.lookup.cache.LookupLoadingSpec;
import org.apache.druid.sql.calcite.planner.CatalogResolver;
import org.apache.druid.sql.calcite.planner.ColumnMapping;
Expand Down Expand Up @@ -545,31 +546,31 @@ public void testInsertOnExternalDataSourceWithCatalogProjections(String contextN
.add("delta", ColumnType.LONG)
.build();
AggregateProjectionMetadata expectedProjection = new AggregateProjectionMetadata(
AggregateProjectionMetadata.schemaBuilder("channel_added_hourly")
.timeColumnName(Granularities.GRANULARITY_VIRTUAL_COLUMN_NAME)
.virtualColumns(
AggregateProjectionSchema.schemaBuilder("channel_added_hourly")
.timeColumnName(Granularities.GRANULARITY_VIRTUAL_COLUMN_NAME)
.virtualColumns(
Granularities.toVirtualColumn(
Granularities.HOUR,
Granularities.GRANULARITY_VIRTUAL_COLUMN_NAME
)
)
.groupAndOrder(Granularities.GRANULARITY_VIRTUAL_COLUMN_NAME, "channel")
.aggregators(new LongSumAggregatorFactory("sum_added", "added"))
.build(),
.groupAndOrder(Granularities.GRANULARITY_VIRTUAL_COLUMN_NAME, "channel")
.aggregators(new LongSumAggregatorFactory("sum_added", "added"))
.build(),
16
);
AggregateProjectionMetadata expectedProjection2 = new AggregateProjectionMetadata(
AggregateProjectionMetadata.schemaBuilder("channel_delta_daily")
.timeColumnName(Granularities.GRANULARITY_VIRTUAL_COLUMN_NAME)
.virtualColumns(
AggregateProjectionSchema.schemaBuilder("channel_delta_daily")
.timeColumnName(Granularities.GRANULARITY_VIRTUAL_COLUMN_NAME)
.virtualColumns(
Granularities.toVirtualColumn(
Granularities.DAY,
Granularities.GRANULARITY_VIRTUAL_COLUMN_NAME
)
)
.groupAndOrder(Granularities.GRANULARITY_VIRTUAL_COLUMN_NAME, "channel")
.aggregators(new LongSumAggregatorFactory("sum_delta", "delta"))
.build(),
.groupAndOrder(Granularities.GRANULARITY_VIRTUAL_COLUMN_NAME, "channel")
.aggregators(new LongSumAggregatorFactory("sum_delta", "delta"))
.build(),
11
);

Expand Down
Loading
Loading