Skip to content

runtime: fix missing funk_txn_prepare error checking #5943

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion config/extra/with-handholding.mk
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ CPPFLAGS+=-DFD_SPAD_USE_HANDHOLDING=1
CPPFLAGS+=-DFD_TOWER_USE_HANDHOLDING=1
CPPFLAGS+=-DFD_TMPL_USE_HANDHOLDING=1
CPPFLAGS+=-DFD_TXN_HANDHOLDING=1
CPPFLAGS+=-DFD_FUNK_HANDHOLDING=1
CPPFLAGS+=-DFD_RUNTIME_ERR_HANDHOLDING=1
CPPFLAGS+=-DFD_FOREST_USE_HANDHOLDING=1
CPPFLAGS+=-DFD_STAKES_USE_HANDHOLDING=1
28 changes: 15 additions & 13 deletions contrib/bundle-test-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,26 @@ version = "0.1.0"
edition = "2021"

[dependencies]
tonic = { version = "0.12.2", features = ["tls-roots", "tls", "tls-webpki-roots"] }
prost = "0.13.3"
prost-types = "0.13.3"
tonic = { version = "0.14", features = ["tls-webpki-roots"] }
tonic-prost = "0.14"
prost = "0.14"
prost-types = "0.14"
log = "0.4.22"
tokio = { version = "1.40.0", features = ["rt-multi-thread"] }
tokio = "1.47"
tokio-stream = "0.1"
futures = "0.3.30"
chrono = "0.4.38"
thiserror = "1.0.64"
futures = "0.3"
chrono = "0.4"
thiserror = "1.0"
bs58 = "0.5.1"
futures-util = "0.3.31"
env_logger = "0.11.5"
base64="0.22.1"
futures-util = "0.3"
env_logger = "0.11"
base64 = "0.22"
rustyline = "17.0"

[build-dependencies]
tonic-build = "0.12.2"
protobuf-src = "2.1.0"
prost-types = "0.13.3"
tonic-prost-build = "0.14"
protobuf-src = "2.1"
prost-types = "0.14"

[dev-dependencies]
ed25519-dalek = "2.1.1"
Expand Down
4 changes: 1 addition & 3 deletions contrib/bundle-test-server/build.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use tonic_build::configure;

fn main() -> Result<(), std::io::Error> {
const PROTOC_ENVAR: &str = "PROTOC";
if std::env::var(PROTOC_ENVAR).is_err() {
Expand All @@ -23,7 +21,7 @@ fn main() -> Result<(), std::io::Error> {
protos.push(proto);
}

configure()
tonic_prost_build::configure()
.build_client(false)
.build_server(true)
.type_attribute(
Expand Down
190 changes: 148 additions & 42 deletions contrib/bundle-test-server/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,56 +1,73 @@
use std::pin::Pin;

use crate::proto::auth::{self, Token};
use crate::proto::auth::auth_service_server::{AuthService, AuthServiceServer};
use crate::proto::auth::{self, Token};
use crate::proto::bundle::{Bundle, BundleUuid};
use crate::proto::packet::{Packet, PacketBatch};
use base64::prelude::*;
use chrono::{Duration, Utc};
use futures::select;
use futures::FutureExt;
use futures_util::stream::Stream;
use log::info;
use prost_types::Timestamp;
use rustyline::{error::ReadlineError, DefaultEditor};
use std::net::SocketAddr;
use std::sync::Arc;
use tokio::sync::{broadcast, mpsc};
use tonic::{transport::Server, Request, Response, Status};
use futures_util::stream::Stream;
use base64::prelude::*;
use tokio::sync::mpsc;

use crate::proto::block_engine::block_engine_validator_server::{BlockEngineValidator, BlockEngineValidatorServer};
use crate::proto::block_engine::{SubscribePacketsRequest, SubscribePacketsResponse, SubscribeBundlesRequest, SubscribeBundlesResponse, BlockBuilderFeeInfoRequest, BlockBuilderFeeInfoResponse};
use crate::proto::block_engine::block_engine_validator_server::{
BlockEngineValidator, BlockEngineValidatorServer,
};
use crate::proto::block_engine::{
BlockBuilderFeeInfoRequest, BlockBuilderFeeInfoResponse, SubscribeBundlesRequest,
SubscribeBundlesResponse, SubscribePacketsRequest, SubscribePacketsResponse,
};

#[derive(Debug, Default)]
pub struct BlockEngineValidatorService;
pub struct Service {
kill_streams: broadcast::Receiver<()>,
}

#[derive(Clone)]
pub struct ServiceHandle(Arc<Service>);

type PacketResponseStream = Pin<Box<dyn Stream<Item = Result<SubscribePacketsResponse, Status>> + Send>>;
type BundleResponseStream = Pin<Box<dyn Stream<Item = Result<SubscribeBundlesResponse, Status>> + Send>>;
type PacketResponseStream =
Pin<Box<dyn Stream<Item = Result<SubscribePacketsResponse, Status>> + Send>>;
type BundleResponseStream =
Pin<Box<dyn Stream<Item = Result<SubscribeBundlesResponse, Status>> + Send>>;

pub mod proto {
pub mod auth {
pub(crate) mod proto {
pub(crate) mod auth {
tonic::include_proto!("auth");
}
pub mod block_engine {
pub(crate) mod block_engine {
tonic::include_proto!("block_engine");
}
pub mod bundle {
pub(crate) mod bundle {
tonic::include_proto!("bundle");
}
pub mod packet {
pub(crate) mod packet {
tonic::include_proto!("packet");
}
pub mod relayer {
pub(crate) mod relayer {
tonic::include_proto!("relayer");
}
pub mod shared {
pub(crate) mod shared {
tonic::include_proto!("shared");
}
}

#[tonic::async_trait]
impl BlockEngineValidator for BlockEngineValidatorService {
impl BlockEngineValidator for ServiceHandle {
type SubscribePacketsStream = PacketResponseStream;
type SubscribeBundlesStream = BundleResponseStream;

async fn subscribe_packets(
&self,
_request: Request<SubscribePacketsRequest>,
) -> Result<Response<Self::SubscribePacketsStream>, Status> {
let mut kill_streams = self.0.kill_streams.resubscribe();
let (tx, rx) = mpsc::channel(16);
tokio::spawn(async move {
info!("Packet stream start");
Expand All @@ -70,19 +87,23 @@ impl BlockEngineValidator for BlockEngineValidatorService {
}),
};
loop {
if tx.send(Ok(msg.clone())).await.is_err() {
info!("Packet stream stop");
break;
select! {
_ = kill_streams.recv().fuse() => break,
res = tx.send(Ok(msg.clone())).fuse() => if res.is_err() { break }
}
}
info!("Packet stream stop");
});
Ok(Response::new(Box::pin(tokio_stream::wrappers::ReceiverStream::new(rx))))
Ok(Response::new(Box::pin(
tokio_stream::wrappers::ReceiverStream::new(rx),
)))
}

async fn subscribe_bundles(
&self,
_request: Request<SubscribeBundlesRequest>,
) -> Result<Response<Self::SubscribeBundlesStream>, Status> {
let mut kill_streams = self.0.kill_streams.resubscribe();
let (tx, rx) = mpsc::channel(16);
tokio::spawn(async move {
info!("Bundle stream start");
Expand Down Expand Up @@ -111,13 +132,16 @@ impl BlockEngineValidator for BlockEngineValidatorService {
]
};
loop {
if tx.send(Ok(msg.clone())).await.is_err() {
info!("Bundle stream stop");
break;
select! {
_ = kill_streams.recv().fuse() => break,
res = tx.send(Ok(msg.clone())).fuse() => if res.is_err() { break }
}
}
info!("Bundle stream stop");
});
Ok(Response::new(Box::pin(tokio_stream::wrappers::ReceiverStream::new(rx))))
Ok(Response::new(Box::pin(
tokio_stream::wrappers::ReceiverStream::new(rx),
)))
}

async fn get_block_builder_fee_info(
Expand All @@ -133,17 +157,17 @@ impl BlockEngineValidator for BlockEngineValidatorService {
}
}

#[derive(Debug, Default)]
pub struct Auth;

#[tonic::async_trait]
impl AuthService for Auth {
impl AuthService for ServiceHandle {
async fn generate_auth_challenge(
&self,
request: Request<auth::GenerateAuthChallengeRequest>,
) -> Result<Response<auth::GenerateAuthChallengeResponse>, Status> {
let req_data = request.into_inner();
info!("Received auth challenge request from {}", bs58::encode(&req_data.pubkey).into_string());
info!(
"Received auth challenge request from {}",
bs58::encode(&req_data.pubkey).into_string()
);
Ok(Response::new(auth::GenerateAuthChallengeResponse {
challenge: "012345678".to_string(),
}))
Expand Down Expand Up @@ -187,18 +211,100 @@ impl AuthService for Auth {
}
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
env_logger::init();
struct Cnc {
kill_streams_tx: broadcast::Sender<()>,
kill_server_tx: broadcast::Sender<()>,
}

let addr = "127.0.0.1:50051".parse()?;
info!("Block Engine Validator Server listening on {}", addr);
fn handle_line(cnc: &mut Cnc, line: &str) -> bool {
match line {
"" => return false,
"help" => {
println!("Available commands:");
println!(" help - Show this help message");
println!(" exit - Exit the server");
println!(" kill-streams - Kill all active streams");
println!(" kill-server - Kill and restart the server");
}
"exit" | "quit" => {
println!("Exiting...");
std::process::exit(0);
}
"kill-streams" => {
let _ = cnc.kill_streams_tx.send(());
}
"kill-server" => {
let _ = cnc.kill_server_tx.send(());
}
cmd => {
println!("Unknown command: {}", cmd);
return false;
}
}
true
}

Server::builder()
.add_service(BlockEngineValidatorServer::new(BlockEngineValidatorService::default()))
.add_service(AuthServiceServer::new(Auth::default()))
.serve(addr)
.await?;
async fn run_server(
service: ServiceHandle,
listen_addr: SocketAddr,
mut kill_signal: broadcast::Receiver<()>,
) {
loop {
let server = Server::builder()
.add_service(BlockEngineValidatorServer::new(service.clone()))
.add_service(AuthServiceServer::new(service.clone()))
.serve_with_shutdown(listen_addr.clone(), kill_signal.recv().map(|_| ()));
server.await.unwrap();
info!("Restarting server");
}
}

fn main() {
env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init();

let (kill_streams_tx, kill_streams_rx) = broadcast::channel(2);
let (kill_server_tx, kill_server_rx) = broadcast::channel(2);

let mut cnc = Cnc {
kill_streams_tx,
kill_server_tx,
};

// Spawn a thread handling all gRPC I/O
let addr: SocketAddr = "127.0.0.1:50051".parse().unwrap();
let handle = std::thread::spawn(move || {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();

Ok(())
let service = ServiceHandle(Arc::new(Service {
kill_streams: kill_streams_rx,
}));
rt.block_on(run_server(service, addr, kill_server_rx));
});

// Run a REPL on the current thread
let mut rl = match DefaultEditor::new() {
Ok(rl) => rl,
Err(_) => {
handle.join().unwrap();
std::process::exit(1);
}
};
println!("Block Engine Validator Server listening on {}", addr);
loop {
let readline = rl.readline("");
match readline {
Ok(line) => {
if handle_line(&mut cnc, &line) {
let _ = rl.add_history_entry(&line);
}
}
Err(ReadlineError::Interrupted) | Err(ReadlineError::Eof) => {
std::process::exit(0);
}
Err(err) => panic!("Unexpected error: {}", err),
}
}
}
1 change: 1 addition & 0 deletions src/discof/restore/fd_snapin_tile.c
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ handle_control_frag( fd_snapin_tile_t * ctx,

fd_funk_txn_xid_t incremental_xid = fd_funk_generate_xid();
ctx->funk_txn = fd_funk_txn_prepare( ctx->funk, ctx->funk_txn, &incremental_xid, 0 );
if( FD_UNLIKELY( !ctx->funk_txn ) ) FD_LOG_ERR(( "fd_funk_txn_prepare failed" ));
ctx->full = 0;
ctx->state = FD_SNAPIN_STATE_LOADING;
break;
Expand Down
2 changes: 2 additions & 0 deletions src/flamenco/runtime/fd_runtime.c
Original file line number Diff line number Diff line change
Expand Up @@ -1954,6 +1954,7 @@ fd_migrate_builtin_to_core_bpf( fd_exec_slot_ctx_t * slot_ctx,
fd_funk_txn_xid_t migration_xid = fd_funk_generate_xid();
fd_funk_txn_start_write( slot_ctx->funk );
slot_ctx->funk_txn = fd_funk_txn_prepare( slot_ctx->funk, slot_ctx->funk_txn, &migration_xid, 0UL );
if( FD_UNLIKELY( !slot_ctx->funk_txn ) ) FD_LOG_ERR(( "fd_funk_txn_prepare failed" ));
fd_funk_txn_end_write( slot_ctx->funk );

/* Attempt serialization of program account. If the program is
Expand Down Expand Up @@ -2804,6 +2805,7 @@ fd_runtime_read_genesis( fd_exec_slot_ctx_t * slot_ctx,
xid.ul[1] = 0UL;
xid.ul[0] = 0UL;
slot_ctx->funk_txn = fd_funk_txn_prepare( slot_ctx->funk, NULL, &xid, 1 );
if( FD_UNLIKELY( !slot_ctx->funk_txn ) ) FD_LOG_ERR(( "fd_funk_txn_prepare failed" ));
fd_funk_txn_end_write( slot_ctx->funk );

fd_runtime_init_bank_from_genesis( slot_ctx,
Expand Down
Loading
Loading