Skip to content

Commit f113a86

Browse files
committed
contrib: add REPL to bundle-test-server
Add 'kill-server' and 'kill-streams' command to exercise bundle server edge cases.
1 parent 17da1ce commit f113a86

File tree

14 files changed

+199
-169
lines changed

14 files changed

+199
-169
lines changed

config/extra/with-handholding.mk

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ CPPFLAGS+=-DFD_SPAD_USE_HANDHOLDING=1
66
CPPFLAGS+=-DFD_TOWER_USE_HANDHOLDING=1
77
CPPFLAGS+=-DFD_TMPL_USE_HANDHOLDING=1
88
CPPFLAGS+=-DFD_TXN_HANDHOLDING=1
9-
CPPFLAGS+=-DFD_FUNK_HANDHOLDING=1
109
CPPFLAGS+=-DFD_RUNTIME_ERR_HANDHOLDING=1
1110
CPPFLAGS+=-DFD_FOREST_USE_HANDHOLDING=1
1211
CPPFLAGS+=-DFD_STAKES_USE_HANDHOLDING=1

contrib/bundle-test-server/Cargo.toml

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -4,24 +4,26 @@ version = "0.1.0"
44
edition = "2021"
55

66
[dependencies]
7-
tonic = { version = "0.12.2", features = ["tls-roots", "tls", "tls-webpki-roots"] }
8-
prost = "0.13.3"
9-
prost-types = "0.13.3"
7+
tonic = { version = "0.14", features = ["tls-webpki-roots"] }
8+
tonic-prost = "0.14"
9+
prost = "0.14"
10+
prost-types = "0.14"
1011
log = "0.4.22"
11-
tokio = { version = "1.40.0", features = ["rt-multi-thread"] }
12+
tokio = "1.47"
1213
tokio-stream = "0.1"
13-
futures = "0.3.30"
14-
chrono = "0.4.38"
15-
thiserror = "1.0.64"
14+
futures = "0.3"
15+
chrono = "0.4"
16+
thiserror = "1.0"
1617
bs58 = "0.5.1"
17-
futures-util = "0.3.31"
18-
env_logger = "0.11.5"
19-
base64="0.22.1"
18+
futures-util = "0.3"
19+
env_logger = "0.11"
20+
base64 = "0.22"
21+
rustyline = "17.0"
2022

2123
[build-dependencies]
22-
tonic-build = "0.12.2"
23-
protobuf-src = "2.1.0"
24-
prost-types = "0.13.3"
24+
tonic-prost-build = "0.14"
25+
protobuf-src = "2.1"
26+
prost-types = "0.14"
2527

2628
[dev-dependencies]
2729
ed25519-dalek = "2.1.1"

contrib/bundle-test-server/build.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,3 @@
1-
use tonic_build::configure;
2-
31
fn main() -> Result<(), std::io::Error> {
42
const PROTOC_ENVAR: &str = "PROTOC";
53
if std::env::var(PROTOC_ENVAR).is_err() {
@@ -23,7 +21,7 @@ fn main() -> Result<(), std::io::Error> {
2321
protos.push(proto);
2422
}
2523

26-
configure()
24+
tonic_prost_build::configure()
2725
.build_client(false)
2826
.build_server(true)
2927
.type_attribute(

contrib/bundle-test-server/src/main.rs

Lines changed: 148 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -1,56 +1,73 @@
11
use std::pin::Pin;
22

3-
use crate::proto::auth::{self, Token};
43
use crate::proto::auth::auth_service_server::{AuthService, AuthServiceServer};
4+
use crate::proto::auth::{self, Token};
55
use crate::proto::bundle::{Bundle, BundleUuid};
66
use crate::proto::packet::{Packet, PacketBatch};
7+
use base64::prelude::*;
78
use chrono::{Duration, Utc};
9+
use futures::select;
10+
use futures::FutureExt;
11+
use futures_util::stream::Stream;
812
use log::info;
913
use prost_types::Timestamp;
14+
use rustyline::{error::ReadlineError, DefaultEditor};
15+
use std::net::SocketAddr;
16+
use std::sync::Arc;
17+
use tokio::sync::{broadcast, mpsc};
1018
use tonic::{transport::Server, Request, Response, Status};
11-
use futures_util::stream::Stream;
12-
use base64::prelude::*;
13-
use tokio::sync::mpsc;
1419

15-
use crate::proto::block_engine::block_engine_validator_server::{BlockEngineValidator, BlockEngineValidatorServer};
16-
use crate::proto::block_engine::{SubscribePacketsRequest, SubscribePacketsResponse, SubscribeBundlesRequest, SubscribeBundlesResponse, BlockBuilderFeeInfoRequest, BlockBuilderFeeInfoResponse};
20+
use crate::proto::block_engine::block_engine_validator_server::{
21+
BlockEngineValidator, BlockEngineValidatorServer,
22+
};
23+
use crate::proto::block_engine::{
24+
BlockBuilderFeeInfoRequest, BlockBuilderFeeInfoResponse, SubscribeBundlesRequest,
25+
SubscribeBundlesResponse, SubscribePacketsRequest, SubscribePacketsResponse,
26+
};
1727

18-
#[derive(Debug, Default)]
19-
pub struct BlockEngineValidatorService;
28+
pub struct Service {
29+
kill_streams: broadcast::Receiver<()>,
30+
}
31+
32+
#[derive(Clone)]
33+
pub struct ServiceHandle(Arc<Service>);
2034

21-
type PacketResponseStream = Pin<Box<dyn Stream<Item = Result<SubscribePacketsResponse, Status>> + Send>>;
22-
type BundleResponseStream = Pin<Box<dyn Stream<Item = Result<SubscribeBundlesResponse, Status>> + Send>>;
35+
type PacketResponseStream =
36+
Pin<Box<dyn Stream<Item = Result<SubscribePacketsResponse, Status>> + Send>>;
37+
type BundleResponseStream =
38+
Pin<Box<dyn Stream<Item = Result<SubscribeBundlesResponse, Status>> + Send>>;
2339

24-
pub mod proto {
25-
pub mod auth {
40+
pub(crate) mod proto {
41+
pub(crate) mod auth {
2642
tonic::include_proto!("auth");
2743
}
28-
pub mod block_engine {
44+
pub(crate) mod block_engine {
2945
tonic::include_proto!("block_engine");
3046
}
31-
pub mod bundle {
47+
pub(crate) mod bundle {
3248
tonic::include_proto!("bundle");
3349
}
34-
pub mod packet {
50+
pub(crate) mod packet {
3551
tonic::include_proto!("packet");
3652
}
37-
pub mod relayer {
53+
pub(crate) mod relayer {
3854
tonic::include_proto!("relayer");
3955
}
40-
pub mod shared {
56+
pub(crate) mod shared {
4157
tonic::include_proto!("shared");
4258
}
4359
}
4460

4561
#[tonic::async_trait]
46-
impl BlockEngineValidator for BlockEngineValidatorService {
62+
impl BlockEngineValidator for ServiceHandle {
4763
type SubscribePacketsStream = PacketResponseStream;
4864
type SubscribeBundlesStream = BundleResponseStream;
4965

5066
async fn subscribe_packets(
5167
&self,
5268
_request: Request<SubscribePacketsRequest>,
5369
) -> Result<Response<Self::SubscribePacketsStream>, Status> {
70+
let mut kill_streams = self.0.kill_streams.resubscribe();
5471
let (tx, rx) = mpsc::channel(16);
5572
tokio::spawn(async move {
5673
info!("Packet stream start");
@@ -70,19 +87,23 @@ impl BlockEngineValidator for BlockEngineValidatorService {
7087
}),
7188
};
7289
loop {
73-
if tx.send(Ok(msg.clone())).await.is_err() {
74-
info!("Packet stream stop");
75-
break;
90+
select! {
91+
_ = kill_streams.recv().fuse() => break,
92+
res = tx.send(Ok(msg.clone())).fuse() => if res.is_err() { break }
7693
}
7794
}
95+
info!("Packet stream stop");
7896
});
79-
Ok(Response::new(Box::pin(tokio_stream::wrappers::ReceiverStream::new(rx))))
97+
Ok(Response::new(Box::pin(
98+
tokio_stream::wrappers::ReceiverStream::new(rx),
99+
)))
80100
}
81101

82102
async fn subscribe_bundles(
83103
&self,
84104
_request: Request<SubscribeBundlesRequest>,
85105
) -> Result<Response<Self::SubscribeBundlesStream>, Status> {
106+
let mut kill_streams = self.0.kill_streams.resubscribe();
86107
let (tx, rx) = mpsc::channel(16);
87108
tokio::spawn(async move {
88109
info!("Bundle stream start");
@@ -111,13 +132,16 @@ impl BlockEngineValidator for BlockEngineValidatorService {
111132
]
112133
};
113134
loop {
114-
if tx.send(Ok(msg.clone())).await.is_err() {
115-
info!("Bundle stream stop");
116-
break;
135+
select! {
136+
_ = kill_streams.recv().fuse() => break,
137+
res = tx.send(Ok(msg.clone())).fuse() => if res.is_err() { break }
117138
}
118139
}
140+
info!("Bundle stream stop");
119141
});
120-
Ok(Response::new(Box::pin(tokio_stream::wrappers::ReceiverStream::new(rx))))
142+
Ok(Response::new(Box::pin(
143+
tokio_stream::wrappers::ReceiverStream::new(rx),
144+
)))
121145
}
122146

123147
async fn get_block_builder_fee_info(
@@ -133,17 +157,17 @@ impl BlockEngineValidator for BlockEngineValidatorService {
133157
}
134158
}
135159

136-
#[derive(Debug, Default)]
137-
pub struct Auth;
138-
139160
#[tonic::async_trait]
140-
impl AuthService for Auth {
161+
impl AuthService for ServiceHandle {
141162
async fn generate_auth_challenge(
142163
&self,
143164
request: Request<auth::GenerateAuthChallengeRequest>,
144165
) -> Result<Response<auth::GenerateAuthChallengeResponse>, Status> {
145166
let req_data = request.into_inner();
146-
info!("Received auth challenge request from {}", bs58::encode(&req_data.pubkey).into_string());
167+
info!(
168+
"Received auth challenge request from {}",
169+
bs58::encode(&req_data.pubkey).into_string()
170+
);
147171
Ok(Response::new(auth::GenerateAuthChallengeResponse {
148172
challenge: "012345678".to_string(),
149173
}))
@@ -187,18 +211,100 @@ impl AuthService for Auth {
187211
}
188212
}
189213

190-
#[tokio::main]
191-
async fn main() -> Result<(), Box<dyn std::error::Error>> {
192-
env_logger::init();
214+
struct Cnc {
215+
kill_streams_tx: broadcast::Sender<()>,
216+
kill_server_tx: broadcast::Sender<()>,
217+
}
193218

194-
let addr = "127.0.0.1:50051".parse()?;
195-
info!("Block Engine Validator Server listening on {}", addr);
219+
fn handle_line(cnc: &mut Cnc, line: &str) -> bool {
220+
match line {
221+
"" => return false,
222+
"help" => {
223+
println!("Available commands:");
224+
println!(" help - Show this help message");
225+
println!(" exit - Exit the server");
226+
println!(" kill-streams - Kill all active streams");
227+
println!(" kill-server - Kill and restart the server");
228+
}
229+
"exit" | "quit" => {
230+
println!("Exiting...");
231+
std::process::exit(0);
232+
}
233+
"kill-streams" => {
234+
let _ = cnc.kill_streams_tx.send(());
235+
}
236+
"kill-server" => {
237+
let _ = cnc.kill_server_tx.send(());
238+
}
239+
cmd => {
240+
println!("Unknown command: {}", cmd);
241+
return false;
242+
}
243+
}
244+
true
245+
}
196246

197-
Server::builder()
198-
.add_service(BlockEngineValidatorServer::new(BlockEngineValidatorService::default()))
199-
.add_service(AuthServiceServer::new(Auth::default()))
200-
.serve(addr)
201-
.await?;
247+
async fn run_server(
248+
service: ServiceHandle,
249+
listen_addr: SocketAddr,
250+
mut kill_signal: broadcast::Receiver<()>,
251+
) {
252+
loop {
253+
let server = Server::builder()
254+
.add_service(BlockEngineValidatorServer::new(service.clone()))
255+
.add_service(AuthServiceServer::new(service.clone()))
256+
.serve_with_shutdown(listen_addr.clone(), kill_signal.recv().map(|_| ()));
257+
server.await.unwrap();
258+
info!("Restarting server");
259+
}
260+
}
261+
262+
fn main() {
263+
env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init();
264+
265+
let (kill_streams_tx, kill_streams_rx) = broadcast::channel(2);
266+
let (kill_server_tx, kill_server_rx) = broadcast::channel(2);
267+
268+
let mut cnc = Cnc {
269+
kill_streams_tx,
270+
kill_server_tx,
271+
};
272+
273+
// Spawn a thread handling all gRPC I/O
274+
let addr: SocketAddr = "127.0.0.1:50051".parse().unwrap();
275+
let handle = std::thread::spawn(move || {
276+
let rt = tokio::runtime::Builder::new_current_thread()
277+
.enable_all()
278+
.build()
279+
.unwrap();
202280

203-
Ok(())
281+
let service = ServiceHandle(Arc::new(Service {
282+
kill_streams: kill_streams_rx,
283+
}));
284+
rt.block_on(run_server(service, addr, kill_server_rx));
285+
});
286+
287+
// Run a REPL on the current thread
288+
let mut rl = match DefaultEditor::new() {
289+
Ok(rl) => rl,
290+
Err(_) => {
291+
handle.join().unwrap();
292+
std::process::exit(1);
293+
}
294+
};
295+
println!("Block Engine Validator Server listening on {}", addr);
296+
loop {
297+
let readline = rl.readline("");
298+
match readline {
299+
Ok(line) => {
300+
if handle_line(&mut cnc, &line) {
301+
let _ = rl.add_history_entry(&line);
302+
}
303+
}
304+
Err(ReadlineError::Interrupted) | Err(ReadlineError::Eof) => {
305+
std::process::exit(0);
306+
}
307+
Err(err) => panic!("Unexpected error: {}", err),
308+
}
309+
}
204310
}

src/discof/restore/fd_snapin_tile.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,7 @@ handle_control_frag( fd_snapin_tile_t * ctx,
241241

242242
fd_funk_txn_xid_t incremental_xid = fd_funk_generate_xid();
243243
ctx->funk_txn = fd_funk_txn_prepare( ctx->funk, ctx->funk_txn, &incremental_xid, 0 );
244+
if( FD_UNLIKELY( !ctx->funk_txn ) ) FD_LOG_ERR(( "fd_funk_txn_prepare failed" ));
244245
ctx->full = 0;
245246
ctx->state = FD_SNAPIN_STATE_LOADING;
246247
break;

src/flamenco/runtime/fd_runtime.c

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1954,6 +1954,7 @@ fd_migrate_builtin_to_core_bpf( fd_exec_slot_ctx_t * slot_ctx,
19541954
fd_funk_txn_xid_t migration_xid = fd_funk_generate_xid();
19551955
fd_funk_txn_start_write( slot_ctx->funk );
19561956
slot_ctx->funk_txn = fd_funk_txn_prepare( slot_ctx->funk, slot_ctx->funk_txn, &migration_xid, 0UL );
1957+
if( FD_UNLIKELY( !slot_ctx->funk_txn ) ) FD_LOG_ERR(( "fd_funk_txn_prepare failed" ));
19571958
fd_funk_txn_end_write( slot_ctx->funk );
19581959

19591960
/* Attempt serialization of program account. If the program is
@@ -2804,6 +2805,7 @@ fd_runtime_read_genesis( fd_exec_slot_ctx_t * slot_ctx,
28042805
xid.ul[1] = 0UL;
28052806
xid.ul[0] = 0UL;
28062807
slot_ctx->funk_txn = fd_funk_txn_prepare( slot_ctx->funk, NULL, &xid, 1 );
2808+
if( FD_UNLIKELY( !slot_ctx->funk_txn ) ) FD_LOG_ERR(( "fd_funk_txn_prepare failed" ));
28072809
fd_funk_txn_end_write( slot_ctx->funk );
28082810

28092811
fd_runtime_init_bank_from_genesis( slot_ctx,

0 commit comments

Comments
 (0)