Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions apps/fortuna/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ url = "2.5.0"
chrono = { version = "0.4.38", features = [
"clock",
"std",
"serde"
], default-features = false }
backoff = { version = "0.4.0", features = ["futures", "tokio"] }
thiserror = "1.0.61"
Expand Down
173 changes: 171 additions & 2 deletions apps/fortuna/src/api.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
use chrono::DateTime;
use ethers::types::TxHash;
use serde::Serialize;
use std::collections::BTreeMap;
use utoipa::ToSchema;
use {
crate::{
chain::reader::{BlockNumber, BlockStatus, EntropyReader},
Expand All @@ -21,9 +26,10 @@ use {
tokio::sync::RwLock,
url::Url,
};
pub use {chain_ids::*, index::*, live::*, metrics::*, ready::*, revelation::*};
pub use {chain_ids::*, explorer::*, index::*, live::*, metrics::*, ready::*, revelation::*};

mod chain_ids;
mod explorer;
mod index;
mod live;
mod metrics;
Expand All @@ -41,10 +47,159 @@ pub struct ApiMetrics {
pub http_requests: Family<RequestLabel, Counter>,
}

#[derive(Clone, Debug, Serialize)]
pub enum JournalLog {
Observed { tx_hash: TxHash },
FailedToReveal { reason: String },
Revealed { tx_hash: TxHash },
Landed { block_number: BlockNumber },
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand why we need so many different types of log entries. I think at the end of the day (after we upgrade the contracts), requests can be in literally 2 states: pending (meaning we've seen it but haven't sent the callback) or complete. If it's complete, the result may or may not be an error.

The representation you are using for those two states (a vector of these log entries) has a much larger state space. I don't see why that's necessary

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also I presume you're going to add a bunch of additional fields to these log entries? E.g., all the stuff emitted in the request event, all the stuff emitted in the callback

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, more fields will be added. I think more verbosity is helpful for internal debugging + calculating more metrics such as our landing latency, observation latency, etc. Many things can go wrong between each of these steps and knowing the latest state is very powerful

}

impl JournalLog {
pub fn get_tx_hash(&self) -> Option<TxHash> {
match self {
JournalLog::Observed { tx_hash } => Some(*tx_hash),
JournalLog::FailedToReveal { .. } => None,
JournalLog::Revealed { tx_hash } => Some(*tx_hash),
JournalLog::Landed { .. } => None,
}
}
}

#[derive(Clone, Debug, Serialize)]
pub struct TimedJournalLog {
pub timestamp: DateTime<chrono::Utc>,
pub log: JournalLog,
}

impl TimedJournalLog {
pub fn with_current_time(log: JournalLog) -> Self {
TimedJournalLog {
timestamp: chrono::Utc::now(),
log,
}
}
}

#[derive(Clone, Debug, Serialize, ToSchema)]
pub struct RequestJournal {
pub chain_id: ChainId,
pub sequence: u64,
pub journal: Vec<TimedJournalLog>,
}

type RequestKey = (ChainId, u64);

#[derive(Default)]
pub struct History {
pub by_hash: HashMap<TxHash, Vec<RequestKey>>,
pub by_chain_and_time: BTreeMap<(ChainId, DateTime<chrono::Utc>), RequestKey>,
pub by_time: BTreeMap<DateTime<chrono::Utc>, RequestKey>,
pub by_request_key: HashMap<RequestKey, RequestJournal>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doing all of this indexing manually kind of sucks. I think there are two other reasonable approaches:

  1. use a free text search library like indicium and throw all the searchable attributes in as strings. You could include a prefix to indicate type like:

"chain_id:blast sequence:7 tx:01249e"

you still have to manually do the time ordering and the eviction stuff. However, I think that's pretty easy to do. You can make by_time: BTreeMap<RequestKey, RequestJournal> and then implement Ord for RequestKey

indicium https://docs.rs/indicium/0.6.5/indicium/index.html also gives you fuzzy matching and autocomplete which are potentially useful.

  1. use sqlite and make an empty database on server start

}

impl History {
const MAX_HISTORY: usize = 1_000_000;
pub fn new() -> Self {
Self::default()
}

pub fn add(&mut self, (chain_id, sequence): RequestKey, request_journal_log: TimedJournalLog) {
let mut new_entry = false;
let entry = self
.by_request_key
.entry((chain_id.clone(), sequence))
.or_insert_with(|| {
new_entry = true;
RequestJournal {
chain_id: chain_id.clone(),
sequence,
journal: vec![],
}
});
if let Some(tx_hash) = request_journal_log.log.get_tx_hash() {
self.by_hash
.entry(tx_hash)
.or_default()
.push((chain_id.clone(), sequence));
}
entry.journal.push(request_journal_log);
if new_entry {
let current_time = chrono::Utc::now();
self.by_chain_and_time.insert(
(chain_id.clone(), current_time),
(chain_id.clone(), sequence),
);
self.by_time
.insert(current_time, (chain_id.clone(), sequence));

if self.by_time.len() > Self::MAX_HISTORY {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

at this point you're going to kick out the earliest one from all of the different indexes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, but not a priority TBH. As long as we restart every few months (which we definitely do), this will not become an issue.

// TODO
}
}
}

pub fn get_request_logs(&self, request_key: &RequestKey) -> Option<RequestJournal> {
self.by_request_key.get(request_key).cloned()
}

pub fn get_request_logs_by_tx_hash(&self, tx_hash: TxHash) -> Vec<RequestJournal> {
self.by_hash
.get(&tx_hash)
.map(|request_keys| {
request_keys
.iter()
.map(|request_key| self.by_request_key.get(request_key).unwrap().clone())
.collect()
})
.unwrap_or_default()
}

pub fn get_latest_requests(
&self,
chain_id: Option<&ChainId>,
limit: u64,
min_timestamp: Option<DateTime<chrono::Utc>>,
max_timestamp: Option<DateTime<chrono::Utc>>,
) -> Vec<RequestJournal> {
match chain_id {
Some(chain_id) => {
let range = self.by_chain_and_time.range(
(
chain_id.clone(),
min_timestamp.unwrap_or(DateTime::<chrono::Utc>::MIN_UTC),
)
..(
chain_id.clone(),
max_timestamp.unwrap_or(DateTime::<chrono::Utc>::MAX_UTC),
),
);
range
.rev()
.take(limit as usize)
.map(|(_, request_key)| self.by_request_key.get(request_key).unwrap().clone())
.collect()
}
None => self
.by_time
.range(
min_timestamp.unwrap_or(DateTime::<chrono::Utc>::MIN_UTC)
..max_timestamp.unwrap_or(DateTime::<chrono::Utc>::MAX_UTC),
)
.rev()
.take(limit as usize)
.map(|(_time, request_key)| self.by_request_key.get(request_key).unwrap().clone())
.collect::<Vec<_>>(),
}
}
}

#[derive(Clone)]
pub struct ApiState {
pub chains: Arc<RwLock<HashMap<ChainId, ApiBlockChainState>>>,

pub history: Arc<RwLock<History>>,

pub metrics_registry: Arc<RwLock<Registry>>,

/// Prometheus metrics
Expand All @@ -55,6 +210,7 @@ impl ApiState {
pub async fn new(
chains: Arc<RwLock<HashMap<ChainId, ApiBlockChainState>>>,
metrics_registry: Arc<RwLock<Registry>>,
history: Arc<RwLock<History>>,
) -> ApiState {
let metrics = ApiMetrics {
http_requests: Family::default(),
Expand All @@ -70,6 +226,7 @@ impl ApiState {
ApiState {
chains,
metrics: Arc::new(metrics),
history,
metrics_registry,
}
}
Expand Down Expand Up @@ -114,6 +271,7 @@ pub enum RestError {
/// The server cannot currently communicate with the blockchain, so is not able to verify
/// which random values have been requested.
TemporarilyUnavailable,
BadFilterParameters(String),
/// The server is not able to process the request because the blockchain initialization
/// has not been completed yet.
Uninitialized,
Expand Down Expand Up @@ -156,6 +314,11 @@ impl IntoResponse for RestError {
"An unknown error occurred processing the request",
)
.into_response(),
RestError::BadFilterParameters(message) => (
StatusCode::BAD_REQUEST,
format!("Invalid filter parameters: {}", message),
)
.into_response(),
}
}
}
Expand All @@ -167,6 +330,7 @@ pub fn routes(state: ApiState) -> Router<(), Body> {
.route("/metrics", get(metrics))
.route("/ready", get(ready))
.route("/v1/chains", get(chain_ids))
.route("/v1/explorer", get(explorer))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the name of this endpoint should be a noun indicating the kind of thing being returned.

(we follow REST conventions on http endpoints. https://restfulapi.net/resource-naming/ )

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in this case like /v1/logs or something would be appropriate. and ? query parameters are fine

.route(
"/v1/chains/:chain_id/revelations/:sequence",
get(revelation),
Expand Down Expand Up @@ -252,7 +416,12 @@ mod test {
ApiBlockChainState::Initialized(avax_state),
);

let api_state = ApiState::new(Arc::new(RwLock::new(chains)), metrics_registry).await;
let api_state = ApiState::new(
Arc::new(RwLock::new(chains)),
metrics_registry,
Default::default(),
)
.await;

let app = api::routes(api_state);
(TestServer::new(app).unwrap(), eth_read, avax_read)
Expand Down
77 changes: 77 additions & 0 deletions apps/fortuna/src/api/explorer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
use crate::api::{ChainId, RequestJournal, RestError};
use axum::extract::{Query, State};
use axum::Json;
use ethers::types::TxHash;
use utoipa::{IntoParams, ToSchema};

#[derive(Debug, serde::Serialize, serde::Deserialize, IntoParams)]
#[into_params(parameter_in=Query)]
pub struct ExplorerQueryParams {
pub mode: ExplorerQueryParamsMode,

pub min_timestamp: Option<u64>,
pub max_timestamp: Option<u64>,
pub sequence_id: Option<u64>,
#[param(value_type = Option<String>)]
pub tx_hash: Option<TxHash>,
#[param(value_type = Option<String>)]
pub chain_id: Option<ChainId>,
}
#[derive(Debug, serde::Serialize, serde::Deserialize, ToSchema)]
#[serde(rename_all = "kebab-case")]
pub enum ExplorerQueryParamsMode {
TxHash,
ChainAndSequence,
ChainAndTimestamp,
Timestamp,
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i don't think you should take this mode parameter. Instead, take all the filters and intersect them. it's much more natural (and powerful)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Either of the indexing ideas ^ will make this functionality much easier to support)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

with great power comes great responsibility :D
I wanted to avoid footguns in the ui. For example we know the transaction hash but the chain id is set to something else by mistake, I'd rather return an error than an empty set.


#[utoipa::path(
get,
path = "/v1/explorer",
responses(
(status = 200, description = "Random value successfully retrieved", body = Vec<RequestJournal>)
),
params(ExplorerQueryParams)
)]
pub async fn explorer(
State(state): State<crate::api::ApiState>,
Query(query_params): Query<ExplorerQueryParams>,
) -> anyhow::Result<Json<Vec<RequestJournal>>, RestError> {
let result = match query_params.mode {
ExplorerQueryParamsMode::TxHash => {
let tx_hash = query_params.tx_hash.ok_or(RestError::BadFilterParameters(
"tx_hash is required when mode=tx-hash".to_string(),
))?;
state
.history
.read()
.await
.get_request_logs_by_tx_hash(tx_hash)
}
ExplorerQueryParamsMode::ChainAndSequence => {
let chain_id = query_params.chain_id.ok_or(RestError::BadFilterParameters(
"chain_id is required when mode=chain-and-sequence".to_string(),
))?;
let sequence_id = query_params
.sequence_id
.ok_or(RestError::BadFilterParameters(
"sequence_id is required when mode=chain-and-sequence".to_string(),
))?;
state
.history
.read()
.await
.get_request_logs(&(chain_id, sequence_id))
.into_iter()
.collect()
}
ExplorerQueryParamsMode::ChainAndTimestamp => {
vec![]
}
ExplorerQueryParamsMode::Timestamp => {
vec![]
}
};
Ok(Json(result))
}
6 changes: 4 additions & 2 deletions apps/fortuna/src/chain/ethereum.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use ethers::contract::LogMeta;
use {
crate::{
api::ChainId,
Expand Down Expand Up @@ -285,14 +286,15 @@ impl<T: JsonRpcClient + 'static> EntropyReader for PythRandom<Provider<T>> {
.to_block(to_block)
.topic1(provider);

let res: Vec<RequestedWithCallbackFilter> = event.query().await?;
let res: Vec<(RequestedWithCallbackFilter, LogMeta)> = event.query_with_meta().await?;

Ok(res
.iter()
.map(|r| RequestedWithCallbackEvent {
.map(|(r, meta)| RequestedWithCallbackEvent {
sequence_number: r.sequence_number,
user_random_number: r.user_random_number,
provider_address: r.request.provider,
tx_hash: meta.transaction_hash,
})
.filter(|r| r.provider_address == provider)
.collect())
Expand Down
2 changes: 2 additions & 0 deletions apps/fortuna/src/chain/reader.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use ethers::types::TxHash;
use {
anyhow::Result,
axum::async_trait,
Expand Down Expand Up @@ -34,6 +35,7 @@ pub struct RequestedWithCallbackEvent {
pub sequence_number: u64,
pub user_random_number: [u8; 32],
pub provider_address: Address,
pub tx_hash: TxHash,
}

/// EntropyReader is the read-only interface of the Entropy contract.
Expand Down
Loading
Loading