Skip to content

Commit c341be2

Browse files
authored
Merge pull request #113 from utkarshgupta137/rsky-relay/all-pds
rsky-relay: connect to everything
2 parents fae43fc + ce5c4f8 commit c341be2

File tree

13 files changed

+311
-132
lines changed

13 files changed

+311
-132
lines changed

rsky-relay/Cargo.toml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,15 @@ ciborium = "0.2"
1111
cid = { version = "0.10", features = ["serde-codec"] }
1212
clap = { version = "4", features = ["derive", "env"] }
1313
color-eyre = "0.6"
14+
exponential-backoff = "2"
1415
file-rotate = "0.8"
1516
futures = { version = "0.3", default-features = false, features = ["std"] }
1617
hashbrown = "0.15"
17-
httparse = "1.10"
18+
http = "1"
19+
httparse = "1"
1820
ipld-core = "0.4"
1921
k256 = "0.13"
22+
libc = "0.2"
2023
lru = "0.14"
2124
magnetic = "2"
2225
mimalloc = "0.1"
@@ -36,6 +39,7 @@ serde_json = { version = "1", features = ["raw_value"] }
3639
sha2 = "0.10"
3740
signal-hook = { version = "0.3", features = ["extended-siginfo"] }
3841
sled = { git = "https://github.com/spacejam/sled.git", rev = "005c023" }
42+
socket2 = "0.5"
3943
thingbuf = "0.1"
4044
thiserror = "2"
4145
tokio = { version = "1", features = ["rt-multi-thread", "macros", "time"] }

rsky-relay/src/crawler/client.rs

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
use std::io;
2+
use std::net::{TcpStream, ToSocketAddrs};
3+
4+
use http::Uri;
5+
use http::request::Parts;
6+
use socket2::{Domain, Protocol, Socket, Type};
7+
use tungstenite::client::{IntoClientRequest, uri_mode};
8+
use tungstenite::client_tls_with_config;
9+
use tungstenite::error::{Error, Result, UrlError};
10+
use tungstenite::handshake::client::Request;
11+
use tungstenite::protocol::WebSocketConfig;
12+
use tungstenite::stream::{Mode, NoDelay};
13+
14+
use crate::crawler::types::{DecomposeError, HandshakeResult};
15+
16+
/// Connect to the given WebSocket in blocking mode.
17+
///
18+
/// The URL may be either ws:// or wss://.
19+
/// To support wss:// URLs, feature `native-tls` or `rustls-tls` must be turned on.
20+
///
21+
/// This function "just works" for those who wants a simple blocking solution
22+
/// similar to `std::net::TcpStream`. If you want a non-blocking or other
23+
/// custom stream, call `client` instead.
24+
///
25+
/// This function uses `native_tls` or `rustls` to do TLS depending on the feature flags enabled. If
26+
/// you want to use other TLS libraries, use `client` instead. There is no need to enable any of
27+
/// the `*-tls` features if you don't call `connect` since it's the only function that uses them.
28+
pub fn connect<Req: IntoClientRequest>(request: Req) -> HandshakeResult {
29+
connect_with_config(request, None, 3)
30+
}
31+
32+
// Ref: https://github.com/snapview/tungstenite-rs/blob/master/src/client.rs
33+
#[expect(
34+
clippy::expect_used,
35+
clippy::ignored_unit_patterns,
36+
clippy::redundant_clone,
37+
clippy::redundant_else
38+
)]
39+
pub fn connect_with_config<Req: IntoClientRequest>(
40+
request: Req, config: Option<WebSocketConfig>, max_redirects: u8,
41+
) -> HandshakeResult {
42+
fn try_client_handshake(request: Request, config: Option<WebSocketConfig>) -> HandshakeResult {
43+
let uri = request.uri();
44+
let mode = uri_mode(uri)?;
45+
46+
let host = request.uri().host().ok_or(Error::Url(UrlError::NoHostName))?;
47+
let host = if host.starts_with('[') { &host[1..host.len() - 1] } else { host };
48+
let port = uri.port_u16().unwrap_or(match mode {
49+
Mode::Plain => 80,
50+
Mode::Tls => 443,
51+
});
52+
let mut stream = connect_to_some((host, port), request.uri())?;
53+
NoDelay::set_nodelay(&mut stream, true)?;
54+
55+
client_tls_with_config(request, stream, config, None).decompose()
56+
}
57+
58+
fn create_request(parts: &Parts, uri: &Uri) -> Request {
59+
let mut builder =
60+
Request::builder().uri(uri.clone()).method(parts.method.clone()).version(parts.version);
61+
*builder.headers_mut().expect("Failed to create `Request`") = parts.headers.clone();
62+
builder.body(()).expect("Failed to create `Request`")
63+
}
64+
65+
let (parts, _) = request.into_client_request()?.into_parts();
66+
let mut uri = parts.uri.clone();
67+
68+
for attempt in 0..=max_redirects {
69+
let request = create_request(&parts, &uri);
70+
71+
match try_client_handshake(request, config) {
72+
Err(Error::Http(res)) if res.status().is_redirection() && attempt < max_redirects => {
73+
if let Some(location) = res.headers().get("Location") {
74+
uri = location.to_str()?.parse::<Uri>()?;
75+
// debug!("Redirecting to {uri:?}");
76+
continue;
77+
} else {
78+
// warn!("No `Location` found in redirect");
79+
return Err(Error::Http(res));
80+
}
81+
}
82+
other => return other,
83+
}
84+
}
85+
86+
unreachable!("Bug in a redirect handling logic")
87+
}
88+
89+
fn connect_to_some(addrs: impl ToSocketAddrs, uri: &Uri) -> Result<TcpStream> {
90+
fn is_blocking_error(error: &io::Error) -> bool {
91+
matches!(
92+
error.kind(),
93+
io::ErrorKind::Interrupted | io::ErrorKind::NotConnected | io::ErrorKind::WouldBlock
94+
) || matches!(error.raw_os_error(), Some(libc::EINPROGRESS))
95+
}
96+
97+
for addr in addrs.to_socket_addrs()? {
98+
// debug!("Trying to contact {uri} at {addr}...");
99+
let socket = Socket::new(Domain::IPV4, Type::STREAM, Some(Protocol::TCP))?;
100+
socket.set_nonblocking(true)?;
101+
match socket.connect(&addr.into()) {
102+
Ok(()) => {}
103+
Err(e) if is_blocking_error(&e) => {}
104+
Err(_) => continue,
105+
}
106+
return Ok(socket.into());
107+
}
108+
Err(Error::Url(UrlError::UnableToConnect(uri.to_string())))
109+
}

rsky-relay/src/crawler/connection.rs

Lines changed: 10 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,14 @@
11
use std::io;
2-
use std::net::TcpStream;
32
use std::os::fd::{AsFd, AsRawFd, BorrowedFd, RawFd};
43

54
use thingbuf::mpsc;
65
use thiserror::Error;
6+
use tungstenite::Message;
77
use tungstenite::stream::MaybeTlsStream;
8-
use tungstenite::{Message, WebSocket};
98
use url::Url;
109

10+
use crate::crawler::client;
11+
use crate::crawler::types::{HandshakeResult, WebSocketClient};
1112
use crate::types::{Cursor, MessageSender};
1213

1314
#[derive(Debug, Error)]
@@ -22,7 +23,7 @@ pub enum ConnectionError {
2223

2324
pub struct Connection {
2425
pub(crate) hostname: String,
25-
client: WebSocket<MaybeTlsStream<TcpStream>>,
26+
client: WebSocketClient,
2627
message_tx: MessageSender,
2728
}
2829

@@ -49,26 +50,18 @@ impl AsRawFd for Connection {
4950
}
5051

5152
impl Connection {
52-
pub fn connect(
53-
hostname: String, cursor: Option<Cursor>, message_tx: MessageSender,
54-
) -> Result<Self, ConnectionError> {
53+
pub const fn new(hostname: String, client: WebSocketClient, message_tx: MessageSender) -> Self {
54+
Self { hostname, client, message_tx }
55+
}
56+
57+
pub fn connect(hostname: &str, cursor: Option<Cursor>) -> HandshakeResult {
5558
#[expect(clippy::unwrap_used)]
5659
let mut url =
5760
Url::parse(&format!("wss://{hostname}/xrpc/com.atproto.sync.subscribeRepos")).unwrap();
5861
if let Some(cursor) = cursor {
5962
url.query_pairs_mut().append_pair("cursor", &cursor.to_string());
6063
}
61-
let (client, _) = tungstenite::connect(url)?;
62-
match client.get_ref() {
63-
MaybeTlsStream::Rustls(stream) => {
64-
stream.get_ref().set_nonblocking(true)?;
65-
}
66-
MaybeTlsStream::Plain(stream) => {
67-
stream.set_nonblocking(true)?;
68-
}
69-
_ => {}
70-
}
71-
Ok(Self { hostname, client, message_tx })
64+
client::connect(url)
7265
}
7366

7467
pub fn close(&mut self) -> Result<(), ConnectionError> {

rsky-relay/src/crawler/manager.rs

Lines changed: 61 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,22 @@
1+
use std::collections::BTreeMap;
12
use std::sync::atomic::Ordering;
2-
use std::time::Duration;
3+
use std::time::{Duration, Instant};
34
use std::{io, thread};
45

6+
use exponential_backoff::{Backoff, IntoIter as BackoffIter};
7+
use hashbrown::HashMap;
58
use magnetic::Consumer;
69
use magnetic::buffer::dynamic::DynamicBufferP2;
7-
use rusqlite::{Connection, OpenFlags, OptionalExtension};
10+
use rusqlite::{Connection, ErrorCode, OpenFlags, OptionalExtension};
811
use thiserror::Error;
912

1013
use crate::SHUTDOWN;
1114
use crate::crawler::RequestCrawl;
1215
use crate::crawler::types::{Command, CommandSender, RequestCrawlReceiver, Status, StatusReceiver};
1316
use crate::crawler::worker::{Worker, WorkerError};
14-
use crate::types::MessageSender;
17+
use crate::types::{Cursor, MessageSender};
1518

16-
const CAPACITY: usize = 1024;
19+
const CAPACITY: usize = 1 << 12;
1720
const SLEEP: Duration = Duration::from_millis(10);
1821

1922
#[derive(Debug, Error)]
@@ -45,6 +48,8 @@ struct WorkerHandle {
4548
pub struct Manager {
4649
workers: Box<[WorkerHandle]>,
4750
next_id: usize,
51+
hosts: HashMap<String, [BackoffIter; 2]>,
52+
retries: BTreeMap<Instant, (usize, String)>,
4853
conn: Connection,
4954
request_crawl_rx: RequestCrawlReceiver,
5055
status_rx: StatusReceiver,
@@ -76,38 +81,24 @@ impl Manager {
7681
Ok(Self {
7782
workers: workers.into_boxed_slice(),
7883
next_id: 0,
84+
hosts: HashMap::new(),
85+
retries: BTreeMap::new(),
7986
conn,
8087
request_crawl_rx,
8188
status_rx,
8289
})
8390
}
8491

8592
pub fn run(mut self) -> Result<(), ManagerError> {
86-
let mut requests = Vec::new();
87-
{
88-
let mut stmt = self.conn.prepare_cached("SELECT host, cursor FROM hosts")?;
89-
let mut rows = stmt.query(())?;
90-
while let Some(row) = rows.next()? {
91-
let hostname = row.get_unwrap("host");
92-
let cursor: u64 = row.get_unwrap("cursor");
93-
requests.push(RequestCrawl { hostname, cursor: Some(cursor.into()) });
94-
}
95-
}
96-
for request in requests {
97-
self.handle_connect(request)?;
98-
}
9993
while self.update()? {
10094
thread::sleep(SLEEP);
10195
}
10296
tracing::info!("shutting down crawler");
103-
SHUTDOWN.store(true, Ordering::Relaxed);
10497
self.shutdown()
10598
}
10699

107-
pub fn shutdown(mut self) -> Result<(), ManagerError> {
108-
for worker in &mut self.workers {
109-
worker.command_tx.push(Command::Shutdown)?;
110-
}
100+
pub fn shutdown(self) -> Result<(), ManagerError> {
101+
SHUTDOWN.store(true, Ordering::Relaxed);
111102
for (id, worker) in self.workers.into_iter().enumerate() {
112103
if let Err(err) = worker.thread_handle.join().map_err(|_| ManagerError::Join)? {
113104
tracing::warn!(%id, %err, "crawler worker error");
@@ -121,51 +112,74 @@ impl Manager {
121112
return Ok(false);
122113
}
123114

124-
if let Ok(status) = self.status_rx.try_pop() {
125-
if !self.handle_status(status)? {
126-
return Ok(false);
115+
if let Some(entry) = self.retries.first_entry() {
116+
if *entry.key() < Instant::now() {
117+
let (id, hostname) = entry.remove();
118+
let prev = self.next_id;
119+
self.next_id = id;
120+
self.handle_connect(RequestCrawl { hostname, cursor: None })?;
121+
self.next_id = prev;
127122
}
128123
}
129124

125+
if let Ok(status) = self.status_rx.try_pop() {
126+
self.handle_status(status);
127+
}
128+
130129
if let Ok(request_crawl) = self.request_crawl_rx.pop() {
131-
let exists = {
132-
let mut stmt = self.conn.prepare_cached("SELECT * FROM hosts WHERE host = ?1")?;
133-
stmt.exists((&request_crawl.hostname,))?
134-
};
135-
if !exists {
136-
thread::sleep(SLEEP);
130+
if !self.hosts.contains_key(&request_crawl.hostname) {
137131
self.handle_connect(request_crawl)?;
138132
}
139133
}
140134

141135
Ok(true)
142136
}
143137

144-
fn handle_status(&mut self, status: Status) -> Result<bool, ManagerError> {
138+
fn handle_status(&mut self, status: Status) {
145139
match status {
146-
Status::Disconnected(id, hostname) => {
147-
// TODO: add proper backoff
148-
thread::sleep(SLEEP * 1000);
149-
let prev = self.next_id;
150-
self.next_id = id;
151-
self.handle_connect(RequestCrawl { hostname, cursor: None })?;
152-
self.next_id = prev;
140+
Status::Disconnected { worker_id: id, hostname, connected } => {
141+
#[expect(clippy::unwrap_used)]
142+
let backoff =
143+
self.hosts.get_mut(&hostname).unwrap().get_mut(usize::from(connected)).unwrap();
144+
let Some(Some(delay)) = backoff.next() else { unreachable!() };
145+
let next = Instant::now() + delay;
146+
assert!(self.retries.insert(next, (id, hostname)).is_none());
153147
}
154148
}
155-
Ok(true)
156149
}
157150

158151
fn handle_connect(&mut self, mut request_crawl: RequestCrawl) -> Result<(), ManagerError> {
159-
let cursor: Option<u64> = {
160-
let mut stmt = self.conn.prepare_cached("SELECT * FROM hosts WHERE host = ?1")?;
161-
stmt.query_row((&request_crawl.hostname,), |row| Ok(row.get_unwrap("cursor")))
162-
.optional()?
163-
};
164-
if let Some(cursor) = cursor {
165-
request_crawl.cursor = Some(cursor.into());
152+
self.hosts.entry(request_crawl.hostname.clone()).or_insert_with(|| {
153+
let backoff_connect =
154+
Backoff::new(u32::MAX, Duration::from_secs(60), Duration::from_secs(60 * 60 * 6));
155+
let backoff_reconnect =
156+
Backoff::new(u32::MAX, Duration::from_secs(1), Duration::from_secs(60 * 60));
157+
[backoff_connect.iter(), backoff_reconnect.iter()]
158+
});
159+
if request_crawl.cursor.is_none() {
160+
request_crawl.cursor = loop {
161+
match self.get_cursor(&request_crawl.hostname) {
162+
Ok(cursor) => break cursor,
163+
Err(ManagerError::Sqlite(err))
164+
if err.sqlite_error_code() == Some(ErrorCode::DatabaseLocked) =>
165+
{
166+
continue;
167+
}
168+
Err(err) => Err(err)?,
169+
}
170+
};
166171
}
167172
self.workers[self.next_id].command_tx.push(Command::Connect(request_crawl))?;
168173
self.next_id = (self.next_id + 1) % self.workers.len();
174+
thread::sleep(SLEEP);
169175
Ok(())
170176
}
177+
178+
fn get_cursor(&self, host: &str) -> Result<Option<Cursor>, ManagerError> {
179+
let mut stmt = self.conn.prepare_cached("SELECT * FROM hosts WHERE host = ?1")?;
180+
Ok(stmt
181+
.query_row((&host,), |row| Ok(row.get_unwrap::<_, u64>("cursor")))
182+
.optional()?
183+
.map(Into::into))
184+
}
171185
}

rsky-relay/src/crawler/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
mod client;
12
mod connection;
23
mod manager;
34
mod types;

0 commit comments

Comments
 (0)