Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion rsky-relay/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,15 @@ ciborium = "0.2"
cid = { version = "0.10", features = ["serde-codec"] }
clap = { version = "4", features = ["derive", "env"] }
color-eyre = "0.6"
exponential-backoff = "2"
file-rotate = "0.8"
futures = { version = "0.3", default-features = false, features = ["std"] }
hashbrown = "0.15"
httparse = "1.10"
http = "1"
httparse = "1"
ipld-core = "0.4"
k256 = "0.13"
libc = "0.2"
lru = "0.14"
magnetic = "2"
mimalloc = "0.1"
Expand All @@ -36,6 +39,7 @@ serde_json = { version = "1", features = ["raw_value"] }
sha2 = "0.10"
signal-hook = { version = "0.3", features = ["extended-siginfo"] }
sled = { git = "https://github.com/spacejam/sled.git", rev = "005c023" }
socket2 = "0.5"
thingbuf = "0.1"
thiserror = "2"
tokio = { version = "1", features = ["rt-multi-thread", "macros", "time"] }
Expand Down
109 changes: 109 additions & 0 deletions rsky-relay/src/crawler/client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
use std::io;
use std::net::{TcpStream, ToSocketAddrs};

use http::Uri;
use http::request::Parts;
use socket2::{Domain, Protocol, Socket, Type};
use tungstenite::client::{IntoClientRequest, uri_mode};
use tungstenite::client_tls_with_config;
use tungstenite::error::{Error, Result, UrlError};
use tungstenite::handshake::client::Request;
use tungstenite::protocol::WebSocketConfig;
use tungstenite::stream::{Mode, NoDelay};

use crate::crawler::types::{DecomposeError, HandshakeResult};

/// Connect to the given WebSocket in blocking mode.
///
/// The URL may be either ws:// or wss://.
/// To support wss:// URLs, feature `native-tls` or `rustls-tls` must be turned on.
///
/// This function "just works" for those who wants a simple blocking solution
/// similar to `std::net::TcpStream`. If you want a non-blocking or other
/// custom stream, call `client` instead.
///
/// This function uses `native_tls` or `rustls` to do TLS depending on the feature flags enabled. If
/// you want to use other TLS libraries, use `client` instead. There is no need to enable any of
/// the `*-tls` features if you don't call `connect` since it's the only function that uses them.
pub fn connect<Req: IntoClientRequest>(request: Req) -> HandshakeResult {
connect_with_config(request, None, 3)
}

// Ref: https://github.com/snapview/tungstenite-rs/blob/master/src/client.rs
#[expect(
clippy::expect_used,
clippy::ignored_unit_patterns,
clippy::redundant_clone,
clippy::redundant_else
)]
pub fn connect_with_config<Req: IntoClientRequest>(
request: Req, config: Option<WebSocketConfig>, max_redirects: u8,
) -> HandshakeResult {
fn try_client_handshake(request: Request, config: Option<WebSocketConfig>) -> HandshakeResult {
let uri = request.uri();
let mode = uri_mode(uri)?;

let host = request.uri().host().ok_or(Error::Url(UrlError::NoHostName))?;
let host = if host.starts_with('[') { &host[1..host.len() - 1] } else { host };
let port = uri.port_u16().unwrap_or(match mode {
Mode::Plain => 80,
Mode::Tls => 443,
});
let mut stream = connect_to_some((host, port), request.uri())?;
NoDelay::set_nodelay(&mut stream, true)?;

client_tls_with_config(request, stream, config, None).decompose()
}

fn create_request(parts: &Parts, uri: &Uri) -> Request {
let mut builder =
Request::builder().uri(uri.clone()).method(parts.method.clone()).version(parts.version);
*builder.headers_mut().expect("Failed to create `Request`") = parts.headers.clone();
builder.body(()).expect("Failed to create `Request`")
}

let (parts, _) = request.into_client_request()?.into_parts();
let mut uri = parts.uri.clone();

for attempt in 0..=max_redirects {
let request = create_request(&parts, &uri);

match try_client_handshake(request, config) {
Err(Error::Http(res)) if res.status().is_redirection() && attempt < max_redirects => {
if let Some(location) = res.headers().get("Location") {
uri = location.to_str()?.parse::<Uri>()?;
// debug!("Redirecting to {uri:?}");
continue;
} else {
// warn!("No `Location` found in redirect");
return Err(Error::Http(res));
}
}
other => return other,
}
}

unreachable!("Bug in a redirect handling logic")
}

fn connect_to_some(addrs: impl ToSocketAddrs, uri: &Uri) -> Result<TcpStream> {
fn is_blocking_error(error: &io::Error) -> bool {
matches!(
error.kind(),
io::ErrorKind::Interrupted | io::ErrorKind::NotConnected | io::ErrorKind::WouldBlock
) || matches!(error.raw_os_error(), Some(libc::EINPROGRESS))
}

for addr in addrs.to_socket_addrs()? {
// debug!("Trying to contact {uri} at {addr}...");
let socket = Socket::new(Domain::IPV4, Type::STREAM, Some(Protocol::TCP))?;
socket.set_nonblocking(true)?;
match socket.connect(&addr.into()) {
Ok(()) => {}
Err(e) if is_blocking_error(&e) => {}
Err(_) => continue,
}
return Ok(socket.into());
}
Err(Error::Url(UrlError::UnableToConnect(uri.to_string())))
}
27 changes: 10 additions & 17 deletions rsky-relay/src/crawler/connection.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
use std::io;
use std::net::TcpStream;
use std::os::fd::{AsFd, AsRawFd, BorrowedFd, RawFd};

use thingbuf::mpsc;
use thiserror::Error;
use tungstenite::Message;
use tungstenite::stream::MaybeTlsStream;
use tungstenite::{Message, WebSocket};
use url::Url;

use crate::crawler::client;
use crate::crawler::types::{HandshakeResult, WebSocketClient};
use crate::types::{Cursor, MessageSender};

#[derive(Debug, Error)]
Expand All @@ -22,7 +23,7 @@ pub enum ConnectionError {

pub struct Connection {
pub(crate) hostname: String,
client: WebSocket<MaybeTlsStream<TcpStream>>,
client: WebSocketClient,
message_tx: MessageSender,
}

Expand All @@ -49,26 +50,18 @@ impl AsRawFd for Connection {
}

impl Connection {
pub fn connect(
hostname: String, cursor: Option<Cursor>, message_tx: MessageSender,
) -> Result<Self, ConnectionError> {
pub const fn new(hostname: String, client: WebSocketClient, message_tx: MessageSender) -> Self {
Self { hostname, client, message_tx }
}

pub fn connect(hostname: &str, cursor: Option<Cursor>) -> HandshakeResult {
#[expect(clippy::unwrap_used)]
let mut url =
Url::parse(&format!("wss://{hostname}/xrpc/com.atproto.sync.subscribeRepos")).unwrap();
if let Some(cursor) = cursor {
url.query_pairs_mut().append_pair("cursor", &cursor.to_string());
}
let (client, _) = tungstenite::connect(url)?;
match client.get_ref() {
MaybeTlsStream::Rustls(stream) => {
stream.get_ref().set_nonblocking(true)?;
}
MaybeTlsStream::Plain(stream) => {
stream.set_nonblocking(true)?;
}
_ => {}
}
Ok(Self { hostname, client, message_tx })
client::connect(url)
}

pub fn close(&mut self) -> Result<(), ConnectionError> {
Expand Down
108 changes: 61 additions & 47 deletions rsky-relay/src/crawler/manager.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,22 @@
use std::collections::BTreeMap;
use std::sync::atomic::Ordering;
use std::time::Duration;
use std::time::{Duration, Instant};
use std::{io, thread};

use exponential_backoff::{Backoff, IntoIter as BackoffIter};
use hashbrown::HashMap;
use magnetic::Consumer;
use magnetic::buffer::dynamic::DynamicBufferP2;
use rusqlite::{Connection, OpenFlags, OptionalExtension};
use rusqlite::{Connection, ErrorCode, OpenFlags, OptionalExtension};
use thiserror::Error;

use crate::SHUTDOWN;
use crate::crawler::RequestCrawl;
use crate::crawler::types::{Command, CommandSender, RequestCrawlReceiver, Status, StatusReceiver};
use crate::crawler::worker::{Worker, WorkerError};
use crate::types::MessageSender;
use crate::types::{Cursor, MessageSender};

const CAPACITY: usize = 1024;
const CAPACITY: usize = 1 << 12;
const SLEEP: Duration = Duration::from_millis(10);

#[derive(Debug, Error)]
Expand Down Expand Up @@ -45,6 +48,8 @@ struct WorkerHandle {
pub struct Manager {
workers: Box<[WorkerHandle]>,
next_id: usize,
hosts: HashMap<String, [BackoffIter; 2]>,
retries: BTreeMap<Instant, (usize, String)>,
conn: Connection,
request_crawl_rx: RequestCrawlReceiver,
status_rx: StatusReceiver,
Expand Down Expand Up @@ -76,38 +81,24 @@ impl Manager {
Ok(Self {
workers: workers.into_boxed_slice(),
next_id: 0,
hosts: HashMap::new(),
retries: BTreeMap::new(),
conn,
request_crawl_rx,
status_rx,
})
}

pub fn run(mut self) -> Result<(), ManagerError> {
let mut requests = Vec::new();
{
let mut stmt = self.conn.prepare_cached("SELECT host, cursor FROM hosts")?;
let mut rows = stmt.query(())?;
while let Some(row) = rows.next()? {
let hostname = row.get_unwrap("host");
let cursor: u64 = row.get_unwrap("cursor");
requests.push(RequestCrawl { hostname, cursor: Some(cursor.into()) });
}
}
for request in requests {
self.handle_connect(request)?;
}
while self.update()? {
thread::sleep(SLEEP);
}
tracing::info!("shutting down crawler");
SHUTDOWN.store(true, Ordering::Relaxed);
self.shutdown()
}

pub fn shutdown(mut self) -> Result<(), ManagerError> {
for worker in &mut self.workers {
worker.command_tx.push(Command::Shutdown)?;
}
pub fn shutdown(self) -> Result<(), ManagerError> {
SHUTDOWN.store(true, Ordering::Relaxed);
for (id, worker) in self.workers.into_iter().enumerate() {
if let Err(err) = worker.thread_handle.join().map_err(|_| ManagerError::Join)? {
tracing::warn!(%id, %err, "crawler worker error");
Expand All @@ -121,51 +112,74 @@ impl Manager {
return Ok(false);
}

if let Ok(status) = self.status_rx.try_pop() {
if !self.handle_status(status)? {
return Ok(false);
if let Some(entry) = self.retries.first_entry() {
if *entry.key() < Instant::now() {
let (id, hostname) = entry.remove();
let prev = self.next_id;
self.next_id = id;
self.handle_connect(RequestCrawl { hostname, cursor: None })?;
self.next_id = prev;
}
}

if let Ok(status) = self.status_rx.try_pop() {
self.handle_status(status);
}

if let Ok(request_crawl) = self.request_crawl_rx.pop() {
let exists = {
let mut stmt = self.conn.prepare_cached("SELECT * FROM hosts WHERE host = ?1")?;
stmt.exists((&request_crawl.hostname,))?
};
if !exists {
thread::sleep(SLEEP);
if !self.hosts.contains_key(&request_crawl.hostname) {
self.handle_connect(request_crawl)?;
}
}

Ok(true)
}

fn handle_status(&mut self, status: Status) -> Result<bool, ManagerError> {
fn handle_status(&mut self, status: Status) {
match status {
Status::Disconnected(id, hostname) => {
// TODO: add proper backoff
thread::sleep(SLEEP * 1000);
let prev = self.next_id;
self.next_id = id;
self.handle_connect(RequestCrawl { hostname, cursor: None })?;
self.next_id = prev;
Status::Disconnected { worker_id: id, hostname, connected } => {
#[expect(clippy::unwrap_used)]
let backoff =
self.hosts.get_mut(&hostname).unwrap().get_mut(usize::from(connected)).unwrap();
let Some(Some(delay)) = backoff.next() else { unreachable!() };
let next = Instant::now() + delay;
assert!(self.retries.insert(next, (id, hostname)).is_none());
}
}
Ok(true)
}

fn handle_connect(&mut self, mut request_crawl: RequestCrawl) -> Result<(), ManagerError> {
let cursor: Option<u64> = {
let mut stmt = self.conn.prepare_cached("SELECT * FROM hosts WHERE host = ?1")?;
stmt.query_row((&request_crawl.hostname,), |row| Ok(row.get_unwrap("cursor")))
.optional()?
};
if let Some(cursor) = cursor {
request_crawl.cursor = Some(cursor.into());
self.hosts.entry(request_crawl.hostname.clone()).or_insert_with(|| {
let backoff_connect =
Backoff::new(u32::MAX, Duration::from_secs(60), Duration::from_secs(60 * 60 * 6));
let backoff_reconnect =
Backoff::new(u32::MAX, Duration::from_secs(1), Duration::from_secs(60 * 60));
[backoff_connect.iter(), backoff_reconnect.iter()]
});
if request_crawl.cursor.is_none() {
request_crawl.cursor = loop {
match self.get_cursor(&request_crawl.hostname) {
Ok(cursor) => break cursor,
Err(ManagerError::Sqlite(err))
if err.sqlite_error_code() == Some(ErrorCode::DatabaseLocked) =>
{
continue;
}
Err(err) => Err(err)?,
}
};
}
self.workers[self.next_id].command_tx.push(Command::Connect(request_crawl))?;
self.next_id = (self.next_id + 1) % self.workers.len();
thread::sleep(SLEEP);
Ok(())
}

fn get_cursor(&self, host: &str) -> Result<Option<Cursor>, ManagerError> {
let mut stmt = self.conn.prepare_cached("SELECT * FROM hosts WHERE host = ?1")?;
Ok(stmt
.query_row((&host,), |row| Ok(row.get_unwrap::<_, u64>("cursor")))
.optional()?
.map(Into::into))
}
}
1 change: 1 addition & 0 deletions rsky-relay/src/crawler/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
mod client;
mod connection;
mod manager;
mod types;
Expand Down
Loading