Skip to content

Commit cfe4b45

Browse files
authored
add api of getting available file info by root (#357)
* add api of getting available file info by root
1 parent d43a616 commit cfe4b45

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+676
-340
lines changed

.github/workflows/tests.yml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,11 @@ jobs:
4545
python-version: '3.9'
4646
cache: 'pip'
4747

48+
- name: Set up Go
49+
uses: actions/setup-go@v4
50+
with:
51+
go-version: '1.22'
52+
4853
- name: Install dependencies
4954
run: |
5055
python -m pip install --upgrade pip

node/rpc/src/zgs/api.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,11 @@ pub trait Rpc {
6363
async fn check_file_finalized(&self, tx_seq_or_root: TxSeqOrRoot) -> RpcResult<Option<bool>>;
6464

6565
#[method(name = "getFileInfo")]
66-
async fn get_file_info(&self, data_root: DataRoot) -> RpcResult<Option<FileInfo>>;
66+
async fn get_file_info(
67+
&self,
68+
data_root: DataRoot,
69+
need_available: bool,
70+
) -> RpcResult<Option<FileInfo>>;
6771

6872
#[method(name = "getFileInfoByTxSeq")]
6973
async fn get_file_info_by_tx_seq(&self, tx_seq: u64) -> RpcResult<Option<FileInfo>>;

node/rpc/src/zgs/impl.rs

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ impl RpcServer for RpcServerImpl {
9595
let tx_seq = try_option!(
9696
self.ctx
9797
.log_store
98-
.get_tx_seq_by_data_root(&data_root)
98+
.get_tx_seq_by_data_root(&data_root, true)
9999
.await?
100100
);
101101

@@ -121,7 +121,12 @@ impl RpcServer for RpcServerImpl {
121121
) -> RpcResult<Option<SegmentWithProof>> {
122122
info!(%data_root, %index, "zgs_downloadSegmentWithProof");
123123

124-
let tx = try_option!(self.ctx.log_store.get_tx_by_data_root(&data_root).await?);
124+
let tx = try_option!(
125+
self.ctx
126+
.log_store
127+
.get_tx_by_data_root(&data_root, true)
128+
.await?
129+
);
125130

126131
self.get_segment_with_proof_by_tx(tx, index).await
127132
}
@@ -144,7 +149,12 @@ impl RpcServer for RpcServerImpl {
144149
let seq = match tx_seq_or_root {
145150
TxSeqOrRoot::TxSeq(v) => v,
146151
TxSeqOrRoot::Root(v) => {
147-
try_option!(self.ctx.log_store.get_tx_seq_by_data_root(&v).await?)
152+
try_option!(
153+
self.ctx
154+
.log_store
155+
.get_tx_seq_by_data_root(&v, false)
156+
.await?
157+
)
148158
}
149159
};
150160

@@ -163,10 +173,19 @@ impl RpcServer for RpcServerImpl {
163173
}
164174
}
165175

166-
async fn get_file_info(&self, data_root: DataRoot) -> RpcResult<Option<FileInfo>> {
176+
async fn get_file_info(
177+
&self,
178+
data_root: DataRoot,
179+
need_available: bool,
180+
) -> RpcResult<Option<FileInfo>> {
167181
debug!(%data_root, "zgs_getFileInfo");
168182

169-
let tx = try_option!(self.ctx.log_store.get_tx_by_data_root(&data_root).await?);
183+
let tx = try_option!(
184+
self.ctx
185+
.log_store
186+
.get_tx_by_data_root(&data_root, need_available)
187+
.await?
188+
);
170189

171190
Ok(Some(self.get_file_info_by_tx(tx).await?))
172191
}
@@ -288,7 +307,7 @@ impl RpcServerImpl {
288307
let maybe_tx = self
289308
.ctx
290309
.log_store
291-
.get_tx_by_data_root(&segment.root)
310+
.get_tx_by_data_root(&segment.root, false)
292311
.await?;
293312

294313
self.put_segment_with_maybe_tx(segment, maybe_tx).await

node/storage-async/src/lib.rs

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -59,15 +59,23 @@ impl Store {
5959
delegate!(fn get_proof_at_root(root: Option<DataRoot>, index: u64, length: u64) -> Result<FlowRangeProof>);
6060
delegate!(fn get_context() -> Result<(DataRoot, u64)>);
6161

62-
pub async fn get_tx_seq_by_data_root(&self, data_root: &DataRoot) -> Result<Option<u64>> {
62+
pub async fn get_tx_seq_by_data_root(
63+
&self,
64+
data_root: &DataRoot,
65+
need_available: bool,
66+
) -> Result<Option<u64>> {
6367
let root = *data_root;
64-
self.spawn(move |store| store.get_tx_seq_by_data_root(&root))
68+
self.spawn(move |store| store.get_tx_seq_by_data_root(&root, need_available))
6569
.await
6670
}
6771

68-
pub async fn get_tx_by_data_root(&self, data_root: &DataRoot) -> Result<Option<Transaction>> {
72+
pub async fn get_tx_by_data_root(
73+
&self,
74+
data_root: &DataRoot,
75+
need_available: bool,
76+
) -> Result<Option<Transaction>> {
6977
let root = *data_root;
70-
self.spawn(move |store| store.get_tx_by_data_root(&root))
78+
self.spawn(move |store| store.get_tx_by_data_root(&root, need_available))
7179
.await
7280
}
7381

node/storage/src/log_store/log_manager.rs

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -511,7 +511,7 @@ impl LogStoreChunkRead for LogManager {
511511
index_start: usize,
512512
index_end: usize,
513513
) -> crate::error::Result<Option<ChunkArray>> {
514-
let tx_seq = try_option!(self.get_tx_seq_by_data_root(data_root)?);
514+
let tx_seq = try_option!(self.get_tx_seq_by_data_root(data_root, true)?);
515515
self.get_chunks_by_tx_and_index_range(tx_seq, index_start, index_end)
516516
}
517517

@@ -536,13 +536,27 @@ impl LogStoreRead for LogManager {
536536
self.tx_store.get_tx_by_seq_number(seq)
537537
}
538538

539-
fn get_tx_seq_by_data_root(&self, data_root: &DataRoot) -> crate::error::Result<Option<u64>> {
539+
fn get_tx_seq_by_data_root(
540+
&self,
541+
data_root: &DataRoot,
542+
need_available: bool,
543+
) -> crate::error::Result<Option<u64>> {
540544
let seq_list = self.tx_store.get_tx_seq_list_by_data_root(data_root)?;
545+
let mut available_seq = None;
541546
for tx_seq in &seq_list {
542547
if self.tx_store.check_tx_completed(*tx_seq)? {
543548
// Return the first finalized tx if possible.
544549
return Ok(Some(*tx_seq));
545550
}
551+
if need_available
552+
&& available_seq.is_none()
553+
&& !self.tx_store.check_tx_pruned(*tx_seq)?
554+
{
555+
available_seq = Some(*tx_seq);
556+
}
557+
}
558+
if need_available {
559+
return Ok(available_seq);
546560
}
547561
// No tx is finalized, return the first one.
548562
Ok(seq_list.first().cloned())

node/storage/src/log_store/mod.rs

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,14 +31,22 @@ pub trait LogStoreRead: LogStoreChunkRead {
3131
fn get_tx_by_seq_number(&self, seq: u64) -> Result<Option<Transaction>>;
3232

3333
/// Get a transaction by the data root of its data.
34-
/// If all txs are not finalized, return the first one.
34+
/// If all txs are not finalized, return the first one if need available is false.
3535
/// Otherwise, return the first finalized tx.
36-
fn get_tx_seq_by_data_root(&self, data_root: &DataRoot) -> Result<Option<u64>>;
36+
fn get_tx_seq_by_data_root(
37+
&self,
38+
data_root: &DataRoot,
39+
need_available: bool,
40+
) -> Result<Option<u64>>;
3741

38-
/// If all txs are not finalized, return the first one.
42+
/// If all txs are not finalized, return the first one if need available is false.
3943
/// Otherwise, return the first finalized tx.
40-
fn get_tx_by_data_root(&self, data_root: &DataRoot) -> Result<Option<Transaction>> {
41-
match self.get_tx_seq_by_data_root(data_root)? {
44+
fn get_tx_by_data_root(
45+
&self,
46+
data_root: &DataRoot,
47+
need_available: bool,
48+
) -> Result<Option<Transaction>> {
49+
match self.get_tx_seq_by_data_root(data_root, need_available)? {
4250
Some(seq) => self.get_tx_by_seq_number(seq),
4351
None => Ok(None),
4452
}

tests/cache_test.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,10 @@ def run_test(self):
1717
self.contract.submit(submissions)
1818
wait_until(lambda: self.contract.num_submissions() == 1)
1919
wait_until(lambda: client.zgs_get_file_info(data_root) is not None)
20-
wait_until(lambda: not client.zgs_get_file_info(data_root)["isCached"] and client.zgs_get_file_info(data_root)["uploadedSegNum"] == 1)
20+
wait_until(
21+
lambda: not client.zgs_get_file_info(data_root)["isCached"]
22+
and client.zgs_get_file_info(data_root)["uploadedSegNum"] == 1
23+
)
2124
client.zgs_upload_segment(segments[1])
2225
wait_until(lambda: client.zgs_get_file_info(data_root)["finalized"])
2326

tests/config/node_config.py

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,7 @@
77
"log_config_file": "log_config",
88
"confirmation_block_count": 1,
99
"discv5_disable_ip_limit": True,
10-
"network_peer_manager": {
11-
"heartbeat_interval": "1s"
12-
},
10+
"network_peer_manager": {"heartbeat_interval": "1s"},
1311
"router": {
1412
"private_ip_enabled": True,
1513
},
@@ -22,7 +20,7 @@
2220
"auto_sync_idle_interval": "1s",
2321
"sequential_find_peer_timeout": "10s",
2422
"random_find_peer_timeout": "10s",
25-
}
23+
},
2624
}
2725

2826
CONFIG_DIR = os.path.dirname(__file__)
@@ -75,11 +73,12 @@
7573
NO_SEAL_FLAG = 0x1
7674
NO_MERKLE_PROOF_FLAG = 0x2
7775

76+
7877
def update_config(default: dict, custom: dict):
7978
"""
8079
Supports to update configurations with dict value.
8180
"""
82-
for (key, value) in custom.items():
81+
for key, value in custom.items():
8382
if default.get(key) is None or type(value) != dict:
8483
default[key] = value
8584
else:

tests/crash_test.py

Lines changed: 15 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -20,17 +20,15 @@ def run_test(self):
2020

2121
segment = submit_data(self.nodes[0], chunk_data)
2222
self.log.info("segment: %s", segment)
23-
wait_until(lambda: self.nodes[0].zgs_get_file_info(data_root)["finalized"] is True)
23+
wait_until(
24+
lambda: self.nodes[0].zgs_get_file_info(data_root)["finalized"] is True
25+
)
2426

2527
for i in range(1, self.num_nodes):
26-
wait_until(
27-
lambda: self.nodes[i].zgs_get_file_info(data_root) is not None
28-
)
28+
wait_until(lambda: self.nodes[i].zgs_get_file_info(data_root) is not None)
2929
self.nodes[i].admin_start_sync_file(0)
3030
self.log.info("wait for node: %s", i)
31-
wait_until(
32-
lambda: self.nodes[i].zgs_get_file_info(data_root)["finalized"]
33-
)
31+
wait_until(lambda: self.nodes[i].zgs_get_file_info(data_root)["finalized"])
3432

3533
# 2: first node runnging, other nodes killed
3634
self.log.info("kill node")
@@ -56,22 +54,16 @@ def run_test(self):
5654
for i in range(2, self.num_nodes):
5755
self.start_storage_node(i)
5856
self.nodes[i].wait_for_rpc_connection()
59-
wait_until(
60-
lambda: self.nodes[i].zgs_get_file_info(data_root) is not None
61-
)
57+
wait_until(lambda: self.nodes[i].zgs_get_file_info(data_root) is not None)
6258
self.nodes[i].admin_start_sync_file(1)
6359

6460
self.nodes[i].stop(kill=True)
6561
self.start_storage_node(i)
6662
self.nodes[i].wait_for_rpc_connection()
67-
wait_until(
68-
lambda: self.nodes[i].zgs_get_file_info(data_root) is not None
69-
)
63+
wait_until(lambda: self.nodes[i].zgs_get_file_info(data_root) is not None)
7064
self.nodes[i].admin_start_sync_file(1)
71-
72-
wait_until(
73-
lambda: self.nodes[i].zgs_get_file_info(data_root)["finalized"]
74-
)
65+
66+
wait_until(lambda: self.nodes[i].zgs_get_file_info(data_root)["finalized"])
7567

7668
# 4: node[1..] synced contract entries and killed
7769
self.log.info("kill node 0")
@@ -96,13 +88,9 @@ def run_test(self):
9688
self.log.info("wait for node: %s", i)
9789
self.start_storage_node(i)
9890
self.nodes[i].wait_for_rpc_connection()
99-
wait_until(
100-
lambda: self.nodes[i].zgs_get_file_info(data_root) is not None
101-
)
91+
wait_until(lambda: self.nodes[i].zgs_get_file_info(data_root) is not None)
10292
self.nodes[i].admin_start_sync_file(2)
103-
wait_until(
104-
lambda: self.nodes[i].zgs_get_file_info(data_root)["finalized"]
105-
)
93+
wait_until(lambda: self.nodes[i].zgs_get_file_info(data_root)["finalized"])
10694

10795
# 5: node[1..] synced contract entries and killed, sync disorder
10896
self.nodes[0].stop(kill=True)
@@ -137,21 +125,13 @@ def run_test(self):
137125
self.log.info("wait for node: %s", i)
138126
self.start_storage_node(i)
139127
self.nodes[i].wait_for_rpc_connection()
140-
wait_until(
141-
lambda: self.nodes[i].zgs_get_file_info(data_root1) is not None
142-
)
128+
wait_until(lambda: self.nodes[i].zgs_get_file_info(data_root1) is not None)
143129
self.nodes[i].admin_start_sync_file(4)
144-
wait_until(
145-
lambda: self.nodes[i].zgs_get_file_info(data_root1)["finalized"]
146-
)
130+
wait_until(lambda: self.nodes[i].zgs_get_file_info(data_root1)["finalized"])
147131

148-
wait_until(
149-
lambda: self.nodes[i].zgs_get_file_info(data_root) is not None
150-
)
132+
wait_until(lambda: self.nodes[i].zgs_get_file_info(data_root) is not None)
151133
self.nodes[i].admin_start_sync_file(3)
152-
wait_until(
153-
lambda: self.nodes[i].zgs_get_file_info(data_root)["finalized"]
154-
)
134+
wait_until(lambda: self.nodes[i].zgs_get_file_info(data_root)["finalized"])
155135

156136

157137
if __name__ == "__main__":

tests/fuzz_test.py

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -45,13 +45,9 @@ def run_chunk_size(sizes, nodes, contract, log):
4545
lock.release()
4646

4747
log.info("submit data via client %s", idx)
48-
wait_until(
49-
lambda: nodes[idx].zgs_get_file_info(data_root) is not None
50-
)
48+
wait_until(lambda: nodes[idx].zgs_get_file_info(data_root) is not None)
5149
segment = submit_data(nodes[idx], chunk_data)
52-
wait_until(
53-
lambda: nodes[idx].zgs_get_file_info(data_root)["finalized"]
54-
)
50+
wait_until(lambda: nodes[idx].zgs_get_file_info(data_root)["finalized"])
5551

5652
lock.acquire()
5753
nodes_index.append(idx)
@@ -65,17 +61,15 @@ def run_chunk_size(sizes, nodes, contract, log):
6561
lambda: nodes[idx].zgs_get_file_info(data_root) is not None
6662
)
6763

68-
def wait_finalized():
64+
def wait_finalized():
6965
ret = nodes[idx].zgs_get_file_info(data_root)
7066
if ret["finalized"]:
7167
return True
7268
else:
73-
nodes[idx].admin_start_sync_file(ret['tx']['seq'])
69+
nodes[idx].admin_start_sync_file(ret["tx"]["seq"])
7470
return False
7571

76-
wait_until(
77-
lambda: wait_finalized(), timeout = 180
78-
)
72+
wait_until(lambda: wait_finalized(), timeout=180)
7973

8074
def run_small_chunk_size(nodes, contract, log):
8175
sizes = [i for i in range(1, SAMLL_SIZE + 1)]
@@ -84,7 +78,7 @@ def run_small_chunk_size(nodes, contract, log):
8478
run_chunk_size(sizes, nodes, contract, log)
8579

8680
def run_large_chunk_size(nodes, contract, log):
87-
sizes = [i for i in range(256 * 1024 * 256 - LARGE_SIZE, 256 * 1024 * 256 )]
81+
sizes = [i for i in range(256 * 1024 * 256 - LARGE_SIZE, 256 * 1024 * 256)]
8882
random.shuffle(sizes)
8983

9084
run_chunk_size(sizes, nodes, contract, log)

0 commit comments

Comments
 (0)