Skip to content

Commit 4102755

Browse files
committed
add upload segments grpc function
1 parent cb18ffd commit 4102755

File tree

8 files changed

+418
-180
lines changed

8 files changed

+418
-180
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

node/rpc/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ tonic = { version = "0.9.2", features = ["transport"] }
3333
prost = "0.11.9"
3434
prost-types = "0.11.9"
3535
tonic-reflection = "0.9.2"
36+
ethereum-types = "0.14"
3637

3738
[build-dependencies]
3839
tonic-build = "0.9.2"

node/rpc/proto/zgs_grpc.proto

Lines changed: 35 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,35 @@ syntax = "proto3";
22

33
package zgs_grpc;
44

5-
// A trivial ping service
6-
service ZgsGrpcService {
7-
rpc Ping (PingRequest) returns (PingReply);
5+
option go_package = "github.com/0glabs/0g-storage-client/node/proto;zgs_grpc";
6+
7+
message Empty {}
8+
9+
/// 32-byte hash root
10+
message DataRoot {
11+
bytes value = 1;
12+
}
13+
14+
/// A proof over a file-segment Merkle tree
15+
message FileProof {
16+
/// sequence of 32-byte hashes
17+
repeated bytes lemma = 1;
18+
/// bit-paths (left=false, right=true) alongside the lemmas
19+
repeated bool path = 2;
20+
}
21+
22+
/// A file segment plus its Merkle proof
23+
message SegmentWithProof {
24+
DataRoot root = 1; // file Merkle root
25+
bytes data = 2; // raw segment bytes
26+
uint64 index = 3; // segment index
27+
FileProof proof = 4; // Merkle proof of this leaf
28+
uint64 file_size = 5; // total file length
29+
}
30+
31+
message UploadSegmentsByTxSeqRequest {
32+
repeated SegmentWithProof segments = 1;
33+
uint64 tx_seq = 2;
834
}
935

1036
message PingRequest {
@@ -14,3 +40,9 @@ message PingRequest {
1440
message PingReply {
1541
string message = 1;
1642
}
43+
44+
// A trivial ping service
45+
service ZgsGrpcService {
46+
rpc Ping (PingRequest) returns (PingReply);
47+
rpc UploadSegmentsByTxSeq(UploadSegmentsByTxSeqRequest) returns (Empty);
48+
}

node/rpc/src/lib.rs

Lines changed: 63 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,11 @@ mod middleware;
1010
mod miner;
1111
pub mod types;
1212
mod zgs;
13+
mod zgs_grpc;
14+
mod rpc_helper;
1315

1416
use crate::miner::RpcServer as MinerRpcServer;
17+
use crate::types::SegmentWithProof;
1518
use crate::zgs_grpc::zgs_grpc::ZgsGrpcServiceImpl;
1619
use crate::zgs_grpc_proto::zgs_grpc_service_server::ZgsGrpcServiceServer;
1720
use admin::RpcServer as AdminRpcServer;
@@ -22,30 +25,26 @@ use jsonrpsee::core::RpcResult;
2225
use jsonrpsee::http_server::{HttpServerBuilder, HttpServerHandle};
2326
use network::{NetworkGlobals, NetworkMessage, NetworkSender};
2427
use std::error::Error;
28+
use std::fmt::{Debug, Formatter, Result as FmtResult};
2529
use std::sync::Arc;
2630
use storage_async::Store;
2731
use sync::{SyncRequest, SyncResponse, SyncSender};
2832
use task_executor::ShutdownReason;
2933
use tokio::sync::broadcast;
3034
use zgs::RpcServer as ZgsRpcServer;
3135
use zgs_miner::MinerMessage;
36+
use tonic_reflection::server::Builder as ReflectionBuilder;
37+
use tonic::transport::Server;
3238

3339
pub use admin::RpcClient as ZgsAdminRpcClient;
3440
pub use config::Config as RPCConfig;
3541
pub use miner::RpcClient as ZgsMinerRpcClient;
3642
pub use zgs::RpcClient as ZgsRPCClient;
37-
// bring in the reflection-builder
38-
use tonic_reflection::server::Builder as ReflectionBuilder;
39-
4043

4144
pub mod zgs_grpc_proto {
4245
tonic::include_proto!("zgs_grpc");
4346
}
4447

45-
mod zgs_grpc;
46-
47-
use tonic::transport::Server;
48-
4948
const DESCRIPTOR_SET: &[u8] = include_bytes!("../proto/zgs_grpc_descriptor.bin");
5049

5150
/// A wrapper around all the items required to spawn the HTTP server.
@@ -164,3 +163,60 @@ pub async fn run_grpc_server(ctx: Context) -> Result<(), Box<dyn Error>> {
164163
Ok(())
165164
}
166165

166+
enum SegmentIndex {
167+
Single(usize),
168+
Range(usize, usize), // [start, end]
169+
}
170+
171+
impl Debug for SegmentIndex {
172+
fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
173+
match self {
174+
Self::Single(val) => write!(f, "{}", val),
175+
Self::Range(start, end) => write!(f, "[{},{}]", start, end),
176+
}
177+
}
178+
}
179+
180+
struct SegmentIndexArray {
181+
items: Vec<SegmentIndex>,
182+
}
183+
184+
impl Debug for SegmentIndexArray {
185+
fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
186+
match self.items.first() {
187+
None => write!(f, "NULL"),
188+
Some(first) if self.items.len() == 1 => write!(f, "{:?}", first),
189+
_ => write!(f, "{:?}", self.items),
190+
}
191+
}
192+
}
193+
194+
impl SegmentIndexArray {
195+
fn new(segments: &[SegmentWithProof]) -> Self {
196+
let mut items = Vec::new();
197+
198+
let mut current = match segments.first() {
199+
None => return SegmentIndexArray { items },
200+
Some(seg) => SegmentIndex::Single(seg.index),
201+
};
202+
203+
for index in segments.iter().skip(1).map(|seg| seg.index) {
204+
match current {
205+
SegmentIndex::Single(val) if val + 1 == index => {
206+
current = SegmentIndex::Range(val, index)
207+
}
208+
SegmentIndex::Range(start, end) if end + 1 == index => {
209+
current = SegmentIndex::Range(start, index)
210+
}
211+
_ => {
212+
items.push(current);
213+
current = SegmentIndex::Single(index);
214+
}
215+
}
216+
}
217+
218+
items.push(current);
219+
220+
SegmentIndexArray { items }
221+
}
222+
}

node/rpc/src/rpc_helper.rs

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
use crate::Context;
2+
use crate::types::SegmentWithProof;
3+
use crate::error;
4+
use chunk_pool::SegmentInfo;
5+
use jsonrpsee::core::RpcResult;
6+
use shared_types::Transaction;
7+
8+
/// Put a single segment (mirrors your old `put_segment`)
9+
pub async fn put_segment(
10+
ctx: &Context,
11+
segment: SegmentWithProof,
12+
) -> RpcResult<()> {
13+
debug!(root = %segment.root, index = %segment.index, "putSegment");
14+
15+
// fetch optional tx
16+
let maybe_tx = ctx
17+
.log_store
18+
.get_tx_by_data_root(&segment.root, false)
19+
.await?;
20+
21+
put_segment_with_maybe_tx(ctx, segment, maybe_tx).await
22+
}
23+
24+
/// Put a segment, given an optional Transaction (mirrors `put_segment_with_maybe_tx`)
25+
pub async fn put_segment_with_maybe_tx(
26+
ctx: &Context,
27+
segment: SegmentWithProof,
28+
maybe_tx: Option<Transaction>,
29+
) -> RpcResult<()> {
30+
ctx.chunk_pool.validate_segment_size(&segment.data)?;
31+
32+
if let Some(tx) = &maybe_tx {
33+
if tx.data_merkle_root != segment.root {
34+
return Err(error::internal_error("data root and tx seq not match"));
35+
}
36+
}
37+
38+
// decide cache vs write
39+
let need_cache = if ctx.chunk_pool.check_already_has_cache(&segment.root).await {
40+
true
41+
} else {
42+
check_need_cache(ctx, &maybe_tx, segment.file_size).await?
43+
};
44+
45+
segment.validate(ctx.config.chunks_per_segment)?;
46+
47+
let seg_info = SegmentInfo {
48+
root: segment.root,
49+
seg_data: segment.data,
50+
seg_proof: segment.proof,
51+
seg_index: segment.index,
52+
chunks_per_segment: ctx.config.chunks_per_segment,
53+
};
54+
55+
if need_cache {
56+
ctx.chunk_pool.cache_chunks(seg_info).await?;
57+
} else {
58+
let file_id = chunk_pool::FileID {
59+
root: seg_info.root,
60+
tx_id: maybe_tx.unwrap().id(),
61+
};
62+
ctx.chunk_pool
63+
.write_chunks(seg_info, file_id, segment.file_size)
64+
.await?;
65+
}
66+
67+
Ok(())
68+
}
69+
70+
/// The old `check_need_cache`
71+
pub async fn check_need_cache(
72+
ctx: &Context,
73+
maybe_tx: &Option<Transaction>,
74+
file_size: usize,
75+
) -> RpcResult<bool> {
76+
if let Some(tx) = maybe_tx {
77+
if tx.size != file_size as u64 {
78+
return Err(error::invalid_params(
79+
"file_size",
80+
"segment file size not matched with tx file size",
81+
));
82+
}
83+
if ctx.log_store.check_tx_completed(tx.seq).await? {
84+
return Err(error::invalid_params(
85+
"root",
86+
"already uploaded and finalized",
87+
));
88+
}
89+
if ctx.log_store.check_tx_pruned(tx.seq).await? {
90+
return Err(error::invalid_params("root", "already pruned"));
91+
}
92+
Ok(false)
93+
} else {
94+
if file_size > ctx.config.max_cache_file_size {
95+
return Err(error::invalid_params(
96+
"file_size",
97+
"caching of large file when tx is unavailable is not supported",
98+
));
99+
}
100+
Ok(true)
101+
}
102+
}

node/rpc/src/types.rs

Lines changed: 77 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use crate::error;
1+
use crate::{error, zgs_grpc_proto};
22
use append_merkle::ZERO_HASHES;
33
use jsonrpsee::core::RpcResult;
44
use merkle_light::hash::Algorithm;
@@ -11,12 +11,15 @@ use shared_types::{
1111
Transaction, CHUNK_SIZE,
1212
};
1313
use std::collections::HashSet;
14+
use std::convert::TryFrom;
1415
use std::hash::Hasher;
1516
use std::net::IpAddr;
1617
use std::time::Instant;
1718
use storage::config::ShardConfig;
1819
use storage::log_store::log_manager::bytes_to_entries;
1920
use storage::H256;
21+
use tonic::Status as GrpcStatus;
22+
use ethereum_types::H256 as EthH256;
2023

2124
const ZERO_HASH: [u8; 32] = [
2225
0xd3, 0x97, 0xb3, 0xb0, 0x43, 0xd8, 0x7f, 0xcd, 0x6f, 0xad, 0x12, 0x91, 0xff, 0xb, 0xfd, 0x16,
@@ -76,6 +79,79 @@ pub struct SegmentWithProof {
7679
pub file_size: usize,
7780
}
7881

82+
/// Convert the proto DataRoot → your app’s DataRoot
83+
impl TryFrom<zgs_grpc_proto::DataRoot> for DataRoot {
84+
type Error = GrpcStatus;
85+
86+
fn try_from(value: zgs_grpc_proto::DataRoot) -> Result<Self, GrpcStatus> {
87+
let bytes = value.value;
88+
if bytes.len() != 32 {
89+
return Err(GrpcStatus::invalid_argument(format!("Invalid hash length: got {}, want 32", bytes.len())));
90+
}
91+
// assume AppDataRoot is a newtype around H256:
92+
let mut arr = [0u8; 32];
93+
arr.copy_from_slice(&bytes);
94+
Ok(EthH256(arr))
95+
}
96+
}
97+
98+
/// Convert proto FileProof → your app’s FileProof
99+
impl TryFrom<zgs_grpc_proto::FileProof> for FileProof {
100+
type Error = GrpcStatus;
101+
102+
fn try_from(value: zgs_grpc_proto::FileProof) -> Result<Self, GrpcStatus> {
103+
// turn each `bytes` into an H256
104+
let mut lemma = Vec::with_capacity(value.lemma.len());
105+
for bin in value.lemma {
106+
if bin.len() != 32 {
107+
return Err(GrpcStatus::invalid_argument(format!("Invalid hash length: got {}, want 32", bin.len())));
108+
}
109+
let mut arr = [0u8; 32];
110+
arr.copy_from_slice(&bin);
111+
lemma.push(H256(arr));
112+
}
113+
114+
Ok(FileProof {
115+
lemma,
116+
path: value.path,
117+
})
118+
}
119+
}
120+
121+
/// Convert the full SegmentWithProof
122+
impl TryFrom<zgs_grpc_proto::SegmentWithProof> for SegmentWithProof {
123+
type Error = GrpcStatus;
124+
125+
fn try_from(grpc_segment: zgs_grpc_proto::SegmentWithProof) -> Result<Self, GrpcStatus> {
126+
let root = grpc_segment
127+
.root
128+
.unwrap()
129+
.try_into()
130+
.map_err(|e| e)?;
131+
let data = grpc_segment.data;
132+
// index is u64 in proto, usize in app
133+
let index = grpc_segment.index
134+
.try_into()
135+
.map_err(|_| GrpcStatus::invalid_argument(format!("Invalid segment index: {}", grpc_segment.index)))?;
136+
let proof = grpc_segment
137+
.proof
138+
.unwrap()
139+
.try_into()
140+
.map_err(|e| e)?;
141+
let file_size = grpc_segment.file_size
142+
.try_into()
143+
.map_err(|_| GrpcStatus::invalid_argument(format!("Invalid file size: {}", grpc_segment.file_size)))?;
144+
145+
Ok(SegmentWithProof {
146+
root,
147+
data,
148+
index,
149+
proof,
150+
file_size,
151+
})
152+
}
153+
}
154+
79155
impl SegmentWithProof {
80156
/// Splits file into segments and returns the total number of segments and the last segment size.
81157
pub fn split_file_into_segments(

0 commit comments

Comments
 (0)