Skip to content

Commit 431c309

Browse files
authored
Merge pull request #906 from lmnr-ai/dev
Realtime updates, move traces and spans to query engine, frontend fixes
2 parents 0268638 + 6e332c8 commit 431c309

File tree

142 files changed

+16536
-3510
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

142 files changed

+16536
-3510
lines changed

.env

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,17 +8,21 @@ RABBITMQ_DEFAULT_USER=admin
88
RABBITMQ_DEFAULT_PASS=adminpasswd
99

1010
CLICKHOUSE_USER=ch_user
11+
CLICKHOUSE_RO_USER=ch_user
1112
CLICKHOUSE_PASSWORD=ch_passwd
13+
CLICKHOUSE_RO_PASSWORD=ch_passwd
1214
POSTGRES_PORT=5433
1315
# must be exactly 32 bytes (64 hex characters)
1416
AEAD_SECRET_KEY=0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef
17+
# could be any value
18+
NEXTAUTH_SECRET=0123456789abcdef0123456789abcdef
1519

1620

1721
# Uncomment and set these to override the default exposed ports.
18-
# APP_SERVER_HOST_PORT=8080
19-
# APP_SERVER_GRPC_HOST_PORT=8081
22+
# APP_SERVER_HOST_PORT=8000
23+
# APP_SERVER_GRPC_HOST_PORT=8001
2024
# FRONTEND_HOST_PORT=5667
21-
# AGENT_MANAGER_HOST_PORT=8901
25+
# QUERY_ENGINE_HOST_PORT=8903
2226

2327
# Uncomment and set this to enable OpenAI support. Currently used in Next.js
2428
# to generate chat names.

.github/workflows/build-push.yml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@ jobs:
2020
- dockerfile: ./frontend/Dockerfile
2121
context: ./frontend
2222
image: ghcr.io/lmnr-ai/frontend
23+
- dockerfile: ./query-engine/Dockerfile
24+
context: ./query-engine
25+
image: ghcr.io/lmnr-ai/query-engine
2326
permissions:
2427
contents: read
2528
packages: write
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
name: Check Query Engine Tests
2+
3+
on:
4+
pull_request:
5+
types:
6+
- synchronize
7+
- opened
8+
- reopened
9+
paths:
10+
- 'query-engine/**'
11+
12+
jobs:
13+
test:
14+
runs-on: ubuntu-latest
15+
16+
steps:
17+
- name: Checkout code
18+
uses: actions/checkout@v4
19+
- name: Set up Python
20+
uses: actions/setup-python@v5
21+
with:
22+
python-version: 3.13
23+
- name: Install uv
24+
uses: astral-sh/setup-uv@v6
25+
with:
26+
activate-environment: true
27+
- name: Install the project
28+
working-directory: ./query-engine
29+
run: uv sync --all-extras --dev
30+
- name: Run tests
31+
working-directory: ./query-engine
32+
run: uv run pytest

app-server/.env.example

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ RABBITMQ_URL=amqp://admin:adminpasswd@localhost:5672/%2f
99
CLICKHOUSE_URL=http://localhost:8123
1010
CLICKHOUSE_USER=ch_user
1111
CLICKHOUSE_PASSWORD=ch_passwd
12+
CLICKHOUSE_RO_USER=ch_user
13+
CLICKHOUSE_RO_PASSWORD=ch_passwd
1214

1315
SHARED_SECRET_TOKEN=some_secret
1416
# must be exactly 32 bytes (64 hex characters)
@@ -18,5 +20,4 @@ ENVIRONMENT=FULL
1820
# Optional, if you want to use Redis for caching. Create a Redis/Valkey instance and set the URL here.
1921
# REDIS_URL=redis://localhost:6379
2022

21-
22-
QUERY_ENGINE_URL=
23+
QUERY_ENGINE_URL=http://localhost:8903

app-server/src/api/v1/evaluators.rs

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
use std::sync::Arc;
2+
13
use actix_web::{
24
HttpResponse, post,
35
web::{Data, Json},
@@ -12,9 +14,10 @@ use crate::{
1214
DB,
1315
evaluators::{EvaluatorScoreSource, insert_evaluator_score},
1416
project_api_keys::ProjectApiKey,
15-
spans::{get_root_span_id, is_span_in_project},
1617
},
18+
query_engine::QueryEngine,
1719
routes::types::ResponseResult,
20+
sql::{self, ClickhouseReadonlyClient},
1821
};
1922

2023
#[derive(Deserialize)]
@@ -55,9 +58,14 @@ pub async fn create_evaluator_score(
5558
req: Json<CreateEvaluatorScoreRequest>,
5659
db: Data<DB>,
5760
clickhouse: Data<clickhouse::Client>,
61+
clickhouse_ro: Data<Option<Arc<ClickhouseReadonlyClient>>>,
62+
query_engine: Data<Arc<QueryEngine>>,
5863
project_api_key: ProjectApiKey,
5964
) -> ResponseResult {
6065
let req = req.into_inner();
66+
let clickhouse_ro = clickhouse_ro.as_ref().clone().unwrap();
67+
let query_engine = query_engine.as_ref().clone();
68+
let clickhouse = clickhouse.as_ref().clone();
6169

6270
// Extract common fields from both variants
6371
let (name, metadata, score, source) = match &req {
@@ -85,11 +93,21 @@ pub async fn create_evaluator_score(
8593

8694
let span_id = match &req {
8795
CreateEvaluatorScoreRequest::WithTraceId(req) => {
88-
get_root_span_id(&db.pool, &req.trace_id, &project_api_key.project_id).await?
96+
sql::queries::get_top_span_id(
97+
clickhouse_ro,
98+
query_engine,
99+
req.trace_id,
100+
project_api_key.project_id,
101+
)
102+
.await?
89103
}
90104
CreateEvaluatorScoreRequest::WithSpanId(req) => {
91-
let exists =
92-
is_span_in_project(&db.pool, &req.span_id, &project_api_key.project_id).await?;
105+
let exists = crate::ch::spans::is_span_in_project(
106+
clickhouse.clone(),
107+
req.span_id,
108+
project_api_key.project_id,
109+
)
110+
.await?;
93111
if !exists {
94112
return Ok(HttpResponse::NotFound().body("No matching spans found"));
95113
}
@@ -118,7 +136,7 @@ pub async fn create_evaluator_score(
118136
.await?;
119137

120138
let _ = insert_evaluator_score_ch(
121-
clickhouse.into_inner().as_ref().clone(),
139+
clickhouse.clone(),
122140
score_id,
123141
project_id,
124142
name,

app-server/src/api/v1/tag.rs

Lines changed: 32 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
1+
use std::sync::Arc;
2+
13
use crate::{
2-
ch::spans::append_tags_to_span,
3-
db::{self, DB, project_api_keys::ProjectApiKey, tags::TagSource},
4+
ch::{spans::append_tags_to_span, tags::insert_tag},
5+
db::{project_api_keys::ProjectApiKey, tags::TagSource},
6+
query_engine::QueryEngine,
47
routes::types::ResponseResult,
5-
tags::insert_or_update_tag,
8+
sql::{self, ClickhouseReadonlyClient},
69
};
710
use actix_web::{
811
HttpResponse, post,
@@ -37,8 +40,9 @@ pub enum TagRequest {
3740
#[post("tag")]
3841
pub async fn tag_trace(
3942
req: Json<TagRequest>,
40-
db: web::Data<DB>,
4143
clickhouse: web::Data<clickhouse::Client>,
44+
clickhouse_ro: web::Data<Option<Arc<ClickhouseReadonlyClient>>>,
45+
query_engine: web::Data<Arc<QueryEngine>>,
4246
project_api_key: ProjectApiKey,
4347
) -> ResponseResult {
4448
let req = req.into_inner();
@@ -49,15 +53,27 @@ pub async fn tag_trace(
4953
if names.is_empty() {
5054
return Ok(HttpResponse::BadRequest().body("No names provided"));
5155
}
56+
let clickhouse_ro = clickhouse_ro.as_ref().clone().unwrap();
57+
let query_engine = query_engine.as_ref().clone();
58+
let clickhouse = clickhouse.as_ref().clone();
59+
5260
let span_id = match &req {
5361
TagRequest::WithTraceId(req) => {
54-
db::spans::get_root_span_id(&db.pool, &req.trace_id, &project_api_key.project_id)
55-
.await?
62+
sql::queries::get_top_span_id(
63+
clickhouse_ro,
64+
query_engine,
65+
req.trace_id,
66+
project_api_key.project_id,
67+
)
68+
.await?
5669
}
5770
TagRequest::WithSpanId(req) => {
58-
let exists =
59-
db::spans::is_span_in_project(&db.pool, &req.span_id, &project_api_key.project_id)
60-
.await?;
71+
let exists = crate::ch::spans::is_span_in_project(
72+
clickhouse.clone(),
73+
req.span_id,
74+
project_api_key.project_id,
75+
)
76+
.await?;
6177
if !exists {
6278
return Ok(HttpResponse::NotFound().body("No matching spans found"));
6379
}
@@ -69,26 +85,20 @@ pub async fn tag_trace(
6985
return Ok(HttpResponse::NotFound().body("No matching spans found"));
7086
};
7187

72-
let clickhouse = clickhouse.as_ref().clone();
73-
7488
let futures = names
7589
.iter()
7690
.map(|name| {
77-
insert_or_update_tag(
78-
&db.pool,
91+
insert_tag(
7992
clickhouse.clone(),
8093
project_api_key.project_id,
81-
Uuid::new_v4(),
82-
span_id,
83-
None,
84-
None,
8594
name.clone(),
8695
TagSource::CODE,
96+
span_id,
8797
)
8898
})
8999
.collect::<Vec<_>>();
90100

91-
let tags = futures_util::future::try_join_all(futures).await?;
101+
let tag_ids = futures_util::future::try_join_all(futures).await?;
92102

93103
append_tags_to_span(
94104
clickhouse.clone(),
@@ -98,14 +108,12 @@ pub async fn tag_trace(
98108
)
99109
.await?;
100110

101-
let response = tags
111+
let response = tag_ids
102112
.iter()
103-
.map(|tag| {
113+
.map(|id| {
104114
serde_json::json!({
105-
"id": tag.id,
106-
"spanId": tag.span_id,
107-
"createdAt": tag.created_at,
108-
"updatedAt": tag.updated_at,
115+
"id": id,
116+
"spanId": span_id,
109117
})
110118
})
111119
.collect::<Vec<_>>();

app-server/src/ch/spans.rs

Lines changed: 37 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -99,17 +99,23 @@ impl CHSpan {
9999
let user_id = span.attributes.user_id();
100100
let path = span.attributes.flat_path();
101101

102-
let span_input_string = span
103-
.input
104-
.as_ref()
105-
.map(|input| sanitize_string(&input.to_string()))
106-
.unwrap_or(String::new());
107-
108-
let span_output_string = span
109-
.output
110-
.as_ref()
111-
.map(|output| sanitize_string(&output.to_string()))
112-
.unwrap_or(String::new());
102+
let span_input_string = if let Some(input_url) = &span.input_url {
103+
format!("<lmnr_payload_url>{}</lmnr_payload_url>", input_url)
104+
} else {
105+
span.input
106+
.as_ref()
107+
.map(|input| sanitize_string(&input.to_string()))
108+
.unwrap_or(String::new())
109+
};
110+
111+
let span_output_string = if let Some(output_url) = &span.output_url {
112+
format!("<lmnr_payload_url>{}</lmnr_payload_url>", output_url)
113+
} else {
114+
span.output
115+
.as_ref()
116+
.map(|output| sanitize_string(&output.to_string()))
117+
.unwrap_or(String::new())
118+
};
113119

114120
let trace_metadata = span.attributes.metadata().map_or(String::new(), |m| {
115121
serde_json::to_string(&m).unwrap_or_default()
@@ -135,18 +141,15 @@ impl CHSpan {
135141
.unwrap_or(String::from("")),
136142
request_model: usage.request_model.clone().unwrap_or(String::from("")),
137143
response_model: usage.response_model.clone().unwrap_or(String::from("")),
138-
session_id: session_id.unwrap_or(String::from("<null>")),
144+
session_id: session_id.unwrap_or(String::from("")),
139145
project_id: project_id,
140146
trace_id: span.trace_id,
141-
provider: usage
142-
.provider_name
143-
.clone()
144-
.unwrap_or(String::from("<null>")),
145-
user_id: user_id.unwrap_or(String::from("<null>")),
146-
path: path.unwrap_or(String::from("<null>")),
147+
provider: usage.provider_name.clone().unwrap_or(String::from("")),
148+
user_id: user_id.unwrap_or(String::from("")),
149+
path: path.unwrap_or(String::from("")),
147150
input: span_input_string,
148151
output: span_output_string,
149-
status: span.status.clone().unwrap_or(String::from("<null>")),
152+
status: span.status.clone().unwrap_or(String::from("")),
150153
size_bytes: size_bytes as u64,
151154
attributes: span.attributes.to_string(),
152155
trace_metadata,
@@ -215,3 +218,18 @@ pub async fn append_tags_to_span(
215218

216219
Ok(())
217220
}
221+
222+
pub async fn is_span_in_project(
223+
clickhouse: clickhouse::Client,
224+
span_id: Uuid,
225+
project_id: Uuid,
226+
) -> Result<bool> {
227+
let result = clickhouse
228+
.query("SELECT count(*) FROM spans WHERE span_id = ? AND project_id = ?")
229+
.bind(span_id)
230+
.bind(project_id)
231+
.fetch_one::<u64>()
232+
.await?;
233+
234+
Ok(result > 0)
235+
}

app-server/src/ch/tags.rs

Lines changed: 7 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -36,17 +36,11 @@ pub struct CHTag {
3636
}
3737

3838
impl CHTag {
39-
pub fn new(
40-
project_id: Uuid,
41-
class_id: Uuid,
42-
id: Uuid,
43-
name: String,
44-
source: TagSource,
45-
span_id: Uuid,
46-
) -> Self {
39+
pub fn new(project_id: Uuid, id: Uuid, name: String, source: TagSource, span_id: Uuid) -> Self {
4740
Self {
4841
project_id,
49-
class_id,
42+
// TODO: Remove this once we drop the class_id column
43+
class_id: Uuid::nil(),
5044
created_at: chrono_to_nanoseconds(Utc::now()),
5145
id,
5246
name,
@@ -59,20 +53,19 @@ impl CHTag {
5953
pub async fn insert_tag(
6054
client: clickhouse::Client,
6155
project_id: Uuid,
62-
class_id: Uuid,
63-
id: Uuid,
6456
name: String,
6557
source: TagSource,
6658
span_id: Uuid,
67-
) -> Result<()> {
68-
let tag = CHTag::new(project_id, class_id, id, name, source, span_id);
59+
) -> Result<Uuid> {
60+
let id = Uuid::new_v4();
61+
let tag = CHTag::new(project_id, id, name, source, span_id);
6962
let ch_insert = client.insert("tags");
7063
match ch_insert {
7164
Ok(mut ch_insert) => {
7265
ch_insert.write(&tag).await?;
7366
let ch_insert_end_res = ch_insert.end().await;
7467
match ch_insert_end_res {
75-
Ok(_) => Ok(()),
68+
Ok(_) => Ok(id),
7669
Err(e) => {
7770
return Err(anyhow::anyhow!("Clickhouse tag insertion failed: {:?}", e));
7871
}

0 commit comments

Comments
 (0)