Skip to content

Commit 86e41e4

Browse files
authored
Merge pull request #17957 from MinaProtocol/dkijania/rocksdb-test-ci
implement RocksDB compatibility test in Python
2 parents e9b29f4 + e8b4c80 commit 86e41e4

File tree

7 files changed

+286
-0
lines changed

7 files changed

+286
-0
lines changed
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
let S = ../../Lib/SelectFiles.dhall
2+
3+
let Pipeline = ../../Pipeline/Dsl.dhall
4+
5+
let PipelineTag = ../../Pipeline/Tag.dhall
6+
7+
let JobSpec = ../../Pipeline/JobSpec.dhall
8+
9+
let Command = ../../Command/Base.dhall
10+
11+
let Size = ../../Command/Size.dhall
12+
13+
let Cmd = ../../Lib/Cmds.dhall
14+
15+
let Docker = ../../Command/Docker/Type.dhall
16+
17+
let commands =
18+
[ Cmd.run
19+
"apt-get update && DEBIAN_FRONTEND=noninteractive apt-get install --no-install-recommends --quiet --yes python3 python3-pip build-essential sudo curl"
20+
, Cmd.run "./scripts/rocksdb-compatibility/install-rocksdb.sh"
21+
, Cmd.run
22+
"pip3 install --break-system-packages -r ./scripts/rocksdb-compatibility/requirements.txt"
23+
, Cmd.run "python3 ./scripts/rocksdb-compatibility/test.py"
24+
]
25+
26+
in Pipeline.build
27+
Pipeline.Config::{
28+
, spec = JobSpec::{
29+
, dirtyWhen =
30+
[ S.strictlyStart (S.contains "scripts/rocksdb-compatibility")
31+
, S.exactly
32+
"buildkite/src/Jobs/Test/RocksDBLedgerTarCompatibilityTest"
33+
"dhall"
34+
]
35+
, path = "Test"
36+
, name = "RocksDBLedgerTarCompatibilityTest"
37+
, tags =
38+
[ PipelineTag.Type.Fast
39+
, PipelineTag.Type.Test
40+
, PipelineTag.Type.Stable
41+
]
42+
}
43+
, steps =
44+
[ Command.build
45+
Command.Config::{
46+
, commands = commands
47+
, label = "Check RocksDB Ledger Tar Compatibility"
48+
, key = "test"
49+
, target = Size.Multi
50+
, docker = Some Docker::{ image = "ubuntu:noble" }
51+
}
52+
]
53+
}
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
/.venv
2+
/__pycache__
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
To run this test:
2+
1. Run `install-rocksdb.sh` (preferably in a docker because it installs to system library) to ensure rocksdb dyn lib are installed
3+
2. Run `test.py` inside a venv where everything in `requirements.txt` is installed
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
#!/usr/bin/env bash
2+
3+
set -euox pipefail
4+
5+
ROCKSDB_VERSION=10.5.1
6+
7+
ROCKSDB_SOURCE=$(mktemp -d --tmpdir rocksdb-$ROCKSDB_VERSION.XXXXXX)
8+
9+
# shellcheck disable=SC2064
10+
trap "rm -rf $ROCKSDB_SOURCE" EXIT
11+
12+
curl -L https://github.com/facebook/rocksdb/archive/refs/tags/v${ROCKSDB_VERSION}.tar.gz | tar xz -C $ROCKSDB_SOURCE
13+
14+
cd $ROCKSDB_SOURCE/rocksdb-${ROCKSDB_VERSION}
15+
16+
# NOTE:
17+
# `-Wno-unused-parameter` is to fix this error:
18+
# util/compression.cc:684:40: error: unused parameter ‘args’ [-Werror=unused-parameter]
19+
# 684 | Status ExtractUncompressedSize(Args& args) override {
20+
# | ~~~~~~^~~~
21+
sudo EXTRA_CXXFLAGS="-Wno-unused-parameter" make -j"$(nproc)" install-shared
22+
23+
# Refresh LD cache so follow up programs can locate the dyn libaray
24+
sudo ldconfig
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
cffi==2.0.0
2+
tqdm==4.65
3+
pycurl==7.45.7
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
from cffi import FFI
2+
from contextlib import contextmanager
3+
4+
ffi = FFI()
5+
6+
ffi.cdef("""
7+
typedef struct rocksdb_t rocksdb_t;
8+
typedef struct rocksdb_options_t rocksdb_options_t;
9+
typedef struct rocksdb_readoptions_t rocksdb_readoptions_t;
10+
typedef struct rocksdb_iterator_t rocksdb_iterator_t;
11+
12+
rocksdb_options_t* rocksdb_options_create(void);
13+
void rocksdb_options_destroy(rocksdb_options_t*);
14+
void rocksdb_options_set_create_if_missing(rocksdb_options_t*, unsigned char);
15+
16+
rocksdb_t* rocksdb_open(const rocksdb_options_t* options, const char* name, char** errptr);
17+
void rocksdb_close(rocksdb_t* db);
18+
19+
rocksdb_readoptions_t* rocksdb_readoptions_create(void);
20+
void rocksdb_readoptions_destroy(rocksdb_readoptions_t*);
21+
22+
rocksdb_iterator_t* rocksdb_create_iterator(rocksdb_t* db, const rocksdb_readoptions_t* options);
23+
void rocksdb_iter_destroy(rocksdb_iterator_t* iter);
24+
void rocksdb_iter_seek_to_first(rocksdb_iterator_t* iter);
25+
unsigned char rocksdb_iter_valid(const rocksdb_iterator_t* iter);
26+
void rocksdb_iter_next(rocksdb_iterator_t* iter);
27+
const char* rocksdb_iter_key(const rocksdb_iterator_t* iter, size_t* klen);
28+
const char* rocksdb_iter_value(const rocksdb_iterator_t* iter, size_t* vlen);
29+
""")
30+
31+
# Load the library
32+
rocksdb = ffi.dlopen("librocksdb.so")
33+
34+
@contextmanager
35+
def rocksdb_options(create_if_missing=False):
36+
opts = rocksdb.rocksdb_options_create()
37+
rocksdb.rocksdb_options_set_create_if_missing(opts, int(create_if_missing))
38+
try:
39+
yield opts
40+
finally:
41+
rocksdb.rocksdb_options_destroy(opts)
42+
43+
@contextmanager
44+
def open_db(path, options):
45+
err_ptr = ffi.new("char**")
46+
db = rocksdb.rocksdb_open(options, path.encode('utf-8'), err_ptr)
47+
if err_ptr[0] != ffi.NULL:
48+
raise RuntimeError("Open error: " + ffi.string(err_ptr[0]).decode())
49+
try:
50+
yield db
51+
finally:
52+
rocksdb.rocksdb_close(db)
53+
54+
def read_iter(db):
55+
"""
56+
Generator that yields (key, value) pairs from a RocksDB database.
57+
58+
Args:
59+
db (rocksdb_t*): A RocksDB database handle.
60+
61+
Yields:
62+
tuple[bytes, bytes]: The (key, value) pairs from the database.
63+
"""
64+
ropts = rocksdb.rocksdb_readoptions_create()
65+
it = rocksdb.rocksdb_create_iterator(db, ropts)
66+
try:
67+
rocksdb.rocksdb_iter_seek_to_first(it)
68+
while rocksdb.rocksdb_iter_valid(it):
69+
klen = ffi.new("size_t*")
70+
vlen = ffi.new("size_t*")
71+
key_ptr = rocksdb.rocksdb_iter_key(it, klen)
72+
val_ptr = rocksdb.rocksdb_iter_value(it, vlen)
73+
yield (
74+
bytes(ffi.buffer(key_ptr, klen[0])),
75+
bytes(ffi.buffer(val_ptr, vlen[0])),
76+
)
77+
rocksdb.rocksdb_iter_next(it)
78+
finally:
79+
rocksdb.rocksdb_iter_destroy(it)
80+
rocksdb.rocksdb_readoptions_destroy(ropts)
81+
82+
def test(path, rounds):
83+
"""
84+
Iterate over a RocksDB database and print key-value pairs in hexadecimal.
85+
86+
Args:
87+
path (str): Path to the RocksDB database.
88+
rounds (int): Number of key-value pairs to read from the start of the database.
89+
90+
Behavior:
91+
- Opens the database in read-only mode (does not create a new DB).
92+
- Uses a RocksDB iterator to traverse from the first key.
93+
- Prints each key-value pair as hexadecimal strings.
94+
- Stops early if the iterator reaches the end of the DB before 'rounds' entries.
95+
"""
96+
with rocksdb_options(create_if_missing=False) as opts, open_db(path, opts) as db:
97+
for i, (key, val) in enumerate(read_iter(db)):
98+
print(f"Found KV-pair: {key.hex()} -> {val.hex()}")
99+
if i + 1 >= rounds:
100+
break
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
import os
2+
import random
3+
import tarfile
4+
import tempfile
5+
import xml.etree.ElementTree as ET
6+
from io import BytesIO
7+
from typing import List
8+
from urllib.parse import urljoin
9+
10+
import pycurl
11+
from tqdm import tqdm
12+
13+
import rocksdb
14+
15+
NUM_LEDGER_TARS = 5
16+
NUM_KV_PER_LEDGER = 10
17+
18+
# Match keys starting with "genesis_ledger" or "epoch_ledger" and ending with ".tar.gz"
19+
def matches_pattern(key: str) -> bool:
20+
return (key.startswith("genesis_ledger") or key.startswith("epoch_ledger")) and key.endswith(".tar.gz")
21+
22+
23+
def download_file(url: str, dest_path: str) -> None:
24+
with open(dest_path, "wb") as f:
25+
# Create a progress bar (tqdm)
26+
pbar = tqdm(unit="B", unit_scale=True, unit_divisor=1024, ncols=80)
27+
28+
def progress(download_t, download_d, _upload_t, _upload_d):
29+
_ = (_upload_t, _upload_d) # Make pyright happier
30+
if download_t > 0:
31+
pbar.total = download_t
32+
pbar.update(download_d - pbar.n)
33+
34+
c = pycurl.Curl()
35+
c.setopt(pycurl.URL, url)
36+
c.setopt(pycurl.WRITEDATA, f)
37+
c.setopt(pycurl.FOLLOWLOCATION, True)
38+
c.setopt(pycurl.NOPROGRESS, False)
39+
c.setopt(pycurl.XFERINFOFUNCTION, progress)
40+
c.perform()
41+
c.close()
42+
43+
pbar.close()
44+
45+
46+
def extract_tar_gz(tar_path: str, target_dir: str) -> None:
47+
with tarfile.open(tar_path, "r:gz") as tar:
48+
tar.extractall(path=target_dir)
49+
50+
# TODO: figure out how to enable SSL here
51+
def list_s3_keys(url, matches_pattern) -> List[str] :
52+
buffer = BytesIO()
53+
c = pycurl.Curl()
54+
c.setopt(pycurl.URL, url)
55+
c.setopt(pycurl.WRITEDATA, buffer)
56+
c.setopt(pycurl.FOLLOWLOCATION, True)
57+
c.setopt(pycurl.SSL_VERIFYPEER, False)
58+
c.setopt(pycurl.SSL_VERIFYHOST, 0)
59+
c.perform()
60+
status_code = c.getinfo(pycurl.RESPONSE_CODE)
61+
c.close()
62+
63+
if status_code != 200:
64+
raise RuntimeError(f"Failed to list S3 bucket: {status_code}")
65+
66+
data = buffer.getvalue()
67+
root = ET.fromstring(data)
68+
ns = {"s3": "http://s3.amazonaws.com/doc/2006-03-01/"}
69+
tar_keys = [
70+
text
71+
for elem in root.findall(".//s3:Contents/s3:Key", ns)
72+
if (text := elem.text) is not None and matches_pattern(text)
73+
]
74+
return tar_keys
75+
76+
def main():
77+
tar_keys = list_s3_keys("https://snark-keys.o1test.net.s3.amazonaws.com/", matches_pattern)
78+
79+
if not tar_keys:
80+
raise RuntimeError("No ledger tar files found.")
81+
82+
for tar_key in random.sample(tar_keys, min(NUM_LEDGER_TARS, len(tar_keys))):
83+
tar_uri = urljoin("https://s3-us-west-2.amazonaws.com/snark-keys.o1test.net/", tar_key)
84+
print(f"Testing RocksDB compatibility on {tar_uri}")
85+
86+
with tempfile.TemporaryDirectory() as tmpdir:
87+
tar_path = os.path.join(tmpdir, os.path.basename(tar_key))
88+
print(f" Downloading to {tar_path}...")
89+
download_file(tar_uri, tar_path)
90+
91+
db_path = os.path.join(tmpdir, "extracted")
92+
os.makedirs(db_path, exist_ok=True)
93+
print(f" Extracting to {db_path}...")
94+
extract_tar_gz(tar_path, db_path)
95+
96+
print(f" Testing extracted RocksDB at {db_path}")
97+
rocksdb.test(db_path, NUM_KV_PER_LEDGER)
98+
99+
100+
if __name__ == "__main__":
101+
main()

0 commit comments

Comments
 (0)