Skip to content

Commit 020112b

Browse files
feat: daily active agents
1 parent 1d487b4 commit 020112b

File tree

6 files changed

+333
-8
lines changed

6 files changed

+333
-8
lines changed

crates/api/src/openapi.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@ use utoipa::OpenApi;
100100
crate::routes::admin::bi_deployment_summary,
101101
crate::routes::admin::bi_status_history,
102102
crate::routes::admin::bi_usage,
103+
crate::routes::admin::bi_daily_active_agents,
103104
crate::routes::admin::bi_top_consumers,
104105
crate::routes::agents::start_instance,
105106
crate::routes::agents::stop_instance,
@@ -213,6 +214,8 @@ use utoipa::OpenApi;
213214
services::bi_metrics::DeploymentStatusCount,
214215
services::bi_metrics::DeploymentSummary,
215216
services::bi_metrics::StatusChangeRecord,
217+
services::bi_metrics::DailyActiveAgentsByType,
218+
services::bi_metrics::DailyActiveAgentsPoint,
216219
services::bi_metrics::UsageAggregation,
217220
services::bi_metrics::UsageGroupBy,
218221
services::bi_metrics::UsageRankBy,

crates/api/src/routes/admin.rs

Lines changed: 40 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,9 @@ use chrono::{DateTime, Utc};
1616
use serde::{Deserialize, Serialize};
1717
use services::analytics::{ActivityLogEntry, AnalyticsSummary, TopActiveUsersResponse};
1818
use services::bi_metrics::{
19-
DeploymentFilter, DeploymentRecord, DeploymentSummary, StatusChangeRecord, TopConsumer,
20-
TopConsumerFilter, TopConsumerGroupBy, UsageAggregation, UsageFilter, UsageGroupBy,
21-
UsageRankBy as BiUsageRankBy, UserSummary,
19+
DailyActiveAgentsPoint, DeploymentFilter, DeploymentRecord, DeploymentSummary,
20+
StatusChangeRecord, TopConsumer, TopConsumerFilter, TopConsumerGroupBy, UsageAggregation,
21+
UsageFilter, UsageGroupBy, UsageRankBy as BiUsageRankBy, UserSummary,
2222
};
2323

2424
/// Maximum rows for BI usage aggregation queries.
@@ -2376,6 +2376,42 @@ pub async fn bi_top_consumers(
23762376
}))
23772377
}
23782378

2379+
/// Get daily active agents (with usage > 0) time series (BI). Requires admin authentication.
2380+
/// Dates are UTC. Maximum range 365 days.
2381+
#[utoipa::path(
2382+
get,
2383+
path = "/v1/admin/bi/usage/daily-active-agents",
2384+
tag = "Admin",
2385+
params(BiSummaryQuery),
2386+
responses(
2387+
(status = 200, description = "Daily active agents time series", body = [DailyActiveAgentsPoint]),
2388+
(status = 400, description = "Bad request", body = crate::error::ApiErrorResponse),
2389+
(status = 401, description = "Unauthorized", body = crate::error::ApiErrorResponse),
2390+
(status = 403, description = "Forbidden - Admin access required", body = crate::error::ApiErrorResponse),
2391+
(status = 500, description = "Internal server error", body = crate::error::ApiErrorResponse)
2392+
),
2393+
security(
2394+
("session_token" = [])
2395+
)
2396+
)]
2397+
pub async fn bi_daily_active_agents(
2398+
State(app_state): State<AppState>,
2399+
Query(params): Query<BiSummaryQuery>,
2400+
) -> Result<Json<Vec<DailyActiveAgentsPoint>>, ApiError> {
2401+
validate_date_range(params.start_date, params.end_date)?;
2402+
2403+
let points = app_state
2404+
.bi_metrics_service
2405+
.get_daily_active_agents(params.start_date, params.end_date)
2406+
.await
2407+
.map_err(|e| {
2408+
tracing::error!("Failed to get daily active agents: {}", e);
2409+
ApiError::internal_server_error("Failed to get daily active agents")
2410+
})?;
2411+
2412+
Ok(Json(points))
2413+
}
2414+
23792415
/// Create admin router with all admin routes (requires admin authentication)
23802416
pub fn create_admin_router() -> Router<AppState> {
23812417
Router::new()
@@ -2424,6 +2460,7 @@ pub fn create_admin_router() -> Router<AppState> {
24242460
.route("/deployments", get(bi_list_deployments))
24252461
.route("/deployments/summary", get(bi_deployment_summary))
24262462
.route("/deployments/{id}/status-history", get(bi_status_history))
2463+
.route("/usage/daily-active-agents", get(bi_daily_active_agents))
24272464
.route("/usage", get(bi_usage))
24282465
.route("/usage/top", get(bi_top_consumers)),
24292466
)

crates/api/tests/bi_metrics_tests.rs

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
mod common;
22

3+
use chrono::{Duration, Utc};
34
use common::{create_test_server_and_db, mock_login, TestServerConfig};
45
use services::user::ports::UserRepository;
56
use uuid::Uuid;
@@ -724,6 +725,128 @@ async fn test_bi_usage_with_type_filter() {
724725
cleanup(&db, inst1_id, inst2_id).await;
725726
}
726727

728+
// =============================================================================
729+
// Daily active agents endpoint
730+
// =============================================================================
731+
732+
#[tokio::test]
733+
async fn test_bi_daily_active_agents_with_data() {
734+
let (server, db) = server_and_db().await;
735+
let admin_token = mock_login(&server, "bi_admin_daily_agents@admin.org").await;
736+
737+
let user = db
738+
.user_repository()
739+
.get_user_by_email("bi_admin_daily_agents@admin.org")
740+
.await
741+
.unwrap()
742+
.unwrap();
743+
744+
let (inst1_id, inst2_id) = seed_bi_test_data(&db, user.id.0).await;
745+
746+
// Date range that includes today (UTC) so the seeded usage events are counted
747+
let now = Utc::now();
748+
let start = (now - Duration::days(1))
749+
.format("%Y-%m-%dT00:00:00Z")
750+
.to_string();
751+
let end = (now + Duration::days(1))
752+
.format("%Y-%m-%dT00:00:00Z")
753+
.to_string();
754+
let url = format!(
755+
"/v1/admin/bi/usage/daily-active-agents?start_date={}&end_date={}",
756+
start, end,
757+
);
758+
759+
let response = server
760+
.get(&url)
761+
.add_header(AUTH, auth_header(&admin_token))
762+
.await;
763+
764+
assert_eq!(
765+
response.status_code(),
766+
200,
767+
"daily-active-agents should return 200"
768+
);
769+
770+
let body: serde_json::Value = response.json();
771+
let points = body.as_array().expect("response should be array of points");
772+
assert!(
773+
!points.is_empty(),
774+
"Should have at least one day in the range"
775+
);
776+
777+
// Today (UTC) should have 2 active agents (both instances have usage > 0)
778+
let today_str = now.format("%Y-%m-%d").to_string();
779+
let today_point = points
780+
.iter()
781+
.find(|p| p.get("date").and_then(|v| v.as_str()).unwrap_or("") == today_str);
782+
assert!(
783+
today_point.is_some(),
784+
"Should have a point for today {} in {:?}",
785+
today_str,
786+
points
787+
.iter()
788+
.map(|p| p.get("date").and_then(|v| v.as_str()).unwrap_or("?"))
789+
.collect::<Vec<_>>()
790+
);
791+
let count = today_point
792+
.unwrap()
793+
.get("active_agents_count")
794+
.unwrap()
795+
.as_i64()
796+
.unwrap();
797+
assert!(
798+
count >= 2,
799+
"Today should have at least 2 active agents (we seeded 2 instances with usage), got {}",
800+
count
801+
);
802+
803+
// Breakdown by instance type: we seeded one openclaw and one ironclaw with usage
804+
let by_type = today_point
805+
.unwrap()
806+
.get("by_instance_type")
807+
.and_then(|v| v.as_array())
808+
.unwrap_or(&vec![]);
809+
let openclaw_count = by_type
810+
.iter()
811+
.find(|e| e.get("instance_type").and_then(|v| v.as_str()) == Some("openclaw"))
812+
.and_then(|e| e.get("active_agents_count").and_then(|v| v.as_i64()))
813+
.unwrap_or(0);
814+
let ironclaw_count = by_type
815+
.iter()
816+
.find(|e| e.get("instance_type").and_then(|v| v.as_str()) == Some("ironclaw"))
817+
.and_then(|e| e.get("active_agents_count").and_then(|v| v.as_i64()))
818+
.unwrap_or(0);
819+
assert!(
820+
openclaw_count >= 1,
821+
"Today should have at least 1 openclaw active agent, by_instance_type={:?}",
822+
by_type
823+
);
824+
assert!(
825+
ironclaw_count >= 1,
826+
"Today should have at least 1 ironclaw active agent, by_instance_type={:?}",
827+
by_type
828+
);
829+
830+
cleanup(&db, inst1_id, inst2_id).await;
831+
}
832+
833+
#[tokio::test]
834+
async fn test_bi_daily_active_agents_requires_date_range() {
835+
let (server, _db) = server_and_db().await;
836+
let admin_token = mock_login(&server, "bi_admin_daily_no_range@admin.org").await;
837+
838+
// No query params: validate_date_range passes (both None), repo returns []
839+
let response = server
840+
.get("/v1/admin/bi/usage/daily-active-agents")
841+
.add_header(AUTH, auth_header(&admin_token))
842+
.await;
843+
844+
assert_eq!(response.status_code(), 200);
845+
let body: serde_json::Value = response.json();
846+
let points = body.as_array().unwrap();
847+
assert!(points.is_empty(), "No date range should return empty array");
848+
}
849+
727850
// =============================================================================
728851
// Top consumers endpoint with seeded data
729852
// =============================================================================

crates/database/src/repositories/bi_metrics_repository.rs

Lines changed: 120 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
use crate::pool::DbPool;
22
use async_trait::async_trait;
3-
use chrono::{DateTime, Utc};
3+
use chrono::{DateTime, Duration, Utc};
44
use services::bi_metrics::{
5-
BiMetricsRepository, DeploymentFilter, DeploymentRecord, DeploymentStatusCount,
6-
DeploymentSummary, ListUsersFilter, ListUsersSort, StatusChangeRecord, TopConsumer,
7-
TopConsumerFilter, TopConsumerGroupBy, UsageAggregation, UsageFilter, UsageGroupBy,
8-
UsageRankBy, UserWithStats, UsersSortBy, UsersSortOrder,
5+
BiMetricsRepository, DailyActiveAgentsByType, DailyActiveAgentsPoint, DeploymentFilter,
6+
DeploymentRecord, DeploymentStatusCount, DeploymentSummary, ListUsersFilter, ListUsersSort,
7+
StatusChangeRecord, TopConsumer, TopConsumerFilter, TopConsumerGroupBy, UsageAggregation,
8+
UsageFilter, UsageGroupBy, UsageRankBy, UserWithStats, UsersSortBy, UsersSortOrder,
99
};
1010
use services::user::ports::User;
1111
use services::UserId;
@@ -17,6 +17,9 @@ const AGGREGATION_TIMEOUT_MS: u32 = 30_000;
1717
/// Hard limit for unbounded queries (status history, usage aggregation).
1818
const MAX_UNBOUNDED_ROWS: i64 = 1000;
1919

20+
/// Max date range (days) for daily active agents time series.
21+
const MAX_DAILY_ACTIVE_AGENTS_RANGE_DAYS: i64 = 365;
22+
2023
pub struct PostgresBiMetricsRepository {
2124
pool: DbPool,
2225
}
@@ -744,4 +747,116 @@ WHERE 1=1
744747

745748
Ok((users, total_count as u64))
746749
}
750+
751+
async fn get_daily_active_agents(
752+
&self,
753+
start_date: Option<DateTime<Utc>>,
754+
end_date: Option<DateTime<Utc>>,
755+
) -> anyhow::Result<Vec<DailyActiveAgentsPoint>> {
756+
let (start, end) = match (start_date, end_date) {
757+
(Some(s), Some(e)) => {
758+
if s >= e {
759+
return Ok(Vec::new());
760+
}
761+
let cap_end = if (e - s).num_days() > MAX_DAILY_ACTIVE_AGENTS_RANGE_DAYS {
762+
s + Duration::days(MAX_DAILY_ACTIVE_AGENTS_RANGE_DAYS)
763+
} else {
764+
e
765+
};
766+
(s, cap_end)
767+
}
768+
_ => return Ok(Vec::new()),
769+
};
770+
771+
let mut client = self.pool.get().await?;
772+
let tx = client.build_transaction().start().await?;
773+
tx.execute(
774+
&format!("SET LOCAL statement_timeout = '{AGGREGATION_TIMEOUT_MS}'"),
775+
&[],
776+
)
777+
.await?;
778+
779+
// Total per day: generate all days, left join to count of distinct instance_id
780+
let rows = tx
781+
.query(
782+
r#"
783+
WITH days AS (
784+
SELECT d::date AS day
785+
FROM generate_series($1::timestamptz, $2::timestamptz, '1 day'::interval) AS d
786+
),
787+
daily_counts AS (
788+
SELECT (created_at AT TIME ZONE 'UTC')::date AS day,
789+
COUNT(DISTINCT instance_id)::bigint AS cnt
790+
FROM user_usage_event
791+
WHERE instance_id IS NOT NULL
792+
AND (quantity > 0 OR COALESCE(cost_nano_usd, 0) > 0)
793+
AND created_at >= $1
794+
AND created_at < $2 + interval '1 day'
795+
GROUP BY (created_at AT TIME ZONE 'UTC')::date
796+
)
797+
SELECT days.day, COALESCE(daily_counts.cnt, 0)::bigint
798+
FROM days
799+
LEFT JOIN daily_counts ON days.day = daily_counts.day
800+
ORDER BY days.day
801+
"#,
802+
&[&start, &end],
803+
)
804+
.await?;
805+
806+
// Per day per instance_type: join to agent_instances to get type
807+
let by_type_rows = tx
808+
.query(
809+
r#"
810+
SELECT (u.created_at AT TIME ZONE 'UTC')::date AS day,
811+
ai.type AS instance_type,
812+
COUNT(DISTINCT u.instance_id)::bigint AS cnt
813+
FROM user_usage_event u
814+
JOIN agent_instances ai ON u.instance_id = ai.id
815+
WHERE u.instance_id IS NOT NULL
816+
AND (u.quantity > 0 OR COALESCE(u.cost_nano_usd, 0) > 0)
817+
AND u.created_at >= $1
818+
AND u.created_at < $2 + interval '1 day'
819+
GROUP BY (u.created_at AT TIME ZONE 'UTC')::date, ai.type
820+
ORDER BY day, instance_type
821+
"#,
822+
&[&start, &end],
823+
)
824+
.await?;
825+
826+
tx.commit().await?;
827+
828+
// Index by day: day -> Vec<(instance_type, count)>
829+
let mut by_type_per_day: std::collections::HashMap<
830+
chrono::NaiveDate,
831+
Vec<DailyActiveAgentsByType>,
832+
> = std::collections::HashMap::new();
833+
for r in by_type_rows {
834+
let day: chrono::NaiveDate = r.get(0);
835+
let instance_type: String = r.get(1);
836+
let count: i64 = r.get(2);
837+
by_type_per_day
838+
.entry(day)
839+
.or_default()
840+
.push(DailyActiveAgentsByType {
841+
instance_type,
842+
active_agents_count: count,
843+
});
844+
}
845+
846+
let points = rows
847+
.into_iter()
848+
.map(|r| {
849+
let day: chrono::NaiveDate = r.get(0);
850+
let count: i64 = r.get(1);
851+
let by_instance_type = by_type_per_day.remove(&day).unwrap_or_default();
852+
DailyActiveAgentsPoint {
853+
date: day.format("%Y-%m-%d").to_string(),
854+
active_agents_count: count,
855+
by_instance_type,
856+
}
857+
})
858+
.collect();
859+
860+
Ok(points)
861+
}
747862
}

0 commit comments

Comments
 (0)