Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions rsky-relay/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,12 @@ You can test the WebSocket connection using `websocat`:
websocat -k wss://localhost:9000/xrpc/com.atproto.sync.subscribeRepos?cursor=0
```

You can test the HTTP endpoints using `curl`:

```bash
curl https://localhost:9000/xrpc/com.atproto.sync.listHosts?limit=10
```

## Command-Line Options

- `-c, --cert <FILE>`: Path to SSL certificate file
Expand Down
2 changes: 1 addition & 1 deletion rsky-relay/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,9 @@ pub async fn main() -> Result<()> {
thingbuf::mpsc::blocking::with_recycle(CAPACITY_MSGS, MessageRecycle);
let (request_crawl_tx, request_crawl_rx) = rtrb::RingBuffer::new(CAPACITY_REQS);
let (subscribe_repos_tx, subscribe_repos_rx) = rtrb::RingBuffer::new(CAPACITY_REQS);
let validator = ValidatorManager::new(message_rx)?;
let server =
Server::new(args.certs.zip(args.private_key), request_crawl_tx, subscribe_repos_tx)?;
let validator = ValidatorManager::new(message_rx)?;
let handle = tokio::spawn(validator.run());
let crawler = CrawlerManager::new(WORKERS_CRAWLERS, &message_tx, request_crawl_rx)?;
let publisher = PublisherManager::new(WORKERS_PUBLISHERS, subscribe_repos_rx)?;
Expand Down
106 changes: 103 additions & 3 deletions rsky-relay/src/server/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ use std::time::{Duration, Instant};
use color_eyre::Result;
use color_eyre::eyre::eyre;
use httparse::{EMPTY_HEADER, Status};
#[cfg(feature = "labeler")]
#[cfg(not(feature = "labeler"))]
use rusqlite::named_params;
use rusqlite::{Connection, OpenFlags};
use rustls::{ServerConfig, ServerConnection, StreamOwned};
use thiserror::Error;
Expand All @@ -24,7 +25,7 @@ use crate::config::{HOSTS_MIN_ACCOUNTS, HOSTS_RELAY};
use crate::crawler::{RequestCrawl, RequestCrawlSender};
use crate::publisher::{MaybeTlsStream, SubscribeRepos, SubscribeReposSender};
#[cfg(not(feature = "labeler"))]
use crate::server::types::{HostStatus, ListHosts};
use crate::server::types::{Host, HostStatus, ListHosts};

const SLEEP: Duration = Duration::from_millis(10);

Expand Down Expand Up @@ -72,7 +73,6 @@ pub enum ServerError {
PushError(#[from] rtrb::PushError<RequestCrawl>),
#[error("url parse error: {0}")]
UrlParse(#[from] url::ParseError),
#[cfg(feature = "labeler")]
#[error("sqlite error: {0}")]
Sqlite(#[from] rusqlite::Error),
}
Expand Down Expand Up @@ -101,6 +101,8 @@ pub struct Server {
last: Instant,
#[cfg(feature = "labeler")]
conn: Connection,
#[cfg(not(feature = "labeler"))]
relay_conn: Connection,
request_crawl_tx: RequestCrawlSender,
subscribe_repos_tx: SubscribeReposSender,
}
Expand Down Expand Up @@ -130,6 +132,12 @@ impl Server {
let base_url = Url::parse("http://example.com")?;
let now = Instant::now();
let last = now.checked_sub(HOSTS_INTERVAL).unwrap_or(now);
// Created by `ValidatorManager::new`.
#[cfg(not(feature = "labeler"))]
let relay_conn = Connection::open_with_flags(
"relay.db",
OpenFlags::SQLITE_OPEN_READ_ONLY | OpenFlags::SQLITE_OPEN_NO_MUTEX,
)?;
#[cfg(feature = "labeler")]
let conn = Connection::open_with_flags(
"plc_directory.db",
Expand All @@ -143,6 +151,8 @@ impl Server {
last,
#[cfg(feature = "labeler")]
conn,
#[cfg(not(feature = "labeler"))]
relay_conn,
request_crawl_tx,
subscribe_repos_tx,
})
Expand Down Expand Up @@ -226,6 +236,38 @@ impl Server {
stream.shutdown()?;
Ok(())
}
#[cfg(not(feature = "labeler"))]
("GET", PATH_LIST_HOSTS) => {
let (status, body) = match self.list_hosts(&url) {
Ok(hosts) => ("200 OK", serde_json::to_string(&hosts)?),
Err(e) => {
let error = serde_json::json!({
"error": "BadRequest",
"message": e.to_string(),
});
("400 Bad Request", serde_json::to_string(&error)?)
}
};

let response = format!(
"HTTP/1.1 {}\r\n\
Content-Type: application/json; charset=utf-8\r\n\
Content-Length: {}\r\n\
Connection: close\r\n\
\r\n\
{}",
status,
body.len(),
body
);

#[expect(clippy::unwrap_used)]
let mut stream = stream.0.take().unwrap();
stream.write_all(response.as_bytes())?;
stream.flush()?;
stream.shutdown()?;
Ok(())
}
("GET", PATH_SUBSCRIBE) => {
let mut cursor = None;
for (key, value) in url.query_pairs() {
Expand Down Expand Up @@ -262,6 +304,64 @@ impl Server {
}
}

#[cfg(not(feature = "labeler"))]
fn list_hosts(&mut self, url: &Url) -> Result<ListHosts> {
// Default query parameters.
let mut limit = 200;
let mut cursor = None;

for (key, value) in url.query_pairs() {
match key.as_ref() {
"limit" => match value.parse::<u16>() {
Ok(l @ 1..=1000) => limit = l,
_ => {
return Err(eyre!("limit parameter invalid or out of range: {value}"));
}
},
"cursor" => match value.parse::<i64>() {
Ok(c) => cursor = Some(c),
Err(_) => {
return Err(eyre!("cursor parameter invalid: {value}"));
}
},
// Ignore unknown query parameters.
_ => (),
}
}

let mut stmt_hosts = self.relay_conn.prepare_cached(
"SELECT rowid, host, cursor
FROM hosts
WHERE :cursor is NULL OR rowid > :cursor
LIMIT :limit;",
)?;
let hosts = stmt_hosts
.query_map(
named_params! {
":cursor": cursor,
":limit": limit,
},
|row| Ok((row.get::<_, i64>("rowid")?, row.get("host")?, row.get("cursor")?)),
)?
.collect::<Result<Vec<_>, _>>()?;

let cursor = hosts.last().map(|(rowid, _, _)| rowid.to_string());

let hosts = hosts
.into_iter()
.map(|(_, hostname, seq)| Host {
// TODO: Track host account counts.
account_count: 0,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indigo updates its counter for a PDS host whenever an account is first observed on a PDS or is observed to move between PDSs. AFAICT rsky-relay doesn't have any tracking of which host a repo is on, so I haven't hooked this up to anything and it instead reports the same default value as indigo.

hostname,
seq,
// TODO: Track status of hosts.
status: HostStatus::Active,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indigo does things like mark a host as offline if it fails repeated connection attempts, or mark hosts that are actually relays as banned. I couldn't find any logic in rsky-relay that did this, so for now this reports the same default value as indigo. This does mean rsky-relay is over-reporting which hosts it is actively connected to, but that can be fixed by someone more familiar with the stack.

})
.collect();

Ok(ListHosts { cursor, hosts })
}

#[cfg(not(feature = "labeler"))]
fn query_hosts(&mut self) -> Result<()> {
let client = reqwest::blocking::Client::builder()
Expand Down
Loading