Skip to content

Commit 1a1140c

Browse files
authored
versioned datasets v0 (#999)
* versioned datasets v0 * fix build * order table by func on id, drop additional sort from views * update get datapoints on app server, update view name * fix view, accept id in post datapoints
1 parent 0e24130 commit 1a1140c

File tree

43 files changed

+1636
-753
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+1636
-753
lines changed

app-server/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ tracing = {version = "0.1.41", features = ["attributes"]}
5656
tracing-opentelemetry = "0.30.0"
5757
tracing-subscriber = {version = "0.3", features = ["env-filter", "fmt"]}
5858
url = "2.5.4"
59-
uuid = {version = "1.18.1", features = ["v4", "fast-rng", "macro-diagnostics", "serde"]}
59+
uuid = {version = "1.18.1", features = ["v4", "fast-rng", "macro-diagnostics", "serde", "v7", "std"]}
6060

6161
[build-dependencies]
6262
tonic-build = "0.13"

app-server/src/api/v1/datasets.rs

Lines changed: 85 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,27 @@
1-
use std::sync::Arc;
1+
use std::{collections::HashMap, sync::Arc};
22

33
use actix_web::{HttpResponse, get, post, web};
4+
use chrono::Utc;
45
use futures_util::StreamExt;
56
use serde::Deserialize;
7+
use serde_json::Value;
68
use uuid::Uuid;
79

810
use crate::{
9-
ch::datapoints as ch_datapoints,
10-
datasets::datapoints::Datapoint,
11+
ch::datapoints::{self as ch_datapoints},
12+
datasets::datapoints::{CHQueryEngineDatapoint, Datapoint},
1113
db::{self, DB, project_api_keys::ProjectApiKey},
14+
query_engine::QueryEngine,
1215
routes::{PaginatedResponse, types::ResponseResult},
16+
sql::{self, ClickhouseReadonlyClient},
1317
storage::{Storage, StorageTrait},
1418
};
1519

1620
#[derive(Deserialize)]
21+
#[serde(rename_all = "camelCase")]
1722
pub struct GetDatapointsRequestParams {
18-
name: String,
23+
#[serde(alias = "dataset_name", alias = "name")]
24+
dataset_name: String,
1925
limit: i64,
2026
offset: i64,
2127
}
@@ -24,39 +30,93 @@ pub struct GetDatapointsRequestParams {
2430
async fn get_datapoints(
2531
params: web::Query<GetDatapointsRequestParams>,
2632
db: web::Data<DB>,
27-
clickhouse: web::Data<clickhouse::Client>,
33+
clickhouse_ro: web::Data<Option<Arc<ClickhouseReadonlyClient>>>,
34+
query_engine: web::Data<Arc<QueryEngine>>,
2835
project_api_key: ProjectApiKey,
2936
) -> ResponseResult {
3037
let project_id = project_api_key.project_id;
3138
let db = db.into_inner();
32-
let clickhouse = clickhouse.into_inner().as_ref().clone();
39+
let clickhouse_ro = if let Some(clickhouse_ro) = clickhouse_ro.as_ref() {
40+
clickhouse_ro.clone()
41+
} else {
42+
return Ok(HttpResponse::InternalServerError().json(serde_json::json!({
43+
"error": "ClickHouse read-only client is not configured"
44+
})));
45+
};
46+
let query_engine = query_engine.into_inner().as_ref().clone();
3347
let query = params.into_inner();
3448

3549
let dataset_id =
36-
db::datasets::get_dataset_id_by_name(&db.pool, &query.name, project_id).await?;
50+
db::datasets::get_dataset_id_by_name(&db.pool, &query.dataset_name, project_id).await?;
3751

3852
let Some(dataset_id) = dataset_id else {
3953
return Ok(HttpResponse::NotFound().json(serde_json::json!({
4054
"error": "Dataset not found"
4155
})));
4256
};
4357

44-
// Get datapoints from ClickHouse
45-
let ch_datapoints = ch_datapoints::get_datapoints_paginated(
46-
clickhouse.clone(),
58+
let select_query = "
59+
SELECT
60+
id,
61+
dataset_id,
62+
created_at,
63+
data,
64+
target,
65+
metadata
66+
FROM dataset_datapoints
67+
WHERE dataset_id = {dataset_id:UUID}
68+
ORDER BY toUInt128(id) ASC
69+
LIMIT {limit:Int64}
70+
OFFSET {offset:Int64}
71+
";
72+
let parameters = HashMap::from([
73+
(
74+
"dataset_id".to_string(),
75+
Value::String(dataset_id.to_string()),
76+
),
77+
("limit".to_string(), Value::Number(query.limit.into())),
78+
("offset".to_string(), Value::Number(query.offset.into())),
79+
]);
80+
81+
let select_query_result = sql::execute_sql_query(
82+
select_query.to_string(),
4783
project_id,
48-
dataset_id,
49-
Some(query.limit),
50-
Some(query.offset),
84+
parameters.clone(),
85+
clickhouse_ro.clone(),
86+
query_engine.clone(),
5187
)
5288
.await?;
5389

54-
let total_count = ch_datapoints::count_datapoints(clickhouse, project_id, dataset_id).await?;
90+
let total_count_query = "
91+
SELECT COUNT(*) as count FROM dataset_datapoints
92+
WHERE dataset_id = {dataset_id:UUID}
93+
";
5594

56-
let datapoints: Vec<Datapoint> = ch_datapoints
95+
let total_count_result = sql::execute_sql_query(
96+
total_count_query.to_string(),
97+
project_id,
98+
HashMap::from([(
99+
"dataset_id".to_string(),
100+
Value::String(dataset_id.to_string()),
101+
)]),
102+
clickhouse_ro,
103+
query_engine,
104+
)
105+
.await?;
106+
107+
let total_count = total_count_result
108+
.first()
109+
.and_then(|v| v.get("count").and_then(|v| v.as_i64()).map(|v| v as u64))
110+
.unwrap_or_default();
111+
112+
let datapoints: Vec<Datapoint> = select_query_result
57113
.into_iter()
58-
.map(|ch_dp| ch_dp.into())
59-
.collect();
114+
.map(|ch_dp| {
115+
serde_json::from_value::<CHQueryEngineDatapoint>(ch_dp)
116+
.map_err(anyhow::Error::from)
117+
.and_then(|ch_dp| ch_dp.try_into())
118+
})
119+
.collect::<Result<Vec<Datapoint>, anyhow::Error>>()?;
60120

61121
let response = PaginatedResponse {
62122
total_count,
@@ -68,13 +128,18 @@ async fn get_datapoints(
68128
}
69129

70130
#[derive(Deserialize)]
131+
#[serde(rename_all = "camelCase")]
71132
pub struct CreateDatapointsRequest {
133+
// The alias is added to support the old endpoint (dataset_name)
134+
#[serde(alias = "dataset_name")]
72135
pub dataset_name: String,
73136
pub datapoints: Vec<CreateDatapointRequest>,
74137
}
75138

76139
#[derive(Deserialize)]
77140
pub struct CreateDatapointRequest {
141+
#[serde(default)]
142+
pub id: Option<Uuid>,
78143
pub data: serde_json::Value,
79144
pub target: Option<serde_json::Value>,
80145
#[serde(default)]
@@ -119,7 +184,9 @@ async fn create_datapoints(
119184
.datapoints
120185
.into_iter()
121186
.map(|dp_req| Datapoint {
122-
id: Uuid::new_v4(),
187+
// now_v7 is guaranteed to be sorted by creation time
188+
id: dp_req.id.unwrap_or(Uuid::now_v7()),
189+
created_at: Utc::now(),
123190
dataset_id,
124191
data: dp_req.data,
125192
target: dp_req.target,

app-server/src/ch/datapoints.rs

Lines changed: 8 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
11
use anyhow::Result;
2-
use chrono::Utc;
2+
use chrono::{DateTime, Utc};
33
use clickhouse::Row;
44
use serde::{Deserialize, Serialize};
5-
use std::collections::HashMap;
65
use uuid::Uuid;
76

87
use super::utils::chrono_to_nanoseconds;
@@ -27,18 +26,17 @@ pub struct CHDatapoint {
2726
impl From<CHDatapoint> for Datapoint {
2827
fn from(ch_datapoint: CHDatapoint) -> Self {
2928
// Parse JSON strings back to Values
30-
let data = serde_json::from_str(&ch_datapoint.data).unwrap_or(serde_json::Value::Null);
31-
let target = if ch_datapoint.target == "<null>" || ch_datapoint.target.is_empty() {
32-
None
33-
} else {
34-
serde_json::from_str(&ch_datapoint.target).ok()
35-
};
36-
let metadata: HashMap<String, serde_json::Value> =
37-
serde_json::from_str(&ch_datapoint.metadata).unwrap_or_default();
29+
let (data, target, metadata) = Datapoint::parse_string_payloads(
30+
ch_datapoint.data,
31+
ch_datapoint.target,
32+
ch_datapoint.metadata,
33+
)
34+
.unwrap();
3835

3936
Datapoint {
4037
id: ch_datapoint.id,
4138
dataset_id: ch_datapoint.dataset_id,
39+
created_at: DateTime::from_timestamp_nanos(ch_datapoint.created_at),
4240
data,
4341
target,
4442
metadata,
@@ -99,63 +97,3 @@ pub async fn insert_datapoints(
9997
)),
10098
}
10199
}
102-
103-
/// Get paginated datapoints from ClickHouse
104-
pub async fn get_datapoints_paginated(
105-
clickhouse: clickhouse::Client,
106-
project_id: Uuid,
107-
dataset_id: Uuid,
108-
limit: Option<i64>,
109-
offset: Option<i64>,
110-
) -> Result<Vec<CHDatapoint>> {
111-
let mut query = String::from(
112-
"SELECT
113-
id,
114-
dataset_id,
115-
project_id,
116-
created_at,
117-
data,
118-
target,
119-
metadata
120-
FROM dataset_datapoints
121-
WHERE project_id = ? AND dataset_id = ?
122-
ORDER BY created_at DESC",
123-
);
124-
125-
if let Some(limit) = limit {
126-
query.push_str(&format!(" LIMIT {}", limit));
127-
}
128-
if let Some(offset) = offset {
129-
query.push_str(&format!(" OFFSET {}", offset));
130-
}
131-
132-
let datapoints = clickhouse
133-
.query(&query)
134-
.bind(project_id)
135-
.bind(dataset_id)
136-
.fetch_all::<CHDatapoint>()
137-
.await?;
138-
139-
Ok(datapoints)
140-
}
141-
142-
#[derive(Row, Deserialize)]
143-
struct CountResult {
144-
count: u64,
145-
}
146-
147-
/// Count total datapoints in ClickHouse
148-
pub async fn count_datapoints(
149-
clickhouse: clickhouse::Client,
150-
project_id: Uuid,
151-
dataset_id: Uuid,
152-
) -> Result<u64> {
153-
let result = clickhouse
154-
.query("SELECT COUNT(*) as count FROM dataset_datapoints WHERE project_id = ? AND dataset_id = ?")
155-
.bind(project_id)
156-
.bind(dataset_id)
157-
.fetch_one::<CountResult>()
158-
.await?;
159-
160-
Ok(result.count)
161-
}

app-server/src/datasets/datapoints.rs

Lines changed: 54 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use std::collections::HashMap;
22

3-
use serde::Serialize;
3+
use chrono::{DateTime, NaiveDateTime, Utc};
4+
use serde::{Deserialize, Serialize};
45
use serde_json::Value;
56
use uuid::Uuid;
67

@@ -12,4 +13,56 @@ pub struct Datapoint {
1213
pub data: Value,
1314
pub target: Option<Value>,
1415
pub metadata: HashMap<String, Value>,
16+
pub created_at: DateTime<Utc>,
17+
}
18+
19+
impl Datapoint {
20+
pub fn parse_string_payloads(
21+
data: String,
22+
target: String,
23+
metadata: String,
24+
) -> Result<(Value, Option<Value>, HashMap<String, Value>), serde_json::Error> {
25+
let data = serde_json::from_str(&data).unwrap_or(serde_json::Value::Null);
26+
let target = if target == "<null>" || target.is_empty() || target.to_lowercase() == "null" {
27+
None
28+
} else {
29+
serde_json::from_str(&target).ok()
30+
};
31+
let metadata: HashMap<String, serde_json::Value> =
32+
serde_json::from_str(&metadata).unwrap_or_default();
33+
34+
Ok((data, target, metadata))
35+
}
36+
}
37+
38+
// This struct is similar to CHDatapoint, but for some reason
39+
// uuid parsing on that doesn't work with the query engine
40+
// returned owned strings (internally it uses Uuid::from_str which expects a &str)
41+
#[derive(Deserialize)]
42+
pub struct CHQueryEngineDatapoint {
43+
pub id: Uuid,
44+
pub dataset_id: Uuid,
45+
pub created_at: String,
46+
pub data: String,
47+
pub target: String,
48+
pub metadata: String,
49+
}
50+
51+
impl TryInto<Datapoint> for CHQueryEngineDatapoint {
52+
type Error = anyhow::Error;
53+
fn try_into(self) -> Result<Datapoint, Self::Error> {
54+
let (data, target, metadata) =
55+
Datapoint::parse_string_payloads(self.data, self.target, self.metadata)?;
56+
Ok(Datapoint {
57+
id: self.id,
58+
dataset_id: self.dataset_id,
59+
created_at: DateTime::from_naive_utc_and_offset(
60+
NaiveDateTime::parse_from_str(&self.created_at, "%Y-%m-%d %H:%M:%S%.f")?,
61+
Utc,
62+
),
63+
data,
64+
target,
65+
metadata,
66+
})
67+
}
1568
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
import { NextRequest, NextResponse } from "next/server";
2+
3+
import { countDatapoints } from "@/lib/actions/datapoints";
4+
5+
export async function GET(
6+
req: NextRequest,
7+
props: { params: Promise<{ projectId: string; datasetId: string }> }
8+
): Promise<Response> {
9+
const params = await props.params;
10+
11+
try {
12+
const countData = await countDatapoints({
13+
projectId: params.projectId,
14+
datasetId: params.datasetId,
15+
});
16+
17+
return NextResponse.json(countData);
18+
} catch (error) {
19+
console.error("Error listing datapoints:", error);
20+
return NextResponse.json({ error: "Internal server error" }, { status: 500 });
21+
}
22+
}

0 commit comments

Comments
 (0)