Skip to content

Commit 769d214

Browse files
authored
Feat/support event db mgr (#581)
* feat: add doc store * feat: add index * test: add index test case * fix: update the error report * feat: upgrade version * fix: revert the key and case * feat: add delete event db * fix: revert evm key * fix: fix the meta and reconnect bug * feat: update docker release cd * feat: update the readme
1 parent 3ba0f9b commit 769d214

File tree

7 files changed

+187
-12
lines changed

7 files changed

+187
-12
lines changed

.github/workflows/docker_release_cd.yml

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,12 @@ jobs:
2828
sudo apt-get install protobuf-compiler
2929
protoc --version
3030
ROOT_DIR=`pwd`
31-
cd ${ROOT_DIR}/metadata && yarn install && npx hardhat test
31+
cd ${ROOT_DIR}/metadata && yarn install && npx hardhat compile
3232
test -e ${ROOT_DIR}/metadata/artifacts/contracts/DB3MetaStore.sol/DB3MetaStore.json && cp -f ${ROOT_DIR}/metadata/artifacts/contracts/DB3MetaStore.sol/DB3MetaStore.json ${ROOT_DIR}/abi/
33+
test -e ${ROOT_DIR}/metadata/artifacts/contracts/libraries/Events.sol/Events.json && cp -f ${ROOT_DIR}/metadata/artifacts/contracts/libraries/Events.sol/Events.json ${ROOT_DIR}/abi/
3334
cd ${ROOT_DIR} && cargo build --release
3435
cp ${ROOT_DIR}/target/release/db3 ${ROOT_DIR}/docker/
36+
cd ${ROOT_DIR}/sdk && yarn build && yarn link
3537
- name: docker login
3638
uses: docker/login-action@v1
3739
with:
@@ -41,8 +43,7 @@ jobs:
4143
- name: Build Docker image
4244
run: |
4345
ROOT_DIR=`pwd`
44-
cp ${ROOT_DIR}/docker/config.ts ${ROOT_DIR}/thirdparty/data-manager/src/data-context/
45-
cd ${ROOT_DIR}/thirdparty/data-manager && yarn && yarn build && mv -f dist ${ROOT_DIR}/docker/pages
46+
cd ${ROOT_DIR}/thirdparty/data-manager && yarn && yarn link db3.js && yarn build && mv -f dist ${ROOT_DIR}/docker/pages
4647
cd ${ROOT_DIR}
4748
RELEASE_NAME=${GITHUB_REF/refs\/tags\//}
4849
cd docker && docker build . -t ghcr.io/dbpunk-labs/db3:$RELEASE_NAME

README.md

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ sudo docker run -p 26639:26639 -p 26619:26619 -p 26629:26629 \
3232
-e ADMIN_ADDR=0xF78c...29679 \ # use your own wallet address
3333
-it ghcr.io/dbpunk-labs/db3:latest
3434
```
35+
3536
you should see the following output
3637
```
3738
start store node...
@@ -41,19 +42,18 @@ start ar testnet ...
4142
10000000000000Start the local db3 nodes successfully
4243
The storage node url: http://127.0.0.1:26619
4344
The index node url: http://127.0.0.1:26639
44-
The console node url: http://127.0.0.1:26629/console
45-
The setup url: http://127.0.0.1:26629/welcome
45+
The console url: http://127.0.0.1:26629
4646
```
4747

4848
**2. Setup the node**
4949

50-
* open the `http://127.0.0.1:26629/welcome` to setup your node
51-
* open the `http://127.0.0.1:26629/console/database` to create database or collection
52-
* open the `http://127.0.0.1:26629/console/node/dashboard` to vist the dashboard
50+
* open the `http://127.0.0.1:26629/` to setup your node for the first time
51+
* open the `http://127.0.0.1:26629/database` to create database or collection
52+
* open the `http://127.0.0.1:26629/node/dashboard` to vist the dashboard
5353

5454
**3. Playground**
5555

56-
open the `http://127.0.0.1:26629/console/database` to create database or collection, then use the playgound `http://127.0.0.1:26629/console/database/playground` with the following code
56+
open the `http://127.0.0.1:26629/database` to create database or collection, then go the playgound
5757

5858
```typescript
5959
// create a account

src/node/src/indexer_impl.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,14 +150,15 @@ impl IndexerNodeImpl {
150150
Ok(handle) => {
151151
info!("listen and handle event message");
152152
let mut stream = handle.into_inner();
153-
while let Some(event) = stream.message().await.unwrap() {
153+
while let Ok(Some(event)) = stream.message().await {
154154
match self.handle_event(event, &store_sdk).await {
155155
Err(e) => {
156156
warn!("[IndexerBlockSyncer] handle event error: {:?}", e);
157157
}
158158
_ => {}
159159
}
160160
}
161+
sleep(Duration::from_millis(1000 * 5)).await;
161162
}
162163
Err(e) => {
163164
warn!("fail to subscribe block event for {e} and retry in 5 seconds");

src/node/src/storage_node_light_impl.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,27 @@ impl StorageNodeV2Impl {
180180
pub async fn start_bg_task(&self) {
181181
self.start_to_produce_block().await;
182182
self.start_to_rollup().await;
183+
self.start_flush_state().await;
184+
}
185+
186+
async fn start_flush_state(&self) {
187+
let local_db_store = self.db_store.clone();
188+
let local_running = self.running.clone();
189+
task::spawn(async move {
190+
info!("start the database meta flush thread");
191+
while local_running.load(Ordering::Relaxed) {
192+
sleep(TokioDuration::from_millis(60000)).await;
193+
match local_db_store.flush_database_state() {
194+
Ok(_) => {
195+
info!("flush database meta done");
196+
}
197+
Err(e) => {
198+
warn!("flush database meta error {e}");
199+
}
200+
}
201+
}
202+
info!("exit the flush thread");
203+
});
183204
}
184205

185206
async fn start_to_produce_block(&self) {

src/proto/proto/db3_database_v2.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ message DatabaseStatePersistence {
6565
map<string, CollectionState> collection_states = 4;
6666
int64 doc_order = 5;
6767
}
68+
6869
message BlockState {
6970
uint64 block = 1;
7071
uint32 order = 2;

src/storage/src/db_store_v2.rs

Lines changed: 152 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ type DBRawIterator<'a> = DBRawIteratorWithThreadMode<'a, StorageEngine>;
4949
const STATE_CF: &str = "DB_STATE_CF";
5050
const BLOCK_STATE_CF: &str = "BLOCK_STATE_CF";
5151
const BLOCK_STATE_KEY: &str = "BLOCK_STATE_KEY";
52+
5253
#[derive(Clone)]
5354
pub struct DBStoreV2Config {
5455
pub db_path: String,
@@ -131,6 +132,28 @@ impl DBStoreV2 {
131132
.map_err(|e| DB3Error::WriteStoreError(format!("{e}")))
132133
}
133134

135+
pub fn flush_database_state(&self) -> Result<()> {
136+
let cf_handle = self
137+
.se
138+
.cf_handle(self.config.db_store_cf_name.as_str())
139+
.ok_or(DB3Error::ReadStoreError("cf is not found".to_string()))?;
140+
let mut it = self.se.raw_iterator_cf(&cf_handle);
141+
it.seek_to_first();
142+
loop {
143+
if !it.valid() {
144+
break;
145+
}
146+
if let Some(k) = it.key() {
147+
let addr = DB3Address::try_from(k)?;
148+
if let Some(state) = self.build_persistence_state(&addr) {
149+
self.put_entry::<DatabaseStatePersistence>(STATE_CF, k, state)?;
150+
}
151+
}
152+
it.next();
153+
}
154+
Ok(())
155+
}
156+
134157
pub fn get_event_db(&self, addr: &DB3Address) -> Result<Option<EventDatabase>> {
135158
let database = self.get_database(addr)?;
136159
if let Some(db) = database {
@@ -211,7 +234,7 @@ impl DBStoreV2 {
211234
self.db_state.insert(
212235
address_str,
213236
DatabaseState {
214-
doc_order: state.doc_order,
237+
doc_order: state.doc_order + 1,
215238
collection_state,
216239
total_doc_count: state.total_col_count,
217240
},
@@ -276,6 +299,7 @@ impl DBStoreV2 {
276299
fn store_block_state(&self, state: BlockState) -> Result<()> {
277300
self.put_entry(BLOCK_STATE_CF, BLOCK_STATE_KEY.as_ref(), state)
278301
}
302+
279303
fn recover_from_state(&self, address: &DB3Address) -> Result<Option<DatabaseStatePersistence>> {
280304
self.get_entry::<DatabaseStatePersistence>(STATE_CF, address.as_ref())
281305
}
@@ -494,6 +518,7 @@ impl DBStoreV2 {
494518
.map_err(|e| DB3Error::WriteStoreError(format!("{e}")))?;
495519
Ok(())
496520
}
521+
497522
fn get_entry<T>(&self, cf: &str, id: &[u8]) -> Result<Option<T>>
498523
where
499524
T: Message + std::default::Default,
@@ -609,6 +634,34 @@ impl DBStoreV2 {
609634
None
610635
}
611636

637+
fn build_persistence_state(&self, db_addr: &DB3Address) -> Option<DatabaseStatePersistence> {
638+
let db_addr_hex = db_addr.to_hex();
639+
if let Some(guard) = self.db_state.get(db_addr_hex.as_str()) {
640+
let database_state = guard.deref();
641+
let collection_states: HashMap<String, CollectionStateProto> = database_state
642+
.collection_state
643+
.iter()
644+
.map(|(key, value)| {
645+
(
646+
key.to_string(),
647+
CollectionStateProto {
648+
total_doc_count: value.total_doc_count,
649+
},
650+
)
651+
})
652+
.collect();
653+
Some(DatabaseStatePersistence {
654+
addr: db_addr_hex,
655+
total_doc_count: database_state.total_doc_count,
656+
total_col_count: database_state.collection_state.len() as u64,
657+
collection_states,
658+
doc_order: database_state.doc_order,
659+
})
660+
} else {
661+
None
662+
}
663+
}
664+
612665
pub fn get_database_state(&self, db_addr: &DB3Address) -> Option<DatabaseStateProto> {
613666
let db_addr_hex = db_addr.to_hex();
614667
if let Some(guard) = self.db_state.get(db_addr_hex.as_str()) {
@@ -1651,6 +1704,104 @@ mod tests {
16511704
assert_eq!(block_state, Some(BlockState { block: 1, order: 2 }));
16521705
}
16531706
}
1707+
#[test]
1708+
fn test_recover_db_state_with_persistence() {
1709+
let tmp_dir_path = TempDir::new("new_database").expect("create temp dir");
1710+
let real_path = tmp_dir_path.path().to_str().unwrap().to_string();
1711+
let mut address: Vec<DB3Address> = Vec::new();
1712+
1713+
{
1714+
let config = DBStoreV2Config {
1715+
db_path: real_path.to_string(),
1716+
db_store_cf_name: "db".to_string(),
1717+
doc_store_cf_name: "doc".to_string(),
1718+
collection_store_cf_name: "cf2".to_string(),
1719+
index_store_cf_name: "index".to_string(),
1720+
doc_owner_store_cf_name: "doc_owner".to_string(),
1721+
db_owner_store_cf_name: "db_owner".to_string(),
1722+
scan_max_limit: 50,
1723+
enable_doc_store: false,
1724+
doc_store_conf: DocStoreConfig::default(),
1725+
doc_start_id: 1000,
1726+
};
1727+
let result = DBStoreV2::new(config);
1728+
assert_eq!(result.is_ok(), true);
1729+
let db_m = DocumentDatabaseMutation {
1730+
db_desc: "test_desc".to_string(),
1731+
};
1732+
let db3_store = result.unwrap();
1733+
let result = db3_store.create_doc_database(&DB3Address::ZERO, &db_m, 1, 1, 1, 1);
1734+
assert_eq!(result.is_ok(), true);
1735+
let db_id = result.unwrap();
1736+
let result = db3_store.create_doc_database(&DB3Address::ZERO, &db_m, 2, 2, 2, 2);
1737+
assert_eq!(result.is_ok(), true);
1738+
let db_id2 = result.unwrap();
1739+
1740+
let collection = CollectionMutation {
1741+
index_fields: vec![],
1742+
collection_name: "col1".to_string(),
1743+
};
1744+
1745+
let result = db3_store.create_collection(
1746+
&DB3Address::ZERO,
1747+
db_id.address(),
1748+
&collection,
1749+
1,
1750+
1,
1751+
1,
1752+
);
1753+
assert!(result.is_ok());
1754+
let result = db3_store.create_collection(
1755+
&DB3Address::ZERO,
1756+
db_id2.address(),
1757+
&collection,
1758+
1,
1759+
1,
1760+
1,
1761+
);
1762+
assert!(result.is_ok());
1763+
let docs = vec!["{\"test\":0}".to_string()];
1764+
address.push(db_id.address().clone());
1765+
for _n in 0..1003 {
1766+
db3_store
1767+
.add_docs(db_id.address(), &DB3Address::ZERO, "col1", &docs, None)
1768+
.unwrap();
1769+
}
1770+
for _n in 0..91 {
1771+
db3_store
1772+
.add_docs(db_id2.address(), &DB3Address::ZERO, "col1", &docs, None)
1773+
.unwrap();
1774+
}
1775+
let result = db3_store.flush_database_state();
1776+
assert_eq!(result.is_ok(), true);
1777+
}
1778+
1779+
{
1780+
let config = DBStoreV2Config {
1781+
db_path: real_path,
1782+
db_store_cf_name: "db".to_string(),
1783+
doc_store_cf_name: "doc".to_string(),
1784+
collection_store_cf_name: "cf2".to_string(),
1785+
index_store_cf_name: "index".to_string(),
1786+
doc_owner_store_cf_name: "doc_owner".to_string(),
1787+
db_owner_store_cf_name: "db_owner".to_string(),
1788+
scan_max_limit: 50,
1789+
enable_doc_store: false,
1790+
doc_store_conf: DocStoreConfig::default(),
1791+
doc_start_id: 1000,
1792+
};
1793+
let result = DBStoreV2::new(config);
1794+
let db3_store = result.unwrap();
1795+
let result = db3_store.recover_db_state();
1796+
println!("{:?}", result);
1797+
assert_eq!(result.is_ok(), true);
1798+
let database_state_ret = db3_store.get_database_state(&address[0]);
1799+
println!("{:?}", database_state_ret);
1800+
let database_state = database_state_ret.unwrap();
1801+
assert_eq!(database_state.doc_order, 1004);
1802+
}
1803+
}
1804+
16541805
#[test]
16551806
fn test_recover_db_state() {
16561807
let tmp_dir_path = TempDir::new("new_database").expect("create temp dir");

thirdparty/data-manager

Submodule data-manager updated 103 files

0 commit comments

Comments
 (0)