|
1 | | -use khttp::{Headers, Method::*, Server}; |
2 | | -use yarte::Serialize; |
| 1 | +use khttp::{Headers, Method::*, RequestContext, ResponseHandle, Server, Status}; |
| 2 | +use std::{ffi::CStr, io, ptr}; |
| 3 | +use yarte::{Serialize, ywrite_html}; |
3 | 4 |
|
4 | 5 | #[derive(Serialize)] |
5 | 6 | struct HelloMessage { |
@@ -36,5 +37,152 @@ fn main() { |
36 | 37 | res.ok(&headers, buf) |
37 | 38 | }); |
38 | 39 |
|
| 40 | + app.route(Get, "/fortunes", handle_fortunes); |
| 41 | + |
39 | 42 | app.build().serve_epoll().unwrap(); |
40 | 43 | } |
| 44 | + |
| 45 | +// --------------------------------------------------------------------- |
| 46 | +// GET /fortunes handler |
| 47 | +// --------------------------------------------------------------------- |
| 48 | + |
| 49 | +fn handle_fortunes(_ctx: RequestContext, res: &mut ResponseHandle) -> io::Result<()> { |
| 50 | + // headers |
| 51 | + let mut headers = Headers::new(); |
| 52 | + headers.add(Headers::CONTENT_TYPE, b"text/html; charset=utf-8"); |
| 53 | + headers.add("server", b"khttp"); |
| 54 | + |
| 55 | + // response |
| 56 | + match fetch_fortunes_html() { |
| 57 | + Ok(body) => res.ok(&headers, body), |
| 58 | + Err(_) => res.send0(&Status::INTERNAL_SERVER_ERROR, &headers), |
| 59 | + } |
| 60 | +} |
| 61 | + |
| 62 | +// --------------------------------------------------------------------- |
| 63 | +// /fortunes query implementation using postgres (libpq) |
| 64 | +// --------------------------------------------------------------------- |
| 65 | + |
| 66 | +use pq_sys::{ |
| 67 | + ConnStatusType, ExecStatusType, PGconn, PQclear, PQconnectdb, PQerrorMessage, PQexecPrepared, |
| 68 | + PQfinish, PQgetlength, PQgetvalue, PQntuples, PQprepare, PQresultStatus, PQstatus, |
| 69 | +}; |
| 70 | + |
| 71 | +const DB_CONNINFO: &CStr = c"postgres://benchmarkdbuser:benchmarkdbpass@tfb-database/hello_world"; |
| 72 | +const PG_FORTUNES_SQL: &CStr = c"SELECT id, message FROM fortune"; |
| 73 | +const PG_FORTUNES_PREPARED_STMT: &CStr = c"s_fortunes"; |
| 74 | + |
| 75 | +#[derive(Serialize)] |
| 76 | +struct Fortune<'a> { |
| 77 | + id: i32, |
| 78 | + message: &'a str, |
| 79 | +} |
| 80 | + |
| 81 | +fn fetch_fortunes_html() -> Result<Vec<u8>, String> { |
| 82 | + PG_CONN.with(|pg| unsafe { |
| 83 | + let res = PQexecPrepared( |
| 84 | + pg.conn, |
| 85 | + PG_FORTUNES_PREPARED_STMT.as_ptr(), // stmtName |
| 86 | + 0, // nParams |
| 87 | + ptr::null(), // paramValues |
| 88 | + ptr::null(), // paramLengths |
| 89 | + ptr::null(), // paramFormats |
| 90 | + 1, // resultFormat = 1 (binary) |
| 91 | + ); |
| 92 | + if res.is_null() { |
| 93 | + return Err("PQexecPrepared returned null".to_owned()); |
| 94 | + } |
| 95 | + if PQresultStatus(res) != ExecStatusType::PGRES_TUPLES_OK { |
| 96 | + PQclear(res); |
| 97 | + return Err("PQexecPrepared non-ok result status".to_owned()); |
| 98 | + } |
| 99 | + |
| 100 | + let rows = PQntuples(res); |
| 101 | + let mut fortunes = Vec::with_capacity(rows as usize + 1); |
| 102 | + |
| 103 | + for i in 0..rows { |
| 104 | + // field 0: id (int) |
| 105 | + let id_ptr = PQgetvalue(res, i, 0) as *const i32; |
| 106 | + let id = i32::from_be(ptr::read_unaligned(id_ptr)); |
| 107 | + |
| 108 | + // field 1: message (text) |
| 109 | + let msg_len = PQgetlength(res, i, 1) as usize; |
| 110 | + let msg_ptr = PQgetvalue(res, i, 1) as *const u8; |
| 111 | + let msg_slice = std::slice::from_raw_parts(msg_ptr, msg_len); |
| 112 | + let message = std::str::from_utf8_unchecked(msg_slice); // message fields are stored in utf8 |
| 113 | + |
| 114 | + fortunes.push(Fortune { id, message }); |
| 115 | + } |
| 116 | + |
| 117 | + // add extra fortune |
| 118 | + fortunes.push(Fortune { |
| 119 | + id: 0, |
| 120 | + message: "Additional fortune added at request time.", |
| 121 | + }); |
| 122 | + |
| 123 | + // sort |
| 124 | + fortunes.sort_by(|a, b| a.message.cmp(b.message)); |
| 125 | + |
| 126 | + // render html template |
| 127 | + let mut buf = Vec::with_capacity(2048); |
| 128 | + ywrite_html!(buf, "{{> fortunes }}"); |
| 129 | + |
| 130 | + PQclear(res); |
| 131 | + Ok(buf) |
| 132 | + }) |
| 133 | +} |
| 134 | + |
| 135 | +// TLS: connection per thread |
| 136 | +thread_local! { |
| 137 | + static PG_CONN: PgConnection = PgConnection::new(); |
| 138 | +} |
| 139 | + |
| 140 | +struct PgConnection { |
| 141 | + conn: *mut PGconn, |
| 142 | +} |
| 143 | + |
| 144 | +impl PgConnection { |
| 145 | + fn new() -> Self { |
| 146 | + unsafe { |
| 147 | + // connect |
| 148 | + let conn = PQconnectdb(DB_CONNINFO.as_ptr()); |
| 149 | + if PQstatus(conn) != ConnStatusType::CONNECTION_OK { |
| 150 | + let err = get_pg_error_message(conn); |
| 151 | + PQfinish(conn); |
| 152 | + panic!("PQconnectdb failed: {err}"); |
| 153 | + } |
| 154 | + |
| 155 | + // prepare fortunes statement |
| 156 | + let res = PQprepare( |
| 157 | + conn, |
| 158 | + PG_FORTUNES_PREPARED_STMT.as_ptr(), |
| 159 | + PG_FORTUNES_SQL.as_ptr(), |
| 160 | + 0, |
| 161 | + ptr::null(), |
| 162 | + ); |
| 163 | + if res.is_null() { |
| 164 | + PQfinish(conn); |
| 165 | + panic!("PQprepare returned null"); |
| 166 | + } |
| 167 | + |
| 168 | + let st = PQresultStatus(res); |
| 169 | + PQclear(res); |
| 170 | + if st != ExecStatusType::PGRES_COMMAND_OK { |
| 171 | + let err = get_pg_error_message(conn); |
| 172 | + PQfinish(conn); |
| 173 | + panic!("prepare failed: {err}"); |
| 174 | + } |
| 175 | + |
| 176 | + PgConnection { conn } |
| 177 | + } |
| 178 | + } |
| 179 | +} |
| 180 | + |
| 181 | +#[cold] |
| 182 | +fn get_pg_error_message(conn: *mut PGconn) -> String { |
| 183 | + unsafe { |
| 184 | + CStr::from_ptr(PQerrorMessage(conn)) |
| 185 | + .to_string_lossy() |
| 186 | + .into_owned() |
| 187 | + } |
| 188 | +} |
0 commit comments