Skip to content

Commit e512820

Browse files
authored
fix(interactive): Fix several sparodic bugs in groot (#4595)
and bump up rustc version to 1.87.0 Signed-off-by: siyuan0322 <[email protected]>
1 parent 02d59f6 commit e512820

File tree

16 files changed

+78
-54
lines changed

16 files changed

+78
-54
lines changed

.github/workflows/gaia.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ jobs:
5555
- name: Install Rust
5656
uses: actions-rs/toolchain@v1
5757
with:
58-
toolchain: 1.81.0
58+
toolchain: 1.87.0
5959
override: true
6060

6161
- name: Rust Format Check

.github/workflows/gss.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,8 +79,8 @@ jobs:
7979
. ${HOME}/.graphscope_env
8080
export SCCACHE_DIR=~/.cache/sccache
8181
export RUSTC_WRAPPER=/usr/local/bin/sccache
82-
rustup toolchain install 1.81.0
83-
rustup default 1.81.0
82+
rustup toolchain install 1.87.0
83+
rustup default 1.87.0
8484
cd ${GITHUB_WORKSPACE}/interactive_engine
8585
mvn clean install -P groot -Drust.compile.mode=debug -DskipTests --quiet
8686
mvn clean install -Pgroot-data-load --quiet

.github/workflows/interactive.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,7 @@ jobs:
129129
. /home/graphscope/.cargo/env
130130
which cargo
131131
132+
rustup toolchain install 1.87.0 && rustup default 1.87.0
132133
# build compiler
133134
cd ${GIE_HOME}/
134135
mvn clean install -Pexperimental -DskipTests -q

.github/workflows/k8s-ci.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -285,7 +285,7 @@ jobs:
285285
- name: Install Rust
286286
uses: actions-rs/toolchain@v1
287287
with:
288-
toolchain: 1.81.0
288+
toolchain: 1.87.0
289289
override: true
290290

291291
- name: Build Artifact
@@ -649,7 +649,7 @@ jobs:
649649
- name: Install Rust
650650
uses: actions-rs/toolchain@v1
651651
with:
652-
toolchain: 1.81.0
652+
toolchain: 1.87.0
653653
override: true
654654

655655
- name: Build GIE Experimental Artifacts

interactive_engine/executor/store/groot/Cargo.toml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,7 @@ grpcio = "0.10"
2222
grpcio-sys = { version = "0.10", features = ["openssl"] }
2323
# deactivation of bzip2 due to https://github.com/rust-rocksdb/rust-rocksdb/issues/609
2424
# deactivation of zstd due to the 'hidden symbol "ZSTD_maxCLevel" is referenced by DSO' error
25-
rocksdb = { version = "0.21.0", features = ["snappy", "lz4", "zlib"], default-features = false }
26-
#rocksdb = { git = "https://github.com/siyuan0322/rust-rocksdb.git", rev = "c44ea2b", features = ["snappy", "lz4", "zlib"], default-features = false }
25+
rocksdb = { version = "0.21.0", features = ["snappy", "lz4", "zlib", "multi-threaded-cf"], default-features = false }
2726
dyn_type = { path = "../../common/dyn_type" }
2827
rustversion = "1.0"
2928
mimalloc-rust = {version = "0.2.1", optional = true}

interactive_engine/executor/store/groot/src/db/graph/store.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -849,7 +849,9 @@ impl GraphStore {
849849

850850
fn check_si_guard(&self, si: SnapshotId) -> GraphResult<()> {
851851
let guard = self.si_guard.load(Ordering::Relaxed) as SnapshotId;
852-
if si < guard {
852+
// allow for a short duration inconsistent, due to different frontend may have minor
853+
// difference in timing
854+
if guard - si > 10 {
853855
let msg = format!("si#{} is less than current si_guard#{}", si, guard);
854856
let err = gen_graph_err!(ErrorCode::INVALID_OPERATION, msg);
855857
return Err(err);

interactive_engine/executor/store/groot/src/db/graph/tests/graph.rs

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,15 @@ pub fn test_si_guard<G: MultiVersionGraph>(graph: G) {
1919
.unwrap();
2020
schema_version += 1;
2121
assert!(graph
22-
.create_vertex_type(11, schema_version, 4, &types::create_test_type_def(4), schema_version)
22+
.create_vertex_type(1, schema_version, 4, &types::create_test_type_def(4), schema_version)
2323
.is_err());
2424
schema_version += 1;
2525
graph
2626
.drop_vertex_type(15, schema_version, 1)
2727
.unwrap();
2828
schema_version += 1;
2929
assert!(graph
30-
.create_vertex_type(13, schema_version, 5, &types::create_test_type_def(5), schema_version)
30+
.create_vertex_type(3, schema_version, 5, &types::create_test_type_def(5), schema_version)
3131
.is_err());
3232
let properties: HashMap<PropertyId, Value> = HashMap::new();
3333
graph
@@ -38,21 +38,21 @@ pub fn test_si_guard<G: MultiVersionGraph>(graph: G) {
3838
.unwrap();
3939
schema_version += 1;
4040
assert!(graph
41-
.create_vertex_type(16, schema_version, 5, &types::create_test_type_def(5), schema_version)
41+
.create_vertex_type(6, schema_version, 5, &types::create_test_type_def(5), schema_version)
4242
.is_err());
4343
schema_version += 1;
4444
assert!(graph
45-
.drop_vertex_type(16, schema_version, 2)
45+
.drop_vertex_type(6, schema_version, 2)
4646
.is_err());
4747
schema_version += 1;
4848
assert!(graph
49-
.insert_overwrite_vertex(16, 1, 2, &properties)
49+
.insert_overwrite_vertex(6, 1, 2, &properties)
5050
.is_err());
5151
schema_version += 1;
52-
assert!(graph.delete_vertex(16, 1, 2).is_err());
52+
assert!(graph.delete_vertex(6, 1, 2).is_err());
5353
schema_version += 1;
5454
assert!(graph
55-
.insert_update_vertex(16, 1, 2, &properties)
55+
.insert_update_vertex(6, 1, 2, &properties)
5656
.is_err());
5757
schema_version += 1;
5858

@@ -66,7 +66,7 @@ pub fn test_si_guard<G: MultiVersionGraph>(graph: G) {
6666
.unwrap();
6767
schema_version += 1;
6868
assert!(graph
69-
.insert_overwrite_vertex(16, 1, 2, &properties)
69+
.insert_overwrite_vertex(6, 1, 2, &properties)
7070
.is_err());
7171
graph
7272
.insert_update_edge(20, EdgeId::new(1, 2, 3), &edge_type, true, &properties)
@@ -78,24 +78,24 @@ pub fn test_si_guard<G: MultiVersionGraph>(graph: G) {
7878
.insert_overwrite_edge(21, EdgeId::new(1, 2, 3), &edge_type, true, &properties)
7979
.unwrap();
8080
assert!(graph
81-
.insert_update_edge(20, EdgeId::new(1, 2, 3), &edge_type, true, &properties)
81+
.insert_update_edge(10, EdgeId::new(1, 2, 3), &edge_type, true, &properties)
8282
.is_err());
8383
assert!(graph
84-
.drop_edge_type(19, schema_version, 10)
84+
.drop_edge_type(9, schema_version, 10)
8585
.is_err());
8686
schema_version += 1;
8787
assert!(graph
88-
.remove_edge_kind(19, schema_version, &edge_type)
88+
.remove_edge_kind(9, schema_version, &edge_type)
8989
.is_err());
9090
schema_version += 1;
9191
assert!(graph
92-
.delete_edge(19, EdgeId::new(1, 2, 3), &edge_type, true)
92+
.delete_edge(9, EdgeId::new(1, 2, 3), &edge_type, true)
9393
.is_err());
9494
assert!(graph
95-
.create_edge_type(19, schema_version, 20, &types::create_test_type_def(2))
95+
.create_edge_type(9, schema_version, 20, &types::create_test_type_def(2))
9696
.is_err());
9797
schema_version += 1;
9898
assert!(graph
99-
.add_edge_kind(19, schema_version, &edge_type, schema_version)
99+
.add_edge_kind(9, schema_version, &edge_type, schema_version)
100100
.is_err());
101101
}

interactive_engine/executor/store/groot/src/db/storage/rocksdb.rs

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use std::time::Duration;
77
use ::rocksdb::backup::{BackupEngine, BackupEngineOptions, RestoreOptions};
88
use ::rocksdb::{DBRawIterator, Env, IngestExternalFileOptions, Options, ReadOptions, DB};
99
use crossbeam_epoch::{self as epoch, Atomic, Owned};
10-
use rocksdb::WriteBatch;
10+
use rocksdb::{WriteBatch, WriteOptions};
1111

1212
use super::{StorageIter, StorageRes};
1313
use crate::db::api::*;
@@ -85,7 +85,12 @@ impl RocksDB {
8585

8686
let prev = self.db.swap(cur, Ordering::Release, guard);
8787
unsafe {
88-
drop(prev.into_owned());
88+
let prev = prev.into_owned();
89+
prev.cancel_all_background_work(true);
90+
std::thread::spawn(move || {
91+
std::thread::sleep(Duration::from_secs(120));
92+
drop(prev);
93+
});
8994
}
9095
info!("RocksDB replaced");
9196
}
@@ -109,7 +114,9 @@ impl RocksDB {
109114
return Ok(());
110115
}
111116
let db = self.get_db()?;
112-
db.put(key, val).map_err(|e| {
117+
let mut opt = WriteOptions::default();
118+
opt.set_sync(false);
119+
db.put_opt(key, val, &opt).map_err(|e| {
113120
let msg = format!("rocksdb.put failed because {}", e.into_string());
114121
gen_graph_err!(ErrorCode::EXTERNAL_STORAGE_ERROR, msg)
115122
})
@@ -381,8 +388,11 @@ fn init_options(options: &HashMap<String, String>) -> Options {
381388
// https://github.com/facebook/rocksdb/wiki/Basic-Operations#non-sync-writes
382389
opts.set_use_fsync(true);
383390
opts.set_max_write_buffer_number(4);
384-
391+
opts.set_allow_concurrent_memtable_write(true);
392+
opts.set_enable_write_thread_adaptive_yield(true);
385393
opts.set_bytes_per_sync(1048576);
394+
opts.set_enable_pipelined_write(true);
395+
opts.set_unordered_write(false);
386396

387397
if let Some(conf_str) = options.get("store.rocksdb.disable.auto.compactions") {
388398
let val = conf_str.parse().unwrap();

interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/write/KafkaAppender.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import io.opentelemetry.api.GlobalOpenTelemetry;
2020
import io.opentelemetry.api.metrics.Meter;
2121

22+
import org.apache.kafka.clients.consumer.ConsumerRecord;
2223
import org.slf4j.Logger;
2324
import org.slf4j.LoggerFactory;
2425

@@ -248,9 +249,9 @@ public List<Long> replayDMLRecordsFrom(long offset, long timestamp) throws IOExc
248249
try (LogReader logReader =
249250
this.logService.createReader(storeId, offset, timestamp)) {
250251
long batchSnapshotId = ingestSnapshotId.get();
251-
ReadLogEntry readLogEntry;
252-
while (!shouldStop && (readLogEntry = logReader.readNext()) != null) {
253-
LogEntry logEntry = readLogEntry.getLogEntry();
252+
ConsumerRecord<LogEntry, LogEntry> readLogEntry;
253+
while (!shouldStop && (readLogEntry = logReader.readNextRecord()) != null) {
254+
LogEntry logEntry = readLogEntry.value();
254255
OperationBatch batch =
255256
Utils.extractOperations(logEntry.getOperationBatch(), types);
256257
if (batch.getOperationCount() == 0) {

interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/KafkaProcessor.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -372,16 +372,15 @@ public List<Long> replayDMLRecordsFrom(long offset, long timestamp) throws IOExc
372372
long batchSnapshotId;
373373
int replayCount = 0;
374374
try (LogReader logReader = this.logService.createReader(storeId, offset, timestamp)) {
375-
ReadLogEntry readLogEntry;
375+
ConsumerRecord<LogEntry, LogEntry> record;
376376
batchSnapshotId = latestSnapshotId.get();
377-
while (!shouldStop && (readLogEntry = logReader.readNext()) != null) {
378-
LogEntry logEntry = readLogEntry.getLogEntry();
377+
while (!shouldStop && (record = logReader.readNextRecord()) != null) {
378+
LogEntry logEntry = record.value();
379379
OperationBatch batch = Utils.extractOperations(logEntry.getOperationBatch(), types);
380380
if (batch.getOperationCount() == 0) {
381381
continue;
382382
}
383-
ReadLogEntry entry =
384-
new ReadLogEntry(readLogEntry.getOffset(), batchSnapshotId, batch);
383+
ReadLogEntry entry = new ReadLogEntry(record.offset(), batchSnapshotId, batch);
385384
replayQueue.put(entry);
386385
replayCount++;
387386
if (replayCount % 10000 == 0) {

0 commit comments

Comments
 (0)