Skip to content

Commit 049986a

Browse files
siyuan0322lnfjpt
andauthored
fix(interactive): Fix the occasionally incorrect pegasus server routing configuration (#3673)
Fixes the `updatePeerView` may not be trigger at the proper time. --------- Co-authored-by: nengli.ln <nengli.ln@alibaba-inc.com>
1 parent 579ab9f commit 049986a

File tree

6 files changed

+27
-14
lines changed

6 files changed

+27
-14
lines changed

interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/ZkConfig.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,10 @@ public class ZkConfig {
2929
Config.intConfig("zk.session.timeout.ms", 30000);
3030

3131
public static final Config<Integer> ZK_BASE_SLEEP_MS =
32-
Config.intConfig("zk.base.sleep.ms", 1000);
32+
Config.intConfig("zk.base.sleep.ms", 10000);
3333

3434
public static final Config<Integer> ZK_MAX_SLEEP_MS =
35-
Config.intConfig("zk.max.sleep.ms", 45000);
35+
Config.intConfig("zk.max.sleep.ms", 60000);
3636

3737
public static final Config<Integer> ZK_MAX_RETRY = Config.intConfig("zk.max.retry", 29);
3838

interactive_engine/executor/engine/pegasus/network/src/transport/block/mod.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -107,13 +107,16 @@ pub fn listen_on<A: ToSocketAddrs>(
107107
///
108108
/// 如果参数中的`server_id` 大于等于当前服务的id,并不会发起连接,返回`Ok(())`;
109109
///
110-
pub fn connect<A: ToSocketAddrs>(
111-
local_id: u64, remote_id: u64, params: ConnectionParams, addr: A,
110+
pub fn connect(
111+
local_id: u64, remote_id: u64, params: ConnectionParams, addr: SocketAddr,
112112
) -> Result<(), NetError> {
113113
// 连接请求可能会失败, 或许由于对端服务器未启动端口监听,调用方需要根据返回内容确定是否重试;
114-
let mut conn = TcpStream::connect(addr)?;
114+
info!("Try to connect to server {:?}", addr);
115+
let timeout = std::time::Duration::from_secs(10);
116+
let mut conn = TcpStream::connect_timeout(&addr, timeout)?;
117+
// let mut conn = TcpStream::connect(addr)?;
115118
let addr = conn.peer_addr()?;
116-
debug!("connect to server {:?};", addr);
119+
info!("connect to server {:?};", addr);
117120
let hb_sec = params.get_hb_interval_sec();
118121
super::setup_connection(local_id, hb_sec, &mut conn)?;
119122
info!("setup connection to {:?} success;", addr);

interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/discovery/ZkDiscovery.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ public NodeChangeListener(RoleType roleType, ServiceCache<GrootNode> serviceCach
154154

155155
@Override
156156
public void cacheChanged() {
157-
logger.debug("cacheChanged. roleType [" + roleType.getName() + "]");
157+
logger.info("cacheChanged. roleType [" + roleType.getName() + "]");
158158
synchronized (lock) {
159159
Map<Integer, GrootNode> newRoleNodes = new HashMap<>();
160160
for (ServiceInstance<GrootNode> instance : this.serviceCache.getInstances()) {
@@ -207,7 +207,7 @@ private void notifyRemoved(RoleType role, Map<Integer, GrootNode> removed) {
207207
if (removed.isEmpty()) {
208208
return;
209209
}
210-
logger.debug("role [{}] remove nodes [{}]", role.getName(), removed.values());
210+
logger.info("role [{}] remove nodes [{}]", role.getName(), removed.values());
211211
for (Listener listener : this.listeners) {
212212
this.singleThreadExecutor.execute(
213213
() -> {
@@ -225,7 +225,7 @@ private void notifyAdded(RoleType role, Map<Integer, GrootNode> added) {
225225
if (added.isEmpty()) {
226226
return;
227227
}
228-
logger.debug("role [{}] add nodes [{}]", role.getName(), added.values());
228+
logger.info("role [{}] add nodes [{}]", role.getName(), added.values());
229229
for (Listener listener : this.listeners) {
230230
this.singleThreadExecutor.execute(
231231
() -> {

interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/jna/JnaGraphStore.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
*/
1414
package com.alibaba.graphscope.groot.store.jna;
1515

16+
import com.alibaba.graphscope.groot.common.config.CommonConfig;
1617
import com.alibaba.graphscope.groot.common.config.Configs;
1718
import com.alibaba.graphscope.groot.common.config.StoreConfig;
1819
import com.alibaba.graphscope.groot.operation.OperationBatch;
@@ -61,8 +62,10 @@ public JnaGraphStore(Configs configs, int partitionId) throws IOException {
6162
Path walPath = Paths.get(walDir, "" + partitionId);
6263
builder.put(StoreConfig.STORE_WAL_DIR.getKey(), walPath.toString());
6364
}
64-
if (!Files.isDirectory(secondPath)) {
65-
Files.createDirectories(secondPath);
65+
if (CommonConfig.SECONDARY_INSTANCE_ENABLED.get(configs)) {
66+
if (!Files.isDirectory(secondPath)) {
67+
Files.createDirectories(secondPath);
68+
}
6669
}
6770
if (!Files.isDirectory(partitionPath)) {
6871
Files.createDirectories(partitionPath);

interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/wal/kafka/KafkaLogService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,7 @@ private AdminClient createAdminWithRetry() throws InterruptedException {
161161
try {
162162
return AdminClient.create(adminConfig);
163163
} catch (Exception e) {
164-
logger.warn("Error creating Kafka AdminClient", e);
164+
logger.warn("Error creating Kafka AdminClient, servers: {}", servers, e);
165165
Thread.sleep(10000);
166166
}
167167
}

interactive_engine/groot-server/src/main/java/com/alibaba/graphscope/groot/servers/ir/GaiaEngine.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -104,10 +104,17 @@ public void stop() {
104104
@Override
105105
public void nodesJoin(RoleType role, Map<Integer, GrootNode> nodes) {
106106
if (role == RoleType.GAIA_ENGINE) {
107-
this.engineNodes.putAll(nodes);
107+
for (Map.Entry<Integer, GrootNode> entry : nodes.entrySet()) {
108+
GrootNode node = entry.getValue();
109+
if (node.getRoleName().equals(RoleType.GAIA_ENGINE.getName())) {
110+
this.engineNodes.put(entry.getKey(), node);
111+
} else {
112+
logger.warn("Unexpected node joined: {}", node);
113+
}
114+
}
108115
if (this.engineNodes.size() == this.nodeCount) {
109116
String peerViewString =
110-
nodes.values().stream()
117+
engineNodes.values().stream()
111118
.map(
112119
n ->
113120
String.format(

0 commit comments

Comments
 (0)