Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -370,50 +370,6 @@ private boolean peerEquals(PeerId p1, PeerId p2) {
}

private Replicator.State getReplicatorState(PeerId peerId) {
var replicateGroup = getReplicatorGroup();
if (replicateGroup == null) {
return null;
}

ThreadId threadId = replicateGroup.getReplicator(peerId);
if (threadId == null) {
return null;
} else {
Replicator r = (Replicator) threadId.lock();
if (r == null) {
return Replicator.State.Probe;
}
Replicator.State result = getState(r);
threadId.unlock();
return result;
}
}

private ReplicatorGroup getReplicatorGroup() {
var clz = this.raftNode.getClass();
try {
var f = clz.getDeclaredField("replicatorGroup");
f.setAccessible(true);
var group = (ReplicatorGroup) f.get(this.raftNode);
f.setAccessible(false);
return group;
} catch (NoSuchFieldException | IllegalAccessException e) {
log.info("getReplicatorGroup: error {}", e.getMessage());
return null;
}
}

private Replicator.State getState(Replicator r) {
var clz = r.getClass();
try {
var f = clz.getDeclaredField("state");
f.setAccessible(true);
var state = (Replicator.State) f.get(this.raftNode);
f.setAccessible(false);
return state;
} catch (NoSuchFieldException | IllegalAccessException e) {
log.info("getReplicatorGroup: error {}", e.getMessage());
return null;
}
return RaftReflectionUtil.getReplicatorState(this.raftNode, peerId);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hugegraph.pd.raft;

import com.alipay.sofa.jraft.Node;
import com.alipay.sofa.jraft.ReplicatorGroup;
import com.alipay.sofa.jraft.core.Replicator;
import com.alipay.sofa.jraft.entity.PeerId;
import com.alipay.sofa.jraft.util.ThreadId;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class RaftReflectionUtil {

public static Replicator.State getReplicatorState(Node node, PeerId peerId) {
if (node == null || peerId == null) {
return null;
}

// Get ReplicatorGroup from Node
var clz = node.getClass();
ReplicatorGroup replicateGroup = null;
try {
var f = clz.getDeclaredField("replicatorGroup");
f.setAccessible(true);
try {
replicateGroup = (ReplicatorGroup)f.get(node);
}
finally {
f.setAccessible(false);
}
}
catch (NoSuchFieldException | IllegalAccessException e) {
log.warn("Failed to get replicator state via reflection: {}", e.getMessage(), e);
return null;
}

if (replicateGroup == null) {
return null;
}

ThreadId threadId = replicateGroup.getReplicator(peerId);
if (threadId == null) {
return null;
}
else {
Replicator r = (Replicator)threadId.lock();
try {
if (r == null) {
return Replicator.State.Probe;
}
Replicator.State result = null;

// Get state from Replicator

var replicatorClz = r.getClass();
try {
var f = replicatorClz.getDeclaredField("state");
f.setAccessible(true);
try {
result = (Replicator.State)f.get(r);
}catch (Exception e){
log.warn("Failed to get replicator state for peerId: {}, error: {}", peerId, e.getMessage());
}
finally {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ 资源管理改进 - 使用 try-finally 确保清理

新代码正确地使用了 try-finally 块来确保:

  1. f.setAccessible(false) 总是被调用,即使在发生异常时
  2. threadId.unlock() 总是被调用,避免死锁

这是一个很好的改进,提高了代码的健壮性和安全性。

建议:考虑在异常处理时记录更详细的上下文信息(如 peerId),便于问题排查:

Suggested change
finally {
log.error("Failed to get replicator state for peerId: {}, error: {}", peerId, e.getMessage());

f.setAccessible(false);
}
}
catch (NoSuchFieldException e) {
log.warn("Failed to get replicator state via reflection: {}", e.getMessage(), e);
result = null;
}
return result;
} finally {
threadId.unlock();
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hugegraph.pd.raft;

import com.alipay.sofa.jraft.Node;
import com.alipay.sofa.jraft.core.Replicator;
import com.alipay.sofa.jraft.entity.PeerId;

import org.junit.Assert;
import org.junit.Test;

import static org.mockito.Mockito.mock;

public class RaftReflectionUtilTest {

@Test
public void testGetReplicatorStateWithNullNode() {
// Setup
PeerId peerId = mock(PeerId.class);

// Run the test
Replicator.State result = RaftReflectionUtil.getReplicatorState(null, peerId);

// Verify the results
Assert.assertNull(result);
}

@Test
public void testGetReplicatorStateWithNullPeerId() {
// Setup
Node node = mock(Node.class);

// Run the test
Replicator.State result = RaftReflectionUtil.getReplicatorState(node, null);

// Verify the results
Assert.assertNull(result);
}

@Test
public void testGetReplicatorStateWithBothNull() {
// Run the test
Replicator.State result = RaftReflectionUtil.getReplicatorState(null, null);

// Verify the results
Assert.assertNull(result);
}
}
5 changes: 5 additions & 0 deletions hugegraph-store/hg-store-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,11 @@
<artifactId>hg-store-client</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hugegraph</groupId>
<artifactId>hg-pd-core</artifactId>
Copy link
Member

@imbajin imbajin Nov 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

‼️ 模块依赖问题 - 需要架构评审

在 hg-store-core 的 pom.xml 中添加了对 hg-pd-core 的编译时依赖:

<dependency>
    <groupId>org.apache.hugegraph</groupId>
    <artifactId>hg-pd-core</artifactId>
    <version>1.7.0</version>
    <scope>compile</scope>
</dependency>

潜在问题:

  1. 循环依赖风险:PartitionEngine 现在依赖 hg-pd-core 中的 RaftReflectionUtil。需要检查 hg-pd-core 是否依赖 hg-store-core,避免循环依赖
  2. 模块职责不清:RaftReflectionUtil 放在 hg-pd-core 中,但被 store 模块使用。考虑是否应该放在一个更通用的 common 模块中?
  3. 版本硬编码:版本号 1.7.0 是硬编码的,应该使用 ${project.version} 或在父 pom 中统一管理 (默认使用 ${revision} )

建议:

  • 确认这个新的依赖关系符合项目的模块架构设计, 是否应该把它放到一个更合适, 已被依赖引用的模块里?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. 目前hg-pd-core和hg-store-core之间没有循环依赖问题;
  2. 对于RaftReflectionUtil的模块问题,之前排除了hugegraph-common(因为这样做需要给hugegraph-common引入过多依赖),并且hg-store-core和hg-pd-core没有共同依赖的模块,因此选择直接让hg-pd-core依赖hg-pd-core,暂时没有想到更好的实现。
  3. 已修改为使用${revision}。

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. 目前hg-pd-core和hg-store-core之间没有循环依赖问题;
  2. 对于RaftReflectionUtil的模块问题,之前排除了hugegraph-common(因为这样做需要给hugegraph-common引入过多依赖),并且hg-store-core和hg-pd-core没有共同依赖的模块,因此选择直接让hg-pd-core依赖hg-pd-core,暂时没有想到更好的实现。
  3. 已修改为使用${revision}。

嗯嗯, 合理的, 另外 revision 应该是不需要声明的, 参考一下 server 的父子模块, 它默认会使用项目级别的版本号 (revision)

<version>${revision}</version>
</dependency>
</dependencies>
<build>
<plugins>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.hugegraph.pd.common.PDException;
import org.apache.hugegraph.pd.grpc.MetaTask;
import org.apache.hugegraph.pd.grpc.Metapb;
import org.apache.hugegraph.pd.raft.RaftReflectionUtil;
import org.apache.hugegraph.store.business.BusinessHandler;
import org.apache.hugegraph.store.business.BusinessHandlerImpl;
import org.apache.hugegraph.store.cmd.HgCmdClient;
Expand Down Expand Up @@ -1146,51 +1147,7 @@ public Configuration getCurrentConf() {
}

private Replicator.State getReplicatorState(PeerId peerId) {
var replicateGroup = getReplicatorGroup();
if (replicateGroup == null) {
return null;
}

ThreadId threadId = replicateGroup.getReplicator(peerId);
if (threadId == null) {
return null;
} else {
Replicator r = (Replicator) threadId.lock();
if (r == null) {
return Replicator.State.Probe;
}
Replicator.State result = getState(r);
threadId.unlock();
return result;
}
}

private ReplicatorGroup getReplicatorGroup() {
var clz = this.raftNode.getClass();
try {
var f = clz.getDeclaredField("replicatorGroup");
f.setAccessible(true);
var group = (ReplicatorGroup) f.get(this.raftNode);
f.setAccessible(false);
return group;
} catch (NoSuchFieldException | IllegalAccessException e) {
log.info("getReplicatorGroup: error {}", e.getMessage());
return null;
}
}

private Replicator.State getState(Replicator r) {
var clz = r.getClass();
try {
var f = clz.getDeclaredField("state");
f.setAccessible(true);
var state = (Replicator.State) f.get(this.raftNode);
f.setAccessible(false);
return state;
} catch (NoSuchFieldException | IllegalAccessException e) {
log.info("getReplicatorGroup: error {}", e.getMessage());
return null;
}
return RaftReflectionUtil.getReplicatorState(this.raftNode, peerId);
}

class ReplicatorStateListener implements Replicator.ReplicatorStateListener {
Expand Down
Loading