Skip to content

Commit cd7879c

Browse files
authored
fix(interactive): Fix Bugs of OOM in CBO (#4330)
<!-- Thanks for your contribution! please review https://github.com/alibaba/GraphScope/blob/main/CONTRIBUTING.md before opening an issue. --> ## What do these changes do? as titled. <!-- Please give a short brief about these changes. --> ## Related issue number <!-- Are there any issues opened that will be resolved by merging this change? --> Fixes
1 parent d8e5c27 commit cd7879c

File tree

3 files changed

+75
-52
lines changed

3 files changed

+75
-52
lines changed

interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/meta/fetcher/DynamicIrMetaFetcher.java

Lines changed: 61 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import com.alibaba.graphscope.common.config.Configs;
2222
import com.alibaba.graphscope.common.config.GraphConfig;
2323
import com.alibaba.graphscope.common.config.PlannerConfig;
24+
import com.alibaba.graphscope.common.ir.meta.GraphId;
2425
import com.alibaba.graphscope.common.ir.meta.IrMeta;
2526
import com.alibaba.graphscope.common.ir.meta.IrMetaStats;
2627
import com.alibaba.graphscope.common.ir.meta.IrMetaTracker;
@@ -46,26 +47,27 @@ public class DynamicIrMetaFetcher extends IrMetaFetcher implements AutoCloseable
4647
private volatile IrMetaStats currentState;
4748
// To manage the state changes of statistics resulting from different update operations.
4849
private volatile StatsState statsState;
49-
private final boolean fetchStats;
50+
private volatile Boolean statsEnabled = null;
5051

5152
public DynamicIrMetaFetcher(Configs configs, IrMetaReader dataReader, IrMetaTracker tracker) {
5253
super(dataReader, tracker);
5354
this.scheduler = new ScheduledThreadPoolExecutor(1);
54-
this.scheduler.scheduleAtFixedRate(
55-
() -> syncMeta(),
56-
0,
57-
GraphConfig.GRAPH_META_SCHEMA_FETCH_INTERVAL_MS.get(configs),
58-
TimeUnit.MILLISECONDS);
59-
this.fetchStats =
55+
long schemaIntervalMS = GraphConfig.GRAPH_META_SCHEMA_FETCH_INTERVAL_MS.get(configs);
56+
if (schemaIntervalMS > 0) {
57+
logger.info("start to schedule the schema sync task per {} ms", schemaIntervalMS);
58+
this.scheduler.scheduleAtFixedRate(
59+
() -> syncMeta(), schemaIntervalMS, schemaIntervalMS, TimeUnit.MILLISECONDS);
60+
}
61+
boolean isCBOMode =
6062
PlannerConfig.GRAPH_PLANNER_IS_ON.get(configs)
61-
&& PlannerConfig.GRAPH_PLANNER_OPT.get(configs).equals("CBO");
62-
if (this.fetchStats) {
63-
logger.info("start to schedule statistics fetch task");
63+
&& PlannerConfig.GRAPH_PLANNER_OPT.get(configs).equalsIgnoreCase("CBO");
64+
long statsIntervalMS = GraphConfig.GRAPH_META_STATISTICS_FETCH_INTERVAL_MS.get(configs);
65+
if (!isCBOMode || statsIntervalMS <= 0) {
66+
this.statsEnabled = false;
67+
} else {
68+
logger.info("start to schedule the stats sync task per {} ms", statsIntervalMS);
6469
this.scheduler.scheduleAtFixedRate(
65-
() -> syncStats(),
66-
2000,
67-
GraphConfig.GRAPH_META_STATISTICS_FETCH_INTERVAL_MS.get(configs),
68-
TimeUnit.MILLISECONDS);
70+
() -> syncStats(), statsIntervalMS, statsIntervalMS, TimeUnit.MILLISECONDS);
6971
}
7072
}
7173

@@ -80,7 +82,6 @@ private synchronized void syncMeta() {
8082
logger.debug(
8183
"schema from remote: {}",
8284
(meta == null) ? null : meta.getSchema().getSchemaSpec(Type.IR_CORE_IN_JSON));
83-
GraphStatistics curStats;
8485
// if the graph id or schema version is changed, we need to update the statistics
8586
if (this.currentState == null
8687
|| !this.currentState.getGraphId().equals(meta.getGraphId())
@@ -89,58 +90,74 @@ private synchronized void syncMeta() {
8990
.getVersion()
9091
.equals(meta.getSchema().getVersion())) {
9192
this.statsState = StatsState.INITIALIZED;
92-
curStats = null;
93-
} else {
94-
curStats = this.currentState.getStatistics();
93+
this.currentState =
94+
new IrMetaStats(
95+
meta.getGraphId(),
96+
meta.getSnapshotId(),
97+
meta.getSchema(),
98+
meta.getStoredProcedures(),
99+
null);
95100
}
96-
this.currentState =
97-
new IrMetaStats(
98-
meta.getGraphId(),
99-
meta.getSnapshotId(),
100-
meta.getSchema(),
101-
meta.getStoredProcedures(),
102-
curStats);
103-
if (this.fetchStats && this.statsState != StatsState.SYNCED) {
104-
logger.info("start to schedule statistics fetch task");
101+
boolean statsEnabled = getStatsEnabled(this.currentState.getGraphId());
102+
if (statsEnabled && this.statsState != StatsState.SYNCED
103+
|| (!statsEnabled && this.statsState != StatsState.MOCKED)) {
104+
logger.debug("start to sync stats");
105105
syncStats();
106106
}
107107
} catch (Throwable e) {
108-
logger.warn("failed to read meta data", e);
108+
logger.warn("failed to read meta data, error is {}", e);
109+
}
110+
}
111+
112+
private boolean getStatsEnabled(GraphId graphId) {
113+
try {
114+
this.statsEnabled =
115+
(this.statsEnabled == null)
116+
? this.reader.syncStatsEnabled(graphId)
117+
: this.statsEnabled;
118+
return this.statsEnabled;
119+
} catch (
120+
Throwable e) { // if errors happen when reading stats enabled, we assume it is false
121+
logger.warn("failed to read stats enabled, error is {}", e);
122+
return false;
109123
}
110124
}
111125

112126
private synchronized void syncStats() {
113127
try {
114128
if (this.currentState != null) {
115-
GraphStatistics stats = this.reader.readStats(this.currentState.getGraphId());
116-
logger.debug("statistics from remote: {}", stats);
117-
if (stats != null && stats.getVertexCount() != 0) {
118-
this.currentState =
119-
new IrMetaStats(
120-
this.currentState.getSnapshotId(),
121-
this.currentState.getSchema(),
122-
this.currentState.getStoredProcedures(),
123-
stats);
124-
if (tracker != null) {
125-
logger.debug("start to update the glogue");
126-
tracker.onChanged(this.currentState);
129+
boolean statsEnabled = getStatsEnabled(this.currentState.getGraphId());
130+
if (statsEnabled) {
131+
GraphStatistics stats = this.reader.readStats(this.currentState.getGraphId());
132+
logger.debug("statistics from remote: {}", stats);
133+
if (stats != null && stats.getVertexCount() != 0) {
134+
this.currentState =
135+
new IrMetaStats(
136+
this.currentState.getSnapshotId(),
137+
this.currentState.getSchema(),
138+
this.currentState.getStoredProcedures(),
139+
stats);
140+
if (tracker != null) {
141+
logger.info("start to update the glogue");
142+
tracker.onChanged(this.currentState);
143+
}
144+
this.statsState = StatsState.SYNCED;
127145
}
128-
this.statsState = StatsState.SYNCED;
129146
}
130147
}
131148
} catch (Throwable e) {
132-
logger.warn("failed to read graph statistics, error is", e);
149+
logger.warn("failed to read graph statistics, error is {}", e);
133150
} finally {
134151
try {
135152
if (this.currentState != null
136153
&& tracker != null
137154
&& this.statsState == StatsState.INITIALIZED) {
138-
logger.debug("start to mock the glogue");
155+
logger.info("start to mock the glogue");
139156
tracker.onChanged(this.currentState);
140157
this.statsState = StatsState.MOCKED;
141158
}
142159
} catch (Throwable t) {
143-
logger.warn("failed to mock the glogue, error is", t);
160+
logger.warn("failed to mock the glogue, error is {}", t);
144161
}
145162
}
146163
}

interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/planner/rules/JoinDecompositionRule.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -49,10 +49,7 @@ public void onMatch(RelOptRuleCall relOptRuleCall) {
4949
// specific optimization for relational DB scenario.
5050
// 3. `JoinByEdge`: Split the pattern by edge, convert a triangle pattern to `JoinByEdge` to
5151
// support optimizations in Neo4j.
52-
if (getMaxEdgeNum(graphPattern.getPattern()) > 2) {
53-
(new JoinByVertex(graphPattern, mq, decompositionQueue, queueCapacity))
54-
.addDecompositions();
55-
}
52+
(new JoinByVertex(graphPattern, mq, decompositionQueue, queueCapacity)).addDecompositions();
5653
if (config.getForeignKeyMeta() != null) {
5754
(new JoinByForeignKey(graphPattern, mq, decompositionQueue, queueCapacity))
5855
.addDecompositions();
@@ -311,10 +308,13 @@ public JoinByVertex(
311308

312309
@Override
313310
public void addDecompositions() {
314-
List<GraphJoinDecomposition> queues = initDecompositions();
315-
while (!queues.isEmpty()) {
316-
List<GraphJoinDecomposition> nextCompositions = getDecompositions(queues.remove(0));
317-
queues.addAll(nextCompositions);
311+
if (getMaxEdgeNum(graphPattern.getPattern()) > 2) {
312+
List<GraphJoinDecomposition> queues = initDecompositions();
313+
while (!queues.isEmpty()) {
314+
List<GraphJoinDecomposition> nextCompositions =
315+
getDecompositions(queues.remove(0));
316+
queues.addAll(nextCompositions);
317+
}
318318
}
319319
addPxdInnerVDecompositions();
320320
}

interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/planner/volcano/VolcanoPlannerX.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.calcite.plan.Convention;
2323
import org.apache.calcite.plan.ConventionTraitDef;
2424
import org.apache.calcite.plan.RelOptCost;
25+
import org.apache.calcite.plan.RelOptSchema;
2526
import org.apache.calcite.plan.volcano.RelSubset;
2627
import org.apache.calcite.plan.volcano.VolcanoPlanner;
2728
import org.apache.calcite.rel.RelNode;
@@ -59,4 +60,9 @@ protected RelOptCost upperBoundForInputs(RelNode mExpr, RelOptCost upperBound) {
5960
if (relCost == null) return null;
6061
return cost.plus(relCost);
6162
}
63+
64+
@Override
65+
public void registerSchema(RelOptSchema schema) {
66+
// do nothing
67+
}
6268
}

0 commit comments

Comments
 (0)