Skip to content

Commit 7b2cdaf

Browse files
committed
sqlite migration
1 parent 94eb034 commit 7b2cdaf

File tree

5 files changed

+132
-146
lines changed

5 files changed

+132
-146
lines changed

apps/fortuna/migrations/20250502164500_init.up.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
CREATE TABLE log(
2-
id INTEGER PRIMARY KEY AUTOINCREMENT,
2+
id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL,
33
chain_id VARCHAR(255) NOT NULL,
44
sequence INTEGER NOT NULL,
55
timestamp DATETIME NOT NULL,

apps/fortuna/src/api/explorer.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use crate::api::{ChainId, RestError};
2-
use crate::history::RequestJournal;
2+
use crate::history::RequestLog;
33
use axum::extract::{Query, State};
44
use axum::Json;
55
use ethers::types::TxHash;
@@ -38,7 +38,7 @@ pub enum ExplorerQueryParamsMode {
3838
pub async fn explorer(
3939
State(state): State<crate::api::ApiState>,
4040
Query(query_params): Query<ExplorerQueryParams>,
41-
) -> anyhow::Result<Json<Vec<RequestJournal>>, RestError> {
41+
) -> anyhow::Result<Json<Vec<RequestLog>>, RestError> {
4242
let result = match query_params.mode {
4343
ExplorerQueryParamsMode::TxHash => {
4444
let tx_hash = query_params.tx_hash.ok_or(RestError::BadFilterParameters(
@@ -49,6 +49,7 @@ pub async fn explorer(
4949
.read()
5050
.await
5151
.get_request_logs_by_tx_hash(tx_hash)
52+
.await
5253
}
5354
ExplorerQueryParamsMode::ChainAndSequence => {
5455
let chain_id = query_params.chain_id.ok_or(RestError::BadFilterParameters(
@@ -64,6 +65,7 @@ pub async fn explorer(
6465
.read()
6566
.await
6667
.get_request_logs(&(chain_id, sequence_id))
68+
.await
6769
.into_iter()
6870
.collect()
6971
}

apps/fortuna/src/command/run.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ pub async fn run_api(
4040
components(
4141
schemas(
4242
crate::api::GetRandomValueResponse,
43-
crate::history::RequestJournal,
43+
crate::history::RequestLog,
4444
crate::api::ExplorerQueryParamsMode,
4545
crate::api::Blob,
4646
crate::api::BinaryEncoding,

apps/fortuna/src/history.rs

Lines changed: 118 additions & 136 deletions
Original file line numberDiff line numberDiff line change
@@ -1,82 +1,70 @@
11
use crate::api::ChainId;
22
use chrono::{DateTime, NaiveDateTime};
3+
use ethers::abi::AbiEncode;
34
use ethers::prelude::TxHash;
45
use ethers::types::BlockNumber;
56
use serde::Serialize;
67
use sqlx::{Pool, Sqlite, SqlitePool};
78
use std::collections::{BTreeMap, HashMap};
89
use utoipa::ToSchema;
910

10-
#[derive(Clone, Debug, Serialize)]
11-
pub enum JournalLog {
11+
#[derive(Clone, Debug, Serialize, PartialEq)]
12+
pub enum RequestLogType {
1213
Observed { tx_hash: TxHash },
1314
FailedToReveal { reason: String },
1415
Revealed { tx_hash: TxHash },
1516
Landed { block_number: u64 },
1617
}
1718

18-
impl JournalLog {
19+
impl RequestLogType {
1920
pub fn get_tx_hash(&self) -> Option<TxHash> {
2021
match self {
21-
JournalLog::Observed { tx_hash } => Some(*tx_hash),
22-
JournalLog::FailedToReveal { .. } => None,
23-
JournalLog::Revealed { tx_hash } => Some(*tx_hash),
24-
JournalLog::Landed { .. } => None,
22+
RequestLogType::Observed { tx_hash } => Some(*tx_hash),
23+
RequestLogType::FailedToReveal { .. } => None,
24+
RequestLogType::Revealed { tx_hash } => Some(*tx_hash),
25+
RequestLogType::Landed { .. } => None,
2526
}
2627
}
2728

2829
pub fn get_info(&self) -> Option<String> {
2930
match self {
30-
JournalLog::Observed { tx_hash } => None,
31-
JournalLog::FailedToReveal { reason } => Some(reason.clone()),
32-
JournalLog::Revealed { tx_hash } => None,
33-
JournalLog::Landed { block_number } => None,
31+
RequestLogType::Observed { tx_hash } => None,
32+
RequestLogType::FailedToReveal { reason } => Some(reason.clone()),
33+
RequestLogType::Revealed { tx_hash } => None,
34+
RequestLogType::Landed { block_number } => None,
3435
}
3536
}
3637
pub fn get_type(&self) -> String {
3738
match self {
38-
JournalLog::Observed { .. } => "Observed".to_string(),
39-
JournalLog::FailedToReveal { .. } => "FailedToReveal".to_string(),
40-
JournalLog::Revealed { .. } => "Revealed".to_string(),
41-
JournalLog::Landed { .. } => "Landed".to_string(),
39+
RequestLogType::Observed { .. } => "Observed".to_string(),
40+
RequestLogType::FailedToReveal { .. } => "FailedToReveal".to_string(),
41+
RequestLogType::Revealed { .. } => "Revealed".to_string(),
42+
RequestLogType::Landed { .. } => "Landed".to_string(),
4243
}
4344
}
4445

4546
pub fn get_block_number(&self) -> Option<u64> {
4647
match self {
47-
JournalLog::Observed { .. } => None,
48-
JournalLog::FailedToReveal { .. } => None,
49-
JournalLog::Revealed { .. } => None,
50-
JournalLog::Landed { block_number } => Some(*block_number),
48+
RequestLogType::Observed { .. } => None,
49+
RequestLogType::FailedToReveal { .. } => None,
50+
RequestLogType::Revealed { .. } => None,
51+
RequestLogType::Landed { block_number } => Some(*block_number),
5152
}
5253
}
5354
}
5455

55-
#[derive(Clone, Debug, Serialize)]
56-
pub struct TimedJournalLog {
57-
pub timestamp: DateTime<chrono::Utc>,
58-
pub log: JournalLog,
59-
}
60-
61-
impl TimedJournalLog {
62-
pub fn with_current_time(log: JournalLog) -> Self {
63-
TimedJournalLog {
64-
timestamp: chrono::Utc::now(),
65-
log,
66-
}
67-
}
68-
}
69-
70-
#[derive(Clone, Debug, Serialize, ToSchema)]
71-
pub struct RequestJournal {
56+
#[derive(Clone, Debug, Serialize, ToSchema, PartialEq)]
57+
pub struct RequestLog {
7258
pub chain_id: ChainId,
7359
pub sequence: u64,
74-
pub journal: Vec<TimedJournalLog>,
60+
pub timestamp: DateTime<chrono::Utc>,
61+
pub log: RequestLogType,
7562
}
7663

7764
type RequestKey = (ChainId, u64);
7865

7966
struct LogRow {
67+
id: i64,
8068
chain_id: String,
8169
sequence: i64,
8270
timestamp: NaiveDateTime,
@@ -86,47 +74,59 @@ struct LogRow {
8674
tx_hash: Option<String>,
8775
}
8876

77+
impl From<LogRow> for RequestLog {
78+
fn from(row: LogRow) -> Self {
79+
let chain_id = row.chain_id;
80+
let sequence = row.sequence as u64;
81+
let timestamp = row.timestamp.and_utc();
82+
let log_type = match row.r#type.as_str() {
83+
"Observed" => RequestLogType::Observed {
84+
tx_hash: row.tx_hash.unwrap_or_default().parse().unwrap(),
85+
},
86+
"FailedToReveal" => RequestLogType::FailedToReveal {
87+
reason: row.info.unwrap_or_default(),
88+
},
89+
"Revealed" => RequestLogType::Revealed {
90+
tx_hash: row.tx_hash.unwrap_or_default().parse().unwrap(),
91+
},
92+
"Landed" => RequestLogType::Landed {
93+
block_number: row.block_number.unwrap_or_default() as u64,
94+
},
95+
_ => panic!("Unknown log type"),
96+
};
97+
Self {
98+
chain_id,
99+
sequence,
100+
timestamp,
101+
log: log_type,
102+
}
103+
}
104+
}
105+
89106
pub struct History {
90-
pub by_hash: HashMap<TxHash, Vec<RequestKey>>,
91-
pub by_chain_and_time: BTreeMap<(ChainId, DateTime<chrono::Utc>), RequestKey>,
92-
pub by_time: BTreeMap<DateTime<chrono::Utc>, RequestKey>,
93-
pub by_request_key: HashMap<RequestKey, RequestJournal>,
94107
pool: Pool<Sqlite>,
95108
}
96109

97110
impl History {
98111
const MAX_HISTORY: usize = 1_000_000;
99112
pub async fn new() -> Self {
100113
let pool = SqlitePool::connect("sqlite:fortuna.db").await.unwrap();
101-
Self {
102-
by_hash: HashMap::new(),
103-
by_chain_and_time: BTreeMap::new(),
104-
by_time: BTreeMap::new(),
105-
by_request_key: HashMap::new(),
106-
pool,
107-
}
114+
Self { pool }
108115
}
109116

110-
pub async fn add_to_db(
111-
&self,
112-
(chain_id, sequence): RequestKey,
113-
request_journal_log: TimedJournalLog,
114-
) {
115-
let sequence = sequence as i64;
116-
let log_type = request_journal_log.log.get_type();
117-
let block_number = request_journal_log
117+
pub async fn add_to_db(&self, log: RequestLog) {
118+
let sequence = log.sequence as i64;
119+
let log_type = log.log.get_type();
120+
let block_number = log
118121
.log
119122
.get_block_number()
120123
.map(|block_number| block_number as i64); // sqlite does not support u64
121-
let tx_hash = request_journal_log
122-
.log
123-
.get_tx_hash()
124-
.map(|tx_hash| tx_hash.to_string());
125-
let info = request_journal_log.log.get_info();
124+
let tx_hash = log.log.get_tx_hash().map(|tx_hash| tx_hash.encode_hex());
125+
let info = log.log.get_info();
126126
sqlx::query!("INSERT INTO log (chain_id, sequence, timestamp, type, block_number, info, tx_hash) VALUES (?, ?, ?, ?, ?, ?, ?)",
127-
chain_id,
127+
log.chain_id,
128128
sequence,
129-
request_journal_log.timestamp,
129+
log.timestamp,
130130
log_type,
131131
block_number,
132132
info,
@@ -136,81 +136,66 @@ impl History {
136136
.unwrap();
137137
}
138138

139-
pub async fn get_from_db(&self, (chain_id, sequence): RequestKey) -> Option<TimedJournalLog> {
139+
pub async fn get_from_db(&self, (chain_id, sequence): RequestKey) -> Vec<RequestLog> {
140140
let sequence = sequence as i64;
141-
let row = sqlx::query_as!(LogRow, "SELECT chain_id, sequence, timestamp, type, block_number, info, tx_hash FROM log WHERE chain_id = ? AND sequence = ?", chain_id, sequence)
142-
.fetch_optional(&self.pool)
143-
.await
144-
.unwrap();
145-
if let Some(row) = row {
146-
let ts = row.timestamp;
147-
Some(TimedJournalLog {
148-
timestamp: ts.and_utc(),
149-
log: JournalLog::Observed {
150-
tx_hash: TxHash::zero(),
151-
},
152-
})
153-
} else {
154-
None
155-
}
141+
let row = sqlx::query_as!(
142+
LogRow,
143+
"SELECT * FROM log WHERE chain_id = ? AND sequence = ?",
144+
chain_id,
145+
sequence
146+
)
147+
.fetch_all(&self.pool)
148+
.await
149+
.unwrap();
150+
row.into_iter().map(|row| row.into()).collect()
156151
}
157152

158-
pub fn add(&mut self, (chain_id, sequence): RequestKey, request_journal_log: TimedJournalLog) {
159-
self.add_to_db((chain_id, sequence), request_journal_log);
153+
pub fn add(&mut self, log: RequestLog) {
154+
self.add_to_db(log);
160155
}
161156

162-
pub fn get_request_logs(&self, request_key: &RequestKey) -> Option<RequestJournal> {
163-
self.by_request_key.get(request_key).cloned()
157+
pub async fn get_request_logs(&self, request_key: &RequestKey) -> Vec<RequestLog> {
158+
self.get_from_db(request_key.clone()).await
164159
}
165160

166-
pub fn get_request_logs_by_tx_hash(&self, tx_hash: TxHash) -> Vec<RequestJournal> {
167-
self.by_hash
168-
.get(&tx_hash)
169-
.map(|request_keys| {
170-
request_keys
171-
.iter()
172-
.map(|request_key| self.by_request_key.get(request_key).unwrap().clone())
173-
.collect()
174-
})
175-
.unwrap_or_default()
161+
pub async fn get_request_logs_by_tx_hash(&self, tx_hash: TxHash) -> Vec<RequestLog> {
162+
let tx_hash = tx_hash.encode_hex();
163+
let rows = sqlx::query_as!(LogRow, "SELECT * FROM log WHERE tx_hash = ?", tx_hash)
164+
.fetch_all(&self.pool)
165+
.await
166+
.unwrap();
167+
rows.into_iter().map(|row| row.into()).collect()
176168
}
177169

178-
pub fn get_latest_requests(
170+
pub async fn get_latest_requests(
179171
&self,
180172
chain_id: Option<&ChainId>,
181173
limit: u64,
182174
min_timestamp: Option<DateTime<chrono::Utc>>,
183175
max_timestamp: Option<DateTime<chrono::Utc>>,
184-
) -> Vec<RequestJournal> {
185-
match chain_id {
176+
) -> Vec<RequestLog> {
177+
let limit = limit as i64;
178+
let rows = match chain_id {
186179
Some(chain_id) => {
187-
let range = self.by_chain_and_time.range(
188-
(
189-
chain_id.clone(),
190-
min_timestamp.unwrap_or(DateTime::<chrono::Utc>::MIN_UTC),
191-
)
192-
..(
193-
chain_id.clone(),
194-
max_timestamp.unwrap_or(DateTime::<chrono::Utc>::MAX_UTC),
195-
),
196-
);
197-
range
198-
.rev()
199-
.take(limit as usize)
200-
.map(|(_, request_key)| self.by_request_key.get(request_key).unwrap().clone())
201-
.collect()
180+
let chain_id = chain_id.to_string();
181+
let min_timestamp = min_timestamp.unwrap_or(DateTime::<chrono::Utc>::MIN_UTC);
182+
let max_timestamp = max_timestamp.unwrap_or(DateTime::<chrono::Utc>::MAX_UTC);
183+
sqlx::query_as!(LogRow, "SELECT * FROM log WHERE chain_id = ? AND timestamp >= ? AND timestamp <= ? ORDER BY timestamp DESC LIMIT ?",
184+
chain_id,
185+
min_timestamp,
186+
max_timestamp,
187+
limit).fetch_all(&self.pool).await
202188
}
203-
None => self
204-
.by_time
205-
.range(
206-
min_timestamp.unwrap_or(DateTime::<chrono::Utc>::MIN_UTC)
207-
..max_timestamp.unwrap_or(DateTime::<chrono::Utc>::MAX_UTC),
208-
)
209-
.rev()
210-
.take(limit as usize)
211-
.map(|(_time, request_key)| self.by_request_key.get(request_key).unwrap().clone())
212-
.collect::<Vec<_>>(),
213-
}
189+
None => {
190+
let min_timestamp = min_timestamp.unwrap_or(DateTime::<chrono::Utc>::MIN_UTC);
191+
let max_timestamp = max_timestamp.unwrap_or(DateTime::<chrono::Utc>::MAX_UTC);
192+
sqlx::query_as!(LogRow, "SELECT * FROM log WHERE timestamp >= ? AND timestamp <= ? ORDER BY timestamp DESC LIMIT ?",
193+
min_timestamp,
194+
max_timestamp,
195+
limit).fetch_all(&self.pool).await
196+
}
197+
};
198+
rows.unwrap().into_iter().map(|row| row.into()).collect()
214199
}
215200
}
216201

@@ -219,20 +204,17 @@ mod tests {
219204

220205
#[sqlx::test]
221206
async fn test_history(pool: Pool<Sqlite>) {
222-
let history = History {
223-
by_hash: HashMap::new(),
224-
by_chain_and_time: BTreeMap::new(),
225-
by_time: BTreeMap::new(),
226-
by_request_key: HashMap::new(),
227-
pool,
207+
let history = History { pool };
208+
let log = RequestLog {
209+
chain_id: "ethereum".to_string(),
210+
sequence: 1,
211+
timestamp: chrono::Utc::now(),
212+
log: RequestLogType::Observed {
213+
tx_hash: TxHash::zero(),
214+
},
228215
};
229-
history
230-
.add_to_db(
231-
("ethereum".to_string(), 1),
232-
TimedJournalLog::with_current_time(JournalLog::Observed {
233-
tx_hash: TxHash::zero(),
234-
}),
235-
)
236-
.await;
216+
history.add_to_db(log.clone()).await;
217+
let logs = history.get_request_logs(&("ethereum".to_string(), 1)).await;
218+
assert_eq!(logs, vec![log.clone()]);
237219
}
238220
}

0 commit comments

Comments
 (0)