Skip to content

Commit 3938846

Browse files
authored
[ENH]: Tracing for rust sysdb (#6333)
## Description of changes This change adds tracing spans to various parts of the rust sysdb. All methods in spanner.rs are instrumented. Some queries are instrumented with info spans while most are instrumented with debug spans. - Improvements & Bug fixes - ... - New functionality - ... ## Test plan Manual testing on Jaeger. - [ ] Tests pass locally with `pytest` for python, `yarn test` for js, `cargo test` for rust ## Migration plan _Are there any migrations, or any forwards/backwards compatibility changes needed in order to make sure this change deploys reliably?_ ## Observability plan _What is the plan to instrument and monitor this change?_ ## Documentation Changes _Are all docstrings for user-facing APIs updated if required? Do we need to make documentation changes in the [docs section](https://github.com/chroma-core/chroma/tree/main/docs/docs.trychroma.com)?_
1 parent 3063a3a commit 3938846

File tree

3 files changed

+108
-25
lines changed

3 files changed

+108
-25
lines changed

rust/rust-sysdb/src/server.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ use tokio::{
5353
signal::unix::{signal, SignalKind},
5454
};
5555
use tonic::{transport::Server, Request, Response, Status};
56+
use tracing::instrument;
5657

5758
pub struct SysdbService {
5859
port: u16,
@@ -746,7 +747,7 @@ impl SysDb for SysdbService {
746747
.retry(backoff)
747748
.when(|e: &SysDbError| {
748749
if matches!(e, SysDbError::CollectionEntryIsStale) {
749-
tracing::info!(
750+
tracing::warn!(
750751
"Collection entry is stale, retrying flush collection compaction for collection_id: {}",
751752
collection_id
752753
);
@@ -841,6 +842,7 @@ impl SysDb for SysdbService {
841842

842843
impl SysdbService {
843844
/// Create a new version file in object storage
845+
#[instrument(skip(self, storage, segments), level = "info", fields(collection_id = %collection.collection_id))]
844846
async fn create_new_version_file(
845847
&self,
846848
storage: &Storage,

rust/rust-sysdb/src/spanner.rs

Lines changed: 80 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ use google_cloud_spanner::session::SessionConfig;
2727
use google_cloud_spanner::statement::Statement;
2828
use google_cloud_spanner::transaction_rw::ReadWriteTransaction;
2929
use thiserror::Error;
30+
use tracing::instrument;
31+
use tracing::Instrument;
3032
use uuid::Uuid;
3133

3234
use crate::config::{SpannerBackendConfig, SpannerConfig};
@@ -141,6 +143,7 @@ impl SpannerBackend {
141143
/// Uses commit timestamps for created_at and updated_at.
142144
/// Sets last_compaction_time to Unix epoch (0) by default.
143145
/// If the tenant already exists, does nothing (insert on conflict do nothing).
146+
#[instrument(skip(self), level = "info")]
144147
pub async fn create_tenant(
145148
&self,
146149
req: CreateTenantRequest,
@@ -156,7 +159,7 @@ impl SpannerBackend {
156159
);
157160
check_stmt.add_param("id", &tenant_id);
158161

159-
let mut iter = tx.query(check_stmt).await?;
162+
let mut iter = tx.query(check_stmt).instrument(tracing::debug_span!("create_tenant query")).await?;
160163

161164
// If tenant doesn't exist, insert it otherwise ignore for idempotency
162165
// Set last_compaction_time to Unix epoch (0) by default
@@ -184,6 +187,7 @@ impl SpannerBackend {
184187
/// Get tenants by ids.
185188
///
186189
/// Returns `SysDbError::NotFound` if any tenant does not exist or is marked as deleted.
190+
#[instrument(skip(self), level = "info")]
187191
pub async fn get_tenants(
188192
&self,
189193
req: GetTenantsRequest,
@@ -195,7 +199,10 @@ impl SpannerBackend {
195199

196200
let mut tx = self.client.single().await?;
197201

198-
let mut iter = tx.query(stmt).await?;
202+
let mut iter = tx
203+
.query(stmt)
204+
.instrument(tracing::debug_span!("get_tenants query"))
205+
.await?;
199206
let mut tenants = Vec::new();
200207

201208
// Get all rows
@@ -214,6 +221,7 @@ impl SpannerBackend {
214221
}
215222
}
216223

224+
#[instrument(skip(self, tx), level = "info")]
217225
async fn update_tenant(
218226
&self,
219227
tx: &mut ReadWriteTransaction,
@@ -240,6 +248,7 @@ impl SpannerBackend {
240248
///
241249
/// This mimics the Go RegisterFilePaths function which updates the file_paths
242250
/// column for each segment with the new file paths from compaction.
251+
#[instrument(skip(self, tx, req), fields(collection_id = %req.collection_id), level = "info")]
243252
async fn update_segments(
244253
&self,
245254
tx: &mut ReadWriteTransaction,
@@ -276,7 +285,13 @@ impl SpannerBackend {
276285
update_stmt.add_param("region", &self.local_region().as_str());
277286
update_stmt.add_param("file_paths", &file_paths_json);
278287

279-
let rows_affected = tx.update(update_stmt).await?;
288+
let rows_affected = tx
289+
.update(update_stmt)
290+
.instrument(tracing::info_span!(
291+
"update_segment_file_paths",
292+
segment_id = %flush_segment_compaction.segment_id
293+
))
294+
.await?;
280295
if rows_affected == 0 {
281296
return Err(SysDbError::NotFound(format!(
282297
"segment '{}' not found",
@@ -302,6 +317,7 @@ impl SpannerBackend {
302317
/// Validates that the database name is not empty and that the tenant exists.
303318
/// Uses commit timestamps for created_at and updated_at.
304319
/// All checks and the insert are done atomically in a single transaction.
320+
#[instrument(skip(self), level = "info")]
305321
pub async fn create_database(
306322
&self,
307323
req: CreateDatabaseRequest,
@@ -320,7 +336,10 @@ impl SpannerBackend {
320336
);
321337
tenant_check_stmt.add_param("id", &tenant_id);
322338

323-
let mut tenant_iter = tx.query(tenant_check_stmt).await?;
339+
let mut tenant_iter = tx
340+
.query(tenant_check_stmt)
341+
.instrument(tracing::debug_span!("check_tenant_exists"))
342+
.await?;
324343
if tenant_iter.next().await?.is_none() {
325344
return Err(SysDbError::NotFound(format!(
326345
"tenant '{}' not found",
@@ -335,7 +354,7 @@ impl SpannerBackend {
335354
name_check_stmt.add_param("name", &db_name);
336355
name_check_stmt.add_param("tenant_id", &tenant_id);
337356

338-
let mut name_iter = tx.query(name_check_stmt).await?;
357+
let mut name_iter = tx.query(name_check_stmt).instrument(tracing::debug_span!("check_database_name_exists")).await?;
339358
if name_iter.next().await?.is_some() {
340359
return Err(SysDbError::AlreadyExists(format!(
341360
"database with name '{}' already exists for tenant '{}'",
@@ -349,7 +368,10 @@ impl SpannerBackend {
349368
);
350369
check_stmt.add_param("id", &db_id);
351370

352-
let mut iter = tx.query(check_stmt).await?;
371+
let mut iter = tx
372+
.query(check_stmt)
373+
.instrument(tracing::debug_span!("check_database_exists"))
374+
.await?;
353375
if iter.next().await?.is_some() {
354376
return Err(SysDbError::AlreadyExists(format!(
355377
"database with id '{}' already exists",
@@ -366,7 +388,9 @@ impl SpannerBackend {
366388
insert_stmt.add_param("tenant_id", &tenant_id);
367389
insert_stmt.add_param("is_deleted", &false);
368390

369-
tx.update(insert_stmt).await?;
391+
tx.update(insert_stmt)
392+
.instrument(tracing::info_span!("insert_database"))
393+
.await?;
370394
tracing::info!("Created database: {} for tenant: {}", db_name, tenant_id);
371395

372396
Ok(())
@@ -383,6 +407,7 @@ impl SpannerBackend {
383407
/// Get a database by name and tenant.
384408
///
385409
/// Returns `SysDbError::NotFound` if the database does not exist or is marked as deleted.
410+
#[instrument(skip(self), level = "info")]
386411
pub async fn get_database(
387412
&self,
388413
req: GetDatabaseRequest,
@@ -396,7 +421,10 @@ impl SpannerBackend {
396421

397422
let mut tx = self.client.single().await?;
398423

399-
let mut iter = tx.query(stmt).await?;
424+
let mut iter = tx
425+
.query(stmt)
426+
.instrument(tracing::info_span!("get_database query"))
427+
.await?;
400428

401429
// Get the first row if it exists
402430
if let Some(row) = iter.next().await? {
@@ -411,6 +439,7 @@ impl SpannerBackend {
411439
}
412440
}
413441

442+
#[instrument(skip(self), level = "info")]
414443
pub async fn list_databases(&self, tenant: &str) -> Result<Vec<Database>, SysDbError> {
415444
let query = "SELECT id, name, tenant_id FROM databases \
416445
WHERE tenant_id = @tenant_id AND is_deleted = FALSE \
@@ -420,7 +449,10 @@ impl SpannerBackend {
420449
stmt.add_param("tenant_id", &tenant);
421450

422451
let mut tx = self.client.single().await?;
423-
let mut result_set = tx.query(stmt).await?;
452+
let mut result_set = tx
453+
.query(stmt)
454+
.instrument(tracing::info_span!("list_databases query"))
455+
.await?;
424456

425457
let mut databases = Vec::new();
426458
while let Some(row) = result_set.next().await? {
@@ -439,6 +471,7 @@ impl SpannerBackend {
439471
// Collection Operations
440472
// ============================================================
441473

474+
#[instrument(skip(self), level = "info")]
442475
pub async fn create_collection(
443476
&self,
444477
req: CreateCollectionRequest,
@@ -512,7 +545,7 @@ impl SpannerBackend {
512545
db_stmt.add_param("name", &database_name);
513546
db_stmt.add_param("tenant_id", &tenant_id_str);
514547

515-
let mut db_iter = tx.query(db_stmt).await?;
548+
let mut db_iter = tx.query(db_stmt).instrument(tracing::debug_span!("get_database_by_name")).await?;
516549
let db_row = db_iter.next().await?;
517550
let database_id = match db_row {
518551
Some(row) => {
@@ -535,7 +568,7 @@ impl SpannerBackend {
535568
);
536569
id_check_stmt.add_param("collection_id", &collection_id);
537570

538-
let mut id_iter = tx.query(id_check_stmt).await?;
571+
let mut id_iter = tx.query(id_check_stmt).instrument(tracing::debug_span!("check_collection_exists_by_id")).await?;
539572
if id_iter.next().await?.is_some() {
540573
if get_or_create {
541574
// Return the existing collection
@@ -558,7 +591,7 @@ impl SpannerBackend {
558591
check_stmt.add_param("database_name", &database_name);
559592
check_stmt.add_param("name", &collection_name);
560593

561-
let mut check_iter = tx.query(check_stmt).await?;
594+
let mut check_iter = tx.query(check_stmt).instrument(tracing::debug_span!("check_collection_exists_by_name")).await?;
562595
if let Some(existing_row) = check_iter.next().await? {
563596
// Collection with same name exists
564597
if get_or_create {
@@ -775,6 +808,7 @@ impl SpannerBackend {
775808
/// - `limit` and `offset`: Pagination
776809
///
777810
/// Returns a list of matching collections.
811+
#[instrument(skip(self), level = "info")]
778812
pub async fn get_collections(
779813
&self,
780814
req: GetCollectionsRequest,
@@ -898,7 +932,10 @@ impl SpannerBackend {
898932
}
899933

900934
let mut tx = self.client.single().await?;
901-
let mut result_set = tx.query(stmt).await?;
935+
let mut result_set = tx
936+
.query(stmt)
937+
.instrument(tracing::info_span!("get_collections query"))
938+
.await?;
902939

903940
// Collect all rows, grouped by collection_id, preserving query order (created_at ASC)
904941
let mut collection_order: Vec<String> = Vec::new();
@@ -1023,7 +1060,10 @@ impl SpannerBackend {
10231060
fetch_stmt.add_param("collection_id", &collection_id);
10241061
fetch_stmt.add_param("region", &region);
10251062

1026-
let mut fetch_iter = tx.query(fetch_stmt).await?;
1063+
let mut fetch_iter = tx
1064+
.query(fetch_stmt)
1065+
.instrument(tracing::info_span!("fetch_collection_in_tx query"))
1066+
.await?;
10271067

10281068
// Collect all rows and convert to Collection using TryFrom<Vec<Row>>
10291069
let mut rows = Vec::new();
@@ -1047,6 +1087,7 @@ impl SpannerBackend {
10471087
/// - Segments: deduplicated using HashMap
10481088
///
10491089
/// This approach fetches all data in a single network round trip.
1090+
#[instrument(skip(self), level = "info")]
10501091
pub async fn get_collection_with_segments(
10511092
&self,
10521093
req: GetCollectionWithSegmentsRequest,
@@ -1098,7 +1139,10 @@ impl SpannerBackend {
10981139
stmt.add_param("region", &region);
10991140

11001141
let mut tx = self.client.single().await?;
1101-
let mut result_set = tx.query(stmt).await?;
1142+
let mut result_set = tx
1143+
.query(stmt)
1144+
.instrument(tracing::debug_span!("get_collection_with_segments query"))
1145+
.await?;
11021146

11031147
// Collect all rows
11041148
let mut rows: Vec<Row> = Vec::new();
@@ -1161,6 +1205,7 @@ impl SpannerBackend {
11611205
/// - `new_configuration`: New configuration (selective update of hnsw, spann, or embedding function)
11621206
///
11631207
/// Returns the updated collection.
1208+
#[instrument(skip(self))]
11641209
pub async fn update_collection(
11651210
&self,
11661211
req: UpdateCollectionRequest,
@@ -1197,7 +1242,7 @@ impl SpannerBackend {
11971242
);
11981243
check_stmt.add_param("collection_id", &collection_id);
11991244

1200-
let mut check_iter = tx.query(check_stmt).await?;
1245+
let mut check_iter = tx.query(check_stmt).instrument(tracing::debug_span!("check_collection_metadata")).await?;
12011246
let (tenant_id, database_name): (String, String) = match check_iter.next().await? {
12021247
Some(row) => {
12031248
let tenant: String = row.column_by_name("tenant_id").map_err(SysDbError::FailedToReadColumn)?;
@@ -1222,7 +1267,7 @@ impl SpannerBackend {
12221267
name_check_stmt.add_param("name", new_name);
12231268
name_check_stmt.add_param("collection_id", &collection_id);
12241269

1225-
let mut name_iter = tx.query(name_check_stmt).await?;
1270+
let mut name_iter = tx.query(name_check_stmt).instrument(tracing::debug_span!("check_collection_name_exists")).await?;
12261271
if name_iter.next().await?.is_some() {
12271272
return Err(SysDbError::AlreadyExists(format!(
12281273
"collection with name '{}' already exists in database '{}'",
@@ -1263,7 +1308,7 @@ impl SpannerBackend {
12631308
"SELECT name, dimension FROM collections WHERE collection_id = @collection_id",
12641309
);
12651310
current_stmt.add_param("collection_id", &collection_id);
1266-
let mut current_iter = tx.query(current_stmt).await?;
1311+
let mut current_iter = tx.query(current_stmt).instrument(tracing::debug_span!("get_current_collection_info")).await?;
12671312
let (current_name, current_dimension): (String, Option<i64>) = match current_iter.next().await? {
12681313
Some(row) => {
12691314
let n: String = row.column_by_name("name").map_err(SysDbError::FailedToReadColumn)?;
@@ -1290,7 +1335,7 @@ impl SpannerBackend {
12901335
"SELECT key FROM collection_metadata WHERE collection_id = @collection_id",
12911336
);
12921337
keys_stmt.add_param("collection_id", &collection_id);
1293-
let mut keys_iter = tx.query(keys_stmt).await?;
1338+
let mut keys_iter = tx.query(keys_stmt).instrument(tracing::debug_span!("get_collection_metadata_keys")).await?;
12941339
while let Some(row) = keys_iter.next().await? {
12951340
let key: String = row.column_by_name("key").map_err(SysDbError::FailedToReadColumn)?;
12961341
mutations.push(delete(
@@ -1364,7 +1409,7 @@ impl SpannerBackend {
13641409
);
13651410
schema_stmt.add_param("collection_id", &collection_id);
13661411

1367-
let mut schema_iter = tx.query(schema_stmt).await?;
1412+
let mut schema_iter = tx.query(schema_stmt).instrument(tracing::debug_span!("get_collection_schemas")).await?;
13681413
let mut region_schemas: Vec<(String, String)> = Vec::new();
13691414

13701415
while let Some(row) = schema_iter.next().await? {
@@ -1453,6 +1498,7 @@ impl SpannerBackend {
14531498

14541499
/// Flush collection compaction results to the database.
14551500
/// This mimics the logic in go/pkg/sysdb/coordinator/table_catalog.go::FlushCollectionCompactionForVersionedCollection
1501+
#[instrument(skip(self, req))]
14561502
pub async fn flush_collection_compaction(
14571503
&self,
14581504
req: FlushCompactionRequest,
@@ -1605,7 +1651,15 @@ impl SpannerBackend {
16051651
update_stmt.add_param("schema_str", &schema_str);
16061652
update_stmt.add_param("region", &region.to_string());
16071653
update_stmt.add_param("old_version_file_name", &old_version_file_name);
1608-
let rows_affected = tx.update(update_stmt).await?;
1654+
let rows_affected = tx
1655+
.update(update_stmt)
1656+
.instrument(tracing::info_span!(
1657+
"flush_compaction update query",
1658+
collection_id = %collection_id,
1659+
version_file_path = %version_file_path,
1660+
region = %region,
1661+
))
1662+
.await?;
16091663

16101664
if rows_affected == 0 {
16111665
// CAS operation failed - collection was updated by another transaction
@@ -1659,13 +1713,17 @@ impl SpannerBackend {
16591713

16601714
/// Reset the database state by dropping all tables and re-running migrations.
16611715
/// This provides a completely clean slate for testing.
1716+
#[instrument(skip(self), level = "info")]
16621717
pub async fn reset(&self) -> Result<(), SysDbError> {
16631718
// Step 1: Get all indexes first
16641719
let get_indexes_stmt = Statement::new(
16651720
"SELECT table_name, index_name FROM INFORMATION_SCHEMA.INDEXES WHERE table_schema = ''",
16661721
);
16671722
let mut tx = self.client.single().await?;
1668-
let mut indexes_result = tx.query(get_indexes_stmt).await?;
1723+
let mut indexes_result = tx
1724+
.query(get_indexes_stmt)
1725+
.instrument(tracing::debug_span!("get_spanner_indexes"))
1726+
.await?;
16691727
let mut table_names = HashSet::new();
16701728

16711729
let mut indexes: Vec<(String, String)> = Vec::new();

0 commit comments

Comments
 (0)