Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions rsky-relay/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,12 @@ You can test the WebSocket connection using `websocat`:
websocat -k wss://localhost:9000/xrpc/com.atproto.sync.subscribeRepos?cursor=0
```

You can test the HTTP endpoints using `curl`:

```bash
curl https://localhost:9000/xrpc/com.atproto.sync.listHosts?limit=10
```

## Command-Line Options

- `-c, --cert <FILE>`: Path to SSL certificate file
Expand Down
2 changes: 1 addition & 1 deletion rsky-relay/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,9 @@ pub async fn main() -> Result<()> {
thingbuf::mpsc::blocking::with_recycle(CAPACITY_MSGS, MessageRecycle);
let (request_crawl_tx, request_crawl_rx) = rtrb::RingBuffer::new(CAPACITY_REQS);
let (subscribe_repos_tx, subscribe_repos_rx) = rtrb::RingBuffer::new(CAPACITY_REQS);
let validator = ValidatorManager::new(message_rx)?;
let server =
Server::new(args.certs.zip(args.private_key), request_crawl_tx, subscribe_repos_tx)?;
let validator = ValidatorManager::new(message_rx)?;
let handle = tokio::spawn(validator.run());
let crawler = CrawlerManager::new(WORKERS_CRAWLERS, &message_tx, request_crawl_rx)?;
let publisher = PublisherManager::new(WORKERS_PUBLISHERS, subscribe_repos_rx)?;
Expand Down
106 changes: 103 additions & 3 deletions rsky-relay/src/server/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ use std::time::{Duration, Instant};
use color_eyre::Result;
use color_eyre::eyre::eyre;
use httparse::{EMPTY_HEADER, Status};
#[cfg(feature = "labeler")]
#[cfg(not(feature = "labeler"))]
use rusqlite::named_params;
use rusqlite::{Connection, OpenFlags};
use rustls::{ServerConfig, ServerConnection, StreamOwned};
use thiserror::Error;
Expand All @@ -24,7 +25,7 @@ use crate::config::{HOSTS_MIN_ACCOUNTS, HOSTS_RELAY};
use crate::crawler::{RequestCrawl, RequestCrawlSender};
use crate::publisher::{MaybeTlsStream, SubscribeRepos, SubscribeReposSender};
#[cfg(not(feature = "labeler"))]
use crate::server::types::{HostStatus, ListHosts};
use crate::server::types::{Host, HostStatus, ListHosts};

const SLEEP: Duration = Duration::from_millis(10);

Expand Down Expand Up @@ -72,7 +73,6 @@ pub enum ServerError {
PushError(#[from] rtrb::PushError<RequestCrawl>),
#[error("url parse error: {0}")]
UrlParse(#[from] url::ParseError),
#[cfg(feature = "labeler")]
#[error("sqlite error: {0}")]
Sqlite(#[from] rusqlite::Error),
}
Expand Down Expand Up @@ -101,6 +101,8 @@ pub struct Server {
last: Instant,
#[cfg(feature = "labeler")]
conn: Connection,
#[cfg(not(feature = "labeler"))]
relay_conn: Connection,
request_crawl_tx: RequestCrawlSender,
subscribe_repos_tx: SubscribeReposSender,
}
Expand Down Expand Up @@ -130,6 +132,12 @@ impl Server {
let base_url = Url::parse("http://example.com")?;
let now = Instant::now();
let last = now.checked_sub(HOSTS_INTERVAL).unwrap_or(now);
// Created by `ValidatorManager::new`.
#[cfg(not(feature = "labeler"))]
let relay_conn = Connection::open_with_flags(
"relay.db",
OpenFlags::SQLITE_OPEN_READ_ONLY | OpenFlags::SQLITE_OPEN_NO_MUTEX,
)?;
#[cfg(feature = "labeler")]
let conn = Connection::open_with_flags(
"plc_directory.db",
Expand All @@ -143,6 +151,8 @@ impl Server {
last,
#[cfg(feature = "labeler")]
conn,
#[cfg(not(feature = "labeler"))]
relay_conn,
request_crawl_tx,
subscribe_repos_tx,
})
Expand Down Expand Up @@ -226,6 +236,38 @@ impl Server {
stream.shutdown()?;
Ok(())
}
#[cfg(not(feature = "labeler"))]
("GET", PATH_LIST_HOSTS) => {
let (status, body) = match self.list_hosts(&url) {
Ok(hosts) => ("200 OK", serde_json::to_string(&hosts)?),
Err(e) => {
let error = serde_json::json!({
"error": "BadRequest",
"message": e.to_string(),
});
("400 Bad Request", serde_json::to_string(&error)?)
}
};

let response = format!(
"HTTP/1.1 {}\r\n\
Content-Type: application/json; charset=utf-8\r\n\
Content-Length: {}\r\n\
Connection: close\r\n\
\r\n\
{}",
status,
body.len(),
body
);

#[expect(clippy::unwrap_used)]
let mut stream = stream.0.take().unwrap();
stream.write_all(response.as_bytes())?;
stream.flush()?;
stream.shutdown()?;
Ok(())
}
("GET", PATH_SUBSCRIBE) => {
let mut cursor = None;
for (key, value) in url.query_pairs() {
Expand Down Expand Up @@ -262,6 +304,64 @@ impl Server {
}
}

#[cfg(not(feature = "labeler"))]
fn list_hosts(&mut self, url: &Url) -> Result<ListHosts> {
// Default query parameters.
let mut limit = 200;
let mut cursor = None;

for (key, value) in url.query_pairs() {
match key.as_ref() {
"limit" => match value.parse::<u16>() {
Ok(l @ 1..=1000) => limit = l,
_ => {
return Err(eyre!("limit parameter invalid or out of range: {value}"));
}
},
"cursor" => match value.parse::<i64>() {
Ok(c) => cursor = Some(c),
Err(_) => {
return Err(eyre!("cursor parameter invalid: {value}"));
}
},
// Ignore unknown query parameters.
_ => (),
}
}

let mut stmt_hosts = self.relay_conn.prepare_cached(
"SELECT rowid, host, cursor
FROM hosts
WHERE :cursor is NULL OR rowid > :cursor
LIMIT :limit;",
)?;
let hosts = stmt_hosts
.query_map(
named_params! {
":cursor": cursor,
":limit": limit,
},
|row| Ok((row.get::<_, i64>("rowid")?, row.get("host")?, row.get("cursor")?)),
)?
.collect::<Result<Vec<_>, _>>()?;

let cursor = hosts.last().map(|(rowid, ..)| rowid.to_string());

let hosts = hosts
.into_iter()
.map(|(_, hostname, seq)| Host {
// TODO: Track host account counts.
account_count: 0,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indigo updates its counter for a PDS host whenever an account is first observed on a PDS or is observed to move between PDSs. AFAICT rsky-relay doesn't have any tracking of which host a repo is on, so I haven't hooked this up to anything and it instead reports the same default value as indigo.

hostname,
seq,
// TODO: Track status of hosts.
status: HostStatus::Active,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indigo does things like mark a host as offline if it fails repeated connection attempts, or mark hosts that are actually relays as banned. I couldn't find any logic in rsky-relay that did this, so for now this reports the same default value as indigo. This does mean rsky-relay is over-reporting which hosts it is actively connected to, but that can be fixed by someone more familiar with the stack.

})
.collect();

Ok(ListHosts { cursor, hosts })
}

#[cfg(not(feature = "labeler"))]
fn query_hosts(&mut self) -> Result<()> {
let client = reqwest::blocking::Client::builder()
Expand Down
2 changes: 1 addition & 1 deletion rsky-satnav/src/components/hero.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,4 @@ pub fn Hero() -> Element {
}
}
}
}
}
12 changes: 6 additions & 6 deletions rsky-satnav/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -787,21 +787,21 @@ fn App() -> Element {
link { rel: "icon", href: FAVICON }
link { rel: "stylesheet", href: TAILWIND_CSS }
link { rel: "stylesheet", href: MAIN_CSS }

// ⬆ Landing page
Hero {}

// ⬇ The actual tool, reachable via “#viewer”
section {
id: "viewer",
class: "py-16 px-4 sm:px-8 bg-white", // white to contrast the hero’s tinted band

div {
class: "max-w-4xl mx-auto",

h2 { class: "text-3xl font-bold mb-6 text-gray-900",
"Explore your CAR" }

// File pickers (unchanged)
input {
r#type: "file",
Expand All @@ -816,7 +816,7 @@ fn App() -> Element {
onchange: on_file_change(car_data2.clone()),
class: "bg-indigo-600 hover:bg-indigo-700 text-white font-bold py-2 px-4 rounded"
}

div { class: "mt-8", {content} }
}
}
Expand Down
Loading