Skip to content

Commit 339c1a8

Browse files
authored
evals on query engine; more datasets apis (#1021)
* evals on query engine; more datasets apis * fix build, address dataset id comment * minor fix in types
1 parent 026da38 commit 339c1a8

File tree

14 files changed

+1007
-364
lines changed

14 files changed

+1007
-364
lines changed

app-server/src/api/v1/datasets.rs

Lines changed: 135 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,33 @@ use crate::{
1919

2020
#[derive(Deserialize)]
2121
#[serde(rename_all = "camelCase")]
22-
pub struct GetDatapointsRequestParams {
23-
#[serde(alias = "dataset_name", alias = "name")]
24-
dataset_name: String,
22+
struct GetDatasetsRequest {
23+
#[serde(default)]
24+
id: Option<Uuid>,
25+
#[serde(default)]
26+
name: Option<String>,
27+
}
28+
29+
#[get("/datasets")]
30+
async fn get_datasets(
31+
db: web::Data<DB>,
32+
project_api_key: ProjectApiKey,
33+
req: web::Query<GetDatasetsRequest>,
34+
) -> ResponseResult {
35+
let project_id = project_api_key.project_id;
36+
let db = db.into_inner();
37+
let request = req.into_inner();
38+
let datasets =
39+
db::datasets::get_datasets(&db.pool, project_id, request.id, request.name).await?;
40+
41+
Ok(HttpResponse::Ok().json(datasets))
42+
}
43+
44+
#[derive(Deserialize)]
45+
#[serde(rename_all = "camelCase")]
46+
struct GetDatapointsRequestParams {
47+
#[serde(flatten)]
48+
dataset: DatasetIdentifier,
2549
limit: i64,
2650
offset: i64,
2751
}
@@ -46,13 +70,19 @@ async fn get_datapoints(
4670
let query_engine = query_engine.into_inner().as_ref().clone();
4771
let query = params.into_inner();
4872

49-
let dataset_id =
50-
db::datasets::get_dataset_id_by_name(&db.pool, &query.dataset_name, project_id).await?;
51-
52-
let Some(dataset_id) = dataset_id else {
53-
return Ok(HttpResponse::NotFound().json(serde_json::json!({
54-
"error": "Dataset not found"
55-
})));
73+
let dataset_id = match query.dataset {
74+
DatasetIdentifier::Name(name) => {
75+
let Some(dataset_id) =
76+
db::datasets::get_dataset_id_by_name(&db.pool, &name.dataset_name, project_id)
77+
.await?
78+
else {
79+
return Ok(HttpResponse::NotFound().json(serde_json::json!({
80+
"error": "Dataset not found"
81+
})));
82+
};
83+
dataset_id
84+
}
85+
DatasetIdentifier::Id(id) => id.dataset_id,
5686
};
5787

5888
let select_query = "
@@ -129,28 +159,48 @@ async fn get_datapoints(
129159

130160
#[derive(Deserialize)]
131161
#[serde(rename_all = "camelCase")]
132-
pub struct CreateDatapointsRequest {
133-
// The alias is added to support the old endpoint (dataset_name)
134-
#[serde(alias = "dataset_name")]
162+
pub struct DatasetName {
163+
#[serde(alias = "dataset_name", alias = "name")]
135164
pub dataset_name: String,
136-
pub datapoints: Vec<CreateDatapointRequest>,
137165
}
138166

139167
#[derive(Deserialize)]
140-
pub struct CreateDatapointRequest {
168+
#[serde(rename_all = "camelCase")]
169+
pub struct DatasetId {
170+
#[serde(alias = "dataset_id")]
171+
pub dataset_id: Uuid,
172+
}
173+
174+
#[derive(Deserialize)]
175+
#[serde(rename_all = "camelCase")]
176+
#[serde(untagged)]
177+
enum DatasetIdentifier {
178+
Name(DatasetName),
179+
Id(DatasetId),
180+
}
181+
182+
#[derive(Deserialize)]
183+
#[serde(rename_all = "camelCase")]
184+
struct CreateDatapointsRequest {
185+
#[serde(flatten)]
186+
dataset: DatasetIdentifier,
187+
datapoints: Vec<RequestDatapoint>,
188+
#[serde(default)]
189+
create_dataset: bool,
190+
}
191+
192+
#[derive(Deserialize)]
193+
#[serde(rename_all = "camelCase")]
194+
struct RequestDatapoint {
141195
#[serde(default)]
142-
pub id: Option<Uuid>,
143-
pub data: serde_json::Value,
144-
pub target: Option<serde_json::Value>,
196+
id: Option<Uuid>,
197+
data: serde_json::Value,
198+
target: Option<serde_json::Value>,
145199
#[serde(default)]
146-
pub metadata: std::collections::HashMap<String, serde_json::Value>,
200+
metadata: std::collections::HashMap<String, serde_json::Value>,
147201
}
148202

149203
/// Create datapoints in a dataset
150-
///
151-
/// Request body should contain:
152-
/// - dataset_name: The name of the dataset to add datapoints to
153-
/// - datapoints: Array of datapoint objects with data, optional target, and optional metadata
154204
#[post("/datasets/datapoints")]
155205
async fn create_datapoints(
156206
req: web::Json<CreateDatapointsRequest>,
@@ -162,6 +212,7 @@ async fn create_datapoints(
162212
let db = db.into_inner();
163213
let clickhouse = clickhouse.into_inner().as_ref().clone();
164214
let request = req.into_inner();
215+
let mut created = false;
165216

166217
// Validate that we have datapoints to insert
167218
if request.datapoints.is_empty() {
@@ -170,21 +221,55 @@ async fn create_datapoints(
170221
})));
171222
}
172223

173-
let dataset_id =
174-
db::datasets::get_dataset_id_by_name(&db.pool, &request.dataset_name, project_id).await?;
175-
176-
let Some(dataset_id) = dataset_id else {
177-
return Ok(HttpResponse::NotFound().json(serde_json::json!({
178-
"error": "Dataset not found"
179-
})));
224+
let dataset_id = match request.dataset {
225+
DatasetIdentifier::Name(name) => {
226+
match db::datasets::get_dataset_id_by_name(&db.pool, &name.dataset_name, project_id)
227+
.await?
228+
{
229+
Some(dataset_id) => {
230+
if request.create_dataset {
231+
return Ok(HttpResponse::Conflict().json(serde_json::json!({
232+
"error": "Dataset with this name already exists"
233+
})));
234+
}
235+
dataset_id
236+
}
237+
None => {
238+
if request.create_dataset {
239+
let dataset =
240+
db::datasets::create_dataset(&db.pool, &name.dataset_name, project_id)
241+
.await?;
242+
created = true;
243+
dataset.id
244+
} else {
245+
return Ok(HttpResponse::NotFound().json(serde_json::json!({
246+
"error": "Dataset not found"
247+
})));
248+
}
249+
}
250+
}
251+
}
252+
DatasetIdentifier::Id(id) => {
253+
if request.create_dataset {
254+
return Ok(HttpResponse::BadRequest().json(serde_json::json!({
255+
"error": "When creating a new dataset, the name must be provided"
256+
})));
257+
}
258+
if !db::datasets::dataset_exists(&db.pool, id.dataset_id, project_id).await? {
259+
return Ok(HttpResponse::NotFound().json(serde_json::json!({
260+
"error": "Dataset not found"
261+
})));
262+
}
263+
id.dataset_id
264+
}
180265
};
181266

182267
// Convert request datapoints to Datapoint structs
183268
let datapoints: Vec<Datapoint> = request
184269
.datapoints
185270
.into_iter()
186271
.map(|dp_req| Datapoint {
187-
// now_v7 is guaranteed to be sorted by creation time
272+
// `now_v7` is guaranteed to be sorted by creation time
188273
id: dp_req.id.unwrap_or(Uuid::now_v7()),
189274
created_at: Utc::now(),
190275
dataset_id,
@@ -201,9 +286,26 @@ async fn create_datapoints(
201286

202287
ch_datapoints::insert_datapoints(clickhouse, ch_datapoints).await?;
203288

204-
Ok(HttpResponse::Created().json(serde_json::json!({
289+
let datapoint_info = datapoints
290+
.iter()
291+
.map(|dp| {
292+
serde_json::json!({
293+
"id": dp.id,
294+
"createdAt": dp.created_at,
295+
})
296+
})
297+
.collect::<Vec<_>>();
298+
299+
let mut response = if created {
300+
HttpResponse::Created()
301+
} else {
302+
HttpResponse::Ok()
303+
};
304+
Ok(response.json(serde_json::json!({
205305
"message": "Datapoints created successfully",
206-
"count": datapoints.len()
306+
"datasetId": dataset_id,
307+
"count": datapoints.len(),
308+
"datapointInfo": datapoint_info,
207309
})))
208310
}
209311

app-server/src/ch/evaluation_datapoints.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,11 @@ pub struct CHEvaluationDatapoint {
3131
pub data: String,
3232
pub target: String,
3333
pub metadata: String,
34+
#[serde(with = "clickhouse::serde::uuid")]
35+
pub dataset_id: Uuid,
36+
#[serde(with = "clickhouse::serde::uuid")]
37+
pub dataset_datapoint_id: Uuid,
38+
pub dataset_datapoint_created_at: i64,
3439
}
3540

3641
#[derive(Row, Deserialize)]
@@ -57,6 +62,22 @@ impl CHEvaluationDatapoint {
5762
metadata: json_value_to_string(
5863
&serde_json::to_value(result.metadata.unwrap_or_default()).unwrap_or_default(),
5964
),
65+
dataset_id: result
66+
.dataset_link
67+
.as_ref()
68+
.map(|link| link.dataset_id)
69+
.unwrap_or_default(),
70+
dataset_datapoint_id: result
71+
.dataset_link
72+
.as_ref()
73+
.map(|link| link.datapoint_id)
74+
.unwrap_or_default(),
75+
dataset_datapoint_created_at: chrono_to_nanoseconds(
76+
result
77+
.dataset_link
78+
.map(|link| link.created_at)
79+
.unwrap_or_default(),
80+
),
6081
}
6182
}
6283
}

app-server/src/db/datasets.rs

Lines changed: 66 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,60 @@
11
use anyhow::Result;
2-
use sqlx::PgPool;
2+
use chrono::{DateTime, Utc};
3+
use serde::Serialize;
4+
use sqlx::{FromRow, PgPool, QueryBuilder};
35
use uuid::Uuid;
46

7+
#[derive(Serialize, FromRow)]
8+
#[serde(rename_all = "camelCase")]
9+
pub struct Dataset {
10+
pub id: Uuid,
11+
pub name: String,
12+
pub project_id: Uuid,
13+
pub created_at: DateTime<Utc>,
14+
}
15+
16+
/// Get datasets by project id. If ID or name is provided, filter the results accordingly.
17+
pub async fn get_datasets(
18+
pool: &PgPool,
19+
project_id: Uuid,
20+
id: Option<Uuid>,
21+
name: Option<String>,
22+
) -> Result<Vec<Dataset>> {
23+
let mut query_builder = QueryBuilder::new(
24+
r#"SELECT id, name, project_id, created_at FROM datasets WHERE project_id = "#,
25+
);
26+
27+
query_builder.push_bind(project_id);
28+
if let Some(id) = id {
29+
query_builder.push(" AND id = ");
30+
query_builder.push_bind(id);
31+
}
32+
if let Some(name) = name {
33+
query_builder.push(" AND name = ");
34+
query_builder.push_bind(name);
35+
}
36+
query_builder.push(" ORDER BY created_at DESC");
37+
let datasets = query_builder
38+
.build_query_as::<Dataset>()
39+
.fetch_all(pool)
40+
.await?;
41+
42+
Ok(datasets)
43+
}
44+
45+
pub async fn create_dataset(pool: &PgPool, name: &str, project_id: Uuid) -> Result<Dataset> {
46+
let dataset = sqlx::query_as::<_, Dataset>(
47+
"INSERT INTO datasets (name, project_id) VALUES ($1, $2)
48+
RETURNING id, name, project_id, created_at",
49+
)
50+
.bind(name)
51+
.bind(project_id)
52+
.fetch_one(pool)
53+
.await?;
54+
55+
Ok(dataset)
56+
}
57+
558
pub async fn get_dataset_id_by_name(
659
pool: &PgPool,
760
name: &str,
@@ -18,6 +71,18 @@ pub async fn get_dataset_id_by_name(
1871
Ok(dataset_id)
1972
}
2073

74+
pub async fn dataset_exists(pool: &PgPool, dataset_id: Uuid, project_id: Uuid) -> Result<bool> {
75+
let dataset_exists = sqlx::query_scalar::<_, bool>(
76+
"SELECT EXISTS(SELECT 1 FROM datasets WHERE id = $1 AND project_id = $2)",
77+
)
78+
.bind(dataset_id)
79+
.bind(project_id)
80+
.fetch_one(pool)
81+
.await?;
82+
83+
Ok(dataset_exists)
84+
}
85+
2186
pub async fn get_parquet_path(
2287
pool: &PgPool,
2388
project_id: Uuid,

app-server/src/evaluations/utils.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use std::collections::HashMap;
22

3+
use chrono::{DateTime, Utc};
34
use serde::Deserialize;
45
use serde_json::Value;
56
use uuid::Uuid;
@@ -8,6 +9,14 @@ use crate::utils::json_value_to_string;
89

910
const MAX_JSON_VALUE_LENGTH: usize = 1000;
1011

12+
#[derive(Deserialize)]
13+
#[serde(rename_all = "camelCase")]
14+
pub struct EvaluationDatapointDatasetLink {
15+
pub dataset_id: Uuid,
16+
pub datapoint_id: Uuid,
17+
pub created_at: DateTime<Utc>,
18+
}
19+
1120
#[derive(Deserialize)]
1221
#[serde(rename_all = "camelCase")]
1322
pub struct EvaluationDatapointResult {
@@ -26,6 +35,8 @@ pub struct EvaluationDatapointResult {
2635
pub trace_id: Uuid,
2736
#[serde(default)]
2837
pub scores: HashMap<String, Option<f64>>,
38+
#[serde(default)]
39+
pub dataset_link: Option<EvaluationDatapointDatasetLink>,
2940
}
3041

3142
pub struct DatapointColumns {

app-server/src/main.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -797,6 +797,7 @@ fn main() -> anyhow::Result<()> {
797797
.service(
798798
web::scope("/v1")
799799
.wrap(project_auth.clone())
800+
.service(api::v1::datasets::get_datasets)
800801
.service(api::v1::datasets::get_datapoints)
801802
.service(api::v1::datasets::create_datapoints)
802803
.service(api::v1::datasets::get_parquet)

0 commit comments

Comments
 (0)