Skip to content

Commit 8723e12

Browse files
feat(pyth-lazer-agent): add HTTP proxy support for WebSocket connections (#3142)
1 parent 80ab24c commit 8723e12

File tree

7 files changed

+185
-14
lines changed

7 files changed

+185
-14
lines changed

Cargo.lock

Lines changed: 2 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

apps/pyth-lazer-agent/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "pyth-lazer-agent"
3-
version = "0.7.1"
3+
version = "0.7.2"
44
edition = "2024"
55
description = "Pyth Lazer Agent"
66
license = "Apache-2.0"
@@ -32,6 +32,7 @@ serde_json = "1.0.140"
3232
soketto = { version = "0.8.1", features = ["http"] }
3333
solana-keypair = "2.2.1"
3434
tokio = { version = "1.44.1", features = ["full"] }
35+
tokio-native-tls = "0.3.1"
3536
tokio-tungstenite = { version = "0.26.2", features = ["native-tls", "url"] }
3637
tokio-util = { version = "0.7.14", features = ["compat"] }
3738
tracing = "0.1.41"

apps/pyth-lazer-agent/README.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,9 @@ authorization_token = "your_token"
5050
listen_address = "0.0.0.0:8910"
5151
publish_interval_duration = "25ms"
5252
enable_update_deduplication = false
53+
# Optional proxy configuration
54+
# proxy_url = "http://proxy.example.com:8080"
55+
# proxy_url = "http://username:[email protected]:8080" # With authentication
5356
```
5457

5558
- `relayers_urls`: The Lazer team will provide these.
@@ -58,3 +61,4 @@ enable_update_deduplication = false
5861
- `listen_address`: The local port the agent will be listening on; can be anything you want.
5962
- `publisher_interval`: The agent will batch and send transaction bundles at this interval. The Lazer team will provide guidance here.
6063
- `enable_update_deduplication`: The agent will deduplicate updates based inside each batch before sending it to Lazer.
64+
- `proxy_url` (optional): HTTP/HTTPS proxy URL for WebSocket connections. Supports Basic authentication via URL credentials (e.g., `http://user:pass@proxy:port`).

apps/pyth-lazer-agent/src/config.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ pub struct Config {
2323
pub enable_update_deduplication: bool,
2424
#[serde(with = "humantime_serde", default = "default_update_deduplication_ttl")]
2525
pub update_deduplication_ttl: Duration,
26+
pub proxy_url: Option<Url>,
2627
}
2728

2829
#[derive(Deserialize, Derivative, Clone, PartialEq)]

apps/pyth-lazer-agent/src/jrpc_handle.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -313,6 +313,7 @@ pub mod tests {
313313
history_service_url: None,
314314
enable_update_deduplication: false,
315315
update_deduplication_ttl: Default::default(),
316+
proxy_url: None,
316317
};
317318

318319
println!("{:?}", get_metadata(config).await.unwrap());

apps/pyth-lazer-agent/src/lazer_publisher.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ impl LazerPublisher {
8080
token: authorization_token.clone(),
8181
receiver: relayer_sender.subscribe(),
8282
is_ready: is_ready.clone(),
83+
proxy_url: config.proxy_url.clone(),
8384
};
8485
tokio::spawn(async move { task.run().await });
8586
}
@@ -301,6 +302,7 @@ mod tests {
301302
history_service_url: None,
302303
enable_update_deduplication: false,
303304
update_deduplication_ttl: Default::default(),
305+
proxy_url: None,
304306
};
305307

306308
let (relayer_sender, mut relayer_receiver) = broadcast::channel(CHANNEL_CAPACITY);

apps/pyth-lazer-agent/src/relayer_session.rs

Lines changed: 173 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
1-
use anyhow::{Result, bail};
1+
use anyhow::{Context, Result, bail};
22
use backoff::ExponentialBackoffBuilder;
33
use backoff::backoff::Backoff;
4+
use base64::Engine;
45
use futures_util::stream::{SplitSink, SplitStream};
56
use futures_util::{SinkExt, StreamExt};
67
use http::HeaderValue;
@@ -9,32 +10,194 @@ use pyth_lazer_publisher_sdk::transaction::SignedLazerTransaction;
910
use std::sync::Arc;
1011
use std::sync::atomic::{AtomicBool, Ordering};
1112
use std::time::{Duration, Instant};
13+
use tokio::io::{AsyncReadExt, AsyncWriteExt};
1214
use tokio::net::TcpStream;
1315
use tokio::select;
1416
use tokio::sync::broadcast;
1517
use tokio_tungstenite::tungstenite::client::IntoClientRequest;
1618
use tokio_tungstenite::{
17-
MaybeTlsStream, WebSocketStream, connect_async_with_config,
19+
MaybeTlsStream, WebSocketStream, client_async, connect_async_with_config,
1820
tungstenite::Message as TungsteniteMessage,
1921
};
2022
use url::Url;
2123

2224
type RelayerWsSender = SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, TungsteniteMessage>;
2325
type RelayerWsReceiver = SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>;
2426

25-
async fn connect_to_relayer(url: Url, token: &str) -> Result<(RelayerWsSender, RelayerWsReceiver)> {
26-
tracing::info!("connecting to the relayer at {}", url);
27-
let mut req = url.clone().into_client_request()?;
27+
async fn connect_through_proxy(
28+
proxy_url: &Url,
29+
target_url: &Url,
30+
token: &str,
31+
) -> Result<(RelayerWsSender, RelayerWsReceiver)> {
32+
tracing::info!(
33+
"connecting to the relayer at {} via proxy {}",
34+
target_url,
35+
proxy_url
36+
);
37+
38+
let proxy_host = proxy_url.host_str().context("Proxy URL must have a host")?;
39+
let proxy_port = proxy_url
40+
.port()
41+
.unwrap_or(if proxy_url.scheme() == "https" {
42+
443
43+
} else {
44+
80
45+
});
46+
47+
let proxy_addr = format!("{proxy_host}:{proxy_port}");
48+
let mut stream = TcpStream::connect(&proxy_addr)
49+
.await
50+
.context(format!("Failed to connect to proxy at {proxy_addr}"))?;
51+
52+
let target_host = target_url
53+
.host_str()
54+
.context("Target URL must have a host")?;
55+
let target_port = target_url
56+
.port()
57+
.unwrap_or(if target_url.scheme() == "wss" {
58+
443
59+
} else {
60+
80
61+
});
62+
63+
let target_authority = format!("{target_host}:{target_port}");
64+
let mut request_parts = vec![format!("CONNECT {target_authority} HTTP/1.1")];
65+
request_parts.push(format!("Host: {target_authority}"));
66+
67+
let username = proxy_url.username();
68+
if !username.is_empty() {
69+
let password = proxy_url.password().unwrap_or("");
70+
let credentials = format!("{username}:{password}");
71+
let encoded = base64::engine::general_purpose::STANDARD.encode(credentials.as_bytes());
72+
request_parts.push(format!("Proxy-Authorization: Basic {encoded}"));
73+
}
74+
75+
request_parts.push("Proxy-Connection: Keep-Alive".to_string());
76+
request_parts.push(String::new()); // Empty line to end headers
77+
request_parts.push(String::new()); // CRLF to end request
78+
79+
let connect_request = request_parts.join("\r\n");
80+
81+
stream
82+
.write_all(connect_request.as_bytes())
83+
.await
84+
.context(format!(
85+
"Failed to send CONNECT request to proxy at {proxy_url}"
86+
))?;
87+
88+
let mut response_buffer = Vec::new();
89+
let mut temp_buf = [0u8; 1024];
90+
let mut headers_complete = false;
91+
92+
while !headers_complete {
93+
let n = stream.read(&mut temp_buf).await.context(format!(
94+
"Failed to read CONNECT response from proxy at {proxy_url}"
95+
))?;
96+
97+
if n == 0 {
98+
bail!("Proxy closed connection before sending complete response");
99+
}
100+
101+
response_buffer.extend_from_slice(temp_buf.get(..n).context("Invalid buffer slice")?);
102+
103+
if response_buffer.windows(4).any(|w| w == b"\r\n\r\n") {
104+
headers_complete = true;
105+
}
106+
}
107+
108+
let response_str = String::from_utf8_lossy(&response_buffer);
109+
110+
let status_line = response_str
111+
.lines()
112+
.next()
113+
.context("Empty response from proxy")?;
114+
115+
let parts: Vec<&str> = status_line.split_whitespace().collect();
116+
if parts.len() < 2 {
117+
bail!(
118+
"Invalid HTTP response from proxy at {}: {}",
119+
proxy_url,
120+
status_line
121+
);
122+
}
123+
124+
let status_code = parts
125+
.get(1)
126+
.context("Missing status code in proxy response")?
127+
.parse::<u16>()
128+
.context("Invalid status code in proxy response")?;
129+
130+
if status_code != 200 {
131+
let status_text = parts
132+
.get(2..)
133+
.map(|s| s.join(" "))
134+
.unwrap_or_else(|| "Unknown".to_string());
135+
bail!(
136+
"Proxy CONNECT failed with status {} {}: {}",
137+
status_code,
138+
status_text,
139+
status_line
140+
);
141+
}
142+
143+
tracing::info!("Successfully connected through proxy at {}", proxy_url);
144+
145+
let mut req = target_url.clone().into_client_request()?;
28146
let headers = req.headers_mut();
29147
headers.insert(
30148
"Authorization",
31149
HeaderValue::from_str(&format!("Bearer {token}"))?,
32150
);
33-
let (ws_stream, _) = connect_async_with_config(req, None, true).await?;
34-
tracing::info!("connected to the relayer at {}", url);
151+
152+
let maybe_tls_stream = if target_url.scheme() == "wss" {
153+
let tls_connector = tokio_native_tls::native_tls::TlsConnector::builder()
154+
.build()
155+
.context("Failed to build TLS connector")?;
156+
let tokio_connector = tokio_native_tls::TlsConnector::from(tls_connector);
157+
let domain = target_host;
158+
let tls_stream = tokio_connector
159+
.connect(domain, stream)
160+
.await
161+
.context("Failed to establish TLS connection")?;
162+
163+
MaybeTlsStream::NativeTls(tls_stream)
164+
} else {
165+
MaybeTlsStream::Plain(stream)
166+
};
167+
168+
let (ws_stream, _) = client_async(req, maybe_tls_stream)
169+
.await
170+
.context("Failed to complete WebSocket handshake")?;
171+
172+
tracing::info!(
173+
"WebSocket connection established to relayer at {} via proxy {}",
174+
target_url,
175+
proxy_url
176+
);
35177
Ok(ws_stream.split())
36178
}
37179

180+
async fn connect_to_relayer(
181+
url: Url,
182+
token: &str,
183+
proxy_url: Option<&Url>,
184+
) -> Result<(RelayerWsSender, RelayerWsReceiver)> {
185+
if let Some(proxy) = proxy_url {
186+
connect_through_proxy(proxy, &url, token).await
187+
} else {
188+
tracing::info!("connecting to the relayer at {}", url);
189+
let mut req = url.clone().into_client_request()?;
190+
let headers = req.headers_mut();
191+
headers.insert(
192+
"Authorization",
193+
HeaderValue::from_str(&format!("Bearer {token}"))?,
194+
);
195+
let (ws_stream, _) = connect_async_with_config(req, None, true).await?;
196+
tracing::info!("connected to the relayer at {}", url);
197+
Ok(ws_stream.split())
198+
}
199+
}
200+
38201
struct RelayerWsSession {
39202
ws_sender: RelayerWsSender,
40203
}
@@ -58,11 +221,11 @@ impl RelayerWsSession {
58221
}
59222

60223
pub struct RelayerSessionTask {
61-
// connection state
62224
pub url: Url,
63225
pub token: String,
64226
pub receiver: broadcast::Receiver<SignedLazerTransaction>,
65227
pub is_ready: Arc<AtomicBool>,
228+
pub proxy_url: Option<Url>,
66229
}
67230

68231
impl RelayerSessionTask {
@@ -108,10 +271,8 @@ impl RelayerSessionTask {
108271
}
109272

110273
pub async fn run_relayer_connection(&mut self) -> Result<()> {
111-
// Establish relayer connection
112-
// Relayer will drop the connection if no data received in 5s
113274
let (relayer_ws_sender, mut relayer_ws_receiver) =
114-
connect_to_relayer(self.url.clone(), &self.token).await?;
275+
connect_to_relayer(self.url.clone(), &self.token, self.proxy_url.as_ref()).await?;
115276
let mut relayer_ws_session = RelayerWsSession {
116277
ws_sender: relayer_ws_sender,
117278
};
@@ -236,11 +397,11 @@ mod tests {
236397
let (relayer_sender, relayer_receiver) = broadcast::channel(RELAYER_CHANNEL_CAPACITY);
237398

238399
let mut relayer_session_task = RelayerSessionTask {
239-
// connection state
240400
url: Url::parse("ws://127.0.0.1:12346").unwrap(),
241401
token: "token1".to_string(),
242402
receiver: relayer_receiver,
243403
is_ready: Arc::new(AtomicBool::new(false)),
404+
proxy_url: None,
244405
};
245406
tokio::spawn(async move { relayer_session_task.run().await });
246407
tokio::time::sleep(std::time::Duration::from_millis(1000)).await;

0 commit comments

Comments
 (0)