Skip to content

Commit fab6d2a

Browse files
committed
Add RPC logging
1 parent 47a3f57 commit fab6d2a

File tree

8 files changed

+270
-18
lines changed

8 files changed

+270
-18
lines changed

migrations/89.sql

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
CREATE TABLE IF NOT EXISTS rpc_call (
2+
id INTEGER PRIMARY KEY AUTOINCREMENT,
3+
user_id INTEGER REFERENCES user(id),
4+
ip TEXT NOT NULL,
5+
method TEXT NOT NULL,
6+
params_json TEXT,
7+
created_at TEXT NOT NULL,
8+
processed_at TEXT NOT NULL,
9+
duration_ns INTEGER NOT NULL
10+
) STRICT;

src/db/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ pub mod migration;
1313
pub mod osm_user;
1414
pub mod place_submission;
1515
pub mod report;
16+
pub mod rpc_call;
1617
pub mod user;
1718
use crate::{service::filesystem::data_dir_file_path, Result};
1819
use deadpool_sqlite::{Config, Hook, Pool, Runtime};
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
use crate::db::rpc_call::schema::{self, Columns, RpcCall};
2+
use crate::Result;
3+
use geojson::JsonObject;
4+
use rusqlite::{named_params, Connection};
5+
use time::format_description::well_known::Rfc3339;
6+
use time::OffsetDateTime;
7+
8+
pub fn insert(
9+
user_id: i64,
10+
ip: String,
11+
method: String,
12+
params: Option<JsonObject>,
13+
created_at: OffsetDateTime,
14+
processed_at: OffsetDateTime,
15+
conn: &Connection,
16+
) -> Result<RpcCall> {
17+
let sql = format!(
18+
r#"
19+
INSERT INTO {table} (
20+
{user_id},
21+
{ip},
22+
{method},
23+
{params_json},
24+
{created_at},
25+
{processed_at},
26+
{duration_ns}
27+
) VALUES (
28+
:user_id,
29+
:ip,
30+
:method,
31+
:params_json,
32+
:created_at,
33+
:processed_at,
34+
:duration_ns
35+
)
36+
RETURNING {projection}
37+
"#,
38+
table = schema::TABLE_NAME,
39+
user_id = Columns::UserId.as_str(),
40+
ip = Columns::Ip.as_str(),
41+
method = Columns::Method.as_str(),
42+
params_json = Columns::ParamsJson.as_str(),
43+
created_at = Columns::CreatedAt.as_str(),
44+
processed_at = Columns::ProcessedAt.as_str(),
45+
duration_ns = Columns::DurationNs.as_str(),
46+
projection = RpcCall::projection(),
47+
);
48+
let params = named_params! {
49+
":user_id" : user_id,
50+
":ip" : ip,
51+
":method" : method,
52+
":params_json" : params.map(|it| serde_json::to_string(&it).unwrap()),
53+
":created_at" : created_at.format(&Rfc3339)?,
54+
":processed_at" : processed_at.format(&Rfc3339)?,
55+
":duration_ns" : (processed_at - created_at).whole_nanoseconds() as i64,
56+
};
57+
conn.query_row(&sql, params, RpcCall::mapper())
58+
.map_err(Into::into)
59+
}
60+
61+
#[cfg(test)]
62+
mod tests {
63+
use crate::db::test::conn;
64+
use serde_json::json;
65+
use std::time::Duration;
66+
use time::OffsetDateTime;
67+
68+
#[test]
69+
fn insert() {
70+
let conn = conn();
71+
72+
let user_id = 123;
73+
let ip = "192.168.1.100".to_string();
74+
let method = "get_element".to_string();
75+
let params = Some(
76+
json!({"id": 456, "name": "test"})
77+
.as_object()
78+
.unwrap()
79+
.clone(),
80+
);
81+
82+
let created_at = OffsetDateTime::now_utc();
83+
let processed_at = created_at + Duration::from_millis(150);
84+
85+
let result = super::insert(
86+
user_id,
87+
ip.clone(),
88+
method.clone(),
89+
params.clone(),
90+
created_at,
91+
processed_at,
92+
&conn,
93+
);
94+
95+
assert!(result.is_ok(), "insert should succeed");
96+
97+
let rpc_call = result.unwrap();
98+
99+
assert_eq!(rpc_call.user_id, Some(user_id));
100+
assert_eq!(rpc_call.ip, ip);
101+
assert_eq!(rpc_call.method, method);
102+
assert!(rpc_call.params_json.is_some());
103+
assert_eq!(rpc_call.params_json, params);
104+
105+
assert_eq!(rpc_call.created_at, created_at);
106+
assert_eq!(rpc_call.processed_at, processed_at);
107+
108+
let expected_ns = Duration::from_millis(150).as_nanos() as i64;
109+
assert_eq!(rpc_call.duration_ns, expected_ns);
110+
}
111+
}

src/db/rpc_call/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
pub(super) mod blocking_queries;
2+
pub mod queries;
3+
pub mod schema;

src/db/rpc_call/queries.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
use super::blocking_queries;
2+
use crate::{db::rpc_call::schema::RpcCall, Result};
3+
use deadpool_sqlite::Pool;
4+
use geojson::JsonObject;
5+
use time::OffsetDateTime;
6+
7+
pub async fn insert(
8+
user_id: i64,
9+
ip: String,
10+
method: String,
11+
params: Option<JsonObject>,
12+
created_at: OffsetDateTime,
13+
processed_at: OffsetDateTime,
14+
pool: &Pool,
15+
) -> Result<RpcCall> {
16+
pool.get()
17+
.await?
18+
.interact(move |conn| {
19+
blocking_queries::insert(user_id, ip, method, params, created_at, processed_at, conn)
20+
})
21+
.await?
22+
}

src/db/rpc_call/schema.rs

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
use geojson::JsonObject;
2+
use rusqlite::Row;
3+
use serde::{Deserialize, Serialize};
4+
use std::sync::OnceLock;
5+
use time::OffsetDateTime;
6+
7+
pub const TABLE_NAME: &str = "rpc_call";
8+
9+
pub enum Columns {
10+
Id,
11+
UserId,
12+
Ip,
13+
Method,
14+
ParamsJson,
15+
CreatedAt,
16+
ProcessedAt,
17+
DurationNs,
18+
}
19+
20+
impl Columns {
21+
pub fn as_str(&self) -> &'static str {
22+
match self {
23+
Columns::Id => "id",
24+
Columns::UserId => "user_id",
25+
Columns::Ip => "ip",
26+
Columns::Method => "method",
27+
Columns::ParamsJson => "params_json",
28+
Columns::CreatedAt => "created_at",
29+
Columns::ProcessedAt => "processed_at",
30+
Columns::DurationNs => "duration_ns",
31+
}
32+
}
33+
}
34+
35+
#[derive(Debug, Clone, Serialize, Deserialize)]
36+
pub struct RpcCall {
37+
pub id: i64,
38+
pub user_id: Option<i64>,
39+
pub ip: String,
40+
pub method: String,
41+
pub params_json: Option<JsonObject>,
42+
pub created_at: OffsetDateTime,
43+
pub processed_at: OffsetDateTime,
44+
pub duration_ns: i64,
45+
}
46+
47+
impl RpcCall {
48+
pub fn projection() -> &'static str {
49+
static PROJECTION: OnceLock<String> = OnceLock::new();
50+
PROJECTION.get_or_init(|| {
51+
[
52+
Columns::Id,
53+
Columns::UserId,
54+
Columns::Ip,
55+
Columns::Method,
56+
Columns::ParamsJson,
57+
Columns::CreatedAt,
58+
Columns::ProcessedAt,
59+
Columns::DurationNs,
60+
]
61+
.iter()
62+
.map(Columns::as_str)
63+
.collect::<Vec<_>>()
64+
.join(", ")
65+
})
66+
}
67+
68+
pub const fn mapper() -> fn(&Row) -> rusqlite::Result<RpcCall> {
69+
|row: &_| {
70+
let params_json: Option<String> = row.get(Columns::ParamsJson.as_str())?;
71+
let params_json = match params_json {
72+
Some(json_str) => match serde_json::from_str(&json_str) {
73+
Ok(json_obj) => Some(json_obj),
74+
Err(e) => {
75+
return Err(rusqlite::Error::FromSqlConversionFailure(
76+
4,
77+
rusqlite::types::Type::Text,
78+
Box::new(e),
79+
));
80+
}
81+
},
82+
None => None,
83+
};
84+
Ok(RpcCall {
85+
id: row.get(Columns::Id.as_str())?,
86+
user_id: row.get(Columns::UserId.as_str())?,
87+
ip: row.get(Columns::Ip.as_str())?,
88+
method: row.get(Columns::Method.as_str())?,
89+
params_json,
90+
created_at: row.get(Columns::CreatedAt.as_str())?,
91+
processed_at: row.get(Columns::ProcessedAt.as_str())?,
92+
duration_ns: row.get(Columns::DurationNs.as_str())?,
93+
})
94+
}
95+
}
96+
}

src/rpc/generate_element_icons.rs

Lines changed: 4 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use crate::{
2-
db::{self, conf::schema::Conf, user::schema::User},
3-
service::{discord, overpass::OverpassElement},
2+
db::{self},
3+
service::overpass::OverpassElement,
44
Result,
55
};
66
use deadpool_sqlite::Pool;
@@ -27,25 +27,13 @@ pub struct UpdatedElement {
2727
pub new_icon: String,
2828
}
2929

30-
pub async fn run(params: Params, requesting_user: &User, pool: &Pool, conf: &Conf) -> Result<Res> {
30+
pub async fn run(params: Params, pool: &Pool) -> Result<Res> {
3131
let started_at = OffsetDateTime::now_utc();
3232
let updated_elements =
3333
generate_element_icons(params.from_element_id, params.to_element_id, pool).await?;
34-
let time_s = (OffsetDateTime::now_utc() - started_at).as_seconds_f64();
35-
discord::send(
36-
format!(
37-
"{} generated element icons (id range {}..{}, elements affected: {})",
38-
requesting_user.name,
39-
params.from_element_id,
40-
params.to_element_id,
41-
updated_elements.len(),
42-
),
43-
discord::Channel::Api,
44-
conf,
45-
);
4634
Ok(Res {
4735
updated_elements,
48-
time_s,
36+
time_s: (OffsetDateTime::now_utc() - started_at).as_seconds_f64(),
4937
})
5038
}
5139

src/rpc/handler.rs

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ use serde::{de::DeserializeOwned, Deserialize, Serialize};
2222
use serde_json::{json, Map, Value};
2323
use std::collections::HashSet;
2424
use strum::VariantArray;
25+
use time::OffsetDateTime;
2526

2627
#[derive(Deserialize)]
2728
pub struct RpcRequest {
@@ -275,6 +276,10 @@ pub async fn handle(
275276
pool: Data<Pool>,
276277
conf: Data<Conf>,
277278
) -> Result<Json<RpcResponse>> {
279+
let conn_info = req.connection_info();
280+
let req_ip = conn_info.realip_remote_addr().unwrap_or("0.0.0.0");
281+
282+
let started_at = OffsetDateTime::now_utc();
278283
let matrix_client = service::matrix::client(&pool).await;
279284
let headers = req.headers();
280285
let Ok(req) = serde_json::from_str::<Map<String, Value>>(&req_body) else {
@@ -342,11 +347,14 @@ pub async fn handle(
342347
}
343348
Some(user)
344349
};
350+
let user_id = user.as_ref().map(|it| it.id);
345351

346352
if req.jsonrpc != "2.0" {
347353
return Ok(Json(RpcResponse::invalid_request(Value::Null)));
348354
}
349355

356+
let req_params = req.params.clone();
357+
350358
let res: RpcResponse = match req.method {
351359
RpcMethod::Whoami => RpcResponse::from(
352360
req.id.clone(),
@@ -400,8 +408,7 @@ pub async fn handle(
400408
),
401409
RpcMethod::GenerateElementIcons => RpcResponse::from(
402410
req.id.clone(),
403-
super::generate_element_icons::run(params(req.params)?, &user.unwrap(), &pool, &conf)
404-
.await?,
411+
super::generate_element_icons::run(params(req.params)?, &pool).await?,
405412
),
406413
RpcMethod::GenerateElementCategories => RpcResponse::from(
407414
req.id.clone(),
@@ -570,6 +577,20 @@ pub async fn handle(
570577
super::matrix::send_matrix_message::run(params(req.params)?, &matrix_client).await,
571578
),
572579
}?;
580+
581+
if let Some(user_id) = user_id {
582+
db::rpc_call::queries::insert(
583+
user_id,
584+
req_ip.to_string(),
585+
method.as_str().unwrap_or_default().to_string(),
586+
req_params.map(|it| it.as_object().map(|it| it.clone()).unwrap()),
587+
started_at,
588+
OffsetDateTime::now_utc(),
589+
&pool,
590+
)
591+
.await?;
592+
}
593+
573594
Ok(Json(res))
574595
}
575596

0 commit comments

Comments
 (0)