Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,15 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.common.proc;

import org.apache.doris.catalog.ColocateTableIndex;
import org.apache.doris.catalog.ColocateTableIndex.GroupId;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.resource.Tag;

import com.google.common.collect.ImmutableList;

import java.util.List;
import java.util.Map;

/*
* show proc "/colocation_group";
*/
Expand All @@ -36,19 +31,16 @@ public class ColocationGroupProcDir implements ProcDirInterface {
.add("GroupId").add("GroupName").add("TableIds")
.add("BucketsNum").add("ReplicaAllocation").add("DistCols").add("IsStable")
.add("ErrorMsg").build();

@Override
public boolean register(String name, ProcNodeInterface node) {
return false;
}

@Override
public ProcNodeInterface lookup(String groupIdStr) throws AnalysisException {
String[] parts = groupIdStr.split("\\.");
if (parts.length != 2) {
throw new AnalysisException("Invalid group id: " + groupIdStr);
}

long dbId = -1;
long grpId = -1;
try {
Expand All @@ -61,17 +53,54 @@ public ProcNodeInterface lookup(String groupIdStr) throws AnalysisException {
GroupId groupId = new GroupId(dbId, grpId);
ColocateTableIndex index = Env.getCurrentColocateIndex();
Map<Tag, List<List<Long>>> beSeqs = index.getBackendsPerBucketSeq(groupId);
Map<Tag, List<List<Long>>> beSeqs;

// ==========Core modification: Distinguish between cloud/non-cloud environments to obtain the BE sequence==========
if (CloudReplica.isCloudEnv()) {
// Cloud environment: Call CloudReplica to obtain the Colocated BE ID and construct the compatible beSeqs structure
beSeqs = buildCloudBeSeqs(dbId, grpId);
} else {
// Non-cloud environment: Maintain the original logic
beSeqs = index.getBackendsPerBucketSeq(groupId);
}

return new ColocationGroupBackendSeqsProcNode(beSeqs);
}

/**
* Construct the beSeqs data structure in the cloud environment and match it with the format of the non-cloud environment.
* @param dbId dbId of the Colocation Group
* @param grpId grpId of Colocation Group
* @return Compatible Map structure of type <Tag, List<List<Long>>>
*/
private Map<Tag, List<List<Long>>> buildCloudBeSeqs(long dbId, long grpId) {
Map<Tag, List<List<Long>>> beSeqs = Maps.newHashMap();

// 1.Call the core function of the cloud environment to obtain the list of associated BE IDs
List<Long> colocatedBeIds = CloudReplica.getColocatedBeId(dbId, grpId);

// 2. Construct a structure that is compatible with the original format (the Tag is set to DEFAULT, and the Bucket sequence is organized according to the BE ID)
// Original format description:
// - Key: Tag(Resource Label)
// - Value: List<List<Long>> → The outer List corresponds to the Bucket sequence, and the inner List corresponds to the BE ID list of each Bucket.
Tag defaultTag = Tag.DEFAULT_TAG;
List<List<Long>> bucketSeqs = Lists.newArrayList();

// Simplified processing in the cloud environment: A single Bucket sequence contains all the associated BE IDs
bucketSeqs.add(colocatedBeIds);

beSeqs.put(defaultTag, bucketSeqs);
return beSeqs;
}


@Override
public ProcResult fetchResult() throws AnalysisException {
BaseProcResult result = new BaseProcResult();
result.setNames(TITLE_NAMES);

ColocateTableIndex index = Env.getCurrentColocateIndex();
List<List<String>> infos = index.getInfos();
result.setRows(infos);
return result;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package org.apache.doris.common.proc;

import org.apache.doris.catalog.Env;
import org.apache.doris.common.ColocationGroup;
import org.apache.doris.common.GroupId;
import org.apache.doris.datasource.CloudReplica;
import org.apache.doris.persist.ColocateTableIndex;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

// 使用PowerMockRunner是因为需要Mock静态方法 CloudReplica.isCloudEnv()
@RunWith(PowerMockRunner.class)
// 告诉PowerMock需要处理CloudReplica和Env这两个类
@PrepareForTest({CloudReplica.class, Env.class})
public class ColocationGroupProcDirTest {

private long dbId = 10001L;
private long grpId = 20001L;
private ColocationGroupProcDir procDir;

@Before
public void setUp() {
// 每个测试方法运行前都会执行这里
// 实例化要测试的类
procDir = new ColocationGroupProcDir(dbId, grpId);
}

/**
* 测试场景:云环境下,测试是否正确构建了BE序列
* 对应PR中新增的 buildCloudBeSeqs 逻辑
*/
@Test
public void testLookupInCloudEnv() throws Exception {
// 1. 准备模拟数据
// 假设云环境返回了3个BE ID: 101, 102, 103
List<Long> mockCloudBeIds = new ArrayList<>();
mockCloudBeIds.add(101L);
mockCloudBeIds.add(102L);
mockCloudBeIds.add(103L);

// 2. Mock静态类
PowerMockito.mockStatic(CloudReplica.class);

// 3. 设定行为:当调用 CloudReplica.isCloudEnv() 时,返回 true (模拟是云环境)
when(CloudReplica.isCloudEnv()).thenReturn(true);

// 4. 设定行为:当调用 CloudReplica.getColocatedBeId() 时,返回我们准备的数据
when(CloudReplica.getColocatedBeId(dbId, grpId)).thenReturn(mockCloudBeIds);

// 5. 执行测试:调用 lookup 方法(传入 "dbId.grpId" 格式的字符串)
String groupIdStr = dbId + "." + grpId;
ProcNodeInterface result = procDir.lookup(groupIdStr);

// 6. 验证结果
// 期望结果不为空
Assert.assertNotNull(result);
// 期望结果是 ColocationGroupBackendSeqsProcNode 类型
Assert.assertTrue(result instanceof ColocationGroupBackendSeqsProcNode);

}

/**
* 测试场景:非云环境下,是否保持原有的逻辑
*/
@Test
public void testLookupInNonCloudEnv() throws Exception {
// 1. Mock ColocateTableIndex (原有逻辑依赖这个类)
ColocateTableIndex mockIndex = mock(ColocateTableIndex.class);

// 2. Mock静态类
PowerMockito.mockStatic(CloudReplica.class);
PowerMockito.mockStatic(Env.class);

// 3. 设定行为:是非云环境
when(CloudReplica.isCloudEnv()).thenReturn(false);

// 4. 设定行为:Env.getCurrentColocateIndex() 返回我们的mock对象
when(Env.getCurrentColocateIndex()).thenReturn(mockIndex);

// 5. 执行测试
String groupIdStr = dbId + "." + grpId;
procDir.lookup(groupIdStr);


}
}
Loading