Skip to content

Commit e3e0483

Browse files
add per-request transfer byte tracking (#52)
Track response bytes per API key so we can see who is consuming transfer. Wraps the response body in a counting layer inside the limit middleware, records bytes into user_queries, and aggregates into daily_user_queries. Adds a TransferExceeded error variant for future enforcement. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 7d8bb7d commit e3e0483

File tree

7 files changed

+119
-33
lines changed

7 files changed

+119
-33
lines changed

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

be/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,8 @@ nonzero = "0.2.0"
6060
bytes = "1.9.0"
6161
serde_html_form = "0.2.7"
6262
dashmap = "6.1.0"
63+
http-body = "1"
64+
pin-project-lite = "0.2"
6365
time = { version = "0.3", features = ["serde", "formatting"] }
6466
handlebars = "6.3.2"
6567

be/src/api.rs

Lines changed: 77 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,12 @@ use std::{
33
convert::Infallible,
44
fmt::{self, Debug},
55
net::SocketAddr,
6-
sync::{Arc, Mutex},
6+
pin::Pin,
7+
sync::{
8+
atomic::{AtomicU64, Ordering},
9+
Arc, Mutex,
10+
},
11+
task::{Context, Poll},
712
};
813

914
use axum::{
@@ -26,7 +31,10 @@ use serde_json::{json, Value};
2631
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
2732
use url::Url;
2833

29-
use crate::{broadcast, gafe};
34+
use http_body::Frame;
35+
use pin_project_lite::pin_project;
36+
37+
use crate::{broadcast, gafe, user_query};
3038

3139
macro_rules! user_error {
3240
($e:expr) => {
@@ -127,6 +135,7 @@ pub enum Error {
127135
User(String),
128136
Timeout(Option<String>),
129137
TooManyRequests(Option<String>),
138+
TransferExceeded(Option<String>),
130139

131140
Server(Box<dyn std::error::Error + Send + Sync>),
132141
}
@@ -150,6 +159,10 @@ impl Serialize for Error {
150159
state.serialize_field("error", "too_many_requests")?;
151160
state.serialize_field("message", &opt_msg)?;
152161
}
162+
Error::TransferExceeded(opt_msg) => {
163+
state.serialize_field("error", "transfer_exceeded")?;
164+
state.serialize_field("message", &opt_msg)?;
165+
}
153166
Error::Server(err) => {
154167
state.serialize_field("error", "server")?;
155168
state.serialize_field("message", &err.to_string())?;
@@ -167,6 +180,8 @@ impl std::fmt::Display for Error {
167180
Error::Timeout(None) => write!(f, "Operation timed out"),
168181
Error::TooManyRequests(Some(msg)) => write!(f, "Too many requests: {msg}"),
169182
Error::TooManyRequests(None) => write!(f, "Too many requests"),
183+
Error::TransferExceeded(Some(msg)) => write!(f, "Transfer exceeded: {msg}"),
184+
Error::TransferExceeded(None) => write!(f, "Transfer limit exceeded"),
170185
Error::Server(err) => write!(f, "Server error: {err}"),
171186
}
172187
}
@@ -219,6 +234,12 @@ impl axum::response::IntoResponse for Error {
219234
StatusCode::TOO_MANY_REQUESTS,
220235
msg.unwrap_or(String::from("too many requests")),
221236
),
237+
Self::TransferExceeded(msg) => (
238+
StatusCode::TOO_MANY_REQUESTS,
239+
msg.unwrap_or(String::from(
240+
"Transfer limit exceeded. Upgrade at: https://www.indexsupply.net",
241+
)),
242+
),
222243
Self::User(msg) => (StatusCode::BAD_REQUEST, msg),
223244
Self::Server(e) => {
224245
tracing::error!(%e, "server-error={:?}", e);
@@ -285,10 +306,21 @@ pub async fn limit(
285306
"Rate limited. Create or upgrade API Key at: https://www.indexsupply.net",
286307
))));
287308
}
288-
match tokio::time::timeout(account_limit.timeout, next.run(request)).await {
289-
Ok(response) => Ok(response),
290-
Err(_) => Err(Error::Timeout(None)),
291-
}
309+
let log = request.extensions().get::<user_query::RequestLog>().cloned();
310+
let response = match tokio::time::timeout(account_limit.timeout, next.run(request)).await {
311+
Ok(response) => response,
312+
Err(_) => return Err(Error::Timeout(None)),
313+
};
314+
let (parts, body) = response.into_parts();
315+
let counting = CountingBody {
316+
inner: body,
317+
count: Arc::new(AtomicU64::new(0)),
318+
log,
319+
};
320+
Ok(axum::http::Response::from_parts(
321+
parts,
322+
axum::body::Body::new(counting),
323+
))
292324
}
293325

294326
#[derive(Clone, Copy, Default, Debug, Deserialize, Eq, Hash, PartialEq, Serialize)]
@@ -496,19 +528,45 @@ pub async fn latency_header(
496528
Ok(response)
497529
}
498530

499-
pub async fn content_length_header(
500-
request: axum::extract::Request,
501-
next: axum::middleware::Next,
502-
) -> Result<axum::response::Response, Error> {
503-
let response = next.run(request).await;
504-
let span = tracing::Span::current();
505-
response
506-
.headers()
507-
.get("content-length")
508-
.and_then(|cl| cl.to_str().ok())
509-
.map(|cl| cl.parse::<u64>().ok())
510-
.map(|size| span.record("size", size));
511-
Ok(response)
531+
pin_project! {
532+
pub struct CountingBody<B> {
533+
#[pin]
534+
inner: B,
535+
count: Arc<AtomicU64>,
536+
log: Option<user_query::RequestLog>,
537+
}
538+
}
539+
540+
impl<B> http_body::Body for CountingBody<B>
541+
where
542+
B: http_body::Body<Data = bytes::Bytes>,
543+
{
544+
type Data = B::Data;
545+
type Error = B::Error;
546+
547+
fn poll_frame(
548+
self: Pin<&mut Self>,
549+
cx: &mut Context<'_>,
550+
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
551+
let this = self.project();
552+
match this.inner.poll_frame(cx) {
553+
Poll::Ready(Some(Ok(frame))) => {
554+
if let Some(data) = frame.data_ref() {
555+
this.count.fetch_add(data.len() as u64, Ordering::Relaxed);
556+
}
557+
Poll::Ready(Some(Ok(frame)))
558+
}
559+
Poll::Ready(None) => {
560+
let total = this.count.load(Ordering::Relaxed);
561+
tracing::Span::current().record("size", total);
562+
if let Some(log) = this.log.take() {
563+
log.set_bytes(total);
564+
}
565+
Poll::Ready(None)
566+
}
567+
other => other,
568+
}
569+
}
512570
}
513571

514572
pub async fn log_fields(

be/src/main.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,6 @@ fn service(config: api::Config) -> IntoMakeServiceWithConnectInfo<Router, Socket
201201
.layer(axum::middleware::from_fn(api::latency_header))
202202
.layer(tracing)
203203
.layer(axum::middleware::from_fn(api::log_fields))
204-
.layer(axum::middleware::from_fn(api::content_length_header))
205204
.layer(HandleErrorLayer::new(api::handle_service_error))
206205
.load_shed()
207206
.concurrency_limit(1024)

be/src/user_query.rs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ pub struct Row {
1515
pub latency: u16,
1616
pub status: u32,
1717
pub qty: u16,
18+
pub bytes: u64,
1819
}
1920

2021
impl Row {
@@ -28,6 +29,7 @@ impl Row {
2829
latency: 0,
2930
status: 0,
3031
qty: 1,
32+
bytes: 0,
3133
}
3234
}
3335
}
@@ -72,6 +74,12 @@ impl RequestLog {
7274
}
7375
}
7476

77+
pub fn set_bytes(&self, bytes: u64) {
78+
for row in self.0.lock().unwrap().rows.iter_mut() {
79+
row.bytes = bytes;
80+
}
81+
}
82+
7583
async fn insert(self, pool: deadpool_postgres::Pool, status: u16, ip: String) {
7684
// only if no one else has the log
7785
if let Ok(log) = Arc::try_unwrap(self.0).map(Mutex::into_inner) {
@@ -125,8 +133,9 @@ pub async fn insert(pool: deadpool_postgres::Pool, row: Row) {
125133
latency,
126134
status,
127135
ip,
128-
qty
129-
) values ($1, $2, $3, $4, $5, $6, $7, $8)",
136+
qty,
137+
bytes
138+
) values ($1, $2, $3, $4, $5, $6, $7, $8, $9)",
130139
&[
131140
&row.api_key,
132141
&U64::from(row.chain),
@@ -136,6 +145,7 @@ pub async fn insert(pool: deadpool_postgres::Pool, row: Row) {
136145
&(row.status as i16),
137146
&row.ip,
138147
&(row.qty as i16),
148+
&(row.bytes as i64),
139149
],
140150
)
141151
.await;

fe/src/main.rs

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -276,23 +276,29 @@ async fn update_daily_user_queries(
276276
q.api_key,
277277
k.owner_email,
278278
date_trunc('day', q.created_at)::date as day,
279-
q.qty
279+
q.qty,
280+
coalesce(q.bytes, 0) as bytes
280281
from user_queries q
281282
join api_keys k on q.api_key = k.secret
282283
where q.created_at >= date_trunc('day', now())
283284
and q.created_at < date_trunc('day', now() + interval '1 day')
284285
),
285286
aggregated as (
286-
select owner_email, day, sum(qty)::int8 as total_qty
287+
select owner_email, day,
288+
sum(qty)::int8 as total_qty,
289+
sum(bytes)::int8 as total_bytes
287290
from one_day
288291
group by owner_email, day
289292
),
290293
upserted as (
291-
insert into daily_user_queries (owner_email, day, n, updated_at)
292-
select owner_email, day, total_qty, now()
294+
insert into daily_user_queries (owner_email, day, n, bytes, updated_at)
295+
select owner_email, day, total_qty, total_bytes, now()
293296
from aggregated
294297
on conflict (owner_email, day)
295-
do update set n = excluded.n, updated_at = excluded.updated_at
298+
do update set
299+
n = excluded.n,
300+
bytes = excluded.bytes,
301+
updated_at = excluded.updated_at
296302
)
297303
select count(*)::int8, coalesce(sum(qty), 0)::int8
298304
from one_day
@@ -321,23 +327,29 @@ async fn update_wl_daily_user_queries(
321327
k.provision_key,
322328
k.org,
323329
date_trunc('day', q.created_at)::date as day,
324-
q.qty
330+
q.qty,
331+
coalesce(q.bytes, 0) as bytes
325332
from user_queries q
326333
join wl_api_keys k on q.api_key = k.secret
327334
where q.created_at >= date_trunc('day', now())
328335
and q.created_at < date_trunc('day', now() + interval '1 day')
329336
),
330337
aggregated as (
331-
select provision_key, org, day, sum(qty)::int8 as total_qty
338+
select provision_key, org, day,
339+
sum(qty)::int8 as total_qty,
340+
sum(bytes)::int8 as total_bytes
332341
from one_day
333342
group by provision_key, org, day
334343
),
335344
upserted as (
336-
insert into wl_daily_user_queries (provision_key, org, day, n, updated_at)
337-
select provision_key, org, day, total_qty, now()
345+
insert into wl_daily_user_queries (provision_key, org, day, n, bytes, updated_at)
346+
select provision_key, org, day, total_qty, total_bytes, now()
338347
from aggregated
339348
on conflict (provision_key, org, day)
340-
do update set n = excluded.n, updated_at = excluded.updated_at
349+
do update set
350+
n = excluded.n,
351+
bytes = excluded.bytes,
352+
updated_at = excluded.updated_at
341353
)
342354
select count(*)::int8, coalesce(sum(qty), 0)::int8
343355
from one_day;

fe/src/schema.sql

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,13 +130,15 @@ create table if not exists user_queries(
130130
status int2,
131131
qty int2 default 1,
132132
created_at timestamptz default now(),
133-
ip text
133+
ip text,
134+
bytes int8 default 0
134135
);
135136

136137
create table if not exists daily_user_queries (
137138
owner_email text not null,
138139
day date not null,
139140
n int8 not null,
141+
bytes int8 not null default 0,
140142
updated_at timestamptz not null default now(),
141143
primary key (owner_email, day)
142144
);
@@ -146,6 +148,7 @@ create table if not exists wl_daily_user_queries(
146148
org text not null,
147149
day date not null,
148150
n int8 not null,
151+
bytes int8 not null default 0,
149152
updated_at timestamptz not null default now(),
150153
primary key (provision_key, org, day)
151154
);

0 commit comments

Comments
 (0)