Skip to content

Commit 04bb3c3

Browse files
committed
cas-125: multiplexed remote peer connection support
1 parent 0d17f70 commit 04bb3c3

File tree

11 files changed

+2117
-365
lines changed

11 files changed

+2117
-365
lines changed

Cargo.lock

Lines changed: 2 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ tonic-prost-build = "0.14.0"
108108
vergen = "9.0.1"
109109
tikv-jemallocator = "0.6.1"
110110
libc = "0.2.177"
111-
yellowstone-jet-tpu-client = { path = "crates/tpu-client", version = "0.2.0" }
111+
yellowstone-jet-tpu-client = { path = "crates/tpu-client", version = "0.3.0" }
112112

113113
[workspace.lints.clippy]
114114
clone_on_ref_ptr = "deny"

apps/jet/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "yellowstone-jet"
3-
version = "14.9.1"
3+
version = "14.9.2"
44
description = "Yellowstone Jet"
55
edition.workspace = true
66
authors.workspace = true

apps/jet/src/transaction_handler.rs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use {
2020
std::sync::Arc,
2121
thiserror::Error,
2222
tokio::sync::mpsc,
23+
yellowstone_jet_tpu_client::core::PACKET_DATA_SIZE,
2324
};
2425

2526
#[derive(Debug, Error)]
@@ -109,7 +110,17 @@ impl TransactionHandler {
109110
}
110111

111112
let signature = transaction.signatures[0];
112-
let wire_transaction = bincode::serialize(&transaction)?;
113+
let mut wire_transaction = bincode::serialize(&transaction)?;
114+
if wire_transaction.len() > PACKET_DATA_SIZE {
115+
wire_transaction.shrink_to_fit();
116+
if wire_transaction.len() > PACKET_DATA_SIZE {
117+
return Err(TransactionHandlerError::InvalidTransaction(format!(
118+
"transaction size {} exceeds maximum allowed size of {} bytes",
119+
wire_transaction.len(),
120+
PACKET_DATA_SIZE
121+
)));
122+
}
123+
}
113124

114125
self.transaction_sink
115126
.send(Arc::new(SendTransactionRequest {

crates/tpu-client/Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "yellowstone-jet-tpu-client"
3-
version = "0.2.0"
3+
version = "0.3.0"
44
edition.workspace = true
55
authors.workspace = true
66
license.workspace = true
@@ -51,6 +51,7 @@ yellowstone-grpc = [
5151
"dep:solana-rpc-client",
5252
]
5353
shield = ["dep:yellowstone-shield-store"]
54+
intg-testing = []
5455

5556
[dependencies]
5657
async-trait = { workspace = true }
@@ -94,7 +95,6 @@ solana-message = { workspace = true }
9495
solana-net-utils = { workspace = true }
9596
solana-packet = { workspace = true }
9697
solana-pubkey = { workspace = true }
97-
solana-quic-definitions = { workspace = true }
9898
solana-rpc-client = { workspace = true, optional = true }
9999
solana-rpc-client-api = { workspace = true }
100100
solana-signature = { workspace = true, features = ["rand"] }

crates/tpu-client/src/config.rs

Lines changed: 54 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
use {
2+
crate::core::{DEFAULT_UNUSED_CONNECTION_TTL, QUIC_MAX_TIMEOUT},
23
serde::{Deserialize, Deserializer, de},
34
solana_net_utils::{PortRange, VALIDATOR_PORT_RANGE},
45
solana_pubkey::Pubkey,
5-
solana_quic_definitions::QUIC_MAX_TIMEOUT,
66
std::{net::SocketAddr, num::NonZeroUsize, ops::Range, time::Duration},
77
};
88

@@ -191,9 +191,59 @@ pub struct TpuSenderConfig {
191191
///
192192
#[serde(default)]
193193
pub tpu_info_override: Vec<TpuOverrideInfo>,
194+
195+
///
196+
/// Duration after which an orphan connection is evicted.
197+
///
198+
/// # Note
199+
///
200+
/// An orphan connection is a connection that is established but have no sender tasks referencing it.
201+
/// Each remote peer identity multiplexed over a connection gets 1:1 mapping to a sender task.
202+
///
203+
/// When all sender tasks for a connection are dropped, the connection becomes an orphan.
204+
///
205+
#[serde(
206+
default = "TpuSenderConfig::default_unused_connection_ttl",
207+
with = "humantime_serde"
208+
)]
209+
pub orphan_connection_ttl: Duration,
210+
211+
///
212+
/// NO DOCUMENTATION!
213+
///
214+
/// IF YOU USE THIS FEATURE YOU SHOULD FEEL BAD.
215+
#[serde(
216+
skip_deserializing,
217+
default = "TpuSenderConfig::default_allow_arbitrary_txn_size"
218+
)]
219+
pub unsafe_allow_arbitrary_txn_size: bool,
194220
}
195221

196222
impl TpuSenderConfig {
223+
///
224+
/// # Safety
225+
///
226+
/// This function enables sending transactions of arbitrary size, which may lead to unexpected behavior or security vulnerabilities.
227+
/// It should only be used in controlled testing environments.
228+
///
229+
pub unsafe fn allow_arbitrary_txn_size(&mut self) {
230+
#[cfg(not(feature = "intg-testing"))]
231+
{
232+
panic!(
233+
"TpuSenderConfig::allow_arbitrary_txn_size can only be set to true in integration testing builds."
234+
);
235+
}
236+
self.unsafe_allow_arbitrary_txn_size = true;
237+
}
238+
239+
pub const fn default_allow_arbitrary_txn_size() -> bool {
240+
false
241+
}
242+
243+
pub const fn default_unused_connection_ttl() -> Duration {
244+
DEFAULT_UNUSED_CONNECTION_TTL
245+
}
246+
197247
pub const fn default_connection_timeout() -> Duration {
198248
DEFAULT_CONNECTION_TIMEOUT
199249
}
@@ -257,7 +307,7 @@ impl TpuSenderConfig {
257307
/// Talking with Anza, we should not open more than 5 endpoints to host QUIC connections.
258308
pub const DEFAULT_QUIC_DRIVER_ENDPOINT_COUNT: NonZeroUsize =
259309
NonZeroUsize::new(5).expect("default endpoint count must be non-zero");
260-
pub const DEFAULT_CONNECTION_TIMEOUT: Duration = Duration::from_secs(2);
310+
pub const DEFAULT_CONNECTION_TIMEOUT: Duration = Duration::from_secs(4);
261311
pub const DEFAULT_MAX_CONSECUTIVE_CONNECTION_ATTEMPT: usize = 3;
262312
pub const DEFAULT_PER_PEER_TRANSACTION_QUEUE_SIZE: usize = 10_000;
263313
pub const DEFAULT_MAX_CONCURRENT_CONNECTIONS: usize = 1024;
@@ -285,6 +335,8 @@ impl Default for TpuSenderConfig {
285335
send_timeout: DEFAULT_TX_SEND_TIMEOUT,
286336
leader_prediction_lookahead: Some(DEFAULT_LEADER_PREDICTION_LOOKAHEAD),
287337
tpu_info_override: Vec::new(),
338+
orphan_connection_ttl: DEFAULT_UNUSED_CONNECTION_TTL,
339+
unsafe_allow_arbitrary_txn_size: false,
288340
}
289341
}
290342
}

0 commit comments

Comments
 (0)