Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,30 +19,35 @@
package org.apache.pinot.controller.helix.core.assignment.instance;

import com.google.common.annotations.VisibleForTesting;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.pinot.common.assignment.InstancePartitions;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.assignment.InstanceReplicaGroupPartitionConfig;
import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider;
import org.apache.pinot.spi.stream.StreamMetadataProvider;
import org.apache.pinot.spi.utils.IngestionConfigUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
* Variation of {@link InstanceReplicaGroupPartitionSelector} that uses the number of partitions from the stream
* to determine the number of partitions in each replica group.
* Variation of {@link InstanceReplicaGroupPartitionSelector} that uses the number and IDs of partitions
* from the stream to determine the partitions in each replica group. When the stream exposes partition IDs
* (e.g. Kafka with subset), instance partitions are keyed by those IDs so non-contiguous subsets work.
*/
public class ImplicitRealtimeTablePartitionSelector extends InstanceReplicaGroupPartitionSelector {
private static final Logger LOGGER = LoggerFactory.getLogger(ImplicitRealtimeTablePartitionSelector.class);
private final int _numPartitions;
private final List<Integer> _partitionIds;

public ImplicitRealtimeTablePartitionSelector(TableConfig tableConfig,
InstanceReplicaGroupPartitionConfig replicaGroupPartitionConfig, String tableNameWithType,
@Nullable InstancePartitions existingInstancePartitions, boolean minimizeDataMovement) {
this(replicaGroupPartitionConfig, tableNameWithType, existingInstancePartitions, minimizeDataMovement,
// Get the number of partitions from the first stream config
// TODO: Revisit this logic to better handle multiple streams in the future - either validate that they
// all have the same number of partitions and use that or disallow the use of this selector in case the
// partition counts differ.
StreamConsumerFactoryProvider.create(IngestionConfigUtils.getFirstStreamConfig(tableConfig))
.createStreamMetadataProvider(
ImplicitRealtimeTablePartitionSelector.class.getSimpleName() + "-" + tableNameWithType)
Expand All @@ -54,22 +59,36 @@ public ImplicitRealtimeTablePartitionSelector(TableConfig tableConfig,
String tableNameWithType, @Nullable InstancePartitions existingInstancePartitions, boolean minimizeDataMovement,
StreamMetadataProvider streamMetadataProvider) {
super(replicaGroupPartitionConfig, tableNameWithType, existingInstancePartitions, minimizeDataMovement);
_numPartitions = getStreamNumPartitions(streamMetadataProvider);
}

private int getStreamNumPartitions(StreamMetadataProvider streamMetadataProvider) {
List<Integer> partitionIds = null;
try (streamMetadataProvider) {
return streamMetadataProvider.fetchPartitionCount(10_000L);
_numPartitions = streamMetadataProvider.fetchPartitionCount(10_000L);
try {
Set<Integer> ids = streamMetadataProvider.fetchPartitionIds(10_000L);
if (ids != null && !ids.isEmpty()) {
partitionIds = new ArrayList<>(ids);
Collections.sort(partitionIds);
}
} catch (UnsupportedOperationException e) {
// Stream does not expose partition IDs; instance partitions will use 0..numPartitions-1
LOGGER.debug("Stream metadata provider does not support fetchPartitionIds; using default partition ids.");
}
} catch (Exception e) {
throw new RuntimeException("Failed to retrieve partition info for table: " + _tableNameWithType, e);
}
_partitionIds = partitionIds;
}

@Override
protected int getNumPartitions() {
return _numPartitions;
}

@Nullable
@Override
protected List<Integer> getPartitionIds() {
return _partitionIds;
}

@Override
protected int getNumInstancesPerPartition(int numInstancesPerReplicaGroup) {
// This partition selector should only be used for CONSUMING instance partitions, and we enforce a single instance
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.annotation.Nullable;
import org.apache.commons.lang3.tuple.Triple;
import org.apache.helix.model.InstanceConfig;
Expand Down Expand Up @@ -74,6 +76,18 @@ public void selectInstances(Map<Integer, List<InstanceConfig>> poolToInstanceCon
}
}

/**
* Returns existing instances for the given partition and replica-group, or an empty list if none
* (e.g. null from {@link InstancePartitions#getInstances(int, int)} or when no existing partitions).
*/
private List<String> getExistingInstancesOrEmpty(int partitionId, int replicaGroupId) {
if (_existingInstancePartitions == null) {
return List.of();
}
List<String> instances = _existingInstancePartitions.getInstances(partitionId, replicaGroupId);
return instances != null ? instances : List.of();
}

private void nonReplicaGroupBased(Map<Integer, List<InstanceConfig>> poolToInstanceConfigsMap,
InstancePartitions instancePartitions, List<Integer> pools, int tableNameHash) {
// Pick one pool based on the table name hash
Expand Down Expand Up @@ -196,10 +210,12 @@ private void replicaGroupBasedSimple(Map<Integer, List<InstanceConfig>> poolToIn
// [i0, i1, i2, i3, i4]
// p0 p0 p0 p1 p1
// p1 p2 p2 p2
List<Integer> partitionIds = getPartitionIds() != null ? getPartitionIds()
: IntStream.range(0, numPartitions).boxed().collect(Collectors.toList());
for (int replicaGroupId = 0; replicaGroupId < numReplicaGroups; replicaGroupId++) {
String[] instancesInReplicaGroup = replicaGroupIdToInstancesMap[replicaGroupId];
int instanceIdInReplicaGroup = 0;
for (int partitionId = 0; partitionId < numPartitions; partitionId++) {
for (Integer partitionId : partitionIds) {
List<String> instances = new ArrayList<>(numInstancesPerPartition);
for (int i = 0; i < numInstancesPerPartition; i++) {
instances.add(instancesInReplicaGroup[instanceIdInReplicaGroup]);
Expand Down Expand Up @@ -257,6 +273,16 @@ protected int getNumPartitions() {
return numPartitions;
}

/**
* Returns the explicit list of partition IDs to use for instance partition keys, or null to use 0..numPartitions-1.
* When non-null (e.g. Kafka subset [0, 2, 5]), instance partitions will have entries for these IDs so segment
* assignment can look up by stream partition id.
*/
@Nullable
protected List<Integer> getPartitionIds() {
return null;
}

protected int getNumInstancesPerPartition(int numInstancesPerReplicaGroup) {
// Assign all instances within a replica-group to each partition if not configured
int numInstancesPerPartition = _replicaGroupPartitionConfig.getNumInstancesPerPartition();
Expand Down Expand Up @@ -298,7 +324,7 @@ private void replicaGroupBasedMinimumMovement(Map<Integer, List<InstanceConfig>>
Map<Integer, Integer> poolToNumExistingInstancesMap = new TreeMap<>();
if (replicaGroupId < existingNumReplicaGroups) {
for (int partitionId = 0; partitionId < existingNumPartitions; partitionId++) {
List<String> existingInstances = _existingInstancePartitions.getInstances(partitionId, replicaGroupId);
List<String> existingInstances = getExistingInstancesOrEmpty(partitionId, replicaGroupId);
existingInstanceSet.addAll(existingInstances);
for (String existingInstance : existingInstances) {
Integer existingPool = instanceToPoolMap.get(existingInstance);
Expand Down Expand Up @@ -390,24 +416,29 @@ private void replicaGroupBasedMinimumMovement(Map<Integer, List<InstanceConfig>>
}

if (numPartitions == 1) {
int partitionId = (getPartitionIds() != null && getPartitionIds().size() == 1)
? getPartitionIds().get(0) : 0;
for (int replicaGroupId = 0; replicaGroupId < numReplicaGroups; replicaGroupId++) {
List<String> instancesInReplicaGroup = replicaGroupIdToInstancesMap.get(replicaGroupId);
if (replicaGroupId < existingNumReplicaGroups) {
List<String> existingInstances = _existingInstancePartitions.getInstances(0, replicaGroupId);
List<String> existingInstances = getExistingInstancesOrEmpty(partitionId, replicaGroupId);
LinkedHashSet<String> candidateInstances = new LinkedHashSet<>(instancesInReplicaGroup);
List<String> instances =
selectInstancesWithMinimumMovement(numInstancesPerReplicaGroup, candidateInstances, existingInstances);
LOGGER.info(
"Selecting instances: {} for replica-group: {}, partition: 0 for table: {}, existing instances: {}",
instances, replicaGroupId, _tableNameWithType, existingInstances);
instancePartitions.setInstances(0, replicaGroupId, instances);
"Selecting instances: {} for replica-group: {}, partition: {} for table: {}, existing instances: {}",
instances, replicaGroupId, partitionId, _tableNameWithType, existingInstances);
instancePartitions.setInstances(partitionId, replicaGroupId, instances);
} else {
LOGGER.info("Selecting instances: {} for replica-group: {}, partition: 0 for table: {}, "
+ "there is no existing instances", instancesInReplicaGroup, replicaGroupId, _tableNameWithType);
instancePartitions.setInstances(0, replicaGroupId, instancesInReplicaGroup);
LOGGER.info("Selecting instances: {} for replica-group: {}, partition: {} for table: {}, "
+ "there is no existing instances", instancesInReplicaGroup, replicaGroupId, partitionId,
_tableNameWithType);
instancePartitions.setInstances(partitionId, replicaGroupId, instancesInReplicaGroup);
}
}
} else {
List<Integer> partitionIds = getPartitionIds() != null ? getPartitionIds()
: IntStream.range(0, numPartitions).boxed().collect(Collectors.toList());
for (int replicaGroupId = 0; replicaGroupId < numReplicaGroups; replicaGroupId++) {
List<String> instancesInReplicaGroup = replicaGroupIdToInstancesMap.get(replicaGroupId);
if (replicaGroupId < existingNumReplicaGroups) {
Expand All @@ -422,7 +453,8 @@ private void replicaGroupBasedMinimumMovement(Map<Integer, List<InstanceConfig>>
List<List<String>> partitionIdToInstancesMap = new ArrayList<>(numPartitions);
List<Set<String>> partitionIdToInstanceSetMap = new ArrayList<>(numPartitions);
List<List<String>> partitionIdToExistingInstancesMap = new ArrayList<>(existingNumPartitions);
for (int partitionId = 0; partitionId < numPartitions; partitionId++) {
for (int idx = 0; idx < partitionIds.size(); idx++) {
int partitionId = partitionIds.get(idx);
// Initialize the list with empty positions to fill
List<String> instances = new ArrayList<>(numInstancesPerPartition);
for (int i = 0; i < numInstancesPerPartition; i++) {
Expand All @@ -433,8 +465,8 @@ private void replicaGroupBasedMinimumMovement(Map<Integer, List<InstanceConfig>>
partitionIdToInstanceSetMap.add(instanceSet);

// Keep the existing instances that are still alive
if (partitionId < existingNumPartitions) {
List<String> existingInstances = _existingInstancePartitions.getInstances(partitionId, replicaGroupId);
if (idx < existingNumPartitions) {
List<String> existingInstances = getExistingInstancesOrEmpty(partitionId, replicaGroupId);
partitionIdToExistingInstancesMap.add(existingInstances);
int numInstancesToCheck = Math.min(numInstancesPerPartition, existingInstances.size());
for (int i = 0; i < numInstancesToCheck; i++) {
Expand All @@ -446,13 +478,16 @@ private void replicaGroupBasedMinimumMovement(Map<Integer, List<InstanceConfig>>
instanceToNumPartitionsMap.put(existingInstance, numPartitionsOnInstance + 1);
}
}
} else {
partitionIdToExistingInstancesMap.add(List.of());
}
Comment on lines +481 to 483
Copy link

Copilot AI Jan 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The else clause initializing partitionIdToExistingInstancesMap with an empty list should have been added in the original loop structure starting at line 456. This duplicates the pattern from line 470 where existing instances are added conditionally. Consider moving this initialization to ensure the list size matches partitionIds.size() consistently throughout the method.

Copilot uses AI. Check for mistakes.
}

// Fill the vacant positions with instance that serves the least partitions
for (int partitionId = 0; partitionId < numPartitions; partitionId++) {
List<String> instances = partitionIdToInstancesMap.get(partitionId);
Set<String> instanceSet = partitionIdToInstanceSetMap.get(partitionId);
for (int idx = 0; idx < partitionIds.size(); idx++) {
int partitionId = partitionIds.get(idx);
List<String> instances = partitionIdToInstancesMap.get(idx);
Set<String> instanceSet = partitionIdToInstanceSetMap.get(idx);
int numInstancesToFill = numInstancesPerPartition - instanceSet.size();
if (numInstancesToFill > 0) {
// Triple stores (instance, numPartitionsOnInstance, instanceIndex) for sorting
Expand All @@ -477,11 +512,11 @@ private void replicaGroupBasedMinimumMovement(Map<Integer, List<InstanceConfig>>
}
}

if (partitionId < existingNumPartitions) {
if (idx < existingNumPartitions) {
LOGGER.info(
"Selecting instances: {} for replica-group: {}, partition: {} for table: {}, existing instances: {}",
instances, replicaGroupId, partitionId, _tableNameWithType,
partitionIdToExistingInstancesMap.get(partitionId));
partitionIdToExistingInstancesMap.get(idx));
} else {
LOGGER.info("Selecting instances: {} for replica-group: {}, partition: {} for table: {}, "
+ "there is no existing instances", instances, replicaGroupId, partitionId, _tableNameWithType);
Expand All @@ -491,7 +526,7 @@ private void replicaGroupBasedMinimumMovement(Map<Integer, List<InstanceConfig>>
} else {
// Assign consecutive instances within a replica-group to each partition
int instanceIdInReplicaGroup = 0;
for (int partitionId = 0; partitionId < numPartitions; partitionId++) {
for (Integer partitionId : partitionIds) {
List<String> instances = new ArrayList<>(numInstancesPerPartition);
for (int i = 0; i < numInstancesPerPartition; i++) {
instances.add(instancesInReplicaGroup.get(instanceIdInReplicaGroup));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,12 +160,15 @@ protected List<String> assignConsumingSegment(int segmentPartitionId, InstancePa
instancesAssigned.add(instances.get(segmentPartitionId % instances.size()));
}
} else {
// Explicit partition:
// Assign segment to the first instance within the partition.

// Explicit partition: instance partitions are keyed by stream partition id (supports non-contiguous subset).
for (int replicaGroupId = 0; replicaGroupId < numReplicaGroups; replicaGroupId++) {
int partitionId = segmentPartitionId % numPartitions;
instancesAssigned.add(instancePartitions.getInstances(partitionId, replicaGroupId).get(0));
List<String> instances = instancePartitions.getInstances(segmentPartitionId, replicaGroupId);
Preconditions.checkState(instances != null && !instances.isEmpty(),
"No instances for partition %s in CONSUMING instance partitions (table: %s). "
+ "Check that the stream partition subset configuration (e.g. 'stream.kafka.partition.ids') "
+ "matches the instance partition selection in the table configuration.",
segmentPartitionId, _tableNameWithType);
instancesAssigned.add(instances.get(0));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -652,6 +652,53 @@ public void testMinimizeDataMovementImplicitRealtimeTablePartitionSelector() {
assertEquals(instancePartitions.getInstances(8, 1), List.of(SERVER_INSTANCE_ID_PREFIX + 17));
}

@Test
public void testSinglePartitionSubsetWithNonZeroIdAndMinimizeDataMovement() {
// Single-partition subset with non-zero stream partition ID (e.g. stream.kafka.partition.ids=5) must create
// instance partitions for partition 5, not 0, so consumption and assignment work.
int numReplicas = 2;
InstanceReplicaGroupPartitionConfig replicaGroupPartitionConfig =
new InstanceReplicaGroupPartitionConfig(true, 0, numReplicas, 0, 0, 1, true, null);
InstanceAssignmentConfig instanceAssignmentConfig =
new InstanceAssignmentConfig(new InstanceTagPoolConfig(REALTIME_TAG, false, 0, null), null,
replicaGroupPartitionConfig,
InstanceAssignmentConfig.PartitionSelector.IMPLICIT_REALTIME_TABLE_PARTITION_SELECTOR.name(), true);
TableConfig tableConfig =
new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setServerTenant(TENANT_NAME)
.setNumReplicas(numReplicas)
.setInstanceAssignmentConfigMap(Map.of(InstancePartitionsType.CONSUMING.name(), instanceAssignmentConfig))
.build();

int numInstances = 4;
List<InstanceConfig> instanceConfigs = new ArrayList<>(numInstances);
for (int i = 0; i < numInstances; i++) {
InstanceConfig instanceConfig = new InstanceConfig(SERVER_INSTANCE_ID_PREFIX + i);
instanceConfig.addTag(REALTIME_TAG);
instanceConfigs.add(instanceConfig);
}

int singlePartitionId = 5;
StreamMetadataProvider streamMetadataProvider = mock(StreamMetadataProvider.class);
when(streamMetadataProvider.fetchPartitionCount(anyLong())).thenReturn(1);
when(streamMetadataProvider.fetchPartitionIds(anyLong())).thenReturn(Set.of(singlePartitionId));

InstancePartitionSelector instancePartitionSelector =
new ImplicitRealtimeTablePartitionSelector(replicaGroupPartitionConfig, tableConfig.getTableName(), null, true,
streamMetadataProvider);
InstanceAssignmentDriver driver = new InstanceAssignmentDriver(tableConfig);
InstancePartitions instancePartitions =
driver.getInstancePartitions(InstancePartitionsType.CONSUMING.getInstancePartitionsName(RAW_TABLE_NAME),
instanceAssignmentConfig, instanceConfigs, null, true, instancePartitionSelector);

assertEquals(instancePartitions.getNumReplicaGroups(), numReplicas);
assertEquals(instancePartitions.getNumPartitions(), singlePartitionId + 1);

// Instance partitions must be keyed by stream partition id 5, not 0
assertEquals(instancePartitions.getInstances(singlePartitionId, 0), List.of(SERVER_INSTANCE_ID_PREFIX + 0));
assertEquals(instancePartitions.getInstances(singlePartitionId, 1), List.of(SERVER_INSTANCE_ID_PREFIX + 1));
assertNull(instancePartitions.getInstances(0, 0));
}

public void testMirrorServerSetBasedRandom()
throws FileNotFoundException {
testMirrorServerSetBasedRandomInner(10000000);
Expand Down
Loading